Kinesis Spark Streaming Integration-无法输出DStream内容

问题描述

我想使用Python创建一个简单的体系结构,以打印在Kinesis中流传输的数据,然后将其发送到Spark Streaming DStream对象。我正在EC2实例中运行所有程序。

我的数据生产者是Kinesis Agent监视/var/documents/目录。 代理程序日志文件似乎正在解析记录并将它们发送到目的地,但是以某种方式在我打印DStream对象时,什么都没有显示

我的源代码

import boto3,random,time
from pyspark import SparkConf,SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kinesis import KinesisUtils,InitialPositionInStream

conf = SparkConf().setAppName("KinesissparkBigDataPipeline")

sc = SparkContext(conf = conf)
sc.setLogLevel("ERROR")
ssc = StreamingContext(sc,2)

def createStream():
    """
    Function that creates a DStream Object coming from Kinesis Stream.

    Returns:
        sparkDStream => DStream object created from records in the Kinesis Stream.
    """
    kinesisAppName = ("KinesisstreamTests-%d" % abs(random.randint(0,10000000)))
    sparkDStream = KinesisUtils.createStream(
            ssc,kinesisAppName,"EntryPoints","https://kinesis.eu-central-1.amazonaws.com","eu-central-1",InitialPositionInStream.LATEST,2
    )
    return sparkDStream

if __name__ == "__main__":
    try:
        kinesisstream = createStream()
        kinesisstream.pprint()

        ssc.start()
        time.sleep(60)
        ssc.stop()
        # ssc.awaitTermination()
    except Exception as e:
        print(e)
当我运行命令:spark-submit --packages org.apache.spark:spark-streaming-kinesis-asl_2.11:2.4.4 poc_bigdata_pipeline.py时,

输出是:

-------------------------------------------
Time: 2020-11-03 11:09:52
-------------------------------------------

-------------------------------------------
Time: 2020-11-03 11:09:54
-------------------------------------------

...

我在那做错了什么吗?如果我忘记了任何重要信息,请原谅我对此很陌生。

感谢阅读。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)