问题描述
我需要从数据源读取记录并将其写入多个接收器,包括 kafka,以及一些聚合。
这是我的伪代码的样子,
Dataset<Row> dataset = spark.readStream()......
dataset.writeStream().foreachBatch(
// do some processing,including aggregations
// write it to multiple sinks
batch.write().format('kafka').save();
).start().awaitTermination();
当我在 foreach 中尝试一些聚合方法时,默认情况下它采用追加模式并删除旧的聚合。因此输出仅包含当前批次的结果。
我的要求是,当第二批数据到达时,它应该与第一批的结果合并。
例如:对于查询,dataset.groupBy("id").count(value)
如果第一批输入是: {"id":1,"value":1},{"id":1,"value":1}
输出: {"id":1,"value": "2"}
第二批输入: {"id":1,"value":3},"value":2}
输出: {"id":1,"value":5}
预期输出: {"id":1,"value":7}
如何在火花中实现这一点?
提前致谢。
解决方法
以上示例是 DStream 示例,而不是结构化流。您需要将 Spark Structured Stream 视为将数据加载到无界表中。
假设数据源是kafka,这里是Structured Streaming的一个基本例子。请注意,ReadStream 和 WriteStream Api 无法进行模式推断。 Schema 需要来自数据源连接器,在本例中为 Kafka。
val df = sparkSession
.readStream
.format("kafka")
.option("kafka.bootstrap.servers","localhost:9031")
.option("subscribe","word-count")
.option("startingOffsets","latest")
.option("failOnDataLoss","false")
.load()
val query = df
.selectExpr("CAST(key AS STRING) as key","CAST(value AS STRING) as value")
.as[(String,String)]
.select(from_json($"value",schema = schema).as("data"))
.writeStream
.format("parquet")
.option("path","/parquet/word-count/")
.option("checkpointLocation","/tmp/word-count-chkpnt")
.trigger(ProcessingTime("10 second"))
.outputMode(OutputMode.Append())
.start()
使用 .trigger() 函数创建微批次和 outputMode 来保存每个微批次的结果。在此示例中,我每 10 秒创建一个微批处理 .trigger(ProcessingTime("10 second"))
并将流中的每个事件作为一行附加到镶木地板文件 .outputMode(OutputMode.Append())
在您的情况下,您需要使用 .trigger() 和您选择的微批次间隔,并使用 .outputMode(outputMode.Update())
插入具有值的新键或使用递增值更新现有键。
以下部分是您的聚合逻辑所在。您可以将聚合逻辑分解为单独的数据帧,并将数据帧写入为流而不是链接 以提高可读性。
.selectExpr("CAST(key AS STRING) as key",schema = schema).as("data"))
Another example 的结构化流媒体。