问题描述
就像我在标题中说的那样,当生产者停止发送经期信息时,我想接收最后的windowBy消息。目前,我正在手动执行此操作,但首先要进行一些说明。
我有一个Kafka生产者,它正在从文件中读取行(每行都是不同的jSon),每个读取行都以500毫秒的时间间隔发送到Kafka。我只有120行(或jSons)。
我有一个消费者,它消费了生产者发送的所有jSons。代码:
final KStream<String,Aggregate> transactions = builder.stream(kafkaProperties.getTopic(),Consumed.with(Serdes.String(),aggregateSerde));
// Topology
transactions
.groupBy(this::groupedByTimeStampAndProtocolName)
.windowedBy( TimeWindows
.of( Duration.ofSeconds( 10 ))
.grace( Duration.ofMillis( 0 )))
.aggregate(
tool::emptyAggregate,this::processNewRecord,//new TransactionAggregator(),Materialized.<String,Aggregate,WindowStore<Bytes,byte[]>>as(TRANSACTION_AGGREGATE)
.withKeySerde(Serdes.String())
.withValueSerde(aggregateSerde)
)
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream()
.foreach(sendAggregatesToCassandra);
我具有预期的功能,我的意思是,它可以接收所有记录,但是要接收我必须手动发送的记录中最后显示的消息。
与此有关的两个问题:
- 有什么方法可以自动处理最后一个窗口?当生产者发送最后一条记录(第120个jSon)时,生产者将不再发送更多记录。我应该等待时间还是什么都没关系。
- 我已经看到我必须发送3条记录才能处理最后一个窗口。我不清楚为什么我必须发送3条记录(如果我发送
我在JDK 11中使用Kafka Streams(带有Spring),并且正在使用docker化的Kafka:
- confluentinc / cp-kafka:5.5.1
- zookeeper:3.4.14
- 卡夫卡:
<version.kafka>2.5.0</version.kafka>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${version.kafka}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${version.kafka}</version>
</dependency>
Kafka中使用的属性为:
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,127.0.0.1:9092);
props.put(StreamsConfig.APPLICATION_ID_CONFIG,kafkaProperties.getAppId()+Constants.APP_ID);
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.Bytes().getClass());
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,WallclockTimestampExtractor.class);
在生产方:
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,127.0.0.1:9092);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
properties.put(ProducerConfig.ACKS_CONFIG,"all");
请,你能帮我吗?
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)