问题描述
我正在尝试加入 2 个主题并使用 BiFunction 将输出生成到第 3 个主题。我在解析传入消息的类型时遇到问题。我的左侧消息已成功反序列化,但右侧却抛出“标题中没有类型信息,也没有提供默认类型”。
当我逐步执行代码时,我可以看到它在 org.springframework.kafka.support.serializer.JsonDeserializer 行中失败
Assert.state(localReader != null,"No headers available and no default type provided");
消息是由带有 Kafka 活页夹的 spring boot 生成的。它具有以下属性。
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
##
spring.kafka.producer.properties.spring.json.type.mapping=type1:com.demo.domain.type2,type1:com.demo.domain.type2
spring.kafka.producer.properties.spring.json.trusted.packages=com.demo.domain
spring.kafka.producer.properties.spring.json.add.type.headers=true
在 Kafka Stream binder 消费者端
# kafka stream setting
spring.cloud.stream.bindings.joinProcess-in-0.destination=local-stream-process-type1
spring.cloud.stream.bindings.joinProcess-in-1.destination=local-stream-process-type2
spring.cloud.stream.bindings.joinProcess-out-0.destination=local-stream-process-type3
spring.cloud.stream.kafka.streams.binder.functions.joinProcess.applicationId=local-stream-process
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.springframework.kafka.support.serializer.JsonSerde
spring.kafka.streams.properties.spring.json.trusted.packages=*
spring.kafka.properties.spring.json.type.mapping=type1:com.demo.domain.type2,type1:com.demo.domain.type2
spring.kafka.streams.properties.spring.json.use.type.headers=true
我的双功能看起来像
@Configuration
public class StreamsConfig {
@Bean
public RecordMessageConverter converter() {
return new StringJsonMessageConverter();
}
@Bean
public BiFunction<KStream<String,type1>,KStream<String,type2>,type3>> joinProcess() {
return (type1,type2) ->
type1.join(type2,joiner(),JoinWindows.of(Duration.ofDays(1)));
}
private ValueJoiner<type1,type2,type3> joiner() {
return (type1,type2) -> { new type3("test");
};
}
}
我几乎已经完成了之前的所有问题,但没有一个是 Bifunction。我还没有尝试过的一件事是设置 VALUE_TYPE_METHOD。
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)