基于偏移ID跨不同集群的kafka消耗管理

问题描述

我有一些正在运行的apache Kafka客户代码,并获取用户详细信息和流程并进行相应的更新。

目前,我正在使用kafka偏移量来跟踪我正确处理的记录。

无论何时我的使用者由于任何原因重新启动(一个节点关闭而其他节点获取数据,或使用者重新启动等),它都会首先根据处理的偏移量设置从读取的Kafka偏移量

consumer.seek(//get the offset from db);

并开始轮询

consumer.poll()

现在问题出在某些不同的区域故障测试所致,同一应用程序将在其他地方运行,并将开始处理新数据。

数据库是全局同步的,但不同区域的kafka群集之间没有同步,因此我得到了偏移量,该偏移量未针对不同区域中的同一主题进行排序。

因此,我最终在其他区域寻求不同的偏移量。

每当发生故障转移时,第一个集群中的数据都不会被诱骗到第二个集群中,根据业务需求这是可以的。

当前问题是当新记录进入第二个群集时,我不应该从第一个群集应用程序设置的偏移量开始,这可以通过将偏移量与Kafka clusterID(name)一起保存来进行管理,因此无论何时我都可以查询偏移量连同簇以获取基于区域的偏移量。

有什么更好的方法来处理这种情况吗?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)