如下所示,将AWS kineses与Spark流集成在一起,一无所获

问题描述

高度赞赏帮助

问题描述: 我正在尝试通过火花流消耗运动学中的数据。但是问题是主题是否为空或没有数据可供消耗,我看到带有时间戳的黑屏为空。但是我期待如果没有任何内容,则模板为空(无数据)消费。 下面是我正在使用的代码并记录消息。

非常有帮助。 预先感谢...

使用Spark 2.4.5,scala 2.11.8和以下软件包 -包org.apache.spark:spark-streaming-kinesis-asl_2.12:2.4.5 --packages org.apache.spark:spark-streaming-kinesis-asl_2.11:2.4.5

     import sys

    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext
    from pyspark.streaming.kinesis import KinesisUtils,InitialPositionInStream


    ssc = StreamingContext(sc,5)

    #ssc = StreamingContext(spark,5)

    conf = SparkConf()
    conf.setAppName("PythonStreamingKinesis")
    #conf.get("spark.app.name")


    kinesisAppName = conf.get("spark.app.name")
    regionName = "us-west-1"
    streamName = "kinesis_data"
    endpointUrl = "https://kinesis.us-west-1.amazonaws.com"
    lines = KinesisUtils.createStream(ssc,kinesisAppName,streamName,endpointUrl,regionName,InitialPositionInStream.TRIM_HORIZON,2)
    lines.pprint()
     ssc.start()

***日志消息***


时间:2020-08-25 05:57:25

20/08/25 05:57:25 INFO JobScheduler:已完成作业流作业1598335045000 ms.1来自作业时间集1598335045000 ms 20/08/25 05:57:25 INFO JobScheduler:总延迟:0.171 s,时间1598335045000 ms(执行:0.100 s) 20/08/25 05:57:25 INFO KinesisBackedBlockRDD:从持久性列表中删除RDD 108 20/08/25 05:57:25 INFO KinesisInputDStream:在NativeMethodAccessorImpl.java:0上的createStream处移除RDD KinesisBackedBlockRDD [108]的块,时间为1598335045000 ms 20/08/25 05:57:25 INFO ReceivedBlockTracker:删除批次:1598335035000 ms 20/08/25 05:57:25 INFO InputInfoTracker:删除旧的批处理元数据:1598335035000 ms [阶段2:>(0 +1)/ 1] 20/08/25 05:57:30 INFO JobScheduler:添加了作业,时间为1598335050000 ms 20/08/25 05:57:30 INFO JobScheduler:从作业时间1598335050000 ms的作业集中开始作业流作业1598335050000 ms.0

时间:2020-08-25 05:57:30

20/08/25 05:57:30 INFO JobScheduler:已完成作业流作业1598335050000 ms。来自作业时间1598335050000 ms的0 20/08/25 05:57:30 INFO JobScheduler:从作业时间1598335050000 ms的作业集中开始作业流作业1598335050000 ms.1

时间:2020-08-25 05:57:30

20/08/25 05:57:30 INFO JobScheduler:完成作业流作业1598335050000 ms。来自作业时间集合1598335050000 ms 20/08/25 05:57:30 INFO JobScheduler:总延迟:0.157 s,时间1598335050000 ms(执行:0.094 s) 20/08/25 05:57:30 INFO KinesisBackedBlockRDD:从持久性列表中删除RDD 116 20/08/25 05:57:30 INFO KinesisInputDStream:在NativeMethodAccessorImpl.java:0上的createStream处移除RDD KinesisBackedBlockRDD [116]的块,时间为1598335050000 ms 20/08/25 05:57:30 INFO ReceivedBlockTracker:删除批次:1598335040000 ms 20/08/25 05:57:30 INFO InputInfoTracker:删除旧的批处理元数据:1598335040000 ms [阶段2:>(0 +1)/ 1] 20/08/25 05:57:35 INFO JobScheduler:添加了作业,时间为1598335055000 ms 20/08/25 05:57:35 INFO JobScheduler:从作业时间1598335055000 ms开始作业流作业1598335055000 ms.0

时间:2020-08-25 05:57:35

20/08/25 05:57:35 INFO JobScheduler:已完成作业流作业1598335055000 ms.0,来自作业时间集合1598335055000 ms 20/08/25 05:57:35 INFO JobScheduler:从作业时间1598335055000 ms开始作业流作业1598335055000 ms.1

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)