基于kafka实现异步消息请求响应

我们知道单体架构中的HTTP是同步请求响应,微服务架构中的消息时异步请求,无响应。

但如果实际需求中,我们需要获得这个消息的请求结果怎么办?

理论上也是可以实现的!

首先,需要对请求的消息体进行升级增加一个msgiD,用于在接收返回消息时进行识别。

第二,如果发送和接收消息的双方未约定请求通道和响应通道,发送消息时,消息体还需要携带响应通道信息。

为了简化需求,我们假定请求通道和响应通道双方已经约定好。

因为是异步请求,所以发送完消息后,需要返回一个future对象。而且这个future还是可以set的。

我们想到了guava中的SettableFuture对象。

好,下面进入实战环节,重新定义kafka发送消息的方法。新增一个异步send,跟普通send方法不同,发送完消息之后,我们有构造了一个future对象。

 //发送异步响应消息,需要写的唯一msgiD,初步考虑使用UUID实现
    public ResponseFuture sendAsync(String msgid,String msg) {
        send(TASK_TOPIC, msg);
        SettableFuture<String> future = SettableFuture.create();
        ResponseFuture responseFuture = new ResponseFuture(future);
        ResponseFuture.put(msgid, responseFuture);
        return responseFuture;
    }

 

核心的实现在ResponseFuture中。它相当于一个缓冲池,用于存放请求和未返回的响应。

@Data
public class ResponseFuture {
    public static final Map<String, ResponseFuture> map = new ConcurrentHashMap<>(256);


    public static void remove(String msgid) {
        map.remove(msgid);
    }



    public static void put(String msgid, ResponseFuture future) {
        map.put(msgid, future);
    }



    public static void setResponse(String msgid, ResponseBody result) {
        ResponseFuture response = map.get(msgid);
        response.setCurrentTime(System.currentTimeMillis());
        response.getFuture().set(result);
        map.put(msgid, response);
    }

    private Long currentTime;
    private SettableFuture<ResponseBody> future;

    public ResponseFuture( SettableFuture<ResponseBody> future) {
        this.currentTime = System.currentTimeMillis();
        this.future = future;
    }

    //todo:周期性调度删除过期数据
    public static void clear() {
        for (Map.Entry<String, ResponseFuture> entry : map.entrySet()) {
            ResponseFuture value = entry.getValue();
            Long currentTime = value.getCurrentTime();
            if (System.currentTimeMillis() - currentTime > 5000) {
                map.remove(entry.getKey());
            }
        }
    }


}

kafka接收消息

/**
     * 监听回复消息
     * @param record
     */
    public void listenResponse(ConsumerRecord<?, ?> record) {
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        if (kafkaMessage.isPresent()) {
            Object message = kafkaMessage.get();
            log.info(message.toString());
            String msgid = message.toString().substring(1);
            ResponseBody responseBody = JSON.parSEObject(message.toString(), ResponseBody.class);
            ResponseFuture.setResponse(msgid,responseBody);
        }
    }

发送和接收的主函数

public String testAsync() throws ExecutionException, InterruptedException {
        UUID uuid = UUID.randomUUID();
        String msgid = uuid.toString();
        String msg = "";
        ResponseFuture responseFuture = kafkaSender.sendAsync(msgid, msg);
        SettableFuture<ResponseBody> future = responseFuture.getFuture();
        try {
            ResponseBody responseBody = future.get(1, TimeUnit.SECONDS);
            return responseBody.getMsg();
        } catch (TimeoutException e) {
            e.printstacktrace();
        }
        ResponseFuture.remove(msgid);
        return null;

    }

 

相关文章

# 前言 现有主流消息中间件都是生产者-消费者模型,主要角色...
错误的根源是:kafka版本过高所致,2.2+=的版本,已经不需要...
DWS层主要是存放大宽表数据,此业务中主要是针对Kafka topic...
不多BB讲原理,只教你怎么用,看了全网没有比我更详细的了,...
终于写完了,其实最开始学kafka的时候是今年2月份,那时候还...
使用GPKafka实现Kafka数据导入Greenplum数据库踩坑问题记录(...