如何使用 MQ 在 Spark Structured Streaming 中添加故障恢复

问题描述

我正在使用 activeMQ 从主题中读取消息

val df = spark
            .readStream
            .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
            .option("brokerUrl",brokerUrl_)
            .option("topic",topicName_)
            .option("persistence","memory")
            .option("cleanSession","true")
            .option("username",username_)
            .option("password",password_)
            .load()

然后我将其写入 CSV 文件

            df
                .writeStream
                .outputMode("append")
                .format("csv")
                .option("checkpoint",checkpointLocation)
                .option("path",path_)
                .option("truncate",value = false)
                .start
                .awaitTermination()

假设我正在向此发送消息,并且在接收消息之前失败,然后在下一次开始时我想从该失败消息开始读取。这能实现吗??

编辑: 通过“向此发送消息”,我的意思是在 ActiveMQ 主题中排队一条消息,如果 spark 应用程序在收到消息之前失败,那么我如何读取失败的消息? 我曾尝试在 checkpoint 中使用 spark.sparkContext.setCheckpointDir(path_of_checkpoint),但由于偏移量不同,应用程序在接收任何新消息时崩溃,我猜 ActiveMQ 不支持kafka 之类的偏移量加载。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)