问题描述
用例是从2个输入Avro主题中读取,然后生成1个输出Avro主题。该主题有3个分区,复制因子1,保留期为7天。主题在融合的kafka平台上。生产的应用程序是一个Spring Boot应用程序(在Java 1.8上运行),该应用程序使用Spring Cloud Stream Kafka Streams与汇合的kafka进行交互。
问题-停止生产应用程序时,所有数据都会丢失。起始偏移量等于结束偏移量,每个分区的数据大小变为0B。
我的application.yml文件看起来像这样-
spring.cloud.stream:
function:
deFinition: f1;f2
bindings:
f1-in-0:
destination: inputTopic1
f2-in-0:
destination: inputTopic2
f1-out-0:
destination: outputTopic1
f2-out-0:
destination: outputTopic2
spring.cloud.stream.kafka.streams:
binder:
brokers: broker
configuration:
schema.registry.url: schemaRegUrl
consumer.interceptor.classes: io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
producer.interceptor.classes: io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
default:
key.serde: org.apache.kafka.common.serialization.Serdes$SringSerde
value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
bindings:
f1-in-0:
consumer:
applicationId: f1-id
f2-in-0:
consumer:
applicationId: f2-id
我的pom.xml看起来像-
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
<version>Hoxton.SR5</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-streams-avro-serde</artifactId>
<version>5.3.0</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>5.3.0</version>
</dependency>
<dependency>
<groupId>org.modelmapper</groupId>
<artifactId>modelmapper</artifactId>
<version>2.3.8</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>monitoring-interceptors</artifactId>
<version>5.3.1</version>
</dependency>
我的应用程序的处理器类具有2个功能-
@Bean
public Function<KStream<String,InputTopicEvent>,KStream<String,OutputTopicEvent>> f1() {
return events -> event;
}
@Bean
public Function<KStream<String,OutputTopicEvent>> f2() {
return events -> event;
}
整个代码工作正常,但是一旦应用程序停止运行,输出主题上的数据就会丢失。 有人可以帮助解决这个问题吗?
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)