将 Java 对象返回给适当的 Kafka 生产者

问题描述

我想实现基于此 example 发送和接收序列化 Java 对象的 Kafka 主题

我试过了:

生产者配置:

    @Configuration
public class KafkaProducerConfig {

    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    @Bean
    public ProducerFactory<String,Object> requestFactoryProducerFactory() {
        Map<String,Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONfig,bootstrapAddress);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONfig,StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONfig,ObjectFactorySerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String,Object> requestFactoryKafkaTemplate() {
        return new KafkaTemplate<>(requestFactoryProducerFactory());
    }

    @Bean
    public ConsumerFactory<String,Object> consumerFactory() {
        Map<String,Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONfig,bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONfig,"tp-sale.reply");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONfig,StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONfig,ObjectFactoryDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String,Object> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String,Object> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    @Bean
    public ReplyingKafkaTemplate<String,Object,Object> replyKafkaTemplate(ProducerFactory<String,Object> producerFactory,ConcurrentKafkaListenerContainerFactory<String,Object> factory) {
        ConcurrentMessageListenerContainer<String,Object> kafkaMessageListenerContainer = factory.createContainer("tp-sale.reply");
        ReplyingKafkaTemplate<String,Object> requestReplyKafkaTemplate = new ReplyingKafkaTemplate<>(producerFactory,kafkaMessageListenerContainer);
        requestReplyKafkaTemplate.setDefaultTopic("tp-sale.reply");
        return requestReplyKafkaTemplate;
    }
}

制作人:

@RestController
@RequestMapping("/checkout")
public class CheckoutController {

    private static final Logger LOG = LoggerFactory.getLogger(CheckoutController.class);

    private KafkaTemplate<String,Object> requestFactoryKafkaTemplate;
    private ReplyingKafkaTemplate<String,Object> requestReplyKafkaTemplate;

    @Autowired
    public CheckoutController(KafkaTemplate<String,Object> requestFactoryKafkaTemplate,ReplyingKafkaTemplate<String,Object> requestReplyKafkaTemplate){
        this.requestFactoryKafkaTemplate = requestFactoryKafkaTemplate;
        this.requestReplyKafkaTemplate = requestReplyKafkaTemplate;
    }

    @PostMapping("sale_test")
    public void performSaletest() throws ExecutionException,InterruptedException,TimeoutException {

        SaleRequestFactory obj = new SaleRequestFactory();
        obj.setId(100);

        ProducerRecord<String,Object> record = new ProducerRecord<>("tp-sale.request",obj);
        RequestReplyFuture<String,Object> replyFuture = requestReplyKafkaTemplate.sendAndReceive(record);
        SendResult<String,Object> sendResult = replyFuture.getSendFuture().get(10,TimeUnit.SECONDS);
        ConsumerRecord<String,Object> consumerRecord = replyFuture.get(10,TimeUnit.SECONDS);


        SaleResponseFactory value = (SaleResponseFactory) consumerRecord.value();
        System.out.println("!!!!!!!!!!!! " + value.getUnique_id());


    }

    @PostMapping("authorize_test")
    public void performAuthtest() throws ExecutionException,TimeoutException {

        AuthRequestFactory obj = new AuthRequestFactory();
        obj.setId(140);

        ProducerRecord<String,TimeUnit.SECONDS);


        AuthResponseFactory value = (AuthResponseFactory) consumerRecord.value();
        System.out.println("!!!!!!!!!!!! " + value.getUnique_id());


    }
}

ObjectFactoryDe​​serializer

    public class ObjectFactoryDeserializer implements Deserializer<Object> {

    @Override
    public Object deserialize(String topic,byte[] data) {
        return null;
    }

    @Override
    public Object deserialize(String topic,Headers headers,byte[] data) {
        ByteArrayInputStream bais = new ByteArrayInputStream(data);
        try (ObjectInputStream ois = new ObjectInputStream(bais)) {
            return ois.readobject();
        }
        catch (IOException e) {
            throw new UncheckedioException(e);
        }
        catch (ClassNotFoundException e) {
            throw new IllegalStateException(e);
        }
    }

}

ObjectFactorySerializer

public class ObjectFactorySerializer implements Serializer<Object> {

    @Override
    public byte[] serialize(String topic,Object data) {
        return null;
    }

    @Override
    public byte[] serialize(String topic,Object data) {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
            oos.writeObject(data);
            return baos.toByteArray();
        }
        catch (IOException e) {
            throw new UncheckedioException(e);
        }
    }

}

消费者配置:

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    @Bean
    public ConsumerFactory<String,"tp-sale.request");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONfig,ObjectFactoryDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ProducerFactory<String,Object> saleResponseFactoryProducerFactory() {
        Map<String,ObjectFactorySerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String,Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setReplyTemplate(saleResponseFactoryKafkaTemplate());
        return factory;
    }

    @Bean
    public KafkaTemplate<String,Object> saleResponseFactoryKafkaTemplate() {
        return new KafkaTemplate<>(saleResponseFactoryProducerFactory());
    }

}

消费者

@Component
@KafkaListener(id = "tp-sale.request",topics = "tp-sale.request")
public class ConsumerListener {
    
    private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerListener.class);

    @KafkaHandler
    @SendTo("tp-sale.reply")
    public AuthResponseFactory fooListener(AuthRequestFactory authRequestFactory) {
        System.out.println("In AuthRequestFactoryListener: " + authRequestFactory);

        AuthResponseFactory resObj = new AuthResponseFactory();
        resObj.setUnique_id("123123");

        return resObj;
    }

    @KafkaHandler
    @SendTo("tp-sale.reply")
    public SaleResponseFactory barListener(SaleRequestFactory saleRequestFactory) {
        System.out.println("In SaleRequestFactoryListener: " + saleRequestFactory);

        SaleResponseFactory resObj = new SaleResponseFactory();
        resObj.setUnique_id("123123");

        return resObj;
    }
}

完全最小工作example

当我到达端点 authorize_test 时,代码运行良好。

当我到达端点 sale_test 时,我收到此异常:

生产者例外:

14:06:48.706 [consumer-0-C-1] DEBUG KafkaMessageListenerContainer$ListenerConsumer[debug:313] - Commit list: {}
14:06:48.706 [consumer-0-C-1] DEBUG RecoveringBatchErrorHandler[debug:200] - Expected a BatchListenerFailedException; re-seeking batch
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition tp-sale.reply-0 at offset 3. If needed,please seek past the record to continue consumption.
Caused by: java.lang.classCastException: class org.engine.plugin.transactions.factory.SaleResponseFactory cannot be cast to class org.engine.plugin.transactions.factory.AuthResponseFactory (org.engine.plugin.transactions.factory.SaleResponseFactory and org.engine.plugin.transactions.factory.AuthResponseFactory are in unnamed module of loader org.springframework.boot.loader.Launchedurlclassloader @6267c3bb)
        at org.engine.plugin.transactions.factory.ResponseFactoryDeserializer.deserialize(ResponseFactoryDeserializer.java:20)
        at org.engine.plugin.transactions.factory.ResponseFactoryDeserializer.deserialize(ResponseFactoryDeserializer.java:10)
        at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
        at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1324)
        at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:129)
        at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1555)
        at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1391)
        at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:683)
        at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:634)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1289)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1213)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1117)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1038)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.lang.Thread.run(Thread.java:835)
14:06:48.707 [consumer-0-C-1] ERROR KafkaMessageListenerContainer$ListenerConsumer[error:149] - Consumer exception
org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition tp-sale.reply-0 at offset 3. If needed,please seek past the record to continue consumption.
        at org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler.handle(SeekToCurrentBatchErrorHandler.java:72)
        at org.springframework.kafka.listener.RecoveringBatchErrorHandler.handle(RecoveringBatchErrorHandler.java:124)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1372)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1070)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.lang.Thread.run(Thread.java:835)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition tp-sale.reply-0 at offset 3. If needed,please seek past the record to continue consumption.
Caused by: java.lang.classCastException: class org.engine.plugin.transactions.factory.SaleResponseFactory cannot be cast to class org.engine.plugin.transactions.factory.AuthResponseFactory (org.engine.plugin.transactions.factory.SaleResponseFactory and org.engine.plugin.transactions.factory.AuthResponseFactory are in unnamed module of loader org.springframework.boot.loader.Launchedurlclassloader @6267c3bb)
        at org.engine.plugin.transactions.factory.ResponseFactoryDeserializer.deserialize(ResponseFactoryDeserializer.java:20)
        at org.engine.plugin.transactions.factory.ResponseFactoryDeserializer.deserialize(ResponseFactoryDeserializer.java:10)
        at org.apache.kafka.common.serialization.Deserializer.deserialize(Deseri^Calizer.java:60)
        at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1324)
        at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:129)
        at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1555)
        at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1391)
        at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:683)
        at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:634)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1289)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1213)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1117)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1038)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.lang.Thread.run(Thread.java:835)
14:06:48.970 [http-nio-8090-exec-3] ERROR HandlerExecutionChain[triggerAfterCompletion:192] - handlerinterceptor.afterCompletion threw exception
java.lang.NullPointerException: null
   org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
        at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:202)
        at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97)
        at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:542)
14:06:49.142 [http-nio-8090-exec-3] DEBUG dispatcherServlet[logResult:1101] - Failed to complete request: java.lang.InterruptedException
14:06:49.143 [http-nio-8090-exec-3] DEBUG HstsHeaderWriter[writeHeaders:169] - Not injecting HSTS header since it did not match the requestMatcher
14:06:49.149 [consumer-0-C-1] DEBUG RecoveringBatchErrorHandler[debug:200] - Expected a BatchListenerFailedException; re-seeking batch
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition tp-sale.reply-0 at offset 3. If needed,please seek past the record to continue consumption.
Caused by: java.lang.classCastException: class org.engine.plugin.transactions.factory.SaleResponseFactory cannot be cast to class org.engine.plugin.transactions.factory.AuthResponseFactory (org.engine.plugin.transactions.factory.SaleResponseFactory and org.engine.plugin.transactions.factory.AuthResponseFactory are in unnamed module of loader org.springframework.boot.loader.Launchedurlclassloader @6267c3bb)
        at org.engine.plugin.transactions.factory.ResponseFactoryDeserializer.deserialize(ResponseFactoryDeserializer.java:20)
        at org.engine.plugin.transactions.factory.ResponseFactoryDeserializer.deserialize(ResponseFactoryDeserializer.java:10)
        at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
        at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1324)
        at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:129)
14:06:49.149 [consumer-0-C-1] ERROR KafkaMessageListenerContainer$ListenerConsumer[error:149] - Consumer exception
org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition tp-sale.reply-0 at offset 3. If needed,please seek past the record to continue consumption.
Caused by: java.lang.classCastException: class org.engine.plugin.transactions.factory.SaleResponseFactory cannot be cast to class org.engine.plugin.transactions.factory.AuthResponseFactory (org.engine.plugin.transactions.factory.SaleResponseFactory and org.engine.plugin.transactions.factory.AuthResponseFactory are in unnamed module of loader org.springframework.boot.loader.Launchedurlclassloader @6267c3bb)
        at org.engine.plugin.transactions.factory.ResponseFactoryDeserializer.deserialize(ResponseFactoryDeserializer.java:20)
        at org.engine.plugin.transactions.factory.ResponseFactoryDeserializer.deserialize(ResponseFactoryDeserializer.java:10)
        at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
        at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1324)
        at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:129)
        at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1555)
        at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1391)
        at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:683)
        at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:634)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1289)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1213)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1117)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1038)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.lang.Thread.run(Thread.java:835)
14:06:49.150 [consumer-0-C-1] DEBUG KafkaMessageListenerContainer$ListenerConsumer[debug:313] - Commit list: {}
14:06:49.151 [consumer-0-C-1] DEBUG RecoveringBatchErrorHandler[debug:200] - Expected a BatchListenerFailedException; re-seeking batch
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition tp-sale.reply-0 at offset 3. If needed,please seek past the record to continue consumption.
Caused by: java.lang.classCastException: class org.engine.plugin.transactions.factory.SaleResponseFactory cannot be cast to class org.engine.plugin.transactions.factory.AuthResponseFactory (org.engine.plugin.transactions.factory.SaleResponseFactory and org.engine.plugin.transactions.factory.AuthResponseFactory are in unnamed module of loader org.springframework.boot.loader.Launchedurlclassloader @6267c3bb)
        at org.engine.plugin.transactions.factory.ResponseFactoryDeserializer.deserialize(ResponseFactoryDeserializer.java:20)
        at org.engine.plugin.transactions.factory.ResponseFactoryDeserializer.deserialize(ResponseFactoryDeserializer.java:10)
        at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
        at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1324)
        at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:129)
        at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1555)
        at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1391)
        at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:683)
        at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:634)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1289)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1213)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1117)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1038)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.lang.Thread.run(Thread.java:835)
14:06:49.152 [consumer-0-C-1] ERROR KafkaMessageListenerContainer$ListenerConsumer[error:149] - Consumer exception
org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition tp-sale.reply-0 at offset 3. If needed,please seek past the record to continue consumption.
Caused by: java.lang.classCastException: class org.engine.plugin.transactions.factory.SaleResponseFactory cannot be cast to class org.engine.plugin.transactions.factory.AuthResponseFactory (org.engine.plugin.transactions.factory.SaleResponseFactory and org.engine.plugin.transactions.factory.AuthResponseFactory are in unnamed module of loader org.springframework.boot.loader.Launchedurlclassloader @6267c3bb)
        at org.engine.plugin.transactions.factory.ResponseFactoryDeserializer.deserialize(ResponseFactoryDeserializer.java:20)
        at org.engine.plugin.transactions.factory.ResponseFactoryDeserializer.deserialize(ResponseFactoryDeserializer.java:10)
        at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
        at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1324)
        at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:129)
        at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1555)
        at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1391)
        at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:683)
        at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:634)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1289)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1213)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1117)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1038)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.lang.Thread.run(Thread.java:835)
14:06:49.157 [consumer-0-C-1] DEBUG KafkaMessageListenerContainer$ListenerConsumer[debug:313] - Commit list: {}

 

完整日志 https://pastebin.com/Z5XJCNhA

你知道我错在哪里吗?我找不到我的错误。看起来 requestReplyKafkaTemplate 没有正确配置。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)