Node JS 应用程序因 ERR_SOCKET_CANNOT_SEND 错误而崩溃

问题描述

我有一个 node js 服务,它使用来自 Kafka 的消息并通过转换逻辑的各个步骤对其进行处理。在处理过程中,服务使用 Redis 和 mongo 进行存储和缓存。最后,它通过UDP数据包将转换后的消息发送到另一个目的地。

在启动时,它会在一段时间后开始使用来自 Kafka 的消息,但由于未处理的错误而崩溃:ERR_CANNOT_SEND 无法发送数据(见下图)。 重新启动应用程序可以暂时解决问题。 我最初认为这可能与通过 UDP 套接字进行转发有关,但消费者可以访问转发目的地!

我很感激这里的任何帮助。我有点卡在这里

enter image description here

消费者代码

const readFromKafka =  ({host,topic,source},transformationService) => {
    const logger = createChildLogger(`kafka-consumer-${topic}`);
    const options = {
        // connect directly to kafka broker (instantiates a KafkaClient)
        kafkaHost: host,groupId: `${topic}-group`,protocol: ['roundrobin'],// and so on the  other kafka config.
    };

    logger.info(`starting kafka consumer on ${host} for ${topic}`);
    const consumer = new ConsumerGroup(options,[topic]);
    consumer.on('error',(err) => logger.error(err));
    consumer.on('message',async ({value,offset}) => {
        logger.info(`recieved ${topic}`,value);
        if (value) {
            const final = await transformationService([
                JSON.parse(Buffer.from(value,'binary').toString()),]);
            logger.info('Message recieved',{instanceID: final[0].instanceId,trace: final[1]});
         
        } else {
            logger.error(`invalid message: ${topic} ${value}`);
        }
        return;
    });
    consumer.on('rebalanced',() => {
        logger.info('cosumer is rebalancing');
    });
    return consumer;
};

Consumer Service 启动和错误处理代码

//init is the async function used to initialise the cache and other config and components.
const init = async() =>{
    //initialize cache,configs.
}

//startConsumer is the async function that connects to Kafka,//and add a callback for the onMessage listener which processes the message through the transformation service.
const startConsumer = async ({ ...config}) => {
    //calls to fetch info like topic,transformationService etc.
   //readFromKafka function defn pasted above
    readFromKafka( {topicConfig},transformationService);
};

init()
    .then(startConsumer)
    .catch((err) => {
        logger.error(err);
    });

通过 UDP 套接字转发代码。 以下代码间歇性地抛出未处理的错误,因为这似乎适用于前几千条消息,然后突然崩溃

const udpSender = (msg,destinations) => {
    return Object.values(destinations)
        .map(({id,host,port}) => {
            return new Promise((resolve) => {
                dgram.createSocket('udp4').send(msg,msg.length,port,(err) => {
                    resolve({
                        id,timestamp: Date.Now(),logs: err || 'Sent succesfully',});
                });
            });
        });
};

解决方法

根据我们的意见交流,我认为问题只是您的资源不足。

在应用的整个生命周期中,每次发送消息时都会打开一个全新的套接字。但是,发送该消息后您没有进行任何清理,因此该套接字无限期地保持打开状态。然后您打开的套接字继续堆积,消耗资源,直到您最终用完......某些东西。也许是内存,也许是端口,也许是其他东西,但最终您的应用程序崩溃了。

幸运的是,解决方案并不太复杂:只需重用现有的套接字即可。事实上,您可以根据需要为整个应用程序重用一个套接字,因为内部 socket.send 会为您处理排队,因此无需进行任何智能切换。但是,如果您想要更多的并发性,这里有一个循环队列的快速实现,我们预先创建了一个包含 10 个套接字的池,只要我们想发送消息,就可以从中获取:

const MAX_CONCURRENT_SOCKETS = 10;

var rrIndex = 0;

const rrSocketPool = (() => {
    var arr = [];
    for (let i = 0; i < MAX_CONCURRENT_SOCKETS; i++) {
        let sock = dgram.createSocket('udp4');
        arr.push(sock);
    }
    return arr;
})();

const udpSender = (msg,destinations) => {
    return Object.values(destinations)
        .map(({ id,host,port }) => {
            return new Promise((resolve) => {
                var sock = rrSocketPool[rrIndex];
                rrIndex = (rrIndex + 1) % MAX_CONCURRENT_SOCKETS;
                
                sock.send(msg,msg.length,port,(err) => {
                    resolve({
                        id,timestamp: Date.now(),logs: err || 'Sent succesfully',});
                });
            });
        });
};

请注意,由于一些原因,此实现仍然很幼稚,主要是因为套接字本身仍然没有错误处理,仅在其 .send 方法上。您应该查看文档以获取有关捕获事件(例如 error 事件)的更多信息,特别是如果这是一个应该无限期运行的生产服务器,但基本上是您在 {{1} 中放入的错误处理} 回调只会起作用...如果在调用 .send 时发生错误。如果在发送消息之间,而您的套接字空闲时,发生了一些您无法控制的系统级错误并导致您的套接字中断,则您的套接字可能会发出一个错误事件,该事件将无法处理(就像您当前的实现中发生的情况一样,以及您在致命错误之前看到的间歇性错误)。到那时,它们现在可能永久无法使用,这意味着它们应该被替换/恢复或以其他方式处理(或者,只是强制应用程序重新启动并结束它,就像我一样:-))。