org.springframework.kafka.requestreply.KafkaReplyTimeoutException:回复超时

问题描述

使用Kafka执行Spring Rest API DELETE操作,但在返回NULL时获取KafkaReplyTimeoutException。

如果我在Product POJO上返回了一些值,则该操作可以正常运行,但是如果我返回NULL则得到错误。

监听器

    @KafkaListener(id = ProductTopicConstants.DELETE_PRODUCT,topics = ProductTopicConstants.DELETE_PRODUCT,containerFactory = "getDeleteProductContainerFactory")
@SendTo
        public Product DeleteProduct(String id) {
            logger.info("Listening to delete product",id);
            Product product = productRepository.findByid(id);
            if (product == null)
                return null;
            productRepository.delete(product);
            return product;
        }

制作人

public record ProductProducer(ReplyingKafkaTemplate<String,Object,Object> _replyTemplate) implements IProductProducer {

    @Override
    public ProductViewModel Delete(String id) throws InterruptedException,ExecutionException,TimeoutException {
        RequestReplyFuture<String,Object> future =
                this._replyTemplate.sendAndReceive(new ProducerRecord<>(ProductTopicConstants.DELETE_PRODUCT,null,id));
        LOG.info(future.getSendFuture().get(10,TimeUnit.SECONDS).getRecordMetadata().toString());
        Product deletedProduct = (Product) future.get(10,TimeUnit.SECONDS).value();
        if (deletedProduct == null)
            return null;
        Product mappedProducts = mapper.convertValue(deletedProduct,new TypeReference<Product>() {
        });
        return new ProductViewModel(mappedProducts.getId(),mappedProducts.getName(),mappedProducts.getPrice(),mappedProducts.getDescription(),mappedProducts.getVersion());
    }}

工厂容器

@Bean
    public KafkaTemplate<String,Object> kafkaTemplate(ProducerFactory<String,Object> pf) {
        return new KafkaTemplate<>(pf);
    }
   @Bean
    public ConsumerFactory<String,String> consumerFactoryGetDeleteProduct() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(),new StringDeserializer(),new JsonDeserializer<>(String.class));
    }

    @Bean
    public KafkaListenerContainerFactory<?> getDeleteProductContainerFactory(ProducerFactory<String,Object> pf) {
        ConcurrentKafkaListenerContainerFactory<String,String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactoryGetDeleteProduct());
        factory.setReplyTemplate(kafkaTemplate(pf));
        return factory;
    }

错误

org.springframework.kafka.requestreply.KafkaReplyTimeoutException: Reply timed out
    at org.springframework.kafka.requestreply.ReplyingKafkaTemplate.lambda$scheduleTimeout$3(ReplyingKafkaTemplate.java:339) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) ~[spring-context-5.2.7.RELEASE.jar:5.2.7.RELEASE]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) ~[na:na]
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]

解决方法

您为什么会期望有所不同?如果您返回null,则没有要发送的答复,因此我们在客户端超时。

要返回null的值,请返回KafkaNull.INSTANCE,客户端的ConsumerRecord将包含null

@KafkaListener(id = "so63583664",topics = "topic1")
@SendTo
public Object listen(String in) {
    System.out.println(in);
    return in.equals("foo") ? in.toUpperCase() : KafkaNull.INSTANCE;
}

@Bean
public ApplicationRunner runner(ReplyingKafkaTemplate<String,String,String> replyer) {
    return args -> {
        ProducerRecord<String,String> pr = new ProducerRecord<>("topic1","foo","foo");
        RequestReplyFuture<String,String> future = replyer.sendAndReceive(pr);
        System.out.println(future.get(10,TimeUnit.SECONDS).value());
        pr = new ProducerRecord<>("topic1","bar");
        future = replyer.sendAndReceive(pr);
        System.out.println(future.get(10,TimeUnit.SECONDS).value());
    };
}
foo
FOO
bar
null

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...