node.js – kafka-node使用者收到offsetOutOfRange错误

我正在使用kafka-node(kafka的节点客户端),使用使用者来检索有关主题的消息.不幸的是,我收到了“offsetoutOfRange”条件(调用了offsetoutOfRange回调).我的应用程序运行良好,直到消费者显着落后于生产者,在最早和最新的偏移之间留下了一个很大的差距.此时我(也许是错误的)假设消费者能够继续接收消息(并希望能够接收到生产者).

我的kafka消费者客户端代码如下:

:
:
var kafka = require('kafka-node');

var zookeeper = "10.0.1.201:2181";
var id = "embClient";

var Consumer = kafka.Consumer;
var client = new kafka.Client(zookeeper,id);
var consumer = new Consumer( client,[ { topic: "test",partition: 0 } ],{ autoCommit: false } );

consumer.on('error',[error callback...]);

consumer.on('offsetoutOfRange',[offset error callback...]);

consumer.on('message',[message callback...]);
:
:

我做错了什么,还是我错过了什么?

如果没有,我有几个问题:

(a)是否有一种公认的“最佳”方式来写客户端以优雅地处理这种情况?

(b)为何会提出这个条件? (我假设一个客户应该能够继续阅读它停止的消息,最终(理想情况下)赶上……)

(c)我是否需要编写代码/逻辑来处理这种情况,并明确地重新定位消费者偏移量以进行读取? (这看起来有点麻烦)……

任何帮助表示赞赏.

解决方法

我相信该应用程序可能会尝试读取Kafka中不再提供的消息. Kafka根据log.retention.*属性删除旧消息.假设您已向Kafka发送1000条消息.由于保留,Kafka删除了前500条消息.如果您的应用尝试阅读消息350,它将失败并且它将引发offsetoutOfRange错误.之所以会发生这种情况,是因为您的消费者非常缓慢,以至于Kafka经纪人已经在消费者处理消息之前删除了消息.或者您的消费者崩溃了,但上次处理的消息的偏移量已保存在某处.

您可以使用Offset class检索最新/最早的可用偏移量(请参阅方法提取)并更新消费者的偏移量.我们使用这种方法.

一般来说,当这种情况发生时应该知道应该做什么并不容易,因为显然有些事情是非常错误的.

希望能帮助到你,卢卡斯

相关文章

这篇文章主要介绍“基于nodejs的ssh2怎么实现自动化部署”的...
本文小编为大家详细介绍“nodejs怎么实现目录不存在自动创建...
这篇“如何把nodejs数据传到前端”文章的知识点大部分人都不...
本文小编为大家详细介绍“nodejs如何实现定时删除文件”,内...
这篇文章主要讲解了“nodejs安装模块卡住不动怎么解决”,文...
今天小编给大家分享一下如何检测nodejs有没有安装成功的相关...