Spring-kafka消费者异常中的序列化异常

问题描述

当向我的 SpringBoot Kafka 应用程序发送任何消息时,我面临序列化异常,这是日志。

2021-02-24 01:28:21.280  INFO 19249 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.6.0
2021-02-24 01:28:21.281  INFO 19249 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 62abe01bee039651
2021-02-24 01:28:21.281  INFO 19249 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1614110301280
2021-02-24 01:28:21.294  INFO 19249 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Cluster ID: paGjVV-5RVyOatWyXOzBrQ
2021-02-24 01:28:21.365  INFO 19249 --- [ntainer#0-0-C-1] c.h.a.m.service.KafkaConsumer            : #### -> Consumed message -> "22:26 Hello World!!"
2021-02-24 01:28:21.400 ERROR 19249 --- [ntainer#1-0-C-1] essageListenerContainer$ListenerConsumer : Consumer exception

java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
    at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:145) ~[spring-kafka-2.6.6.jar:2.6.6]
    at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:113) ~[spring-kafka-2.6.6.jar:2.6.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1427) ~[spring-kafka-2.6.6.jar:2.6.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1124) ~[spring-kafka-2.6.6.jar:2.6.6]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition myTopic-0 at offset 0. If needed,please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[34,50,58,54,32,72,101,108,111,87,114,100,33,74,97,105,83,104,82,109,66,98,34]] from topic [myTopic]
Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot construct instance of `com.hcl.anusheel.messagestream.request.dto.EntryObject` (although at least one Creator exists): no String-argument constructor/factory method to deserialize from String value ('22:26 Hello World!!')
 at [Source: (byte[])""22:26 Hello World!!""; line: 1,column: 1]
    at com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:63) ~[jackson-databind-2.11.4.jar:2.11.4]
    at com.fasterxml.jackson.databind.DeserializationContext.reportInputMismatch(DeserializationContext.java:1455) ~[jackson-databind-2.11.4.jar:2.11.4]
    at com.fasterxml.jackson.databind.DeserializationContext.handleMissingInstantiator(DeserializationContext.java:1081) ~[jackson-databind-2.11.4.jar:2.11.4]
    at com.fasterxml.jackson.databind.deser.ValueInstantiator._createFromStringFallbacks(ValueInstantiator.java:371) ~[jackson-databind-2.11.4.jar:2.11.4]
    at com.fasterxml.jackson.databind.deser.std.StdValueInstantiator.createFromString(StdValueInstantiator.java:323) ~[jackson-databind-2.11.4.jar:2.11.4]
    at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializefromString(BeanDeserializerBase.java:1408) ~[jackson-databind-2.11.4.jar:2.11.4]
    at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeOther(BeanDeserializer.java:176) ~[jackson-databind-2.11.4.jar:2.11.4]
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:166) ~[jackson-databind-2.11.4.jar:2.11.4]
    at com.fasterxml.jackson.databind.ObjectReader._bindAndClose(ObjectReader.java:2079) ~[jackson-databind-2.11.4.jar:2.11.4]
    at com.fasterxml.jackson.databind.ObjectReader.readValue(ObjectReader.java:1555) ~[jackson-databind-2.11.4.jar:2.11.4]
    at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:517) ~[spring-kafka-2.6.6.jar:2.6.6]
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1365) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:130) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1596) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1432) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:684) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:635) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1308) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) ~[kafka-clients-2.6.0.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1271) ~[spring-kafka-2.6.6.jar:2.6.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1162) ~[spring-kafka-2.6.6.jar:2.6.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1075) ~[spring-kafka-2.6.6.jar:2.6.6]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]

2021-02-24 01:28:21.412 ERROR 19249 --- [ntainer#1-0-C-1] essageListenerContainer$ListenerConsumer : Consumer exception

如何解决这个问题?

这是 EntryObject.java 类,这是传入请求对象的一部分,该对象将被序列化并提供给 Kafka 主题,并且应该从 Kafka 消费者那里检索该对象以进行进一步处理。

public class EntryObject {
    @NonNull
    private String TradeId;
    @NonNull
    private int version;
    @NonNull
    private String counterPartyId;
    @NonNull
    private String bookId;
    @JsonFormat(pattern = "dd/MM/yyyy") 
    @DateTimeFormat(pattern = "dd/MM/yyyy")
    private LocalDate maturityDate;
    @JsonFormat(pattern = "dd/MM/yyyy") 
    @DateTimeFormat(pattern = "dd/MM/yyyy")
    private LocalDate createdDate;
    private char expired;
    
    public String getTradeId() {
        return TradeId;
    }
    public void setTradeId(String TradeId) {
        this.TradeId = TradeId;
    }
    public int getVersion() {
        return version;
    }
    public void setVersion(int version) {
        this.version = version;
    }
    public String getCounterPartyId() {
        return counterPartyId;
    }
    public void setCounterPartyId(String counterPartyId) {
        this.counterPartyId = counterPartyId;
    }
    public String getBookId() {
        return bookId;
    }
    public void setBookId(String bookId) {
        this.bookId = bookId;
    }
    public LocalDate getmaturityDate() {
        return maturityDate;
    }
    public void setmaturityDate(LocalDate maturityDate) {
        this.maturityDate = maturityDate;
    }
    public LocalDate getCreatedDate() {
        return createdDate;
    }
    public void setCreatedDate(LocalDate createdDate) {
        this.createdDate = createdDate;
    }
    public char getExpired() {
        return expired;
    }
    public void setExpired(char expired) {
        this.expired = expired;
    }
    
    @Override
    public String toString() {
        return "EntryObject [TradeId=" + TradeId + ",version=" + version + ",counterPartyId=" + counterPartyId
                + ",bookId=" + bookId + ",maturityDate=" + maturityDate + ",createdDate=" + createdDate
                + ",expired=" + expired + "]";
    }
}

这是我的 KafkaConsumerConfig.java 类。

@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    public ConsumerFactory<String,String> consumerFactory(String groupId) {
        Map<String,Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONfig,bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONfig,groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONfig,StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONfig,StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    public ConcurrentKafkaListenerContainerFactory<String,String> kafkaListenerContainerFactory(String groupId) {
        ConcurrentKafkaListenerContainerFactory<String,String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory(groupId));
        return factory;
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String,String> fooKafkaListenerContainerFactory() {
        return kafkaListenerContainerFactory("foo");
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String,String> barKafkaListenerContainerFactory() {
        return kafkaListenerContainerFactory("bar");
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String,String> headersKafkaListenerContainerFactory() {
        return kafkaListenerContainerFactory("headers");
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String,String> partitionsKafkaListenerContainerFactory() {
        return kafkaListenerContainerFactory("partitions");
    }

    
    public ConsumerFactory<String,EntryObject> entryObjectConsumerFactory() {
        Map<String,"entryObject");
        return new DefaultKafkaConsumerFactory<>(props,new StringDeserializer(),new JsonDeserializer<>(EntryObject.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String,EntryObject> entryObjectKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String,EntryObject> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(entryObjectConsumerFactory());
        return factory;
    }
}

这是我的 KafkaConsumer.java 类

@Service
public class KafkaConsumer {
    
    private final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
    
    @KafkaListener(topics = "myTopic",groupId = "foo",containerFactory = "fooKafkaListenerContainerFactory")
    public void receive(String message) throws IOException {
        logger.info(String.format("#### -> Consumed message -> %s",message));
    }

    @KafkaListener(topics = "myTopic",containerFactory = "entryObjectKafkaListenerContainerFactory")
    public void receive(EntryObject entryObject) throws IOException {
        logger.info("received entryObject = '{}'",entryObject.toString());
    }
}

如何解决这个异常?并且有一个流畅运行的应用程序。

解决方法

引起:com.fasterxml.jackson.databind.exc.MismatchedInputException:无法构造com.hcl.anusheel.messagestream.request.dto.EntryObject的实例(尽管至少存在一个Creator):没有从字符串值反序列化的字符串参数构造函数/工厂方法( '22:26 Hello World!!')

Json 是一个 JSON 编码的字符串 "..."

Jackson 正在尝试为入口对象寻找一个构造函数,看起来像这样......

public EntryObject(String data) { ... }

并且没有这样的 CTOR。