Rocketmq源码分析10:consumer 启动流程

共 21796字,需浏览 44分钟

 ·

2021-04-28 00:28

注:本系列源码分析基于RocketMq 4.8.0,gitee仓库链接:https://gitee.com/funcy/rocketmq.git.

前面分析了producer发送消息的流程,本文我们来分析consumer消费消息的流程。

consumer消费消息的demoorg.apache.rocketmq.example.simple.PushConsumer,代码如下:

public class PushConsumer {

    public static void main(String[] args) 
            throws InterruptedException, MQClientException 
{
        String nameServer = "localhost:9876";
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
        consumer.setNamesrvAddr(nameServer);
        consumer.subscribe("TopicTest""*");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //wrong time format 2017_0422_221800
        consumer.setConsumeTimestamp("20181109221800");
        // 注册监听器,监听消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                    ConsumeConcurrentlyContext context)
 
{
                // 这里获得了消息
                System.out.printf("%s Receive New Messages: %s %n"
                    Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 启动
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

consumer使用起来还是挺简单的,先是创建了一个DefaultMQPushConsumer对象,然后配置了一些属性,比较关键的就是注册消息监听器(在这个监听器里会获取消息),之后就调用start()方法启动consumer.

接下来我们就来分析这块的消费过程。

1. 构造方法:DefaultMQPushConsumer

consumer的处理类为DefaultMQPushConsumer,我们先来看看DefaultMQPushConsumer的构造方法:

public DefaultMQPushConsumer(final String consumerGroup) {
    // 这里指定了队列分配策略
    this(null, consumerGroup, nullnew AllocateMessageQueueAveragely());
}

public DefaultMQPushConsumer(final String namespace, final String consumerGroup, 
        RPCHook rpcHook, AllocateMessageQueueStrategy allocateMessageQueueStrategy)
 
{
    this.consumerGroup = consumerGroup;
    this.namespace = namespace;
    this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
    defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
}

在构造方法中,就只是做了一些成员变量的赋值操作,比较关键的是分配消息队列的策略:allocateMessageQueueStrategy,如果指定,默认就使用AllocateMessageQueueAveragely,即从各队列平均获取消息。

2. 启动consumerDefaultMQPushConsumer#start

consumer的启动方法为DefaultMQPushConsumer#start,代码如下:

public void start() throws MQClientException {
    setConsumerGroup(
        NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
    // 启动
    this.defaultMQPushConsumerImpl.start();
    // 消息轨迹相关内容,我们不关注
    if (null != traceDispatcher) {
        ...
    }
}

继续进入DefaultMQPushConsumerImpl#start

public synchronized void start() throws MQClientException {
    switch (this.serviceState) {
        case CREATE_JUST:
            log.info(...);
            this.serviceState = ServiceState.START_FAILED;

            this.checkConfig();

            this.copySubscription();

            if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                this.defaultMQPushConsumer.changeInstanceNameToPID();
            }
            // 客户端
            this.mQClientFactory = MQClientManager.getInstance()
                .getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
            // 设置负载均衡相关属性
            this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
            this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
            this.rebalanceImpl.setAllocateMessageQueueStrategy(
                this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
            this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);

            this.pullAPIWrapper = new PullAPIWrapper(mQClientFactory,
                this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
            this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);

            if (this.defaultMQPushConsumer.getOffsetStore() != null) {
                this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
            } else {
                // 消息模式:广播模式存在本地,集群模式存在远程(broker)
                switch (this.defaultMQPushConsumer.getMessageModel()) {
                    case BROADCASTING:
                        this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, 
                            this.defaultMQPushConsumer.getConsumerGroup());
                        break;
                    case CLUSTERING:
                        this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, 
                            this.defaultMQPushConsumer.getConsumerGroup());
                        break;
                    default:
                        break;
                }
                this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
            }
            // 加载消费信息的偏移量
            this.offsetStore.load();
            // 根据客户端实例化不同的consumeMessageService:顺序消息与并发消息
            if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                this.consumeOrderly = true;
                this.consumeMessageService = new ConsumeMessageOrderlyService(this
                    (MessageListenerOrderly) this.getMessageListenerInner());
            } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                this.consumeOrderly = false;
                this.consumeMessageService = new ConsumeMessageConcurrentlyService(this
                    (MessageListenerConcurrently) this.getMessageListenerInner());
            }

            this.consumeMessageService.start();
            // 注册消费组
            boolean registerOK = mQClientFactory.registerConsumer(
                this.defaultMQPushConsumer.getConsumerGroup(), this);
            if (!registerOK) {
                this.serviceState = ServiceState.CREATE_JUST;
                this.consumeMessageService.shutdown(
                    defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());
                throw new MQClientException(...);
            }
            // 启动
            mQClientFactory.start();
            log.info(...);
            this.serviceState = ServiceState.RUNNING;
            break;
        case RUNNING:
        case START_FAILED:
        case SHUTDOWN_ALREADY:
            throw new MQClientException(...);
        default:
            break;
    }

    // 更新 topic 的信息,从nameServer获取数据
    this.updateTopicSubscribeInfoWhenSubscriptionChanged();
    this.mQClientFactory.checkClientInBroker();
    // 发送心跳,发送到所有的broker
    this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
    // 负载均衡
    this.mQClientFactory.rebalanceImmediately();
}

这个方法比较长,整个consumer的启动流程都在这里了,咱们挑重点说,来总结下这个方法做了什么。

  1. 获取客户端mQClientFactory,类型为org.apache.rocketmq.client.impl.factory.MQClientInstance,如果对producer还有印象的话,我们就会发现,producer中的mQClientFactory的类型也是它
  2. 区分广播模式与集群模式的offsetStore,所谓的offsetStore,就是一存储器,用来存储当前消费者消费信息的偏移量。在广播模式中,该偏移量保存在本地文件中,而在集群模式中,该偏移量保存在远程broker中,广播模式与集群模式,我们后面再详细分析
  3. 根据客户端实例化不同的consumeMessageService,这里用来区分顺序消息与并发消息,依然是后面再分析
  4. 启动mQClientFactory,也就是启动客户端
  5. 更新topic信息、发送心跳信息到broker、处理负载均衡功能

以上就是DefaultMQPushConsumerImpl#start方法所做的的主要工作了。实际上,上面的123点都是一些配置工作,这些配置对应的服务是在mQClientFactory.start()方法中启动的,我们继续。

3. 启动mQClientFactoryMQClientInstance#start

我们来看看mQClientFactory的启动流程,进入MQClientInstance#start

public void start() throws MQClientException {
    synchronized (this) {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                // 获取 nameServer 的地址
                if (null == this.clientConfig.getNamesrvAddr()) {
                    this.mQClientAPIImpl.fetchNameServerAddr();
                }
                // 启动客户端的远程服务,这个方法会配置netty客户端
                this.mQClientAPIImpl.start();
                // 启动定时任务
                this.startScheduledTask();
                // pull服务,仅对consumer启作用
                this.pullMessageService.start();
                // 启动负载均衡服务,仅对consumer启作用
                this.rebalanceService.start();
                // 启用内部的 producer
                this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                log.info("the client factory [{}] start OK"this.clientId);
                this.serviceState = ServiceState.RUNNING;
                break;
            case START_FAILED:
                throw new MQClientException(...);
            default:
                break;
        }
    }
}

producer的启动过程中,也会调用这个方法,前面我们已经分析过了一波了,这次我们在consumer的角度再来分析这个方法。

该方法所做的工作如下:

  1. 获取 nameServer 的地址
  2. 启动客户端的远程服务,这个方法会配置netty客户端
  3. 启动定时任务
  4. 启动拉取消息服务
  5. 启动负载均衡服务

上面的12producer的流程并无区别,就不再分析了,我们来看看定时任务的启动,进入方法MQClientInstance#startScheduledTask

private void startScheduledTask() {
    ...

    // 持久化消费者的消费偏移量,每5秒一次
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {
                MQClientInstance.this.persistAllConsumerOffset();
            } catch (Exception e) {
                log.error("ScheduledTask persistAllConsumerOffset exception", e);
            }
        }
    }, 1000 * 10this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

    // 省略其他定时任务
    ...
}

这个方法中还启动了其他一些定时任务,这里我们重点关注执行MQClientInstance#persistAllConsumerOffset()方法的定时任务,该定时任务会持久化当前消费者消费消息的偏移量,在本节我们先对这个定时任务有个印象,在分析偏移量持久化一节再详细分析持久化流程。

我们再回到MQClientInstance#start的流程,第4与第5步,主要是启动了两个服务:pullMessageServicerebalanceService,这个类的信息如下:

/**
 * PullMessageService
 */

public class PullMessageService extends ServiceThread {
    ...
}

/**
 * RebalanceService
 */

public class RebalanceService extends ServiceThread {
    ...
}

这两个类都是ServiceThread的子类,这两个类的start()方法也都是来自于ServiceThread

public abstract class ServiceThread implements Runnable {

    // 省略其他代码
    ...

    /**
     * start() 方法
     */

    public void start() {
        log.info(...);
        if (!started.compareAndSet(falsetrue)) {
            return;
        }
        stopped = false;
        this.thread = new Thread(this, getServiceName());
        this.thread.setDaemon(isDaemon);
        this.thread.start();
    }
}

从代码来看,ServiceThread实现了Runnable接口,在其start()方法中,启动了一个线程,线程的执行逻辑正是来自于其子类的run()方法,因此我们要看pullMessageServicerebalanceServicestart()方法执行逻辑,只需要看对应类的run()方法即可。

到此为止,consumer的启动就已经完成了,各项服务也启动起来了,而consumer拉取消息也正是由这些服务的配合处理的,接下来我们就来分析这些服务做了什么。

限于篇幅,本文就先到这里了,下篇继续。


限于作者个人水平,文中难免有错误之处,欢迎指正!原创不易,商业转载请联系作者获得授权,非商业转载请注明出处。

本文首发于微信公众号 「Java技术探秘」,如果您喜欢本文,欢迎关注该公众号,让我们一起在技术的世界里探秘吧!


浏览 16
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报