node.js – 我可以限制kafka-node使用者的消费吗?

看起来像我的kafka节点消费者:

var kafka = require('kafka-node');
var consumer = new Consumer(client,[],{
     ...
    });

在某些情况下,获取的消息太多了.
有没有办法限制它(例如,每秒接受不超过1000条消息,可能使用暂停api?)

>我正在使用kafka-node,与Java版本相比,它似乎具有有限的api

解决方法

我有类似的情况,我正在消费来自Kafka的消息,不得不限制消费,因为我的消费者服务依赖于有自己约束的第三方API.

我使用async / queue以及async / cargo的包装程序asyncTimedCargo进行批处理.
货物从kafka-consumer获取所有消息,并在达到大小限制batch_config.batch_size或超时batch_config.batch_timeout时将其发送到队列.
async / queue提供饱和和不饱和的回调,如果队列任务工作者忙,可以使用它来停止使用.这将阻止货物填满,您的应用程序不会耗尽内存.消费将在不满足时恢复.

//cargo-service.js
module.exports = function(key){
    return new asyncTimedCargo(function(tasks,callback) {
        var length = tasks.length;
        var postBody = [];
        for(var i=0;i<length;i++){
            var message ={};
            var task = JSON.parse(tasks[i].value);
            message = task;
            postBody.push(message);
        }
        var postJson = {
            "json": {"request":postBody}
        };
        sms_queue.push(postJson);
        callback();
    },batch_config.batch_size,batch_config.batch_timeout)
};

//kafka-consumer.js
cargo = cargo-service()
consumer.on('message',function (message) {
    if(message && message.value && utils.isValidJsonString(message.value)) {
        var msgObject = JSON.parse(message.value);        
        cargo.push(message);
    }
    else {
        logger.error('Invalid JSON Message');
    }
});

// sms-queue.js
var sms_queue = queue(
retryable({
    times: queue_config.num_retries,errorFilter: function (err) {
        logger.info("inside retry");
        console.log(err);
        if (err) {
            return true;
        }
        else {
            return false;
        }
    }
},function (task,callback) {
// your worker task for queue
  callback()
}),queue_config.queue_worker_threads);

sms_queue.saturated = function() {
    consumer.pause();
    logger.warn('Queue saturated Consumption paused: ' + sms_queue.running());
};
sms_queue.unsaturated = function() {
    consumer.resume();
    logger.info('Queue unsaturated Consumption resumed: ' + sms_queue.running());
};

相关文章

这篇文章主要介绍“基于nodejs的ssh2怎么实现自动化部署”的...
本文小编为大家详细介绍“nodejs怎么实现目录不存在自动创建...
这篇“如何把nodejs数据传到前端”文章的知识点大部分人都不...
本文小编为大家详细介绍“nodejs如何实现定时删除文件”,内...
这篇文章主要讲解了“nodejs安装模块卡住不动怎么解决”,文...
今天小编给大家分享一下如何检测nodejs有没有安装成功的相关...