问题描述
我想使用Python创建一个简单的体系结构,以打印在Kinesis中流传输的数据,然后将其发送到Spark Streaming DStream对象。我正在EC2实例中运行所有程序。
我的数据生产者是Kinesis Agent监视/var/documents/
目录。
代理程序日志文件似乎正在解析记录并将它们发送到目的地,但是以某种方式在我打印DStream对象时,什么都没有显示。
我的源代码:
import boto3,random,time
from pyspark import SparkConf,SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kinesis import KinesisUtils,InitialPositionInStream
conf = SparkConf().setAppName("KinesissparkBigDataPipeline")
sc = SparkContext(conf = conf)
sc.setLogLevel("ERROR")
ssc = StreamingContext(sc,2)
def createStream():
"""
Function that creates a DStream Object coming from Kinesis Stream.
Returns:
sparkDStream => DStream object created from records in the Kinesis Stream.
"""
kinesisAppName = ("KinesisstreamTests-%d" % abs(random.randint(0,10000000)))
sparkDStream = KinesisUtils.createStream(
ssc,kinesisAppName,"EntryPoints","https://kinesis.eu-central-1.amazonaws.com","eu-central-1",InitialPositionInStream.LATEST,2
)
return sparkDStream
if __name__ == "__main__":
try:
kinesisstream = createStream()
kinesisstream.pprint()
ssc.start()
time.sleep(60)
ssc.stop()
# ssc.awaitTermination()
except Exception as e:
print(e)
当我运行命令:spark-submit --packages org.apache.spark:spark-streaming-kinesis-asl_2.11:2.4.4 poc_bigdata_pipeline.py
时,输出是:
-------------------------------------------
Time: 2020-11-03 11:09:52
-------------------------------------------
-------------------------------------------
Time: 2020-11-03 11:09:54
-------------------------------------------
...
我在那做错了什么吗?如果我忘记了任何重要信息,请原谅我对此很陌生。
感谢阅读。
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)