问题描述
我使用spark(3.0.0)结构化流从kafka中读取主题。
我已经使用mapGropusWithState
,然后使用DB sink
来获取我的流数据,因此,我必须使用 update 模式我从Spark官方指南中获得的理解:https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes
Spark官方指南的以下部分未对files
进行任何说明,也不支持update mode
的{{3}}
console
。
目前,我将其输出到aggregated
,并且希望将数据存储在文件或DB中。
所以我的问题是: 如何在我的情况下将流数据写入db或文件? 我是否必须将数据写入kafka,然后使用kafka connect将它们读回到文件/数据库?
p.s。我按照这些文章进行了- https://stackoverflow.com/questions/62738727/how-to-deduplicate-and-keep-latest-based-on-timestamp-field-in-spark-structured
- https://databricks.com/blog/2017/10/17/arbitrary-stateful-processing-in-apache-sparks-structured-streaming.html
- will also try one more time for below using java api
(https://stackoverflow.com/questions/50933606/spark-streaming-select-record-with-max-timestamp-for-each-id-in-dataframe-pysp)
流式查询。
int main(int argc,char *argv[])
{
QApplication app(argc,argv);
// Startup actions neeeded to display AppView correctly
// Black screen is shown for several seconds
// ...
qquickview AppView;
AppView.setSource(QUrl(QStringLiteral("main.qml")));
AppView.resize(480,800);
AppView.show();
return app.exec();
}
解决方法
我对输出和写入感到困惑。同样,我错误地认为数据库和文件接收器在文档的输出SINK部分中是并行的(因此,在指南的https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks的输出SINKs部分中无法看到数据库接收器。)
我刚刚意识到OUTPUT模式(追加/更新/完成)将执行查询流查询约束。但这与如何写入SINK无关。我还意识到可以通过使用FOREACH SINK来完成DB编写(起初我只是理解这是为了进行额外的转换)。
我发现这些文章/讨论很有用
- https://www.waitingforcode.com/apache-spark-structured-streaming/output-modes-structured-streaming/read#what_is_the_difference_with_SaveMode
- How to write streaming dataframe to PostgreSQL?
- https://linuxize.com/post/how-to-list-databases-tables-in-postgreqsl/
因此,稍后,请再次阅读官方指南,确认每个批次在写入存储时也可以执行自定义逻辑等。