看起来像我的kafka节点消费者:
var kafka = require('kafka-node'); var consumer = new Consumer(client,[],{ ... });
在某些情况下,获取的消息太多了.
有没有办法限制它(例如,每秒接受不超过1000条消息,可能使用暂停api?)
>我正在使用kafka-node,与Java版本相比,它似乎具有有限的api
解决方法
我有类似的情况,我正在消费来自Kafka的消息,不得不限制消费,因为我的消费者服务依赖于有自己约束的第三方API.
我使用async / queue以及async / cargo的包装程序asyncTimedCargo进行批处理.
货物从kafka-consumer获取所有消息,并在达到大小限制batch_config.batch_size或超时batch_config.batch_timeout时将其发送到队列.
async / queue提供饱和和不饱和的回调,如果队列任务工作者忙,可以使用它来停止使用.这将阻止货物填满,您的应用程序不会耗尽内存.消费将在不满足时恢复.
//cargo-service.js module.exports = function(key){ return new asyncTimedCargo(function(tasks,callback) { var length = tasks.length; var postBody = []; for(var i=0;i<length;i++){ var message ={}; var task = JSON.parse(tasks[i].value); message = task; postBody.push(message); } var postJson = { "json": {"request":postBody} }; sms_queue.push(postJson); callback(); },batch_config.batch_size,batch_config.batch_timeout) }; //kafka-consumer.js cargo = cargo-service() consumer.on('message',function (message) { if(message && message.value && utils.isValidJsonString(message.value)) { var msgObject = JSON.parse(message.value); cargo.push(message); } else { logger.error('Invalid JSON Message'); } }); // sms-queue.js var sms_queue = queue( retryable({ times: queue_config.num_retries,errorFilter: function (err) { logger.info("inside retry"); console.log(err); if (err) { return true; } else { return false; } } },function (task,callback) { // your worker task for queue callback() }),queue_config.queue_worker_threads); sms_queue.saturated = function() { consumer.pause(); logger.warn('Queue saturated Consumption paused: ' + sms_queue.running()); }; sms_queue.unsaturated = function() { consumer.resume(); logger.info('Queue unsaturated Consumption resumed: ' + sms_queue.running()); };