更新更改日志时,Faust 如何增加 Rocksdb 中的偏移量? 我很好奇当我们使用 RocksDB 作为状态后端时,faust 如何在内部更新变更日志

问题描述

我很好奇当我们使用 RocksDB 作为状态后端时,faust 如何在内部更新变更日志。

据我所知,我们将在表更新期间有下一个行为:

使用 _on_changelog_sent 回调将新的变更日志发送到 kafka 变更日志主题(来自关于此回调的文档:这是在 RocksDB 中保持偏移量的原因,以便在启动时我们知道哪些偏移量我们已经有了数据数据库)

但最重要的问题:在kafka日志中成功存储changelog消息后会调用这个回调吗?或者我们可以有这样的情况,当我们向 kafka 发送更改日志消息时,通过回调更新 Rocksdb 中的偏移量,但最终发送到 kafka 会失败? (因为一些 kafka 集群问题)

在这种情况下,rocksdb 中 changelog topic 的最后一个偏移量和 kafka 中的 real highwater 偏移量不一致。

如果 Faust 现在尝试重新启动,那么我们就会因为这种不一致而失败。

我问这个是因为我在生产中遇到这样一个问题,当rocksdb中的偏移量大于kafka变更日志主题中的最后一条消息偏移量时。

我相信 kafka 生产者应该等待确认更改日志事件已保存到 kafka 日志,然后才运行回调以更新 Rocksdb。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)