问题描述
我对DStreams主题完全陌生-DStreams的基本抽象,用于批量接收数据流。我想做的是,我有一个包含1000多个记录的文本文件。我需要将文本文件发送到DStreams进行流处理。为此,我用python编写了一个创建DStream的代码,然后以10秒的间隔(批处理)将文本文件的路径传递到DStream中。批处理中没有任何数据。这是代码,
spark = SparkSession.builder.master("local[*]").appName("PysparkStreaming").getorCreate()
sc = spark.sparkContext
ssc = StreamingContext(sc,10) #Batch duration 10 seconds
type(ssc)
lines = ssc.textFileStream('/home/Downloads/Dataset/data.txt') #create DStream
type(lines)
counts = lines.flatMap(lambda line: line.split(" "))\
.map(lambda x: (x,1))\
.reduceByKey(lambda a,b: a+b)
type(counts)
counts.pprint()
ssc.start()
ssc.awaitTermination()
我得到的输出是
-------------------------------------------
Time: 2020-10-02 13:57:40
-------------------------------------------
-------------------------------------------
Time: 2020-10-02 13:57:50
-------------------------------------------
-------------------------------------------
Time: 2020-10-02 13:58:00
-------------------------------------------
-------------------------------------------
Time: 2020-10-02 13:58:10
-------------------------------------------
-------------------------------------------
Time: 2020-10-02 13:58:20
-------------------------------------------
-------------------------------------------
Time: 2020-10-02 13:58:30
-------------------------------------------
-------------------------------------------
Time: 2020-10-02 13:58:40
-------------------------------------------
-------------------------------------------
Time: 2020-10-02 13:58:50
-------------------------------------------
-------------------------------------------
我需要的是,在时间间隔内,必须显示文本文件中的数据。但是,批次之间没有任何显示。拜托,我需要您的帮助才能解决此问题...任何人告诉我要批量获取数据我该怎么做。我需要python中的代码。谢谢你。
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)