问题描述
我想让单词计数流式传输,只显示我想在Twitter上看到的单词。
所以,我像下面那样制作了电线
import java.util.Properties
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import twitter4j.conf.ConfigurationBuilder
import twitter4j.auth.OAuthAuthorization
import twitter4j.Status
import org.apache.spark.streaming.twitter.TwitterUtils
import org.apache.spark.streaming._
import org.apache.log4j._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.{Seconds,StreamingContext}
import org.apache.spark.streaming.twitter._
import twitter4j.TwitterFactory
import twitter4j.conf.ConfigurationBuilder
import java.util.Properties
import org.apache.spark.storage.StorageLevel
import twitter4j.auth.OAuthAuthorization
val appName = "TwitterData"
val ssc = new StreamingContext(sc,Seconds(10))
val hashTags = "XRP"
val cb = new ConfigurationBuilder
val prop = new Properties()
cb.setDebugEnabled(true).setoAuthConsumerKey("key number").setoAuthConsumerSecret("key number").setoAuthAccesstoken("key number").setoAuthAccesstokenSecret("key number")
val bld = cb.build()
val tf = new TwitterFactory(bld)
val twitter = tf.getInstance()
val filters = Array(hashTags).toSeq
val auth = new OAuthAuthorization(bld)
val twitterStream = TwitterUtils.createStream(ssc,Some(auth),filters,StorageLevel.MEMORY_ONLY)
twitterStream.cache()
val lines = twitterStream.map(status => status.getText)
lines.print()
val words = lines.flatMap(_.split(" "))
val pairs = words.map(x => {
if (x == "xrp" || x == "ripple"){
(x,1)
} else {
}
})
pairs.print()
ssc.start()
它可以与Twitter上的Spark Streaming一起正常工作,但按照结果,除了要获得的结果外,我想删除所有空白。
-------------------------------------------
Time: 1603866040000 ms
-------------------------------------------
@RuleXRP I need 15to25 usd per xrp
RT @Grayscale: 10/27/20 UPDATE: Net Assets Under Management,Holdings per Share,and Market Price per Share for our Investment Products.
T....
-------------------------------------------
Time: 1603866040000 ms
-------------------------------------------
()
()
()
()
()
()
(xrp,1)
()
()
()
...
我该怎么做?如果有什么方法可以使我想做的更好,那么我的绳索,请让我知道。我需要你的帮助。 我非常感谢您的建议。 谢谢
解决方法
val pairs = words.map(x => {
if (x == "xrp" || x == "ripple"){
(x,1)
} else {
}
})
这会映射您的结果
相反,您可以在映射之前使用过滤器,这会减少代码一点:
val pairs = words
.filter(x => x == "xrp" || x == "ripple")
.map(x => (x,1))