五张图带你理解 RocketMQ 延时消息机制

今天来聊一聊 RocketMQ 的延时消息是怎么实现的。

延时消息是指发送到 RocketMQ 后不会马上被消费者拉取到,而是等待固定的时间,才能被消费者拉取到。

延时消息的使用场景很多,比如电商场景下关闭超时未支付的订单,某些场景下需要在固定时间后发送提示消息。

1.生产者

首先看一个生产者发送延时消息的官方示例代码

public static void main(String[] args) throws Exception { // Instantiate a producer to send scheduled messages DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup"); // Launch producer producer.start(); int totalMessagesToSend = 100; for (int i = 0; i < totalMessagesToSend; i++) { Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes()); // This message will be delivered to consumer 10 seconds later. message.setDelayTimeLevel(3); // Send the message producer.send(message); } // Shutdown producer after use. producer.shutdown(); }

从上面的代码可以看到,跟普通消息不一样的是,消息设置 setDelayTimeLevel 属性值,这里设置为 3,这里最终将 3 这个延时级别复制给了 DELAY 属性

关于延时级别,可以看下面这个定义:

//MessageStoreConfig类 private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

这里延时级别有 18 个,上面的示例代码中延迟级别是 3,消息会延迟 10s 后消费者才能拉取。

2.broker 处理

2.1 写入消息

broker 收到消息后,会将消息写入 CommitLog。在写入时,会判断消息 DELAY 属性是否大于 0。代码如下:

//CommitLog 类 if (msg.getDelayTimeLevel() > 0) { if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); } topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC; int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); // Backup real topic, queueId MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); msg.setPropertiesstring(MessageDecoder.messageProperties2String(msg.getProperties())); msg.setTopic(topic); msg.setQueueId(queueId); }

从上面的代码可以看到,CommitLog 写入时并没有直接写入,而是把 Topic 改为 SCHEDULE_TOPIC_XXXX,把 queueId 改为延时级别减 1。因为延时级别有 18 个,所以这里有 18 个队列。如下图:

2.2 调度消息

延时消息写入后,会有一个调度任务不停地拉取这些延时消息,这个逻辑在类 ScheduleMessageService。这个类的初始化代码如下:

public void start() { if (started.compareAndSet(false, true)) { this.load(); this.deliverExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageTimerThread_")); //省略部分逻辑 for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) { Integer level = entry.getKey(); Long timeDelay = entry.getValue(); Long offset = this.offsetTable.get(level); if (null == offset) { offset = 0L; } if (timeDelay != null) { //省略部分逻辑 this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(level, offset), FirsT_DELAY_TIME, TimeUnit.MILLISECONDS); } } //省略持久化的逻辑 } }

上面的 load() 方法会加载一个 delayLevelTable(ConcurrentHashMap类型),key 保存延时级别(从 1 开始),value 保存延时时间(单位是 ms)。

load() 方法结束后,创建了一个有 18 个核心线程的定时线程池,然后遍历 delayLevelTable,创建 18 个任务( DeliverDelayedMessageTimerTask)进行每个延时级别的任务调度。任务调度的代码逻辑如下:

public void executeOnTimeup() { ConsumeQueue cq = ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel2QueueId(delayLevel)); if (cq == null) { this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_WHILE); return; } SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset); if (bufferCQ == null) { //省略部分逻辑 this.scheduleNextTimerTask(resetoffset, DELAY_FOR_A_WHILE); return; } long nextOffset = this.offset; try { int i = 0; ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit(); for (; i < bufferCQ.getSize() && isstarted(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { long offsetPy = bufferCQ.getByteBuffer().getLong(); int sizePy = bufferCQ.getByteBuffer().getInt(); long tagsCode = bufferCQ.getByteBuffer().getLong(); //省略部分逻辑 long Now = System.currentTimeMillis(); long deliverTimestamp = this.correctDeliverTimestamp(Now, tagsCode); nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); long countdown = deliverTimestamp - Now; if (countdown > 0) { this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE); return; } MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy); if (msgExt == null) { continue; } MessageExtbrokerInner msginner = ScheduleMessageService.this.messageTimeup(msgExt); //事务消息判断省略 boolean deliverSuc; //只保留同步 deliverSuc = this.syncDeliver(msginner, msgExt.getMsgid(), nextOffset, offsetPy, sizePy); if (!deliverSuc) { this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE); return; } } nextOffset = this.offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); } catch (Exception e) { log.error("ScheduleMessageService, messageTimeup execute error, offset = {}", nextOffset, e); } finally { bufferCQ.release(); } this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE); }

这段代码可以参考下面的流程图来进行理解:

上面有一个修正投递时间的函数,这个函数的意义是如果已经过了投递时间,那么立即投递。代码如下:

private long correctDeliverTimestamp(final long Now, final long deliverTimestamp) { long result = deliverTimestamp; long maxTimestamp = Now + ScheduleMessageService.this.delayLevelTable.get(this.delayLevel); if (deliverTimestamp > maxTimestamp) { result = Now; } return result; }

注意:消息从 CommitLog 转发到 ConsumeQueue 时,会判断是否是延时消息(Topic = SCHEDULE_TOPIC_XXXX 并且延时级别大于 0),如果是延时消息,就会修改 tagsCode 值为消息投递的时间戳,而 tagsCode 原值是 tag 的 HashCode。代码如下:

//CommitLog类checkMessageAndReturnSize方法 if (delayLevel > 0) { tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel, storeTimestamp); }

如下图:

​而 ScheduleMessageService 调度线程将消息从 ConsumeQueue 重新投递到原始队列中时,会把 tagsCode 再次修改为 tag 的 HashCode,代码如下:

//类MessageExtbrokerInner,这个方法被 messageTimeup 方法调用。 public static long tagsstring2tagsCode(final TopicFilterType filter, final String tags) { if (null == tags || tags.length() == 0) { return 0; } return tags.hashCode(); }

如下图:

2.3 一个问题

如果有一个业务场景,要求延时消息 3 小时才能消费,而 RocketMQ 的延时消息最大延时级别只支持延时 2 小时,怎么处理?

这里提供两个思路供大家参考:

broker 上修改 messageDelayLevel 的认配置;

在客户端缓存 msgid,先设置延时级别是 18(2h),当客户端拉取到消息后首先判断有没有缓存,如果有缓存则再次发送延时消息,这次延时级别是 17(1h),如果没有缓存则进行消费。

3 总结

经过上面的讲解,延时消息的处理流程如下:

最后,延时消息的延时时间并不精确,这个时间是 broker 调度线程把消息重新投递到原始的 MessageQueue 的时间,如果发生消息积压或者 RocketMQ 客户端发生流量管控,客户端拉取到消息后进行处理的时间可能会超出预设的延时时间

相关文章

显卡天梯图2024最新版,显卡是电脑进行图形处理的重要设备,...
初始化电脑时出现问题怎么办,可以使用win系统的安装介质,连...
todesk远程开机怎么设置,两台电脑要在同一局域网内,然后需...
油猴谷歌插件怎么安装,可以通过谷歌应用商店进行安装,需要...
虚拟内存这个名词想必很多人都听说过,我们在使用电脑的时候...