问题描述
当向我的 SpringBoot Kafka 应用程序发送任何消息时,我面临序列化异常,这是日志。
2021-02-24 01:28:21.280 INFO 19249 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.6.0
2021-02-24 01:28:21.281 INFO 19249 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 62abe01bee039651
2021-02-24 01:28:21.281 INFO 19249 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1614110301280
2021-02-24 01:28:21.294 INFO 19249 --- [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Cluster ID: paGjVV-5RVyOatWyXOzBrQ
2021-02-24 01:28:21.365 INFO 19249 --- [ntainer#0-0-C-1] c.h.a.m.service.KafkaConsumer : #### -> Consumed message -> "22:26 Hello World!!"
2021-02-24 01:28:21.400 ERROR 19249 --- [ntainer#1-0-C-1] essageListenerContainer$ListenerConsumer : Consumer exception
java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:145) ~[spring-kafka-2.6.6.jar:2.6.6]
at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:113) ~[spring-kafka-2.6.6.jar:2.6.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1427) ~[spring-kafka-2.6.6.jar:2.6.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1124) ~[spring-kafka-2.6.6.jar:2.6.6]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition myTopic-0 at offset 0. If needed,please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[34,50,58,54,32,72,101,108,111,87,114,100,33,74,97,105,83,104,82,109,66,98,34]] from topic [myTopic]
Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot construct instance of `com.hcl.anusheel.messagestream.request.dto.EntryObject` (although at least one Creator exists): no String-argument constructor/factory method to deserialize from String value ('22:26 Hello World!!')
at [Source: (byte[])""22:26 Hello World!!""; line: 1,column: 1]
at com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:63) ~[jackson-databind-2.11.4.jar:2.11.4]
at com.fasterxml.jackson.databind.DeserializationContext.reportInputMismatch(DeserializationContext.java:1455) ~[jackson-databind-2.11.4.jar:2.11.4]
at com.fasterxml.jackson.databind.DeserializationContext.handleMissingInstantiator(DeserializationContext.java:1081) ~[jackson-databind-2.11.4.jar:2.11.4]
at com.fasterxml.jackson.databind.deser.ValueInstantiator._createFromStringFallbacks(ValueInstantiator.java:371) ~[jackson-databind-2.11.4.jar:2.11.4]
at com.fasterxml.jackson.databind.deser.std.StdValueInstantiator.createFromString(StdValueInstantiator.java:323) ~[jackson-databind-2.11.4.jar:2.11.4]
at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializefromString(BeanDeserializerBase.java:1408) ~[jackson-databind-2.11.4.jar:2.11.4]
at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeOther(BeanDeserializer.java:176) ~[jackson-databind-2.11.4.jar:2.11.4]
at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:166) ~[jackson-databind-2.11.4.jar:2.11.4]
at com.fasterxml.jackson.databind.ObjectReader._bindAndClose(ObjectReader.java:2079) ~[jackson-databind-2.11.4.jar:2.11.4]
at com.fasterxml.jackson.databind.ObjectReader.readValue(ObjectReader.java:1555) ~[jackson-databind-2.11.4.jar:2.11.4]
at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:517) ~[spring-kafka-2.6.6.jar:2.6.6]
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1365) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:130) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1596) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1432) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:684) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:635) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1308) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) ~[kafka-clients-2.6.0.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1271) ~[spring-kafka-2.6.6.jar:2.6.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1162) ~[spring-kafka-2.6.6.jar:2.6.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1075) ~[spring-kafka-2.6.6.jar:2.6.6]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
2021-02-24 01:28:21.412 ERROR 19249 --- [ntainer#1-0-C-1] essageListenerContainer$ListenerConsumer : Consumer exception
如何解决这个问题?
这是 EntryObject.java 类,这是传入请求对象的一部分,该对象将被序列化并提供给 Kafka 主题,并且应该从 Kafka 消费者那里检索该对象以进行进一步处理。
public class EntryObject {
@NonNull
private String TradeId;
@NonNull
private int version;
@NonNull
private String counterPartyId;
@NonNull
private String bookId;
@JsonFormat(pattern = "dd/MM/yyyy")
@DateTimeFormat(pattern = "dd/MM/yyyy")
private LocalDate maturityDate;
@JsonFormat(pattern = "dd/MM/yyyy")
@DateTimeFormat(pattern = "dd/MM/yyyy")
private LocalDate createdDate;
private char expired;
public String getTradeId() {
return TradeId;
}
public void setTradeId(String TradeId) {
this.TradeId = TradeId;
}
public int getVersion() {
return version;
}
public void setVersion(int version) {
this.version = version;
}
public String getCounterPartyId() {
return counterPartyId;
}
public void setCounterPartyId(String counterPartyId) {
this.counterPartyId = counterPartyId;
}
public String getBookId() {
return bookId;
}
public void setBookId(String bookId) {
this.bookId = bookId;
}
public LocalDate getmaturityDate() {
return maturityDate;
}
public void setmaturityDate(LocalDate maturityDate) {
this.maturityDate = maturityDate;
}
public LocalDate getCreatedDate() {
return createdDate;
}
public void setCreatedDate(LocalDate createdDate) {
this.createdDate = createdDate;
}
public char getExpired() {
return expired;
}
public void setExpired(char expired) {
this.expired = expired;
}
@Override
public String toString() {
return "EntryObject [TradeId=" + TradeId + ",version=" + version + ",counterPartyId=" + counterPartyId
+ ",bookId=" + bookId + ",maturityDate=" + maturityDate + ",createdDate=" + createdDate
+ ",expired=" + expired + "]";
}
}
这是我的 KafkaConsumerConfig.java 类。
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
public ConsumerFactory<String,String> consumerFactory(String groupId) {
Map<String,Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONfig,bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONfig,groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONfig,StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONfig,StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
public ConcurrentKafkaListenerContainerFactory<String,String> kafkaListenerContainerFactory(String groupId) {
ConcurrentKafkaListenerContainerFactory<String,String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory(groupId));
return factory;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String,String> fooKafkaListenerContainerFactory() {
return kafkaListenerContainerFactory("foo");
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String,String> barKafkaListenerContainerFactory() {
return kafkaListenerContainerFactory("bar");
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String,String> headersKafkaListenerContainerFactory() {
return kafkaListenerContainerFactory("headers");
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String,String> partitionsKafkaListenerContainerFactory() {
return kafkaListenerContainerFactory("partitions");
}
public ConsumerFactory<String,EntryObject> entryObjectConsumerFactory() {
Map<String,"entryObject");
return new DefaultKafkaConsumerFactory<>(props,new StringDeserializer(),new JsonDeserializer<>(EntryObject.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String,EntryObject> entryObjectKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String,EntryObject> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(entryObjectConsumerFactory());
return factory;
}
}
这是我的 KafkaConsumer.java 类
@Service
public class KafkaConsumer {
private final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
@KafkaListener(topics = "myTopic",groupId = "foo",containerFactory = "fooKafkaListenerContainerFactory")
public void receive(String message) throws IOException {
logger.info(String.format("#### -> Consumed message -> %s",message));
}
@KafkaListener(topics = "myTopic",containerFactory = "entryObjectKafkaListenerContainerFactory")
public void receive(EntryObject entryObject) throws IOException {
logger.info("received entryObject = '{}'",entryObject.toString());
}
}
解决方法
引起:com.fasterxml.jackson.databind.exc.MismatchedInputException:无法构造com.hcl.anusheel.messagestream.request.dto.EntryObject
的实例(尽管至少存在一个Creator):没有从字符串值反序列化的字符串参数构造函数/工厂方法( '22:26 Hello World!!')
Json 是一个 JSON 编码的字符串 "..."
。
Jackson 正在尝试为入口对象寻找一个构造函数,看起来像这样......
public EntryObject(String data) { ... }
并且没有这样的 CTOR。