问题描述
我对整个微服务体系结构还很陌生,所以我一直在使用Kafka来检索数据以及向微服务发送数据。我知道如何通过常规的消费者和生产者很好地发送和检索数据(尽管我不是专家),但是最近我了解了Kafka Streams,并希望使用它来简化我正在使用的数据。我能够从另一个微服务中收集数据,但是我注意到,如果我终止该过程并再次运行它,我将获得数据,并在其下再添加一个相同数据的副本。而且,如果我要终止该过程并再次运行它,那么我将在最后一个重复项的下面有另一个重复的数据!即使我停止运行提供数据的其他微服务,我也能够收集数据,因此我假设数据已保存在某处。如果我终止一次并再次运行它,它将是这样的样子。
TOPIC: requestAllUserData
[kafka-producer -> requestAllUserData]: broker update success
[
{
id: 1,first_name: 'John',last_name: 'Doe',city: 'Northridge',age: 25,gender: 'Male',profession: 'Teacher',email: '[email protected]',username: 'JohnTho213',created_at: '06-05-2019',deleted_at: '09-29-2020'
},{
id: 2,first_name: 'Mike',last_name: 'Brown',city: 'Topanga',age: 19,profession: 'Senator',email: '[email protected]',username: 'MBrownYe',created_at: '07-04-18',deleted_at: null
}
]
[
{
id: 1,deleted_at: null
}
]
如您所见,我收到了两次有效载荷,而我只想看到一次。有谁知道这种意外行为的可能原因?我在这里关注了文档-> https://nodefluent.github.io/kafka-streams/docs/
我包含的代码与下面的文档中的代码几乎没有什么
"use strict";
const { KafkaStreams } = require("kafka-streams");
const { nativeConfig: config } = require("./KSConfig.js");
const kafkaStreams = new KafkaStreams(config);
const stream = kafkaStreams.getKStream();
stream
.from("AllUserDataResponse")
.forEach(message => console.log(JSON.parse(message.value)));
function streamTest(){
stream.start().then(() => {
console.log("stream started,as kafka consumer is ready.");
},error => {
console.log("streamed failed to start: " + error);
});
}
exports.streamTest = streamTest;
我正在Server.js文件中运行此文件,尽管我认为这些信息并没有真正的帮助。另外,我一直在尝试收集数据并将其存储在列表或数组中,但是这样做还没有任何运气,因此,如果有人能帮助我做到这一点,也将不胜感激。哦,这是我的KSConfig文件,如果有帮助的话。
"use strict";
const batchOptions = {
batchSize: 5,commitEveryNBatch: 1,concurrency: 1,commitSync: false,noBatchCommits: false
};
const nativeConfig = {
noptions: {
"metadata.broker.list": "localhost:9092",//native client requires broker hosts to connect to
"group.id": "kafka-streams-test-native","client.id": "kafka-streams-test-name-native","event_cb": true,"compression.codec": "snappy","api.version.request": true,"socket.keepalive.enable": true,"socket.blocking.max.ms": 100,"enable.auto.commit": false,"auto.commit.interval.ms": 100,"heartbeat.interval.ms": 250,"retry.backoff.ms": 250,"fetch.min.bytes": 100,"fetch.message.max.bytes": 2 * 1024 * 1024,"queued.min.messages": 100,"fetch.error.backoff.ms": 100,"queued.max.messages.kbytes": 50,"fetch.wait.max.ms": 1000,"queue.buffering.max.ms": 1000,"batch.num.messages": 10000
},tconf: {
"auto.offset.reset": "earliest","request.required.acks": 1
},batchOptions
};
module.exports = {
nativeConfig
};
如果您有任何其他问题,我会再答复。任何帮助或建议,将不胜感激。谢谢!
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)