如何在Python的Spark中使用具有10秒批处理间隔的套接字文本流从静态文件中读取数据?

问题描述

我的本​​地驱动器(Windows)中有一些带有约10K记录的静态文件(log_file)。结构如下。

"date","time","size","r_version","r_arch","r_os","package","version","country","ip_id"
"2012-10-01","00:30:13",35165,"2.15.1","i686","linux-gnu","quadprog","1.5-4","AU",1

我想使用批处理间隔为10秒的套接字文本流读取此日志记录,之后我必须使用RDD或DF计算执行少量的火花操作。我读了下面的代码,只是为了按时间间隔读取数据,以RDD的形式拆分并显示

from pyspark import SparkConf,SparkContext
from pyspark.streaming import StreamingContext

conf = SparkConf().setMaster("local[*]").setAppName("Assignment4")
sc = SparkContext(conf = conf)
ssc = StreamingContext(sc,10)

data = ssc.socketTextStream("file:///SparkL2/log_file.txt",2222)
                            
linesrdd = data.map(lambda x: x.split(","))

linesrdd.pprint()
ssc.start()
ssc.awaitTermination()

我保存了这段代码,并从Anaconda命令提示符下进行了火花提交。我在socketTextStream函数中遇到错误,可能是因为我没有正确使用它。

(base) PS C:\Users\HP> cd c:\SparkL2
(base) PS C:\SparkL2> spark-submit Assignment5.py
20/09/09 21:42:42 ERROR ReceiverTracker: Deregistered receiver for stream 0: Error starting receiver 0 - java.net.UnkNownHostException: file:///SparkL2/log_file.txt
        at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:196)
        at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:162)
        at java.net.socksSocketImpl.connect(SocksSocketImpl.java:394)
        at java.net.socket.connect(Socket.java:606)
        at java.net.socket.connect(Socket.java:555)
        at java.net.socket.<init>(Socket.java:451)
        at java.net.socket.<init>(Socket.java:228)
        at org.apache.spark.streaming.dstream.socketReceiver.onStart(SocketInputDStream.scala:61)
        at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:149)
        at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:131)
        at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint.$anonfun$startReceiver$1(ReceiverTracker.scala:596)
        at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint.$anonfun$startReceiver$1$adapted(ReceiverTracker.scala:586)
        at org.apache.spark.SparkContext.$anonfun$submitJob$1(SparkContext.scala:2242)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:127)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

有人可以帮助我吗?我是pyspark的新手,想自己学习。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)