如何批量将csv文件加载到DStream中?

问题描述

我对DStreams主题完全陌生-DStreams的基本抽象,用于批量接收数据流。我想做的是,我有一个包含1000多个记录的文本文件。我需要将文本文件发送到DStreams进行流处理。为此,我用python编写了一个创建DStream的代码,然后以10秒的间隔(批处理)将文本文件的路径传递到DStream中。批处理中没有任何数据。这是代码

spark = SparkSession.builder.master("local[*]").appName("PysparkStreaming").getorCreate()

sc = spark.sparkContext
ssc = StreamingContext(sc,10)   #Batch duration 10 seconds
type(ssc)
lines = ssc.textFileStream('/home/Downloads/Dataset/data.txt')  #create DStream
type(lines)

        
counts = lines.flatMap(lambda line: line.split(" "))\
               .map(lambda x: (x,1))\
               .reduceByKey(lambda a,b: a+b)
type(counts)
counts.pprint()

ssc.start()
ssc.awaitTermination()

我得到的输出

-------------------------------------------
Time: 2020-10-02 13:57:40
-------------------------------------------

-------------------------------------------
Time: 2020-10-02 13:57:50
-------------------------------------------

-------------------------------------------
Time: 2020-10-02 13:58:00
-------------------------------------------

-------------------------------------------
Time: 2020-10-02 13:58:10
-------------------------------------------

-------------------------------------------
Time: 2020-10-02 13:58:20
-------------------------------------------

-------------------------------------------
Time: 2020-10-02 13:58:30
-------------------------------------------

-------------------------------------------
Time: 2020-10-02 13:58:40
-------------------------------------------

-------------------------------------------
Time: 2020-10-02 13:58:50
-------------------------------------------

-------------------------------------------

我需要的是,在时间间隔内,必须显示文本文件中的数据。但是,批次之间没有任何显示。拜托,我需要您的帮助才能解决此问题...任何人告诉我要批量获取数据我该怎么做。我需要python中的代码。谢谢你。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)