当生产者停止使用Java / Spring发送消息时,如何在windowedBy + aggregate中接收最后一个窗口化的Kafka消息?

问题描述

就像我在标题中说的那样,当生产者停止发送经期信息时,我想接收最后的windowBy消息。目前,我正在手动执行此操作,但首先要进行一些说明。

我有一个Kafka生产者,它正在从文件中读取行(每行都是不同的jSon),每个读取行都以500毫秒的时间间隔发送到Kafka。我只有120行(或jSons)。

我有一个消费者,它消费了生产者发送的所有jSons。代码:

  final KStream<String,Aggregate> transactions = builder.stream(kafkaProperties.getTopic(),Consumed.with(Serdes.String(),aggregateSerde));

  // Topology
  transactions
        .groupBy(this::groupedByTimeStampAndProtocolName)
        .windowedBy( TimeWindows
                .of( Duration.ofSeconds( 10 ))
                .grace( Duration.ofMillis( 0 )))
        .aggregate(
                tool::emptyAggregate,this::processNewRecord,//new TransactionAggregator(),Materialized.<String,Aggregate,WindowStore<Bytes,byte[]>>as(TRANSACTION_AGGREGATE)
                        .withKeySerde(Serdes.String())
                        .withValueSerde(aggregateSerde)
        )
        .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
        .toStream()
        .foreach(sendAggregatesToCassandra);

我具有预期的功能,我的意思是,它可以接收所有记录,但是要接收我必须手动发送的记录中最后显示的消息。

与此有关的两个问题:

  1. 有什么方法可以自动处理最后一个窗口?当生产者发送最后一条记录(第120个jSon)时,生产者将不再发送更多记录。我应该等待时间还是什么都没关系。
  2. 我已经看到我必须发送3条记录才能处理最后一个窗口。我不清楚为什么我必须发送3条记录(如果我发送

我在JDK 11中使用Kafka Streams(带有Spring),并且正在使用docker化的Kafka:

  • confluentinc / cp-kafka:5.5.1
  • zookeeper:3.4.14
  • 卡夫卡:
            <version.kafka>2.5.0</version.kafka>

            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-streams</artifactId>
                <version>${version.kafka}</version>
            </dependency>

            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>${version.kafka}</version>
            </dependency>

Kafka中使用的属性为:

  props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,127.0.0.1:9092);
  props.put(StreamsConfig.APPLICATION_ID_CONFIG,kafkaProperties.getAppId()+Constants.APP_ID);
  props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.Bytes().getClass());
  props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,Serdes.String().getClass());
  props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,WallclockTimestampExtractor.class);

在生产方:

  properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,127.0.0.1:9092);
  properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
  properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
  properties.put(ProducerConfig.ACKS_CONFIG,"all");

请,你能帮我吗?

解决方法

当您使用 suppress()(带有 untilWindowCloses 配置)时,如果“stream-time”提前,操作符只会发出最终结果。 “流时间”作为记录时间戳的函数计算,因此,如果您没有处理任何记录,“流时间”会提前,suppress() 将永远不会发出任何内容。因此,发送更多记录是提高“流时间”的唯一途径。

注意:对于流式用例,假设数据永远不会停止,因此对于实际部署来说这不是问题——像您一样从文件中读取,不是真正的流处理用例:我假设您从文件中读取以进行测试,对于这种情况,您的输入文件应包含更多记录以相应地提前流时间。

有关详细信息,请查看此博文:https://www.confluent.io/blog/kafka-streams-take-on-watermarks-and-triggers/

我也在 Kafka 峰会上讨论过这个话题:https://www.confluent.io/resources/kafka-summit-2020/the-flux-capacitor-of-kafka-streams-and-ksqldb/

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...