java.util在偏移量240处将分区ListnerContainer-0的键/值反序列化的可选错误

问题描述

Kafka监听器

 @KafkaListener(id = ProductTopicConstants.GET_PRODUCT,topics = ProductTopicConstants.GET_PRODUCT,containerFactory = "getDeleteProductContainerFactory")
@SendTo
public Optional<Product> GetProduct(String id) {
    return _productRepository.findByid(id);
}

容器工厂

 @Bean
    public KafkaListenerContainerFactory<?> getDeleteProductContainerFactory(ProducerFactory<String,Object> pf) {
        ConcurrentKafkaListenerContainerFactory<String,String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactoryGetDeleteProduct());
        factory.setReplyTemplate(kafkaTemplate(pf));
        return factory;
    }

卡夫卡制片人

public record ProductProducer(ReplyingKafkaTemplate<String,Object,Object> _replyTemplate,ObjectMapper mapper) implements IProductProducer {
    private static final Logger LOG = LoggerFactory.getLogger(ProductProducer.class);
    @Override
    public ProductViewModel findById(String id) throws InterruptedException,ExecutionException,TimeoutException {
        RequestReplyFuture<String,Object> future =
                this._replyTemplate.sendAndReceive(new ProducerRecord<>(ProductTopicConstants.GET_PRODUCT,null,id));
        LOG.info(future.getSendFuture().get(10,TimeUnit.SECONDS).getRecordMetadata().toString());
        var products = (Optional<Product>)future.get(10,TimeUnit.SECONDS).value();
        var mappedProducts = mapper.convertValue(products,new TypeReference<Product>() { });
        return new ProductViewModel(mappedProducts.getId(),mappedProducts.getName(),mappedProducts.getPrice(),mappedProducts.getDescription(),mappedProducts.getVersion());
    }}

错误

org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition ListnerContainer-0 at offset 240. If needed,please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[123,34,118,101,114,115,105,111,110,58,48,44,100,53,102,52,99,57,56,49,50,54,97,109,80,117,116,32,112,46,125]] from topic [ListnerContainer]
Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `java.util.Optional` (no Creators,like default constructor,exist): cannot deserialize from Object value (no delegate- or property-based Creator)
 at [Source: (byte[])"{"version":0,"id":"5f4405c9e848c80d1042cd62","name":"Product 1","price":100.0,"description":"Product 1 used in summer"}"; line: 1,column: 2]
    at com.fasterxml.jackson.databind.exc.InvalidDefinitionException.from(InvalidDefinitionException.java:67) ~[jackson-databind-2.11.0.jar:2.11.0]
    at com.fasterxml.jackson.databind.DeserializationContext.reportBadDefinition(DeserializationContext.java:1611) ~[jackson-databind-2.11.0.jar:2.11.0]
    at com.fasterxml.jackson.databind.DatabindContext.reportBadDefinition(DatabindContext.java:400) ~[jackson-databind-2.11.0.jar:2.11.0]
    at com.fasterxml.jackson.databind.DeserializationContext.handleMissingInstantiator(DeserializationContext.java:1077) ~[jackson-databind-2.11.0.jar:2.11.0]
    at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1320) ~[jackson-databind-2.11.0.jar:2.11.0]
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:331) ~[jackson-databind-2.11.0.jar:2.11.0]
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:164) ~[jackson-databind-2.11.0.jar:2.11.0]
    at com.fasterxml.jackson.databind.ObjectReader._bindAndClose(ObjectReader.java:2057) ~[jackson-databind-2.11.0.jar:2.11.0]
    at com.fasterxml.jackson.databind.ObjectReader.readValue(ObjectReader.java:1533) ~[jackson-databind-2.11.0.jar:2.11.0]
    at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:461) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1310) ~[kafka-clients-2.5.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.access$3500(Fetcher.java:128) ~[kafka-clients-2.5.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1541) ~[kafka-clients-2.5.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1377) ~[kafka-clients-2.5.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:677) ~[kafka-clients-2.5.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:632) ~[kafka-clients-2.5.0.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1290) ~[kafka-clients-2.5.0.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1248) ~[kafka-clients-2.5.0.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) ~[kafka-clients-2.5.0.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1091) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1047) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:972) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]

如果使用 java.util.Optional 类,则会发生错误。如果我删除此类,则序列化和解串器将正常工作。

我发现了Jackson和java.util.Optional类的一些问题。春季卡夫卡能否解决?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...