是否有可能让spark结构化流更新模式写入db?

问题描述

我使用spark(3.0.0)结构化流从kafka中读取主题

我已经使用mapGropusWithState,然后使用DB sink获取我的流数据,因此,我必须使用 update 模式我从Spark官方指南中获得的理解:https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes

Spark官方指南的以下部分未对files进行任何说明,也不支持update mode的{​​{3}}

写入console

目前,我将其输出aggregated,并且希望将数据存储在文件或DB中。

所以我的问题是: 如何在我的情况下将流数据写入db或文件? 我是否必须将数据写入kafka,然后使用kafka connect将它们读回到文件/数据库

p.s。我按照这些文章进行了- https://stackoverflow.com/questions/62738727/how-to-deduplicate-and-keep-latest-based-on-timestamp-field-in-spark-structured - https://databricks.com/blog/2017/10/17/arbitrary-stateful-processing-in-apache-sparks-structured-streaming.html - will also try one more time for below using java api (https://stackoverflow.com/questions/50933606/spark-streaming-select-record-with-max-timestamp-for-each-id-in-dataframe-pysp) 流式查询

int main(int argc,char *argv[])
{

QApplication app(argc,argv);

// Startup actions neeeded to display AppView correctly
// Black screen is shown for several seconds
// ...

qquickview AppView;
AppView.setSource(QUrl(QStringLiteral("main.qml")));
AppView.resize(480,800);
AppView.show();
return app.exec();

}

解决方法

我对输出和写入感到困惑。同样,我错误地认为数据库和文件接收器在文档的输出SINK部分中是并行的(因此,在指南的https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks的输出SINKs部分中无法看到数据库接收器。)

我刚刚意识到OUTPUT模式(追加/更新/完成)将执行查询流查询约束。但这与如何写入SINK无关。我还意识到可以通过使用FOREACH SINK来完成DB编写(起初我只是理解这是为了进行额外的转换)。

我发现这些文章/讨论很有用

因此,稍后,请再次阅读官方指南,确认每个批次在写入存储时也可以执行自定义逻辑等。