如何在火花结构化流 foreachBatch 方法中实现聚合?

问题描述

我有一个用例,

我需要从数据源读取记录并将其写入多个接收器,包括 kafka,以及一些聚合。

这是我的伪代码的样子,

Dataset<Row> dataset = spark.readStream()......

dataset.writeStream().foreachBatch(
// do some processing,including aggregations
// write it to multiple sinks
batch.write().format('kafka').save();
).start().awaitTermination();

当我在 foreach 中尝试一些聚合方法时,认情况下它采用追加模式并删除旧的聚合。因此输出仅包含当前批次的结果。

我的要求是,当第二批数据到达时,它应该与第一批的结果合并。

例如:对于查询dataset.groupBy("id").count(value)

如果第一批输入是: {"id":1,"value":1},{"id":1,"value":1}

输出 {"id":1,"value": "2"}

第二批输入: {"id":1,"value":3},"value":2}

输出 {"id":1,"value":5}

预期输出 {"id":1,"value":7}

如何在火花中实现这一点?

提前致谢。

解决方法

以上示例是 DStream 示例,而不是结构化流。您需要将 Spark Structured Stream 视为将数据加载到无界表中。

假设数据源是kafka,这里是Structured Streaming的一个基本例子。请注意,ReadStream 和 WriteStream Api 无法进行模式推断。 Schema 需要来自数据源连接器,在本例中为 Kafka。

val df = sparkSession
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers","localhost:9031")
  .option("subscribe","word-count")
  .option("startingOffsets","latest")
  .option("failOnDataLoss","false")
  .load()

val query = df
  .selectExpr("CAST(key AS STRING) as key","CAST(value AS STRING) as value")
  .as[(String,String)]
  .select(from_json($"value",schema = schema).as("data"))
  .writeStream
  .format("parquet")
  .option("path","/parquet/word-count/")
  .option("checkpointLocation","/tmp/word-count-chkpnt")
  .trigger(ProcessingTime("10 second"))
  .outputMode(OutputMode.Append())
  .start()

使用 .trigger() 函数创建微批次和 outputMode 来保存每个微批次的结果。在此示例中,我每 10 秒创建一个微批处理 .trigger(ProcessingTime("10 second")) 并将流中的每个事件作为一行附加到镶木地板文件 .outputMode(OutputMode.Append())

在您的情况下,您需要使用 .trigger() 和您选择的微批次间隔,并使用 .outputMode(outputMode.Update()) 插入具有值的新键或使用递增值更新现有键。

以下部分是您的聚合逻辑所在。您可以将聚合逻辑分解为单独的数据帧,并将数据帧写入为流而不是链接 以提高可读性。

.selectExpr("CAST(key AS STRING) as key",schema = schema).as("data"))

Another example 的结构化流媒体。