问题描述
我正在编写一个Spark Streaming应用程序,该程序从Kinesis数据流中读取数据。我能够通过KinesisUtils.createStream函数进行读取,然后将输出DStream转换为Dataframe以处理消息。
我的要求是执行基于时间序列的聚合,即最近10分钟的窗口内的分钟级聚合。目前,我正在使用“数据帧上的结构化流”的窗口功能。代码如下。
def getSparkSessionInstance(sparkConf):
if ('sparkSessionSingletonInstance' not in globals()):
globals()['sparkSessionSingletonInstance'] = SparkSession\
.builder\
.config(conf=sparkConf)\
.getorCreate()
return globals()['sparkSessionSingletonInstance']
def createContext(applicationName,streamName,endpointUrl,regionName,shards):
print("Creating new context")
sc = SparkContext(appName=applicationName)
ssc = StreamingContext(sc,60)
#sqlContext = sqlContext(sc)
print("appname is" + applicationName +
streamName + endpointUrl + regionName)
kinesis_streams = [KinesisUtils.createStream(ssc,applicationName,InitialPositionInStream.LATEST,2) for _ in range(int(shards))]
kinesisstream = ssc.union(*kinesis_streams)
def process(time,rdd):
print("========= %s =========" % time)
try:
# Get the singleton instance of SparkSession
spark = getSparkSessionInstance(rdd.context.getConf())
df = spark.read.json(rdd)
df.printSchema()
print(df.rdd.getNumPartitions())
df = df.withColumn("eventTime",F.to_timestamp(df.headers.timestamp)).withColumn("x",df.args.x)
df.show(truncate=False)
wDf = df.withWatermark("eventTime","10 minutes").groupBy("x",F.window("eventTime","1 minutes","1 minutes")).count()
wDf.show(truncate=False)
except Exception as e:
print(str(e))
pass
kinesis_json = kinesisstream.map(lambda x: json.loads(x))
#kinesis_json.pprint()
kinesis_json.foreachRDD(process)
ssc.checkpoint(checkpoint)
return ssc
if __name__ == "__main__":
if len(sys.argv) != 7:
print(
"Usage: Aggregate.py <app-name> <stream-name> <endpoint-url> <region-name> <kinesis-shards> <checkpoint-directory>",file=sys.stderr)
sys.exit(-1)
appName,shards,checkpoint = sys.argv[1:]
ssc = StreamingContext.getorCreate(checkpoint,lambda: createContext(appName,shards))
ssc.start()
time.sleep(600)
#ssc.awaitTermination()
ssc.stop(stopSparkContext=True,stopGraceFully=True)
我的问题是,如果我以1分钟的批处理间隔读取Kinesis DStream,它仍然能够创建10分钟的水印还是我需要配置10分钟的DStream窗口?
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)