Spring Kafka Stream - 标题中没有类型信息,使用 BiFunction 时没有提供默认类型

问题描述

我正在尝试加入 2 个主题并使用 BiFunction 将输出生成到第 3 个主题。我在解析传入消息的类型时遇到问题。我的左侧消息已成功反序列化,但右侧却抛出“标题中没有类型信息,也没有提供认类型”。

当我逐步执行代码时,我可以看到它在 org.springframework.kafka.support.serializer.JsonDeserializer 行中失败

Assert.state(localReader != null,"No headers available and no default type provided");

消息是由带有 Kafka 活页夹的 spring boot 生成的。它具有以下属性

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
##
spring.kafka.producer.properties.spring.json.type.mapping=type1:com.demo.domain.type2,type1:com.demo.domain.type2
spring.kafka.producer.properties.spring.json.trusted.packages=com.demo.domain
spring.kafka.producer.properties.spring.json.add.type.headers=true

在 Kafka Stream binder 消费者端

# kafka stream setting
spring.cloud.stream.bindings.joinProcess-in-0.destination=local-stream-process-type1
spring.cloud.stream.bindings.joinProcess-in-1.destination=local-stream-process-type2
spring.cloud.stream.bindings.joinProcess-out-0.destination=local-stream-process-type3

spring.cloud.stream.kafka.streams.binder.functions.joinProcess.applicationId=local-stream-process

spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.springframework.kafka.support.serializer.JsonSerde

spring.kafka.streams.properties.spring.json.trusted.packages=*
spring.kafka.properties.spring.json.type.mapping=type1:com.demo.domain.type2,type1:com.demo.domain.type2
spring.kafka.streams.properties.spring.json.use.type.headers=true

我的双功能看起来像


@Configuration
public class StreamsConfig {

    @Bean
    public RecordMessageConverter converter() {
        return new StringJsonMessageConverter();
    }


    @Bean
    public BiFunction<KStream<String,type1>,KStream<String,type2>,type3>> joinProcess() {
        return (type1,type2) ->
                type1.join(type2,joiner(),JoinWindows.of(Duration.ofDays(1)));
    }

    private ValueJoiner<type1,type2,type3> joiner() {
         return (type1,type2) -> { new type3("test"); 
     };    
    }
}

我几乎已经完成了之前的所有问题,但没有一个是 Bifunction。我还没有尝试过的一件事是设置 VALUE_TYPE_METHOD。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)