Rocketmq源码分析06:broker 消息分发流程

共 19366字,需浏览 39分钟

 ·

2021-04-22 21:34

注:本系列源码分析基于RocketMq 4.8.0,gitee仓库链接:https://gitee.com/funcy/rocketmq.git.

RocketMq消息处理整个流程如下:

  1. 消息接收:消息接收是指接收producer的消息,处理类是SendMessageProcessor,将消息写入到commigLog文件后,接收流程处理完毕;
  2. 消息分发:broker处理消息分发的类是ReputMessageService,它会启动一个线程,不断地将commitLong分到到对应的consumerQueue,这一步操作会写两个文件:consumerQueueindexFile,写入后,消息分发流程处理 完毕;
  3. 消息投递:消息投递是指将消息发往consumer的流程,consumer会发起获取消息的请求,broker收到请求后,调用PullMessageProcessor类处理,从consumerQueue文件获取消息,返回给consumer后,投递流程处理完毕。

以上就是rocketMq处理消息的流程了,接下来我们就从源码来分析消息分发的实现。

1. 分发线程的启动

消息写入到commitlog后,接着broker会对这些消息进行分发操作,这里的分发,是指broker将消息写入到consumerQueue文件中。

broker消息分发的操作是在一个单独的线程中进行的,这里我们来回忆下BrokerController的启动流程,进入BrokerController#start方法:

public void start() throws Exception {
    // 启动各组件
    if (this.messageStore != null) {
        this.messageStore.start();
    }
    ...
}

继续进入DefaultMessageStore#start方法:

public void start() throws Exception {
    ...
    // 处理 maxPhysicalPosInLogicQueue 的值
    long maxPhysicalPosInLogicQueue = commitLog.getMinOffset();
    for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
        for (ConsumeQueue logic : maps.values()) {
            if (logic.getMaxPhysicOffset() > maxPhysicalPosInLogicQueue) {
                maxPhysicalPosInLogicQueue = logic.getMaxPhysicOffset();
            }
        }
    }
    if (maxPhysicalPosInLogicQueue < 0) {
        maxPhysicalPosInLogicQueue = 0;
    }
    if (maxPhysicalPosInLogicQueue < this.commitLog.getMinOffset()) {
        maxPhysicalPosInLogicQueue = this.commitLog.getMinOffset();
    }
    this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue);
    // 消息分发操作,启动新线程来处理
    this.reputMessageService.start();
    ...
}

BrokerController启动时,会处理maxPhysicalPosInLogicQueue的值,这个值就是分发commitlog消息的偏移量,之后就启动ReputMessageService服务来处理。ReputMessageServiceDefaultMessageStore的内部类,它是ServiceThread的子类,start()方法如下:

public abstract class ServiceThread implements Runnable {
    public void start() {
        if (!started.compareAndSet(falsetrue)) {
            return;
        }
        stopped = false;
        this.thread = new Thread(this, getServiceName());
        this.thread.setDaemon(isDaemon);
        this.thread.start();
    }
    ...
}

这个方法仅仅是处理线程的启动,我们继续看ServiceThreadServiceThreadRunnable的子类,它的run()方法如下:

class ReputMessageService extends ServiceThread {
    @Override
    public void run() {
        DefaultMessageStore.log.info(...);

        while (!this.isStopped()) {
            try {
                Thread.sleep(1);
                // 调用的是 doReput() 方法
                this.doReput();
            } catch (Exception e) {
                DefaultMessageStore.log.warn(...);
            }
        }

        DefaultMessageStore.log.info(...);
    }
}

ReputMessageService#run()方法来看,该线程会休眠1ms,然后调用doReput()方法处理,看来doReput()方法就是关键了!

2. 消息分发:DefaultMessageStore.ReputMessageService#doReput

我们进入DefaultMessageStore.ReputMessageService#doReput方法:

private void doReput() {
    // 处理 reputFromOffset
    if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
        log.warn(...);
        this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
    }
    for (boolean doNext = truethis.isCommitLogAvailable() && doNext; ) {

        if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()
                && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
            break;
        }

        // 从CommitLog中获取需要进行转发的消息
        SelectMappedBufferResult result 
            = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
        if (result != null) {
            try {
                this.reputFromOffset = result.getStartOffset();

                for (int readSize = 0; readSize < result.getSize() && doNext; ) {
                    // 检验数据
                    DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog
                        .checkMessageAndReturnSize(result.getByteBuffer(), falsefalse);
                    int size = dispatchRequest.getBufferSize() == -1 
                        ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();

                    if (dispatchRequest.isSuccess()) {
                        if (size > 0) {
                            // 分发消息
                            DefaultMessageStore.this.doDispatch(dispatchRequest);
                            // 长轮询:如果有消息到了主节点,并且开启了长轮询
                            if (BrokerRole.SLAVE != DefaultMessageStore.this
                                    .getMessageStoreConfig().getBrokerRole()
                                    &&DefaultMessageStore.this.brokerConfig.isLongPollingEnable()){
                                // 调用NotifyMessageArrivingListener的arriving方法
                                DefaultMessageStore.this.messageArrivingListener.arriving(
                                    dispatchRequest.getTopic(),
                                    dispatchRequest.getQueueId(), 
                                    dispatchRequest.getConsumeQueueOffset() + 1,
                                    dispatchRequest.getTagsCode(), 
                                    dispatchRequest.getStoreTimestamp(),
                                    dispatchRequest.getBitMap(), 
                                    dispatchRequest.getPropertiesMap());
                            }

                            ...
                        } else if (size == 0) {
                            ...
                        }
                    } else if (!dispatchRequest.isSuccess()) {
                        ...
                    }
                }
            } finally {
                result.release();
            }
        } else {
            doNext = false;
        }
    }
}

该方法依旧很长,我们重点关注与分发相关的流程:

  1. commitLog.getData(...):从CommitLog中获取DispatchRequest需要分发的消息,参数reputFromOffset就是消息在文件中的偏移量
  2. this.doDispatch(...):分发操作,就是把消息的相关写入ConsumeQueueIndexFile两个文件中
  3. 如果当前节点为主节点,且启用了长轮询,则调用NotifyMessageArrivingListenerarriving方法,在这里会把消息主动投递到consumer

总的来说,当消息写入到commitLog后,ReputMessage会根据上一次分发消息的偏移量依次从commitLog文件中读取消息信息,写入到ConsumeQueueIndexFile两个文件中,当然了,这里写入的只是消息的发送时间、在commitLog中的位置信息,完整的消息只有commitLog文件才存在。

写完这两个文件后,接下来就等待consumer来拉取消息了。当然,consumer主动来拉取可能会导致消息无法实时送达,为解决这个问题,rocketMq给出的解决方案是长轮询,具体为:如果当前没有消息,就holdconsumer的请求30s,这30s内一旦有消息过来,就及时唤醒consumer的请求,实际将消息发送出去,就也是NotifyMessageArrivingListener#arriving方法所做的工作,关于这点我们在分析consumer拉取消息时再详细分析。

我们再来看看消息分发消息,进入DefaultMessageStore#doDispatch

public class DefaultMessageStore implements MessageStore {

    private final LinkedList<CommitLogDispatcher> dispatcherList;

    /**
     * DefaultMessageStore 构造方法
     */

    public DefaultMessageStore(...) throws IOException {
        ...
        // 消息分发处理
        this.dispatcherList = new LinkedList<>();
        // 写入 ConsumeQueue 文件
        this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue());
        // 写入 Index 文件
        this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex());
        ...
    }

    /**
     * 分发操作
     */

    public void doDispatch(DispatchRequest req) {
        // 进行分发操作,dispatcherList 包含两个对象:
        // 1. CommitLogDispatcherBuildConsumeQueue:写入 ConsumeQueue 文件
        // 2. CommitLogDispatcherBuildIndex:写入 Index 文件
        for (CommitLogDispatcher dispatcher : this.dispatcherList) {
            dispatcher.dispatch(req);
        }
    }
}

从整个方法的运行来看,DefaultMessageStore在创建时,会准备两个CommitLogDispatcher

  • CommitLogDispatcherBuildConsumeQueue:处理ConsumeQueue文件的写入
  • CommitLogDispatcherBuildIndex:处理IndexFile文件的写入

DefaultMessageStore#doDispatch方法中,就是对这两个文件的写入操作了:

/**
 * consumerQueue 文件分发的构建器
 */

class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {

    @Override
    public void dispatch(DispatchRequest request) {
        final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
        switch (tranType) {
            case MessageSysFlag.TRANSACTION_NOT_TYPE:
            case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                // 将消息在commitLog文件的位置、tags等信息写入ConsumerQueue文件
                DefaultMessageStore.this.putMessagePositionInfo(request);
                break;
            case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
            case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                break;
        }
    }
}

/**
 * indexFile 文件分发的构建器
 */

class CommitLogDispatcherBuildIndex implements CommitLogDispatcher {

    @Override
    public void dispatch(DispatchRequest request) {
        if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {
            DefaultMessageStore.this.indexService.buildIndex(request);
        }
    }
}

需要注意的是,在这两个文件中,写入的仅是消息的位置信息,完整的消息内容仅在commitLog中保存。

3. 总结

本文主要分析了broker消息分发分发,这里说的分发流程,是指broker将消息写入到consumerQueue文件的流程。

broker启动时,会启动一个专门的线程:ReputMessageService,该线程会不停地从comsumer获取消息,然后将其写入到consumerQueue文件与IndexFile文件中。

当消息分发到consumerQueue文件后,接着consumer就可以很方便地从各队列中获取消息了,下一篇我们来分析broker是如何响应consumer获取消息请求的。


限于作者个人水平,文中难免有错误之处,欢迎指正!原创不易,商业转载请联系作者获得授权,非商业转载请注明出处。

本文首发于微信公众号 Java技术探秘,如果您喜欢本文,欢迎关注该公众号,让我们一起在技术的世界里探秘吧!

- END -


浏览 48
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报