Rocketmq源码分析15:延迟消息

共 35925字,需浏览 72分钟

 ·

2021-05-02 15:10

注:本系列源码分析基于RocketMq 4.8.0,gitee仓库链接:https://gitee.com/funcy/rocketmq.git.

rocketmq支持延迟消息,本文我们将从源码角度分析延迟消息的实现原理。

1. demo 准备

延迟消息的demo在org.apache.rocketmq.example.delay包下,发送消息的producer如下:

public class Producer {

    public static void main(String[] args) throws MQClientException, InterruptedException {
        String nameServer = "localhost:9876";
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        producer.setNamesrvAddr(nameServer);
        producer.start();

        for (int i = 0; i < 1; i++)
            try {
                {
                    Message msg = new Message("TopicTest",
                        "TagA",
                        "OrderID188",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    // delayLevel=1  2  3   4   5  6  7  8  9  10 11 12 13 14  15  16  17 18
                    // delayTime =1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
                    // 设置延迟延迟级别
                    msg.setDelayTimeLevel(5);
                    SendResult sendResult = producer.send(msg);
                    System.out.printf("%s%n", sendResult);
                }

            } catch (Exception e) {
                e.printStackTrace();
            }

        producer.shutdown();
    }
}

rocketmq在实现延迟消息时,会准备18个延迟级别,这些级别对应的延迟时间如下:

123456789101112131415161718
1s5s10s30s1m2m3m4m5m6m7m8m9m10m20m30m1h2h

在发送延迟消息时,需要指定消息的延迟级别:

msg.setDelayTimeLevel(5);

这里指定的延迟级别为5,即延迟1分钟后发送。

2. 延迟消息的存储

延迟消息与普通消息的发送并无太多差别,不过在broker在存储延迟消息时,会做一些额外的处理,进入CommitLog#asyncPutMessage方法:

 public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
    // 消息的存储时间
    msg.setStoreTimestamp(System.currentTimeMillis());
    msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
    AppendMessageResult result = null;

    StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();

    String topic = msg.getTopic();
    int queueId = msg.getQueueId();

    final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
    if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
            || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
        // 延迟消息
        if (msg.getDelayTimeLevel() > 0) {
            if (msg.getDelayTimeLevel() > this.defaultMessageStore
                    .getScheduleMessageService().getMaxDelayLevel()) {
                msg.setDelayTimeLevel(
                    this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
            }
            // 指定延迟消息对应的topic
            topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
            // 延迟级别对应的队列,即每个延迟级别都对应一条队列
            queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

            // 原始的topic与queueId
            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, 
                String.valueOf(msg.getQueueId()));
            msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

            msg.setTopic(topic);
            msg.setQueueId(queueId);
        }
    }

    // 省略消息写入 commitLog 的操作
    ...
}

在延迟消息写入前,会做一些特别处理,其实就是将消息的topicqueueId修改为延迟消息专用的topicqueueId

获取延迟队列的方法为ScheduleMessageService#delayLevel2QueueId,代码如下:

public static int delayLevel2QueueId(final int delayLevel) {
    return delayLevel - 1;
}

这里的delayLevel,就对应前面提到的18个延迟级别,这也就是说,每个延迟级别的消息都会有一个专门队列来存储。这样存储有何好处呢?最大的好处就是避免了排序,举个简单的例子:上午10:00broker收到了一条延迟消息1,延迟级别为5;然后在10:02又收到了一条延迟消息2,延迟级别也为5,由于延迟级别相同,他们会存储在同一条队列中.

由于队列天生有序,入队时间先按送达broker的时间先后进行排序,而同一队列上延迟时间也相同,因此延迟消息1一定会在延迟消息2前进行消消费,后面如果有消息再进入该队列中,也会按照先进先出的方式进行消费。

3. 延迟消息的投递

上一节分析了延迟消息的存储,本节我们来分析延迟消息的消费。

延迟消息存储到队列后,会有一个专门的线程定期扫描这些队列,找到满足消费时间的消息,然后将其投递到真正的topicqueueId中,这样这条消息就能被consumer消息了。

处理延迟队列扫描的线程为scheduleMessageService,它在DefaultMessageStore#start方法中启动:

public void start() throws Exception {
    ...
    if (!messageStoreConfig.isEnableDLegerCommitLog()) {
        this.haService.start();
        // 这里处理延迟消息
        this.handleScheduleMessageService(messageStoreConfig.getBrokerRole());
    }
    ...
}

继续跟进,进入DefaultMessageStore#handleScheduleMessageService 方法:

@Override
public void handleScheduleMessageService(final BrokerRole brokerRole) {
    if (this.scheduleMessageService != null) {
        if (brokerRole == BrokerRole.SLAVE) {
            this.scheduleMessageService.shutdown();
        } else {
            // 启动
            this.scheduleMessageService.start();
        }
    }

}

继续跟进,进入ScheduleMessageService#start方法:

/**
 * 延迟消息服务的启动方式
 */

public void start() {
    // CAS 锁机制保证必须 shutdown 后才能再次start
    if (started.compareAndSet(falsetrue)) {
        this.timer = new Timer("ScheduleMessageTimerThread"true);
        for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
            Integer level = entry.getKey();
            Long timeDelay = entry.getValue();
            Long offset = this.offsetTable.get(level);
            if (null == offset) {
                offset = 0L;
            }

            if (timeDelay != null) {
                // 定时执行延迟消息处理任务
                this.timer.schedule(
                    new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
            }
        }
        // 每隔10s,将延迟消息的相关信息持久化到硬盘中
        this.timer.scheduleAtFixedRate(new TimerTask() {

            @Override
            public void run() {
                try {
                    if (started.get()) ScheduleMessageService.this.persist();
                } catch (Throwable e) {
                    log.error("scheduleAtFixedRate flush exception", e);
                }
            }
        }, 10000this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
    }
}

在这个线程中,主要做了两件事:

  1. 遍历所有的延迟级别,为每个延迟级别在延迟FIRST_DELAY_TIME毫秒后就处理延迟消息的投递操作
  2. 开启执久化定时任务:定时将延迟消息的相关信息持久化到硬盘中

3.1 投递操作

处理延迟消息的投递任务为DeliverDelayedMessageTimerTask#run方法,代码如下:

public void run() {
    try {
        if (isStarted()) {
            this.executeOnTimeup();
        }
    } catch (Exception e) {
        // XXX: warn and notify me
        log.error("ScheduleMessageService, executeOnTimeup exception", e);
        ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
            this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);
    }
}

在这个方法中,调用了executeOnTimeup()方法继续操作,我们再进入ScheduleMessageService.DeliverDelayedMessageTimerTask#executeOnTimeup方法:

public void executeOnTimeup() {
    // 获得一条队列
    ConsumeQueue cq =
        ScheduleMessageService.this.defaultMessageStore
            .findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
                delayLevel2QueueId(delayLevel));

    long failScheduleOffset = offset;

    if (cq != null) {
        SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
        if (bufferCQ != null) {
            try {
                long nextOffset = offset;
                int i = 0;
                ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
                for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                    long offsetPy = bufferCQ.getByteBuffer().getLong();
                    int sizePy = bufferCQ.getByteBuffer().getInt();
                    long tagsCode = bufferCQ.getByteBuffer().getLong();

                    if (cq.isExtAddr(tagsCode)) {
                        if (cq.getExt(tagsCode, cqExtUnit)) {
                            tagsCode = cqExtUnit.getTagsCode();
                        } else {
                            log.error(...);
                            // 1. 消息的写入的时间
                            long msgStoreTime = defaultMessageStore.getCommitLog()
                                .pickupStoreTimestamp(offsetPy, sizePy);
                            // 2. 计算投递时间,投递时间 = 消息写入时间 + 延迟级别对应的时间
                            tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
                        }
                    }

                    // 处理投递时间,保证投递时间必须小于(当前时间 + 延迟级别对应的时间)
                    long now = System.currentTimeMillis();
                    long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);

                    nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);

                    long countdown = deliverTimestamp - now;

                    // 小于等于0,表示消费需要投递
                    if (countdown <= 0) {
                        MessageExt msgExt =
                            ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
                                offsetPy, sizePy);

                        if (msgExt != null) {
                            try {
                                MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
                                if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC
                                        .equals(msgInner.getTopic())) {
                                    log.error(...);
                                    continue;
                                }
                                // 3. 投递操作
                                PutMessageResult putMessageResult =
                                    ScheduleMessageService.this.writeMessageStore
                                        .putMessage(msgInner);

                                if (putMessageResult != null && putMessageResult
                                        .getPutMessageStatus() == PutMessageStatus.PUT_OK) {
                                    continue;
                                } else {
                                    // XXX: warn and notify me
                                    log.error(...);
                                    ScheduleMessageService.this.timer.schedule(
                                        new DeliverDelayedMessageTimerTask(this.delayLevel,
                                            nextOffset), DELAY_FOR_A_PERIOD);
                                    ScheduleMessageService.this.updateOffset(this.delayLevel,
                                        nextOffset);
                                    return;
                                }
                            } catch (Exception e) {
                                log.error(...);
                            }
                        }
                    } else {
                        // 4. 安排下一次执行,执行时间为 countdown 毫秒后
                        ScheduleMessageService.this.timer.schedule(
                            new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
                            countdown);
                        // 5. 更新偏移量
                        ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
                        return;
                    }
                }
                // 之后再执行
                nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
                ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
                    this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
                // 更新偏移量
                ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
                return;
            } finally {

                bufferCQ.release();
            }
        }
        else {

            long cqMinOffset = cq.getMinOffsetInQueue();
            if (offset < cqMinOffset) {
                failScheduleOffset = cqMinOffset;
                log.error(...);
            }
        }
    }

    ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
        failScheduleOffset), DELAY_FOR_A_WHILE);
}

这个方法虽然有点长,但逻辑很清晰,执行过程如下:

  1. 获取消息写入时间,就是写入到commitLog的时间
  2. 计算投递时间,投递时间 = 消息写入时间 + 延迟级别对应的时间,如果当前时间大于等于投递时间,就表示消息需要进行投递操作
  3. 如果消息满足投递时间,就进行投递操作,所谓的投递操作,就是将消息写入到真正的topicqueueId的队列中
  4. 如果当前消息不满足投递时间,就表明该队列上之后的消息也不会投递时间,就计算投递时间与当前时间的差值,这个差值就是下次执行executeOnTimeup()方法的时间
  5. 更新偏移量,就是记录当前队列的消费位置

我们来看看偏移量的更新操作,进入ScheduleMessageService#updateOffset方法:

public class ScheduleMessageService extends ConfigManager {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);

    private static final long FIRST_DELAY_TIME = 1000L;
    private static final long DELAY_FOR_A_WHILE = 100L;
    private static final long DELAY_FOR_A_PERIOD = 10000L;

    /** 延迟级别对应的延迟时间,单位为毫秒 */
    private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable =
        new ConcurrentHashMap<Integer, Long>(32);

    /** 延迟级别对应的偏移量 */
    private final ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable =
        new ConcurrentHashMap<Integer, Long>(32);

    private final DefaultMessageStore defaultMessageStore;
    private final AtomicBoolean started = new AtomicBoolean(false);
    private Timer timer;
    private MessageStore writeMessageStore;
    private int maxDelayLevel;

    ...

    /**
     * 更新偏移量的操作
     */

    private void updateOffset(int delayLevel, long offset) {
        this.offsetTable.put(delayLevel, offset);
    }

    ...
}

可以看到,这里的更新偏移量,就是将当前延迟级别消费位置的偏移量添加到offsetTable中进行保存。

3.2 持久化

让我们回到``ScheduleMessageService#start`方法,这个方法中开启了一个持久化任务:

this.timer.scheduleAtFixedRate(new TimerTask() {

    @Override
    public void run() {
        try {
            if (started.get()) ScheduleMessageService.this.persist();
        } catch (Throwable e) {
            log.error("scheduleAtFixedRate flush exception", e);
        }
    }
}, 10000this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());

该任务会定期执行ConfigManager#persist方法进行持久化操作:

public synchronized void persist() {
    String jsonString = this.encode(true);
    if (jsonString != null) {
        String fileName = this.configFilePath();
        try {
            MixAll.string2File(jsonString, fileName);
        } catch (IOException e) {
            log.error("persist file " + fileName + " exception", e);
        }
    }
}

这个方法主要进行了两个操作:

  1. 调用this.encode(true)得到json字符串
  2. json字符串写入到文件中

这个json字符串是个啥呢?我们进入ScheduleMessageService#encode(boolean)方法:

public String encode(final boolean prettyFormat) {
    DelayOffsetSerializeWrapper delayOffsetSerializeWrapper 
        = new DelayOffsetSerializeWrapper();
    // 这个 offsetTable 就是用来保存消费位置偏移量的
    delayOffsetSerializeWrapper.setOffsetTable(this.offsetTable);
    return delayOffsetSerializeWrapper.toJson(prettyFormat);
}

从代码来看,这个方法就是将ScheduleMessageService#offsetTable序列化成json字符串的, 这个 offsetTable 就是用来保存消费位置偏移量的。由此不难得出这个定时任务的作用:定期将延迟队列的消费位置偏移量持久化到文件中。

4. 总结

  1. RocketMq支持了18种延迟级别,每个延迟级别对应不同的延迟时间
  2. 延迟消息对应着一个topic,每个延迟级别都对应着该topic下的一个队列
  3. broker收到延迟消息后,会将该消息放入到延迟级别对应的延迟消息中
  4. 消息投递由定时线程执行,当消息达到投递时间后,会从延迟队列中写入到真正需要投递的队列中

客观来说,开源版 RocketMq 的延迟消息比较简陋,仅支持18种延迟级别,而阿里云版可指定发送时间。


限于作者个人水平,文中难免有错误之处,欢迎指正!原创不易,商业转载请联系作者获得授权,非商业转载请注明出处。

本文首发于微信公众号 「Java技术探秘」,如果您喜欢本文,欢迎关注该公众号,让我们一起在技术的世界里探秘吧!


浏览 27
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报