Rocketmq源码分析12:consumer 负载均衡

java技术探秘

共 25136字,需浏览 51分钟

 · 2021-04-28

注:本系列源码分析基于RocketMq 4.8.0,gitee仓库链接:https://gitee.com/funcy/rocketmq.git.

接上文,继续分析consumer消费流程。

5. 如何选择消息队列:RebalanceService

让我们回到PullMessageService#run()方法:

public class PullMessageService extends ServiceThread {

    ...

    private final LinkedBlockingQueue<PullRequest> pullRequestQueue 
        = new LinkedBlockingQueue<PullRequest>();

    /**
     * 将 pullRequest 放入 pullRequestQueue 中
     */

    public void executePullRequestImmediately(final PullRequest pullRequest) {
        try {
            this.pullRequestQueue.put(pullRequest);
        } catch (InterruptedException e) {
            log.error("executePullRequestImmediately pullRequestQueue.put", e);
        }
    }

    @Override
    public void run() {
        log.info(this.getServiceName() + " service started");

        while (!this.isStopped()) {
            try {
                // 从 pullRequestQueue 获取一个 pullRequest,阻塞的方式
                PullRequest pullRequest = this.pullRequestQueue.take();
                this.pullMessage(pullRequest);
            } catch (InterruptedException ignored) {
            } catch (Exception e) {
                log.error("Pull Message Service Run Method exception", e);
            }
        }

        log.info(this.getServiceName() + " service end");
    }

    ...
}

PullMessageService线程获得了pullRequest后,然后就开始了一次又一次的拉起消息的操作,那这个pullRequest最初是在哪里添加进来的呢?这就是本节要分析的「负载均衡」功能了。

处理负载均衡的线程为RebalanceService,它是在MQClientInstance#start方法中启动的,我们直接进入其run()方法:

public class RebalanceService extends ServiceThread {

    // 省略其他
    ...

    @Override
    public void run() {
        log.info(this.getServiceName() + " service started");

        while (!this.isStopped()) {
            this.waitForRunning(waitInterval);
            this.mqClientFactory.doRebalance();
        }

        log.info(this.getServiceName() + " service end");
    }
}

在它的run()方法中,仅是调用了MQClientInstance#doRebalance方法,我们继续进入:

public void doRebalance() {
    // consumerTable 存放的就是当前 consumer
    for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
        MQConsumerInner impl = entry.getValue();
        if (impl != null) {
            try {
                impl.doRebalance();
            } catch (Throwable e) {
                log.error("doRebalance exception", e);
            }
        }
    }
}

MQClientInstance#doRebalance方法中,会遍历所有的consumer,然后调用DefaultMQPushConsumerImpl#doRebalance方法作进一步的处理,consumerTable就是用来保存DefaultMQPushConsumerImpl实例的,继续进入DefaultMQPushConsumerImpl#doRebalance方法:

@Override
public void doRebalance() {
    if (!this.pause) {
        this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
    }
}

继续跟进,来到RebalanceImpl#doRebalance方法:

public void doRebalance(final boolean isOrder) {
    Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
    if (subTable != null) {
        for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
            final String topic = entry.getKey();
            try {
                // 客户端负载均衡:根据主题来处理负载均衡
                this.rebalanceByTopic(topic, isOrder);
            } catch (Throwable e) {
                if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    log.warn("rebalanceByTopic Exception", e);
                }
            }
        }
    }

    this.truncateMessageQueueNotMyTopic();
}

/**
 * 这就是最张处理负载均衡的地方了
 */

private void rebalanceByTopic(final String topic, final boolean isOrder) {
    switch (messageModel) {
        // 广播模式:不需要处理负载均衡,每个消费者都要消费,只需要更新负载信息
        case BROADCASTING: {
            Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
            if (mqSet != null) {
                // 更新负载均衡信息,这里传入的参数是mqSet,即所有队列
                boolean changed = this
                    .updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
                if (changed) {
                    this.messageQueueChanged(topic, mqSet, mqSet);
                    log.info(...);
                }
            } else {
                log.warn(...);
            }
            break;
        }
        // 集群模式
        case CLUSTERING: {
            // 根据订阅的主题获取消息队列
            Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
            // 客户端id,根据 topic 与 consumerGroup 获取所有的 consumerId
            List<String> cidAll = this.mQClientFactory
                .findConsumerIdList(topic, consumerGroup);
            if (null == mqSet) {
                if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    log.warn(...);
                }
            }

            if (null == cidAll) {
                log.warn(...);
            }

            if (mqSet != null && cidAll != null) {
                List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
                mqAll.addAll(mqSet);
                // 排序后才能保证消费者负载策略相对稳定
                Collections.sort(mqAll);
                Collections.sort(cidAll);
                // MessageQueue 的负载策略
                AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

                List<MessageQueue> allocateResult = null;
                try {
                    // 按负载策略进行分配,返回当前消费者实际订阅的messageQueue集合
                    allocateResult = strategy.allocate(
                        this.consumerGroup,
                        this.mQClientFactory.getClientId(),
                        mqAll,
                        cidAll);
                } catch (Throwable e) {
                    log.error(...);
                    return;
                }

                Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
                if (allocateResult != null) {
                    allocateResultSet.addAll(allocateResult);
                }

                // 更新负载均衡信息,传入参数是 allocateResultSet,即当前consumer分配到的队列
                boolean changed = this
                    .updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
                if (changed) {
                    log.info(...);
                    this.messageQueueChanged(topic, mqSet, allocateResultSet);
                }
            }
            break;
        }
        default:
            break;
    }
}

RebalanceImpl#rebalanceByTopic方法就是最终处理负载均衡的方法了,在这个方法里会区分广播模式与集群模式的处理。

在广播模式下,一条消息会被同一个消费组中的所有consumer消费,而集群模式下,一条消息只会被同一个消费组下的一个consumer消费。

正是因为如此,广播模式下并没有负载均衡可言,直接把所有的队列都分配给当前consumer处理,然后更新QueueTable的负载均衡信息;而集群模式会先分配当前consumer消费的消息队列,再更新QueueTable的负载均衡信息。

这里我们来看看集群模式,看看它的操作:

  1. strategy.allocate(...):按负载均衡策略为当前consumer分配队列
  2. updateProcessQueueTableInRebalance(...):更新负载均衡信息。

rocketMq中,提供了这些负载均衡策略:

  • AllocateMessageQueueAveragely:平均负载策略,rocketMq默认使用的策略
  • AllocateMessageQueueAveragelyByCircle:环形平均分配,这个和平均分配唯一的区别就是,再分队列的时候,平均队列是将属于自己的MessageQueue全部拿走,而环形平均则是,一人拿一个,拿到的Queue不是连续的。
  • AllocateMessageQueueByConfig:用户自定义配置
  • AllocateMessageQueueByMachineRoom:同机房负载策略,这个策略就是当前Consumer只负载处在指定的机房内的MessageQueuebrokerName的命名必须要按要求的格式来设置:机房名@brokerName
  • AllocateMachineRoomNearby:就近机房负载策略,在AllocateMessageQueueByMachineRoom策略中,如果同一机房中只有MessageQueue而没有consumer,那这个MessageQueue上的消息该如何消费呢?AllocateMachineRoomNearby就是扩充了该功能的处理
  • AllocateMessageQueueConsistentHash:一致性哈希策略

这里我们重点来分析平均负载策略AllocateMessageQueueAveragely

public List<MessageQueue> allocate(String consumerGroup, String currentCID, 
        List<MessageQueue> mqAll, List<String> cidAll)
 
{
    // 返回值        
    List<MessageQueue> result = new ArrayList<MessageQueue>();

    // 省略一些判断操作
    ...


    int index = cidAll.indexOf(currentCID);
    int mod = mqAll.size() % cidAll.size();
    // 1. 消费者数量大于队列数量:averageSize = 1
    // 2. 消费者数量小于等于队列数量:averageSize = 队列数量 / 消费者数量,还要处理个+1的操作
    int averageSize = mqAll.size() <= cidAll.size() 
        ? 1 : (mod > 0 && index < mod 
            ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size());
    int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
    int range = Math.min(averageSize, mqAll.size() - startIndex);
    for (int i = 0; i < range; i++) {
        result.add(mqAll.get((startIndex + i) % mqAll.size()));
    }
    return result;
}

这个方法中,关键的分配方法就在后面几行,如果只看代码,会感觉有点晕,这里我举一个例子来简单解释下:

假设:messageQueue一共有6个,consumer有4个,当前consumerindex为1,有了这些前提后,接下来我们就来看它的分配过程了。

  1. 计算取余操作:6 % 4 = 2,这表明messageQueue不能平均分配给每个consumer,接下来就来看看这个余数2是如何处理的

  2. 计算每个consumer平均处理的messageQueue数量

    消费者索引0123
    处理数量2211
    • 这里需要注意,如果consumer数量大于messageQueue数量,那每个consumer最多只会分配到一个messageQueue,这种情况下,余数2不会进行处理,并且有的consumer处理的messageQueue数量为0,同一个messageQueue不会同时被两个及以上的consumer消费掉
    • 这里的messageQueue数量为6,consumer为4,计算得到每个consumer处理的队列数最少为1,除此之外,为了实现“平均”,有2个consumer会需要多处理1个messageQueue,按“平均”的分配原则,如果index小于mod,则会分配多1个messageQueue,这里的mod为2,结果如下:
  3. 分配完每个consumer处理的messageQueue数量后,这些messageQueue该如何分配呢?从代码来看,分配时会先分配完一个consumer,再分配下一个consumer,最终结果就是这样:

    队列Q0Q1Q2Q3Q4Q5
    消费者C1C1C2C2C4C5

从图中可以看到,在6个messageQueue、4个consumer、当前consumerindex为1的情况下,当前consumer会分到2个队列,分别为Q2/Q3.

messageQueue分配完成后,接下来就是更新负载信息了,方法为RebalanceImpl#updateProcessQueueTableInRebalance

private boolean updateProcessQueueTableInRebalance(final String topic, 
        final Set<MessageQueue> mqSet, final boolean isOrder)
 
{

    ...

    List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
    for (MessageQueue mq : mqSet) {
        if (!this.processQueueTable.containsKey(mq)) {
            if (isOrder && !this.lock(mq)) {
                log.warn(...);
                continue;
            }

            this.removeDirtyOffset(mq);
            ProcessQueue pq = new ProcessQueue();
            long nextOffset = this.computePullFromWhere(mq);
            if (nextOffset >= 0) {
                ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
                if (pre != null) {
                    log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
                } 
                // pullRequest 最初产生的地方:mq 不存在,就添加
                else {
                    log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
                    // 添加 pullRequest
                    PullRequest pullRequest = new PullRequest();
                    pullRequest.setConsumerGroup(consumerGroup);
                    pullRequest.setNextOffset(nextOffset);
                    pullRequest.setMessageQueue(mq);
                    pullRequest.setProcessQueue(pq);
                    pullRequestList.add(pullRequest);
                    changed = true;
                }
            } else {
                log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
            }
        }
    }

    // 发布
    this.dispatchPullRequest(pullRequestList);

    return changed;
}

这个方法中最最关键的就是pullRequestList的添加操作了:先遍历传入的MessageQueue,如果当前consumer没有消费过该messageQueue,则添加一个新的pullRequestpullRequestList,之后就是发布pullRequestList了。

看到这里,我们就应该能明白,最初的pullRequest就是在这里产生的,而发布pullRequestList的操作,就是将pullRequest丢给pullMessageService线程处理了:

/**
 * RebalancePushImpl#dispatchPullRequest:发布pullRequest的操作
 */

public void dispatchPullRequest(List<PullRequest> pullRequestList) {
    for (PullRequest pullRequest : pullRequestList) {
        // 在这里执行pullRequest,其实就是把 pullRequest 添加到
        // PullMessageService#pullRequestQueue 中
        this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
        log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
    }
}

限于篇幅,本文就先到这里了,下篇继续。


限于作者个人水平,文中难免有错误之处,欢迎指正!原创不易,商业转载请联系作者获得授权,非商业转载请注明出处。

本文首发于微信公众号 「Java技术探秘」,如果您喜欢本文,欢迎关注该公众号,让我们一起在技术的世界里探秘吧!


浏览 7
点赞
评论
收藏
分享

手机扫一扫分享

举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

举报