Kinesis DStreams上的窗口与Pyspark中的结构流数据帧

问题描述

我正在编写一个Spark Streaming应用程序,该程序从Kinesis数据流中读取数据。我能够通过KinesisUtils.createStream函数进行读取,然后将输出DStream转换为Dataframe以处理消息。

我的要求是执行基于时间序列的聚合,即最近10分钟的窗口内的分钟级聚合。目前,我正在使用“数据帧上的结构化流”的窗口功能代码如下。

def getSparkSessionInstance(sparkConf):
    if ('sparkSessionSingletonInstance' not in globals()):
        globals()['sparkSessionSingletonInstance'] = SparkSession\
            .builder\
            .config(conf=sparkConf)\
            .getorCreate()
    return globals()['sparkSessionSingletonInstance']

def createContext(applicationName,streamName,endpointUrl,regionName,shards):
    print("Creating new context")

    sc = SparkContext(appName=applicationName)
    ssc = StreamingContext(sc,60)
    #sqlContext = sqlContext(sc)

    print("appname is" + applicationName +
          streamName + endpointUrl + regionName)

    kinesis_streams = [KinesisUtils.createStream(ssc,applicationName,InitialPositionInStream.LATEST,2) for _ in range(int(shards))]
    kinesisstream = ssc.union(*kinesis_streams)
    
    def process(time,rdd):
        print("========= %s =========" % time)

        try:
            # Get the singleton instance of SparkSession
            spark = getSparkSessionInstance(rdd.context.getConf())

            df = spark.read.json(rdd)
            df.printSchema()
            print(df.rdd.getNumPartitions())
            df = df.withColumn("eventTime",F.to_timestamp(df.headers.timestamp)).withColumn("x",df.args.x)
            df.show(truncate=False)

            wDf = df.withWatermark("eventTime","10 minutes").groupBy("x",F.window("eventTime","1 minutes","1 minutes")).count()
            wDf.show(truncate=False)
        except Exception as e:
            print(str(e))
            pass

    kinesis_json = kinesisstream.map(lambda x: json.loads(x))
    #kinesis_json.pprint()
    kinesis_json.foreachRDD(process)
    ssc.checkpoint(checkpoint)
    return ssc

if __name__ == "__main__":
    if len(sys.argv) != 7:
        print(
            "Usage: Aggregate.py <app-name> <stream-name> <endpoint-url> <region-name> <kinesis-shards> <checkpoint-directory>",file=sys.stderr)
        sys.exit(-1)

    appName,shards,checkpoint = sys.argv[1:]
    ssc = StreamingContext.getorCreate(checkpoint,lambda: createContext(appName,shards))

    ssc.start()
    time.sleep(600)

    #ssc.awaitTermination()
    ssc.stop(stopSparkContext=True,stopGraceFully=True)

我的问题是,如果我以1分钟的批处理间隔读取Kinesis DStream,它仍然能够创建10分钟的水印还是我需要配置10分钟的DStream窗口?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)