Kafka回调函数

Controller

    /**
     * 回调方法中监控消息是否发送成功 或 失败时做补偿处理
     */
    @GetMapping("/callback/{message}")
    public String sendMessageCallback(@PathVariable("message") String callbackMessage) {
        kafkaProducer.sendMessageCallback(callbackMessage);
        return "niu bi a";
    }

Service :kafka producer

    /**
     * callback
     */
    public void sendMessageCallback(String callbackMessage) {
        kafkaTemplate.send("topic1", callbackMessage).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onFailure(Throwable ex) {
                System.out.println("发送消息失败:" + ex.getMessage());
            }
            @Override
            public void onSuccess(SendResult<String, Object> result) {
                System.out.println("发送消息成功:" + result.getRecordMetadata().topic() + "-"
                        + result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset());
            }
        });
    }
}

消费者:

@Service
public class KafkaConsumer3 {
    @KafkaListener(topics = "topic1", groupId = KafkaConstants.KAFKA_GROUP_ID_THREE)
    public void receivetopicmessage(String message) {
        System.out.println("KafkaConsumer3 ---> receivetopicmessage:接收消息,内容为:" + message);
    }
}

结果:

image

=====

image

相关文章

# 前言 现有主流消息中间件都是生产者-消费者模型,主要角色...
错误的根源是:kafka版本过高所致,2.2+=的版本,已经不需要...
DWS层主要是存放大宽表数据,此业务中主要是针对Kafka topic...
不多BB讲原理,只教你怎么用,看了全网没有比我更详细的了,...
终于写完了,其实最开始学kafka的时候是今年2月份,那时候还...
使用GPKafka实现Kafka数据导入Greenplum数据库踩坑问题记录(...