问题描述
我有一些正在运行的apache Kafka客户代码,并获取用户详细信息和流程并进行相应的更新。
目前,我正在使用kafka偏移量来跟踪我正确处理的记录。
无论何时我的使用者由于任何原因重新启动(一个节点关闭而其他节点获取数据,或使用者重新启动等),它都会首先根据处理的偏移量设置从读取的Kafka偏移量
consumer.seek(//get the offset from db);
并开始轮询
consumer.poll()
现在问题出在某些不同的区域故障测试所致,同一应用程序将在其他地方运行,并将开始处理新数据。
即数据库是全局同步的,但不同区域的kafka群集之间没有同步,因此我得到了偏移量,该偏移量未针对不同区域中的同一主题进行排序。
因此,我最终在其他区域寻求不同的偏移量。
每当发生故障转移时,第一个集群中的数据都不会被诱骗到第二个集群中,根据业务需求这是可以的。
当前问题是当新记录进入第二个群集时,我不应该从第一个群集应用程序设置的偏移量开始,这可以通过将偏移量与Kafka clusterID(name)一起保存来进行管理,因此无论何时我都可以查询偏移量连同簇以获取基于区域的偏移量。
有什么更好的方法来处理这种情况吗?
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)