问题描述
我创建了一个连接器,将集合的所有插入/更新事件推送到同一主题(1个分区)。用于处理插入事件消息的使用者代码将比更新事件花费一些时间。
这里的问题是消息不是根据消耗的顺序提交的。以下是Kafka用户的步骤和行为。
有什么方法可以等到消息1提交后再使用消息2。
消费者代码:
const startConsumer = async () => {
// Creating a kafka consumer group.
const kafkaConsumerGroup = new kafkaNode.ConsumerGroup(consumer_configs,topic_name);
kafkaConsumerGroup.on("connect",() => {
console.log("Consumer Group Connected Successfully");
});
// Listening for messages from kafka.
kafkaConsumerGroup.on("message",async (message) => {
//processor code
const data = JSON.parse(message.value);
response = await processData(JSON.parse(data.payload));
//Commit
kafkaConsumerGroup.commit((error,data) => {
if (error) {
console.log(error);
} else {
console.log('Commit success ');
}
});
});
kafkaConsumerGroup.on("error",(error) => {
console.log(error);
});
};
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)