为什么终止进程并再次运行后,Node中的Kafka Streams向我发送重复的有效载荷?

问题描述

我对整个微服务体系结构还很陌生,所以我一直在使用Kafka来检索数据以及向微服务发送数据。我知道如何通过常规的消费者和生产者很好地发送和检索数据(尽管我不是专家),但是最近我了解了Kafka Streams,并希望使用它来简化我正在使用的数据。我能够从另一个微服务中收集数据,但是我注意到,如果我终止该过程并再次运行它,我将获得数据,并在其下再添加一个相同数据的副本。而且,如果我要终止该过程并再次运行它,那么我将在最后一个重复项的下面有另一个重复的数据!即使我停止运行提供数据的其他微服务,我也能够收集数据,因此我假设数据已保存在某处。如果我终止一次并再次运行它,它将是这样的样子。

TOPIC: requestAllUserData
[kafka-producer -> requestAllUserData]: broker update success
[
  {
    id: 1,first_name: 'John',last_name: 'Doe',city: 'Northridge',age: 25,gender: 'Male',profession: 'Teacher',email: 'johntho213@gmail.com',username: 'JohnTho213',created_at: '06-05-2019',deleted_at: '09-29-2020'
  },{
    id: 2,first_name: 'Mike',last_name: 'Brown',city: 'Topanga',age: 19,profession: 'Senator',email: 'mikebrown@gmail.com',username: 'MBrownYe',created_at: '07-04-18',deleted_at: null
  }
]
[
  {
    id: 1,deleted_at: null
  }
]

如您所见,我收到了两次有效载荷,而我只想看到一次。有谁知道这种意外行为的可能原因?我在这里关注了文档-> https://nodefluent.github.io/kafka-streams/docs/

我包含的代码与下面的文档中的代码几乎没有什么

"use strict";
const { KafkaStreams } = require("kafka-streams");
const { nativeConfig: config } = require("./KSConfig.js");
const kafkaStreams = new KafkaStreams(config);
const stream = kafkaStreams.getKStream();

stream
  .from("AllUserDataResponse")
  .forEach(message => console.log(JSON.parse(message.value)));

function streamTest(){
  stream.start().then(() => {
      console.log("stream started,as kafka consumer is ready.");
  },error => {
      console.log("streamed failed to start: " + error);
  });
}

exports.streamTest = streamTest;

我正在Server.js文件中运行此文件,尽管我认为这些信息并没有真正的帮助。另外,我一直在尝试收集数据并将其存储在列表或数组中,但是这样做还没有任何运气,因此,如果有人能帮助我做到这一点,也将不胜感激。哦,这是我的KSConfig文件,如果有帮助的话。

"use strict";


const batchOptions = {
    batchSize: 5,commitEveryNBatch: 1,concurrency: 1,commitSync: false,noBatchCommits: false
};

const nativeConfig = {
    noptions: {
        "metadata.broker.list": "localhost:9092",//native client requires broker hosts to connect to
        "group.id": "kafka-streams-test-native","client.id": "kafka-streams-test-name-native","event_cb": true,"compression.codec": "snappy","api.version.request": true,"socket.keepalive.enable": true,"socket.blocking.max.ms": 100,"enable.auto.commit": false,"auto.commit.interval.ms": 100,"heartbeat.interval.ms": 250,"retry.backoff.ms": 250,"fetch.min.bytes": 100,"fetch.message.max.bytes": 2 * 1024 * 1024,"queued.min.messages": 100,"fetch.error.backoff.ms": 100,"queued.max.messages.kbytes": 50,"fetch.wait.max.ms": 1000,"queue.buffering.max.ms": 1000,"batch.num.messages": 10000
    },tconf: {
        "auto.offset.reset": "earliest","request.required.acks": 1
    },batchOptions
};

module.exports = {
    nativeConfig
};

如果您有任何其他问题,我会再答复。任何帮助或建议,将不胜感激。谢谢!

解决方法

首先,您需要知道Kafka不能保证一次交货(我们确实有幂等的生产者和交易,但是为了简单起见,在此讨论中我会假装它们不存在 em>)。可能有很多情况会导致重复。

在您的用例中,说消费者读取了一个消息批(m1-m5)。它已处理(并作为日志的一部分打印),但是在将其提交给代理之前,该过程已终止。在这种情况下,再次启动该进程时,它将重新读取同一消息(m1-m5),因为它从未提交给代理。

话虽如此,复制品也可以来自生产商。假设kafka生产者发送了一条消息,而代理收到了该消息,但是由于某些网络故障,错过了来自代理的确认。在这种情况下,生产者可以重新发送导致重复的消息。因此,总而言之,您应该期望在kafka管道中存在重复项,并因此使其具有幂等性。

相关问答

依赖报错 idea导入项目后依赖报错,解决方案:https://blog....
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下...
错误1:gradle项目控制台输出为乱码 # 解决方案:https://bl...
错误还原:在查询的过程中,传入的workType为0时,该条件不起...
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct...