Kakfa使用者甚至在提交相同主题的第一条消息之前正在处理下一条消息

问题描述

我创建了一个连接器,将集合的所有插入/更新事件推送到同一主题(1个分区)。用于处理插入事件消息的使用者代码将比更新事件花费一些时间。

这里的问题是消息不是根据消耗的顺序提交的。以下是Kafka用户的步骤和行为。

  1. 在集合上插入记录,并在一秒钟内更新字段。
  2. 主题现在有2条记录。一个用于插入事件(消息1),另一个用于更新事件(消息2)。
  3. 消息1被消耗
  4. 消息2被消耗
  5. 消息2已提交
  6. 消息1已提交

有什么方法可以等到消息1提交后再使用消息2。

消费者代码

    const startConsumer = async () => {
    
        // Creating a kafka consumer group.
        const kafkaConsumerGroup = new kafkaNode.ConsumerGroup(consumer_configs,topic_name);
    
        kafkaConsumerGroup.on("connect",() => {
            console.log("Consumer Group Connected Successfully");
        });
    
        // Listening for messages from kafka.
        kafkaConsumerGroup.on("message",async (message) => {
    
                //processor code
                const data = JSON.parse(message.value);
                response = await processData(JSON.parse(data.payload));
    
                //Commit
                kafkaConsumerGroup.commit((error,data) => {
                    if (error) {
                        console.log(error);
                    } else {
                        console.log('Commit success ');
                    }
                });
        });
    
        kafkaConsumerGroup.on("error",(error) => {
            console.log(error);
        });
    };

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)