Rocketmq源码分析08:producer 启动流程
注:本系列源码分析基于RocketMq 4.8.0,gitee仓库链接:https://gitee.com/funcy/rocketmq.git.
本文我们来分析rocketMq producer 发送消息的流程.
producer发送消息的示例在org.apache.rocketmq.example.simple.Producer类中,代码如下:
public class Producer {
    public static void main(String[] args) 
            throws MQClientException, InterruptedException {
        String nameServer = "localhost:9876";
        // 1. 创建 DefaultMQProducer 对象
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        producer.setNamesrvAddr(nameServer);
        // 2. 启动 producer
        producer.start();
        for (int i = 0; i < 1; i++)
            try {
                Message msg = new Message("TopicTest",
                    "TagA",
                    "OrderID188",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                // 3. 发送消息    
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
            }
        producer.shutdown();
    }
}
以上代码分三步走:
创建 DefaultMQProducer对象启动 producer发送消息 
接下来我们的分析也按这三步进行。
1. DefaultMQProducer构造方法
DefaultMQProducer构造方法代码如下:
public DefaultMQProducer(final String producerGroup) {
    // 继续调用
    this(null, producerGroup, null);
}
/**
 * 最终调用的构造方法
 */
public DefaultMQProducer(final String namespace, 
        final String producerGroup, RPCHook rpcHook) {
    this.namespace = namespace;
    this.producerGroup = producerGroup;
    defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
}
这个方法就是简单地赋了值,然后创建了DefaultMQProducerImpl实例,我们继续看DefaultMQProducerImpl的构造方法:
public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) {
    this.defaultMQProducer = defaultMQProducer;
    this.rpcHook = rpcHook;
    // 异步发送的队列
    this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(50000);
    // 处理异步发送的线程池
    this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(
            Runtime.getRuntime().availableProcessors(),
            Runtime.getRuntime().availableProcessors(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.asyncSenderThreadPoolQueue,
        new ThreadFactory() {
            private AtomicInteger threadIndex = new AtomicInteger(0);
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());
            }
        });
}
这个构造方法依然还是处理赋值操作,并没做什么实质性内容,就不继续深究了。
2. DefaultMQProducer#start:启动producer
接着我们来看看producer的启动流程,进入DefaultMQProducer#start方法:
public void start() throws MQClientException {
    this.setProducerGroup(withNamespace(this.producerGroup));
    // 调用 defaultMQProducerImpl 的 start() 方法
    this.defaultMQProducerImpl.start();
    // 消息轨迹相关,我们不关注
    if (null != traceDispatcher) {
        ...
    }
}
这个方法先是调用了defaultMQProducerImpl#start方法,然后处理消息轨迹相关操作,关于rocketMq消息轨迹相关内容,本文就不过多探讨了,我们将目光聚集于DefaultMQProducerImpl#start(boolean)方法:
public void start(final boolean startFactory) throws MQClientException {
    switch (this.serviceState) {
        case CREATE_JUST:
            this.serviceState = ServiceState.START_FAILED;
            // 检查一些配置信息
            this.checkConfig();
            // 修改当前的 instanceName 为当前进程id
            if (!this.defaultMQProducer.getProducerGroup()
                    .equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                this.defaultMQProducer.changeInstanceNameToPID();
            }
            // 获取mq实例
            this.mQClientFactory = MQClientManager.getInstance()
                .getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
            // 注册 mqClient 实例
            boolean registerOK = mQClientFactory.registerProducer(
                this.defaultMQProducer.getProducerGroup(), this);
            if (!registerOK) {
                this.serviceState = ServiceState.CREATE_JUST;
                throw new MQClientException(...);
            }
            this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), 
                    new TopicPublishInfo());
            // 启动实例
            if (startFactory) {
                mQClientFactory.start();
            }
            log.info(...);
            this.serviceState = ServiceState.RUNNING;
            break;
        case RUNNING:
        case START_FAILED:
        case SHUTDOWN_ALREADY:
            throw new MQClientException(...);
        default:
            break;
    }
    // 发送心跳到所有的broker
    this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
    // 定时扫描异步请求的返回结果
    this.timer.scheduleAtFixedRate(new TimerTask() {
        @Override
        public void run() {
            try {
                RequestFutureTable.scanExpiredRequest();
            } catch (Throwable e) {
                log.error("scan RequestFutureTable exception", e);
            }
        }
    }, 1000 * 3, 1000);
}
这个方法并不复杂相关内容都已经作了注释,这里重点提出3个方法:
mQClientFactory.start():执行方法为MQClientInstance#start,这个方法里会启动一些组件,我们稍后会分析。mQClientFactory.sendHeartbeatToAllBrokerWithLock():发送心跳到所有的broker,最终执行的方法为MQClientAPIImpl#sendHearbeat:
这里是与public int sendHearbeat(
final String addr,
final HeartbeatData heartbeatData,
final long timeoutMillis
) throws RemotingException, MQBrokerException, InterruptedException {
// request 的 code 为 HEART_BEAT
RemotingCommand request = RemotingCommand
.createRequestCommand(RequestCode.HEART_BEAT, null);
request.setLanguage(clientConfig.getLanguage());
request.setBody(heartbeatData.encode());
// 异步调用
RemotingCommand response = this.remotingClient
.invokeSync(addr, request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return response.getVersion();
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}broker通信,request的code为HEART_BEAT,后面的分析中我们会看到,producer也会同nameServer通信。定时扫描异步请求的返回结果:最终执行的方法为 RequestFutureTable.scanExpiredRequest(),关于该方法的内容,我们在分析producer发送异步消息时再分析。
2.1 MQClientInstance#start:启动MQClientInstance
接下来我们来看看MQClientInstance的启动,方法为MQClientInstance#start,代码如下:
public void start() throws MQClientException {
    synchronized (this) {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                // 获取 nameServer 的地址
                if (null == this.clientConfig.getNamesrvAddr()) {
                    this.mQClientAPIImpl.fetchNameServerAddr();
                }
                // 启动远程服务,这个方法只是装配了netty客户端相关配置
                // 注意:1. 这里是netty客户端,2. 这里并没有创建连接
                this.mQClientAPIImpl.start();
                // 启动定时任务
                this.startScheduledTask();
                // pull服务,仅对consumer启作用
                this.pullMessageService.start();
                // 启动负载均衡服务,仅对consumer启作用
                this.rebalanceService.start();
                // 启用内部的 producer
                this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                log.info("the client factory [{}] start OK", this.clientId);
                this.serviceState = ServiceState.RUNNING;
                break;
            case START_FAILED:
                throw new MQClientException(...);
            default:
                break;
        }
    }
}
这个方法进行的操作在注释中已经说明得很清楚了,接下来我们对以上的部分操作做进一步分析。
1. mQClientAPIImpl.start():配置netty客户端
这里调用的是NettyRemotingClient#start方法,代码如下:
 @Override
public void start() {
    this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
        nettyClientConfig.getClientWorkerThreads(),
        new ThreadFactory() {
            ...
        });
    // 这里使用的是Bootstrap而非ServerBootstrap,表示这是netty客户端
    Bootstrap handler = this.bootstrap
        .group(this.eventLoopGroupWorker)
        .channel(NioSocketChannel.class)
        .option(...)
        // 省略各种option
        .handler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();
                // 省略pipeline的装配
                ...
            }
        });
    this.timer.scheduleAtFixedRate(new TimerTask() {
        @Override
        public void run() {
            ...
        }
    }, 1000 * 3, 1000);
    if (this.channelEventListener != null) {
        this.nettyEventExecutor.start();
    }
}
对于这个方法,说明有两个点:
方法里使用的是 Bootstrap而非ServerBootstrap,表示这是netty客户端整个方法中并没有创建连接 
2. startScheduledTask():启动定时任务
启动定时任务的方法为MQClientInstance#startScheduledTask,代码如下:
private void startScheduledTask() {
    if (null == this.clientConfig.getNamesrvAddr()) {
        // 定时获取 nameServer 的地址
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
                } catch (Exception e) {
                    log.error("ScheduledTask fetchNameServerAddr exception", e);
                }
            }
        }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
    }
    // 定时更新topic的路由信息
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            try {
                MQClientInstance.this.updateTopicRouteInfoFromNameServer();
            } catch (Exception e) {
                log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
            }
        }
    }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
    // 定时发送心跳信息
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            try {
                MQClientInstance.this.cleanOfflineBroker();
                MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
            } catch (Exception e) {
                log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
            }
        }
    }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
    // 持久化消息者的消费偏移量,可以放在本地文件,也可以推送到 broker
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            try {
                MQClientInstance.this.persistAllConsumerOffset();
            } catch (Exception e) {
                log.error("ScheduledTask persistAllConsumerOffset exception", e);
            }
        }
    }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
    // 调整线程池的线程数量,并没有用上
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            try {
                MQClientInstance.this.adjustThreadPool();
            } catch (Exception e) {
                log.error("ScheduledTask adjustThreadPool exception", e);
            }
        }
    }, 1, 1, TimeUnit.MINUTES);
}
这里共有5个定时任务:
定时获取 nameServer的地址,MQClientInstance#start一开始会调用MQClientAPIImpl#fetchNameServerAddr获取nameServer,这里也调用了这个方法定时更新 topic的路由信息,这里会去nameServer获取路由信息,之后再分析定时发送心跳信息到 nameServer,在DefaultMQProducerImpl#start(boolean)中,我们也提到了向nameServer发送心跳信息,两处调用的是同一个方法持久化消费者的消费偏移量,这个仅对消费者 consumer有效,后面分析消费者时再作分析调整线程池的线程数量,不过追踪到最后,发现这个并没有生效,就不多说了 
这里我们重点来看topic路由信息的获取,我们经过对MQClientInstance#updateTopicRouteInfoFromNameServer()的一路追踪,我们来到了MQClientAPIImpl#getTopicRouteInfoFromNameServer(...)
public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,
        boolean allowTopicNotExist) throws MQClientException, InterruptedException, 
        RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
    GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
    requestHeader.setTopic(topic);
    // 发送请求的 code 为 GET_ROUTEINFO_BY_TOPIC
    RemotingCommand request = RemotingCommand
        .createRequestCommand(RequestCode.GET_ROUTEINFO_BY_TOPIC, requestHeader);
    RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
    assert response != null;
    switch (response.getCode()) {
        ...
        case ResponseCode.SUCCESS: {
            byte[] body = response.getBody();
            if (body != null) {
                return TopicRouteData.decode(body, TopicRouteData.class);
            }
        }
        ...
    }
    ...
}
这里发送向NameServer发送消息的code是GET_ROUTEINFO_BY_TOPIC,这点在前面分析nameServer的消息处理时也分析过了,并且还分析了当消息送达nameServer后,nameServer是如何返回topic数据的,遗忘的小伙伴可以看下之前分析nameServer的文章。
限于篇幅,本文就先到这里了,本文主要是分析producer启动流程,下一篇文章将分析消息发送流程。
限于作者个人水平,文中难免有错误之处,欢迎指正!原创不易,商业转载请联系作者获得授权,非商业转载请注明出处。
本文首发于微信公众号 「Java技术探秘」,如果您喜欢本文,欢迎关注该公众号,让我们一起在技术的世界里探秘吧!
