我正在使用kafka-node(kafka的节点客户端),使用使用者来检索有关主题的消息.不幸的是,我收到了“offsetoutOfRange”条件(调用了offsetoutOfRange回调).我的应用程序运行良好,直到消费者显着落后于生产者,在最早和最新的偏移之间留下了一个很大的差距.此时我(也许是错误的)假设消费者能够继续接收消息(并希望能够接收到生产者).
我的kafka消费者客户端代码如下:
: : var kafka = require('kafka-node'); var zookeeper = "10.0.1.201:2181"; var id = "embClient"; var Consumer = kafka.Consumer; var client = new kafka.Client(zookeeper,id); var consumer = new Consumer( client,[ { topic: "test",partition: 0 } ],{ autoCommit: false } ); consumer.on('error',[error callback...]); consumer.on('offsetoutOfRange',[offset error callback...]); consumer.on('message',[message callback...]); : :
我做错了什么,还是我错过了什么?
如果没有,我有几个问题:
(a)是否有一种公认的“最佳”方式来写客户端以优雅地处理这种情况?
(b)为何会提出这个条件? (我假设一个客户应该能够继续阅读它停止的消息,最终(理想情况下)赶上……)
(c)我是否需要编写代码/逻辑来处理这种情况,并明确地重新定位消费者偏移量以进行读取? (这看起来有点麻烦)……
任何帮助表示赞赏.
解决方法
我相信该应用程序可能会尝试读取Kafka中不再提供的消息. Kafka根据log.retention.*属性删除旧消息.假设您已向Kafka发送1000条消息.由于保留,Kafka删除了前500条消息.如果您的应用尝试阅读消息350,它将失败并且它将引发offsetoutOfRange错误.之所以会发生这种情况,是因为您的消费者非常缓慢,以至于Kafka经纪人已经在消费者处理消息之前删除了消息.或者您的消费者崩溃了,但上次处理的消息的偏移量已保存在某处.
您可以使用Offset class检索最新/最早的可用偏移量(请参阅方法提取)并更新消费者的偏移量.我们使用这种方法.
一般来说,当这种情况发生时应该知道应该做什么并不容易,因为显然有些事情是非常错误的.
希望能帮助到你,卢卡斯