Rocketmq源码分析11:consumer 消费流程

共 45927字,需浏览 92分钟

 ·

2021-04-28 00:28

注:本系列源码分析基于RocketMq 4.8.0,gitee仓库链接:https://gitee.com/funcy/rocketmq.git.

接上文,继续分析consumer消费流程。

4. 拉取消息:PullMessageService

MQClientInstance#start方法中,会启动消息拉取的服务:PullMessageServicePullMessageServiceServiceThread的子类,启动该服务时会创建一个新的线程,我们直接来看PullMessageService#run()方法,

public class PullMessageService extends ServiceThread {

    ...

    private final LinkedBlockingQueue<PullRequest> pullRequestQueue 
        = new LinkedBlockingQueue<PullRequest>();

    /**
     * 将 pullRequest 放入 pullRequestQueue 中
     */

    public void executePullRequestImmediately(final PullRequest pullRequest) {
        try {
            this.pullRequestQueue.put(pullRequest);
        } catch (InterruptedException e) {
            log.error("executePullRequestImmediately pullRequestQueue.put", e);
        }
    }

    @Override
    public void run() {
        log.info(this.getServiceName() + " service started");

        while (!this.isStopped()) {
            try {
                // 从 pullRequestQueue 获取一个 pullRequest,阻塞的方式
                PullRequest pullRequest = this.pullRequestQueue.take();
                this.pullMessage(pullRequest);
            } catch (InterruptedException ignored) {
            } catch (Exception e) {
                log.error("Pull Message Service Run Method exception", e);
            }
        }

        log.info(this.getServiceName() + " service end");
    }

    ...
}

PullMessageService#run()方法中,该方法会从pullRequestQueue中获取一个pullRequest的操作,然后调用this.pullMessage(pullRequest)进行拉取操作,注意到pullRequest的类型为LinkedBlockingQueue,并且使用的是阻塞方法take(),因此如果LinkedBlockingQueue中没有内容,那take()方法就会一直在这里阻塞。

关于pullRequestQueue中的内容是在哪里放放的,可以看到PullMessageService#executePullRequestImmediately方法中,会调用pullRequestQueue.put(pullRequest)方法放入元素。谁会调用PullMessageService#executePullRequestImmediately(...)方法呢?关于这点,我们先留个疑问,后面分析负载均衡服务时再揭晓。

我们回到PullMessageService#run()方法,该方法调用了this.pullMessage(pullRequest)方法对pullRequest做了进一步处理,我们跟进去:

private void pullMessage(final PullRequest pullRequest) {
    final MQConsumerInner consumer = this.mQClientFactory
            .selectConsumer(pullRequest.getConsumerGroup());
    if (consumer != null) {
        DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
        // 继续处理
        impl.pullMessage(pullRequest);
    } else {
        log.warn(...);
    }
}

在这个方法里,调用的是DefaultMQPushConsumerImpl#pullMessage来进一步处理pullRequest

/**
 * 拉取消息的核心流程
 * @param pullRequest
 */

public void pullMessage(final PullRequest pullRequest) {
    // 这里省略非常多的代码
    ...

    // pullCallback 是在这里生成的,这里我们并不打算讨论
    PullCallback pullCallback = new PullCallback() {
        ...
    }

    int sysFlag = PullSysFlag.buildSysFlag(
        commitOffsetEnable, // commitOffset
        true// suspend
        subExpression != null// subscription
        classFilter // class filter
    );
    try {
        // 拉取消息
        this.pullAPIWrapper.pullKernelImpl(
            pullRequest.getMessageQueue(),
            subExpression,
            subscriptionData.getExpressionType(),
            subscriptionData.getSubVersion(),
            pullRequest.getNextOffset(),
            this.defaultMQPushConsumer.getPullBatchSize(),
            sysFlag,
            commitOffsetValue,
            BROKER_SUSPEND_MAX_TIME_MILLIS,
            CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
            CommunicationMode.ASYNC,
            pullCallback
        );
    } catch (Exception e) {
        log.error("pullKernelImpl exception", e);
        this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
    }
}

可以看到,这个方法里,最核心就是拉取消息的操作了,方法为PullAPIWrapper#pullKernelImpl

public PullResult pullKernelImpl(
    final MessageQueue mq,
    final String subExpression,
    final String expressionType,
    final long subVersion,
    final long offset,
    final int maxNums,
    final int sysFlag,
    final long commitOffset,
    final long brokerSuspendMaxTimeMillis,
    final long timeoutMillis,
    final CommunicationMode communicationMode,
    final PullCallback pullCallback
)
 throws MQClientException, RemotingException, MQBrokerException, InterruptedException 
{
    // 找到broker
    FindBrokerResult findBrokerResult =
        this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
            this.recalculatePullFromWhichNode(mq), false);
    if (null == findBrokerResult) {
        // broker 为 null,更新 topic 信息后,再获取一次
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
        findBrokerResult =
            this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
                this.recalculatePullFromWhichNode(mq), false);
    }

    if (findBrokerResult != null) {
        {
            // check version
            ...
        }
        int sysFlagInner = sysFlag;

        if (findBrokerResult.isSlave()) {
            sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);
        }
        // 构建请求
        PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
        //这里省略了好多的 requestHeader.setXxx 操作
        ...

        String brokerAddr = findBrokerResult.getBrokerAddr();
        if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
            brokerAddr = computePullFromWhichFilterServer(mq.getTopic(), brokerAddr);
        }
        // 从broker拉取消息
        PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
            brokerAddr,
            requestHeader,
            timeoutMillis,
            communicationMode,
            pullCallback);

        return pullResult;
    }

    throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist"null);
}

这个方法主要是组装拉取消息的请求,组装好之后接着就调用了MQClientAPIImpl#pullMessage方法,我们再进去一探究竟:

public PullResult pullMessage(
    final String addr,
    final PullMessageRequestHeader requestHeader,
    final long timeoutMillis,
    final CommunicationMode communicationMode,
    final PullCallback pullCallback
)
 throws RemotingException, MQBrokerException, InterruptedException 
{
    // 请求code为PULL_MESSAGE
    RemotingCommand request = RemotingCommand
        .createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
    // 拉取数据的几种方式
    switch (communicationMode) {
        case ONEWAY:
            assert false;
            return null;
        case ASYNC:
            // 异步调用的是 pullCallback 处理
            this.pullMessageAsync(addr, request, timeoutMillis, pullCallback);
            return null;
        case SYNC:
            return this.pullMessageSync(addr, request, timeoutMillis);
        default:
            assert false;
            break;
    }

    return null;
}

与发送消息一样,rocketMq拉取消息的模式也有三种:

  • ONEWAY:什么也不做,直接返回null
  • ASYNC:异步方式,拉取成功或失败后,会在pullCallback对象中处理回调信息
  • SYNC:同步方式,拉取的消息同步返回

由于进入的方法是异步方式,因此这里我们主要看异步方式的实现,进入MQClientAPIImpl#pullMessageAsync方法:

private void pullMessageAsync(
    final String addr,
    final RemotingCommand request,
    final long timeoutMillis,
    final PullCallback pullCallback
)
 throws RemotingException, InterruptedException 
{
    // 异步拉取
    this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
        @Override
        public void operationComplete(ResponseFuture responseFuture) {
            // 处理拉取消息的结果
            RemotingCommand response = responseFuture.getResponseCommand();
            // 有响应
            if (response != null) {
                try {
                    PullResult pullResult = MQClientAPIImpl.this
                        .processPullResponse(response, addr);
                    assert pullResult != null;
                    // 调用 pullCallback 的 onSuccess(...) 方法
                    pullCallback.onSuccess(pullResult);
                } catch (Exception e) {
                    // 调用 pullCallback 的 onException(...) 方法
                    pullCallback.onException(e);
                }
            } else {
                ...
            }
        }
    });
}

这块的操作与producer发送异步消息的套路一模一样,调用的同样是remotingClient.invokeAsync(...)方法,结果处理同样的是在InvokeCallback对象中。在InvokeCallback#operationComplete方法中,成功时会调用调用 pullCallbackonSuccess(...) 方法,失败时则调用 pullCallbackonException(...) 方法,接下来我们来看看pullCallback的内容。

pullCallback对象是在DefaultMQPushConsumerImpl#pullMessage方法中创建并传入的,它的内容如下:

/**
 * 拉取消息的核心流程
 * @param pullRequest
 */

public void pullMessage(final PullRequest pullRequest) {
    // 省略其他代码,重点关注 pullCallback
    ...

    // 消息拉取的回调函数,在拉取到消息后会进入这个方法处理
    PullCallback pullCallback = new PullCallback() {

        @Override
        public void onSuccess(PullResult pullResult) {
            if (pullResult != null) {
                // 处理消息,将二制消息解码为java对象,也会对消息进行tag过滤
                pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(
                    pullRequest.getMessageQueue(), pullResult, subscriptionData);

                switch (pullResult.getPullStatus()) {
                    case FOUND:
                        ...

                        long firstMsgOffset = Long.MAX_VALUE;
                        if (pullResult.getMsgFoundList() == null 
                                || pullResult.getMsgFoundList().isEmpty()) {
                            DefaultMQPushConsumerImpl.this
                                .executePullRequestImmediately(pullRequest);
                        } else {
                            ...

                            // 处理消息,处理顺序与并发消息
                            DefaultMQPushConsumerImpl.this.consumeMessageService
                                .submitConsumeRequest(
                                    pullResult.getMsgFoundList(),
                                    processQueue,
                                    pullRequest.getMessageQueue(),
                                    dispatchToConsume);
                            // 准备下一次的运行
                            if (DefaultMQPushConsumerImpl.this
                                    .defaultMQPushConsumer.getPullInterval() > 0) {
                                DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
                                    DefaultMQPushConsumerImpl.this
                                        .defaultMQPushConsumer.getPullInterval());
                            } else {
                                DefaultMQPushConsumerImpl.this
                                    .executePullRequestImmediately(pullRequest);
                            }
                        }

                        ...

                        break;

                    // 省略其他状态的处理
                    ...
                }
            }
        }

        @Override
        public void onException(Throwable e) {
            if (!pullRequest.getMessageQueue().getTopic()
                    .startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                log.warn("execute the pull request exception", e);
            }
            // 这个方法会把 pullRequest 丢到 pullRequestQueue 中
            DefaultMQPushConsumerImpl.this
                .executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
        }
    };

    // 省略其他代码,重点关注 pullCallback
    ...
}

PullCallback主要有两个方法:

  1. onSuccess(...):拉取消息成功时调用,在这个方法里会解码消息,消费消息,然后准备下一次的pullQequest请求
  2. onException(...):拉取消息异常时调用,在这个方法里主要是将出现异常的pullQequest丢到pullRequestQueue,等待下一次再调用

接下来,我们这两个方法进行具体分析。

4.1 消息解码操作

处理消息解码操作的方法为PullAPIWrapper#processPullResult,还过这个方法并不只是处理解码,还处理了其他操作,代码如下:

public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
        final SubscriptionData subscriptionData)
 
{
    PullResultExt pullResultExt = (PullResultExt) pullResult;

    this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());
    if (PullStatus.FOUND == pullResult.getPullStatus()) {
        // 将二进制数据解码为对象
        ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());
        List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);
        List<MessageExt> msgListFilterAgain = msgList;

        // 按 tag 过滤
        if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {
            msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());
            for (MessageExt msg : msgList) {
                if (msg.getTags() != null) {
                    // 真正的过滤操作
                    if (subscriptionData.getTagsSet().contains(msg.getTags())) {
                        msgListFilterAgain.add(msg);
                    }
                }
            }
        }
        if (this.hasHook()) {
            FilterMessageContext filterMessageContext = new FilterMessageContext();
            filterMessageContext.setUnitMode(unitMode);
            filterMessageContext.setMsgList(msgListFilterAgain);
            this.executeHook(filterMessageContext);
        }
        // 进一步处理过后滤的消息
        for (MessageExt msg : msgListFilterAgain) {
            // 事务消息的标识
            String traFlag = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
            if (Boolean.parseBoolean(traFlag)) {
                msg.setTransactionId(msg.getProperty(
                        MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
            }
            // 偏移量
            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET,
                Long.toString(pullResult.getMinOffset()));
            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET,
                Long.toString(pullResult.getMaxOffset()));
            msg.setBrokerName(mq.getBrokerName());
        }
        pullResultExt.setMsgFoundList(msgListFilterAgain);
    }
    pullResultExt.setMessageBinary(null);
    return pullResult;
}

这个方法所做的工作有3件:

  1. 将二进制数据解码为对象,即将byte[]解码为List<MessageExt>
  2. 如果consumer指定了tag,则按tag进行过滤,其实就是调用Set#contains()判断tag是否符合条件
  3. 设置消息的属性,如TransactionIdBrokerName

4.2 消费消息

消费消息的相关代码为

DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
    pullResult.getMsgFoundList(),
    processQueue,
    pullRequest.getMessageQueue(),
    dispatchToConsume);

消费消息的模式有两种:

  • ConsumeMessageConcurrentlyService:并发消费消息
  • ConsumeMessageOrderlyService:顺序消费消息

关于这两点的差别我们之后再分析,这里我们使用的消费模式是并发消费消息,进入ConsumeMessageConcurrentlyService#submitConsumeRequest方法:

public void submitConsumeRequest(
        final List<MessageExt> msgs,
        final ProcessQueue processQueue,
        final MessageQueue messageQueue,
        final boolean dispatchToConsume)
 
{
    final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
    // 一次只拉取32条数据,不足32条直接处理
    if (msgs.size() <= consumeBatchSize) {
        ConsumeRequest consumeRequest 
            = new ConsumeRequest(msgs, processQueue, messageQueue);
        try {
            // 添加任务
            this.consumeExecutor.submit(consumeRequest);
        } catch (RejectedExecutionException e) {
            this.submitConsumeRequestLater(consumeRequest);
        }
    } else {
        // 超过32条就进行分页处理,每页都使用一个线程处理
        for (int total = 0; total < msgs.size(); ) {
            List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
            for (int i = 0; i < consumeBatchSize; i++, total++) {
                if (total < msgs.size()) {
                    msgThis.add(msgs.get(total));
                } else {
                    break;
                }
            }

            ConsumeRequest consumeRequest 
                = new ConsumeRequest(msgThis, processQueue, messageQueue);
            try {
                this.consumeExecutor.submit(consumeRequest);
            } catch (RejectedExecutionException e) {
                for (; total < msgs.size(); total++) {
                    msgThis.add(msgs.get(total));
                }

                this.submitConsumeRequestLater(consumeRequest);
            }
        }
    }
}

这个方法就是将获得到的消息封装为ConsumeRequest,然后提交到线程池中处理。在处理时,会判断消息的多少,如消息超过32条,就会对消息进行分页,每页都使用一个线程处理。

ConsumeRequest最终在线程池中执行了,根据线程的执行规律,我们直接进入它的run方法看看做了什么:

class ConsumeRequest implements Runnable {
    ...

    @Override
    public void run() {
        ...
        // 取出消息监听器
        MessageListenerConcurrently listener 
            = ConsumeMessageConcurrentlyService.this.messageListener;
        ...
        ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
        try {
            if (msgs != null && !msgs.isEmpty()) {
                for (MessageExt msg : msgs) {
                    MessageAccessor.setConsumeStartTimeStamp(
                        msg, String.valueOf(System.currentTimeMillis()));
                }
            }
            // 交由listener实际处理消息
            status = listener.consumeMessage(
                Collections.unmodifiableList(msgs), context);
        } catch (Throwable e) {
            ...
        }
        
        ...

        // 处理结果
        if (!processQueue.isDropped()) {
            ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
        } else {
            log.warn(...);
        }
    }
}

以上代码做了大量的删减,我们仅保留了重要部分,重要部分主要包含三个操作:

  1. 取出当前consumer的消息监听器
  2. 执行消息监听器的consumeMessage()方法
  3. 处理consumeMessage()方法的返回值

这个consumer的消息监听器是个啥呢?我们在org.apache.rocketmq.example.simple.PushConsumer中是这样注册listener的:

public class PushConsumer {

    public static void main(String[] args) 
            throws InterruptedException, MQClientException 
{
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
        ...
        // 注册监听器,监听消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                    ConsumeConcurrentlyContext context)
 
{
                // 这里获得了消息
                System.out.printf("%s Receive New Messages: %s %n"
                    Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 启动
        consumer.start();
    }
}

这里指定的MessageListenerConcurrently#consumeMessage(...)方法就是在ConsumeRequest#run()中调用的。

执行完MessageListenerConcurrently#consumeMessage(...)方法后,接下来会处理这个方法的返回值,方法为ConsumeMessageConcurrentlyService#processConsumeResult,我们直接看关键代码:

public void processConsumeResult(
    final ConsumeConcurrentlyStatus status,
    final ConsumeConcurrentlyContext context,
    final ConsumeRequest consumeRequest
)
 
{       

    // 省略重试的操作,后面分析重试机制时再详细展开
    ...

    long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
    if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
        // 更新偏移量
        this.defaultMQPushConsumerImpl.getOffsetStore()
            .updateOffset(consumeRequest.getMessageQueue(), offset, true);
    }
}

在这个方法会处理两个操作:

  1. 根据返回结果确认是否需要重试,关于重试机制这里就不展开讨论了,后面分析时再详细展开
  2. 更新消费位置的偏移量,更新时,会根据广播模式与集群模式从而执行不同的更新策略,这点我们一会再分析

4.3 准备下一次的pullRequest请求

让我们回到DefaultMQPushConsumerImpl#pullMessage方法,准备下一次运行的代码如下:

// 准备下一次的运行
if (DefaultMQPushConsumerImpl.this
        .defaultMQPushConsumer.getPullInterval() > 0) {
    // 延迟 xxx 秒后进行一次 pullRequest
    DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
        DefaultMQPushConsumerImpl.this
            .defaultMQPushConsumer.getPullInterval());
else {
    // 立即进行一次 pullRequest
    DefaultMQPushConsumerImpl.this
        .executePullRequestImmediately(pullRequest);
}

这两个方法非常相似,区别在于,一个是延迟 xxx 秒后进行一次 pullRequest,另一个是立即进行一次 pullRequest,我们来看看它的操作:

private void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
    // 继续调用
    this.mQClientFactory.getPullMessageService()
        .executePullRequestLater(pullRequest, timeDelay);
}

public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
    if (!isStopped()) {
        // 只执行一次,延迟执行
        this.scheduledExecutorService.schedule(new Runnable() {
            @Override
            public void run() {
                // 调用的是 executePullRequestImmediately(...)
                PullMessageService.this.executePullRequestImmediately(pullRequest);
            }
        }, timeDelay, TimeUnit.MILLISECONDS);
    } else {
        log.warn("PullMessageServiceScheduledThread has shutdown");
    }
}

延迟获取的操作,最终使用scheduledExecutorService来调用executePullRequestImmediately(...),需要注意的是,这个scheduledExecutorService只会执行一次,首次执行时间为指定的timeDelay后,也就是defaultMQPushConsumer.getPullInterval()的值。

最终,无论是延迟执行还是立即执行,都会调用PullMessageService#executePullRequestImmediately方法,内容如下:

public void executePullRequestImmediately(final PullRequest pullRequest) {
    try {
        this.pullRequestQueue.put(pullRequest);
    } catch (InterruptedException e) {
        log.error("executePullRequestImmediately pullRequestQueue.put", e);
    }
}

可以看到,这里仅是pullRequest放入pullRequestQueue中,之后PullMessageService线程就会从其中获取到这个pullRequest,从而又一次发起获取消息的请求了。

限于篇幅,本文就先到这里了,下篇继续。


限于作者个人水平,文中难免有错误之处,欢迎指正!原创不易,商业转载请联系作者获得授权,非商业转载请注明出处。

本文首发于微信公众号 「Java技术探秘」,如果您喜欢本文,欢迎关注该公众号,让我们一起在技术的世界里探秘吧!


浏览 17
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报