使用Spring Cloud Stream Kafka Stream的生产Spring Boot应用程序停止后,Kafka Avro Topic上的数据丢失

问题描述

用例是从2个输入Avro主题中读取,然后生成1个输出Avro主题。该主题有3个分区,复制因子1,保留期为7天。主题在融合的kafka平台上。生产的应用程序是一个Spring Boot应用程序(在Java 1.8上运行),该应用程序使用Spring Cloud Stream Kafka Streams与汇合的kafka进行交互。

问题-停止生产应用程序时,所有数据都会丢失。起始偏移量等于结束偏移量,每个分区的数据大小变为0B。

我的application.yml文件看起来像这样-

spring.cloud.stream:
    function:
        deFinition: f1;f2
    bindings:
        f1-in-0:
            destination: inputTopic1
        f2-in-0:
            destination: inputTopic2
        f1-out-0:
            destination: outputTopic1
        f2-out-0:
            destination: outputTopic2

spring.cloud.stream.kafka.streams:
    binder:
        brokers: broker
        configuration:
            schema.registry.url: schemaRegUrl
            consumer.interceptor.classes: io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
            producer.interceptor.classes: io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
            default:
                key.serde: org.apache.kafka.common.serialization.Serdes$SringSerde
                value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
    bindings:
        f1-in-0:
            consumer:
                applicationId: f1-id
        f2-in-0:
            consumer:
                applicationId: f2-id

我的pom.xml看起来像-


<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
    <version>Hoxton.SR5</version>
</dependency>

<dependency>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro</artifactId>
    <version>1.9.1</version>
</dependency>

<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-streams-avro-serde</artifactId>
    <version>5.3.0</version>
</dependency>

<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-avro-serializer</artifactId>
    <version>5.3.0</version>
</dependency>

<dependency>
    <groupId>org.modelmapper</groupId>
    <artifactId>modelmapper</artifactId>
    <version>2.3.8</version>
</dependency>

<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>monitoring-interceptors</artifactId>
    <version>5.3.1</version>
</dependency>

我的应用程序的处理器类具有2个功能-

@Bean
public Function<KStream<String,InputTopicEvent>,KStream<String,OutputTopicEvent>> f1() {
    return events -> event;
}

@Bean
public Function<KStream<String,OutputTopicEvent>> f2() {
    return events -> event;
}

整个代码工作正常,但是一旦应用程序停止运行,输出主题上的数据就会丢失。 有人可以帮助解决这个问题吗?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)