Kafa + Structured Streaming:并行读取主题

问题描述

我正在使用以下流程构建实时数据管道

Source --> Kafka --> Structured Streaming (2.3) --> Sink

这样做时,我阅读了 3 个主题

Topic 1: sample_t1 (weekly) (written to Sink_Weekly) 
Topic 2: sample_t2 (once a day) (written to Sink_Daily)
Topic 3: sample_t3 (hourly) (Join with Sink_Daily and update Sink_Daily)

TL;DR

目前的方法

目前,我正在阅读如下所示的所有主题,意图是“无论数据何时到达,我都会准备好处理它......无论它来自哪个主题”。

 df = spark 
        .readStream 
        .format("kafka") 
        .option("kafka.bootstrap.servers","localhost:9092") 
        .option("subscribePattern","sample.*") 
        .option("failOnDataLoss","false")
        .load()

query = df.writeStream
    .format("console")
    .foreach(new JoinWithSink(connection)) // Logic to perform joins as I get in the data
    .option("checkpointLocation",path/to/checkpoint/dir)
    .start()

query.awaitTermination()

但是这种方法让我思考这是否是处理不同频率主题的最佳方法

替代解决方案:

我的另一个解决方案是将每个主题作为单独的流查询读取,然后使用流连接来执行用例

df_weekly = spark 
        .readStream 
        .format("kafka") 
        .option("kafka.bootstrap.servers","localhost:9092") 
        .option("subscribe","sample_t1") 
        .option("failOnDataLoss","false")
        .load()


 df_daily = spark 
        .readStream 
        .format("kafka") 
        .option("kafka.bootstrap.servers","sample_t2") 
        .option("failOnDataLoss","false")
        .load()

 df_hourly = spark 
        .readStream 
        .format("kafka") 
        .option("kafka.bootstrap.servers","sample_t3") 
        .option("failOnDataLoss","false")
        .load()

用例:

我希望能够对更快的流查询 (Topic1) 执行连接/查找并相应地更新接收器。

问题:

  1. 使用 subscribePattern 并一次读取所有主题更好还是将主题拆分为不同的流查询更好?
  2. 如何并行运行多个流查询?用例:在 topic1 和 topic2 之间运行连接与在 topic1 和 topic3 之间并行运行?
  3. 如果失败,我如何通过忽略损坏的记录来重新启动流式查询
  4. 以编程方式在 x 小时后停止并重新启动流式查询?我知道我可以在包装脚本上添加触发器或使用 cron.. 有没有更好的方法来停止和重新启动查询

如果您需要更多详细信息,请告诉我,我很乐意提供任何其他详细信息。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)