问题描述
我正在使用以下流程构建实时数据管道
Source --> Kafka --> Structured Streaming (2.3) --> Sink
这样做时,我阅读了 3 个主题。
Topic 1: sample_t1 (weekly) (written to Sink_Weekly)
Topic 2: sample_t2 (once a day) (written to Sink_Daily)
Topic 3: sample_t3 (hourly) (Join with Sink_Daily and update Sink_Daily)
TL;DR
目前的方法:
目前,我正在阅读如下所示的所有主题,意图是“无论数据何时到达,我都会准备好处理它......无论它来自哪个主题”。
df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers","localhost:9092")
.option("subscribePattern","sample.*")
.option("failOnDataLoss","false")
.load()
query = df.writeStream
.format("console")
.foreach(new JoinWithSink(connection)) // Logic to perform joins as I get in the data
.option("checkpointLocation",path/to/checkpoint/dir)
.start()
query.awaitTermination()
替代解决方案:
我的另一个解决方案是将每个主题作为单独的流查询读取,然后使用流连接来执行用例
df_weekly = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers","localhost:9092")
.option("subscribe","sample_t1")
.option("failOnDataLoss","false")
.load()
df_daily = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers","sample_t2")
.option("failOnDataLoss","false")
.load()
df_hourly = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers","sample_t3")
.option("failOnDataLoss","false")
.load()
用例:
我希望能够对更快的流查询 (Topic1) 执行连接/查找并相应地更新接收器。
问题:
- 使用 subscribePattern 并一次读取所有主题更好还是将主题拆分为不同的流查询更好?
- 如何并行运行多个流查询?用例:在 topic1 和 topic2 之间运行连接与在 topic1 和 topic3 之间并行运行?
- 如果失败,我如何通过忽略损坏的记录来重新启动流式查询?
- 以编程方式在 x 小时后停止并重新启动流式查询?我知道我可以在包装脚本上添加触发器或使用 cron.. 有没有更好的方法来停止和重新启动查询?
如果您需要更多详细信息,请告诉我,我很乐意提供任何其他详细信息。
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)