我们可以承认还是不喜欢kafka worker线程

问题描述

@KafkaListener(containerFactory = "sampleListenerContainerFactory",topics = "${sample.topic.name}")
public void receive(SampleKafkaMessage sampleKafkaMessage,Acknowledgment acknowledgment) throws Exception {
        try {
            String logId = LoggerUtil.getLogId();
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        if(sampleKafkaMessage != null) {
                            sampleService.generateInvoice(sampleKafkaMessage);
                            acknowledgment.acknowledge();
                        }
                    }catch (Exception e){
                        logger.error(String.format("Error fetching invoice with Exception = %s",e.getMessage()));
                        acknowledgment.nack(10000);
                    }
                }
            });
        }
        catch (Exception e) {
            logger.error(String.format("[sampleKafkaConsumer] [receive] Exception payload = %s,e.message = %s",sampleKafkaMessage,e.getMessage()));
            acknowledgment.nack(10000);
        }

我有这个代码。当我尝试使用kafka worker线程的后台时,出现错误。我的问题是在kafka worker线程上进行确认是否正确。 请给我解释一下。我是kafka的新手。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...