当生产者停止使用Java / Spring发送消息时,如何在windowedBy + aggregate中接收最后一个窗口化的Kafka消息?

问题描述

就像我在标题中说的那样,当生产者停止发送经期信息时,我想接收最后的windowBy消息。目前,我正在手动执行此操作,但首先要进行一些说明。

我有一个Kafka生产者,它正在从文件中读取行(每行都是不同的jSon),每个读取行都以500毫秒的时间间隔发送到Kafka。我只有120行(或jSons)。

我有一个消费者,它消费了生产者发送的所有jSons。代码:

  final KStream<String,Aggregate> transactions = builder.stream(kafkaProperties.getTopic(),Consumed.with(Serdes.String(),aggregateSerde));

  // Topology
  transactions
        .groupBy(this::groupedByTimeStampAndProtocolName)
        .windowedBy( TimeWindows
                .of( Duration.ofSeconds( 10 ))
                .grace( Duration.ofMillis( 0 )))
        .aggregate(
                tool::emptyAggregate,this::processNewRecord,//new TransactionAggregator(),Materialized.<String,Aggregate,WindowStore<Bytes,byte[]>>as(TRANSACTION_AGGREGATE)
                        .withKeySerde(Serdes.String())
                        .withValueSerde(aggregateSerde)
        )
        .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
        .toStream()
        .foreach(sendAggregatesToCassandra);

我具有预期的功能,我的意思是,它可以接收所有记录,但是要接收我必须手动发送的记录中最后显示的消息。

与此有关的两个问题:

  1. 有什么方法可以自动处理最后一个窗口?当生产者发送最后一条记录(第120个jSon)时,生产者将不再发送更多记录。我应该等待时间还是什么都没关系。
  2. 我已经看到我必须发送3条记录才能处理最后一个窗口。我不清楚为什么我必须发送3条记录(如果我发送

我在JDK 11中使用Kafka Streams(带有Spring),并且正在使用docker化的Kafka:

  • confluentinc / cp-kafka:5.5.1
  • zookeeper:3.4.14
  • 卡夫卡:
            <version.kafka>2.5.0</version.kafka>

            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-streams</artifactId>
                <version>${version.kafka}</version>
            </dependency>

            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>${version.kafka}</version>
            </dependency>

Kafka中使用的属性为:

  props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,127.0.0.1:9092);
  props.put(StreamsConfig.APPLICATION_ID_CONFIG,kafkaProperties.getAppId()+Constants.APP_ID);
  props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.Bytes().getClass());
  props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,Serdes.String().getClass());
  props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,WallclockTimestampExtractor.class);

在生产方:

  properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,127.0.0.1:9092);
  properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
  properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
  properties.put(ProducerConfig.ACKS_CONFIG,"all");

请,你能帮我吗?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)