问题描述
就像我在标题中说的那样,当生产者停止发送经期信息时,我想接收最后的windowBy消息。目前,我正在手动执行此操作,但首先要进行一些说明。
我有一个Kafka生产者,它正在从文件中读取行(每行都是不同的jSon),每个读取行都以500毫秒的时间间隔发送到Kafka。我只有120行(或jSons)。
我有一个消费者,它消费了生产者发送的所有jSons。代码:
final KStream<String,Aggregate> transactions = builder.stream(kafkaProperties.getTopic(),Consumed.with(Serdes.String(),aggregateSerde));
// Topology
transactions
.groupBy(this::groupedByTimeStampAndProtocolName)
.windowedBy( TimeWindows
.of( Duration.ofSeconds( 10 ))
.grace( Duration.ofMillis( 0 )))
.aggregate(
tool::emptyAggregate,this::processNewRecord,//new TransactionAggregator(),Materialized.<String,Aggregate,WindowStore<Bytes,byte[]>>as(TRANSACTION_AGGREGATE)
.withKeySerde(Serdes.String())
.withValueSerde(aggregateSerde)
)
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream()
.foreach(sendAggregatesToCassandra);
我具有预期的功能,我的意思是,它可以接收所有记录,但是要接收我必须手动发送的记录中最后显示的消息。
与此有关的两个问题:
- 有什么方法可以自动处理最后一个窗口?当生产者发送最后一条记录(第120个jSon)时,生产者将不再发送更多记录。我应该等待时间还是什么都没关系。
- 我已经看到我必须发送3条记录才能处理最后一个窗口。我不清楚为什么我必须发送3条记录(如果我发送
我在JDK 11中使用Kafka Streams(带有Spring),并且正在使用docker化的Kafka:
- confluentinc / cp-kafka:5.5.1
- zookeeper:3.4.14
- 卡夫卡:
<version.kafka>2.5.0</version.kafka>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${version.kafka}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${version.kafka}</version>
</dependency>
Kafka中使用的属性为:
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,127.0.0.1:9092);
props.put(StreamsConfig.APPLICATION_ID_CONFIG,kafkaProperties.getAppId()+Constants.APP_ID);
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.Bytes().getClass());
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,WallclockTimestampExtractor.class);
在生产方:
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,127.0.0.1:9092);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
properties.put(ProducerConfig.ACKS_CONFIG,"all");
请,你能帮我吗?
解决方法
当您使用 suppress()
(带有 untilWindowCloses
配置)时,如果“stream-time”提前,操作符只会发出最终结果。 “流时间”作为记录时间戳的函数计算,因此,如果您没有处理任何记录,“流时间”会提前,suppress()
将永远不会发出任何内容。因此,发送更多记录是提高“流时间”的唯一途径。
注意:对于流式用例,假设数据永远不会停止,因此对于实际部署来说这不是问题——像您一样从文件中读取,不是真正的流处理用例:我假设您从文件中读取以进行测试,对于这种情况,您的输入文件应包含更多记录以相应地提前流时间。
有关详细信息,请查看此博文:https://www.confluent.io/blog/kafka-streams-take-on-watermarks-and-triggers/
我也在 Kafka 峰会上讨论过这个话题:https://www.confluent.io/resources/kafka-summit-2020/the-flux-capacitor-of-kafka-streams-and-ksqldb/