问题描述
@KafkaListener(containerFactory = "sampleListenerContainerFactory",topics = "${sample.topic.name}")
public void receive(SampleKafkaMessage sampleKafkaMessage,Acknowledgment acknowledgment) throws Exception {
try {
String logId = LoggerUtil.getLogId();
executor.execute(new Runnable() {
@Override
public void run() {
try {
if(sampleKafkaMessage != null) {
sampleService.generateInvoice(sampleKafkaMessage);
acknowledgment.acknowledge();
}
}catch (Exception e){
logger.error(String.format("Error fetching invoice with Exception = %s",e.getMessage()));
acknowledgment.nack(10000);
}
}
});
}
catch (Exception e) {
logger.error(String.format("[sampleKafkaConsumer] [receive] Exception payload = %s,e.message = %s",sampleKafkaMessage,e.getMessage()));
acknowledgment.nack(10000);
}
我有这个代码。当我尝试使用kafka worker线程的后台时,出现错误。我的问题是在kafka worker线程上进行确认是否正确。 请给我解释一下。我是kafka的新手。
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)