如何删除火花流中的其他结果?

问题描述

我想让单词计数流式传输,只显示我想在Twitter上看到的单词。

所以,我像下面那样制作了电线

import java.util.Properties
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import twitter4j.conf.ConfigurationBuilder
import twitter4j.auth.OAuthAuthorization
import twitter4j.Status
import org.apache.spark.streaming.twitter.TwitterUtils
import org.apache.spark.streaming._

import org.apache.log4j._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.{Seconds,StreamingContext}
import org.apache.spark.streaming.twitter._
import twitter4j.TwitterFactory
import twitter4j.conf.ConfigurationBuilder
import java.util.Properties
import org.apache.spark.storage.StorageLevel
import twitter4j.auth.OAuthAuthorization

val appName = "TwitterData"

val ssc = new StreamingContext(sc,Seconds(10))
val hashTags = "XRP"
val cb = new ConfigurationBuilder
val prop = new Properties()

cb.setDebugEnabled(true).setoAuthConsumerKey("key number").setoAuthConsumerSecret("key number").setoAuthAccesstoken("key number").setoAuthAccesstokenSecret("key number")

val bld = cb.build()
val tf = new TwitterFactory(bld)
val twitter = tf.getInstance()
val filters = Array(hashTags).toSeq
val auth = new OAuthAuthorization(bld)
val twitterStream = TwitterUtils.createStream(ssc,Some(auth),filters,StorageLevel.MEMORY_ONLY)

twitterStream.cache()

val lines = twitterStream.map(status => status.getText)
lines.print()

val words = lines.flatMap(_.split(" "))
val pairs = words.map(x => {
  if (x == "xrp" || x == "ripple"){
    (x,1)
  } else {
  }
})

pairs.print()

ssc.start()

它可以与Twitter上的Spark Streaming一起正常工作,但按照结果,除了要获得的结果外,我想删除所有空白。

-------------------------------------------
Time: 1603866040000 ms
-------------------------------------------
@RuleXRP I need 15to25 usd per xrp
RT @Grayscale: 10/27/20 UPDATE: Net Assets Under Management,Holdings per Share,and Market Price per Share for our Investment Products.

T....

-------------------------------------------
Time: 1603866040000 ms
-------------------------------------------
()
()
()
()
()
()
(xrp,1)
()
()
()
...

我该怎么做?如果有什么方法可以使我想做的更好,那么我的绳索,请让我知道。我需要你的帮助。 我非常感谢您的建议。 谢谢

解决方法

val pairs = words.map(x => {
  if (x == "xrp" || x == "ripple"){
    (x,1)
  } else {
  }
})

这会映射您的结果

相反,您可以在映射之前使用过滤器,这会减少代码一点:

val pairs = words
  .filter(x => x == "xrp" || x == "ripple")
  .map(x => (x,1))

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...