Rocketmq源码分析08:producer 启动流程

共 27614字,需浏览 56分钟

 ·

2021-04-22 21:33

注:本系列源码分析基于RocketMq 4.8.0,gitee仓库链接:https://gitee.com/funcy/rocketmq.git.

本文我们来分析rocketMq producer 发送消息的流程.

producer发送消息的示例在org.apache.rocketmq.example.simple.Producer类中,代码如下:

public class Producer {
    public static void main(String[] args) 
            throws MQClientException, InterruptedException 
{
        String nameServer = "localhost:9876";
        // 1. 创建 DefaultMQProducer 对象
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        producer.setNamesrvAddr(nameServer);
        // 2. 启动 producer
        producer.start();
        for (int i = 0; i < 1; i++)
            try {
                Message msg = new Message("TopicTest",
                    "TagA",
                    "OrderID188",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                // 3. 发送消息    
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
            }
        producer.shutdown();
    }
}

以上代码分三步走:

  1. 创建 DefaultMQProducer 对象
  2. 启动 producer
  3. 发送消息

接下来我们的分析也按这三步进行。

1. DefaultMQProducer构造方法

DefaultMQProducer构造方法代码如下:

public DefaultMQProducer(final String producerGroup) {
    // 继续调用
    this(null, producerGroup, null);
}


/**
 * 最终调用的构造方法
 */

public DefaultMQProducer(final String namespace, 
        final String producerGroup, RPCHook rpcHook)
 
{
    this.namespace = namespace;
    this.producerGroup = producerGroup;
    defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
}

这个方法就是简单地赋了值,然后创建了DefaultMQProducerImpl实例,我们继续看DefaultMQProducerImpl的构造方法:

public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) {
    this.defaultMQProducer = defaultMQProducer;
    this.rpcHook = rpcHook;
    // 异步发送的队列
    this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(50000);
    // 处理异步发送的线程池
    this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(
            Runtime.getRuntime().availableProcessors(),
            Runtime.getRuntime().availableProcessors(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.asyncSenderThreadPoolQueue,
        new ThreadFactory() {
            private AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());
            }
        });
}

这个构造方法依然还是处理赋值操作,并没做什么实质性内容,就不继续深究了。

2. DefaultMQProducer#start:启动producer

接着我们来看看producer的启动流程,进入DefaultMQProducer#start方法:

public void start() throws MQClientException {
    this.setProducerGroup(withNamespace(this.producerGroup));
    // 调用 defaultMQProducerImpl 的 start() 方法
    this.defaultMQProducerImpl.start();
    // 消息轨迹相关,我们不关注
    if (null != traceDispatcher) {
        ...
    }
}

这个方法先是调用了defaultMQProducerImpl#start方法,然后处理消息轨迹相关操作,关于rocketMq消息轨迹相关内容,本文就不过多探讨了,我们将目光聚集于DefaultMQProducerImpl#start(boolean)方法:

public void start(final boolean startFactory) throws MQClientException {
    switch (this.serviceState) {
        case CREATE_JUST:
            this.serviceState = ServiceState.START_FAILED;
            // 检查一些配置信息
            this.checkConfig();
            // 修改当前的 instanceName 为当前进程id
            if (!this.defaultMQProducer.getProducerGroup()
                    .equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                this.defaultMQProducer.changeInstanceNameToPID();
            }
            // 获取mq实例
            this.mQClientFactory = MQClientManager.getInstance()
                .getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
            // 注册 mqClient 实例
            boolean registerOK = mQClientFactory.registerProducer(
                this.defaultMQProducer.getProducerGroup(), this);
            if (!registerOK) {
                this.serviceState = ServiceState.CREATE_JUST;
                throw new MQClientException(...);
            }

            this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), 
                    new TopicPublishInfo());
            // 启动实例
            if (startFactory) {
                mQClientFactory.start();
            }

            log.info(...);
            this.serviceState = ServiceState.RUNNING;
            break;
        case RUNNING:
        case START_FAILED:
        case SHUTDOWN_ALREADY:
            throw new MQClientException(...);
        default:
            break;
    }

    // 发送心跳到所有的broker
    this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
    // 定时扫描异步请求的返回结果
    this.timer.scheduleAtFixedRate(new TimerTask() {
        @Override
        public void run() {
            try {
                RequestFutureTable.scanExpiredRequest();
            } catch (Throwable e) {
                log.error("scan RequestFutureTable exception", e);
            }
        }
    }, 1000 * 31000);
}

这个方法并不复杂相关内容都已经作了注释,这里重点提出3个方法:

  1. mQClientFactory.start():执行方法为MQClientInstance#start,这个方法里会启动一些组件,我们稍后会分析。
  2. mQClientFactory.sendHeartbeatToAllBrokerWithLock():发送心跳到所有的broker,最终执行的方法为MQClientAPIImpl#sendHearbeat
    public int sendHearbeat(
        final String addr,
        final HeartbeatData heartbeatData,
        final long timeoutMillis
    )
     throws RemotingException, MQBrokerException, InterruptedException 
    {
        // request 的 code 为 HEART_BEAT
        RemotingCommand request = RemotingCommand
            .createRequestCommand(RequestCode.HEART_BEAT, null);
        request.setLanguage(clientConfig.getLanguage());
        request.setBody(heartbeatData.encode());
        // 异步调用
        RemotingCommand response = this.remotingClient
            .invokeSync(addr, request, timeoutMillis);
        assert response != null;
        switch (response.getCode()) {
            case ResponseCode.SUCCESS: {
                return response.getVersion();
            }
            default:
                break;
        }

        throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
    }
    这里是与broker通信,requestcodeHEART_BEAT,后面的分析中我们会看到,producer也会同nameServer通信。
  3. 定时扫描异步请求的返回结果:最终执行的方法为RequestFutureTable.scanExpiredRequest(),关于该方法的内容,我们在分析producer发送异步消息时再分析。

2.1 MQClientInstance#start:启动MQClientInstance

接下来我们来看看MQClientInstance的启动,方法为MQClientInstance#start,代码如下:

public void start() throws MQClientException {

    synchronized (this) {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                // 获取 nameServer 的地址
                if (null == this.clientConfig.getNamesrvAddr()) {
                    this.mQClientAPIImpl.fetchNameServerAddr();
                }
                // 启动远程服务,这个方法只是装配了netty客户端相关配置
                // 注意:1. 这里是netty客户端,2. 这里并没有创建连接
                this.mQClientAPIImpl.start();
                // 启动定时任务
                this.startScheduledTask();
                // pull服务,仅对consumer启作用
                this.pullMessageService.start();
                // 启动负载均衡服务,仅对consumer启作用
                this.rebalanceService.start();
                // 启用内部的 producer
                this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                log.info("the client factory [{}] start OK"this.clientId);
                this.serviceState = ServiceState.RUNNING;
                break;
            case START_FAILED:
                throw new MQClientException(...);
            default:
                break;
        }
    }
}

这个方法进行的操作在注释中已经说明得很清楚了,接下来我们对以上的部分操作做进一步分析。

1. mQClientAPIImpl.start():配置netty客户端

这里调用的是NettyRemotingClient#start方法,代码如下:

 @Override
public void start() {
    this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
        nettyClientConfig.getClientWorkerThreads(),
        new ThreadFactory() {
            ...
        });
    // 这里使用的是Bootstrap而非ServerBootstrap,表示这是netty客户端
    Bootstrap handler = this.bootstrap
        .group(this.eventLoopGroupWorker)
        .channel(NioSocketChannel.class)
        .option(...)
        // 省略各种option
        .handler(new ChannelInitializer<SocketChannel>() 
{
            @Override
            public void initChannel(SocketChannel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();
                // 省略pipeline的装配
                ...
            }
        });

    this.timer.scheduleAtFixedRate(new TimerTask() {
        @Override
        public void run() {
            ...
        }
    }, 1000 * 31000);

    if (this.channelEventListener != null) {
        this.nettyEventExecutor.start();
    }
}

对于这个方法,说明有两个点:

  1. 方法里使用的是Bootstrap而非ServerBootstrap,表示这是netty客户端
  2. 整个方法中并没有创建连接

2. startScheduledTask():启动定时任务

启动定时任务的方法为MQClientInstance#startScheduledTask,代码如下:

private void startScheduledTask() {
    if (null == this.clientConfig.getNamesrvAddr()) {
        // 定时获取 nameServer 的地址
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
                } catch (Exception e) {
                    log.error("ScheduledTask fetchNameServerAddr exception", e);
                }
            }
        }, 1000 * 101000 * 60 * 2, TimeUnit.MILLISECONDS);
    }

    // 定时更新topic的路由信息
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {
                MQClientInstance.this.updateTopicRouteInfoFromNameServer();
            } catch (Exception e) {
                log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
            }
        }
    }, 10this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);

    // 定时发送心跳信息
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            try {
                MQClientInstance.this.cleanOfflineBroker();
                MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
            } catch (Exception e) {
                log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
            }
        }
    }, 1000this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);

    // 持久化消息者的消费偏移量,可以放在本地文件,也可以推送到 broker
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            try {
                MQClientInstance.this.persistAllConsumerOffset();
            } catch (Exception e) {
                log.error("ScheduledTask persistAllConsumerOffset exception", e);
            }
        }
    }, 1000 * 10this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

    // 调整线程池的线程数量,并没有用上
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            try {
                MQClientInstance.this.adjustThreadPool();
            } catch (Exception e) {
                log.error("ScheduledTask adjustThreadPool exception", e);
            }
        }
    }, 11, TimeUnit.MINUTES);
}

这里共有5个定时任务:

  1. 定时获取 nameServer 的地址,MQClientInstance#start一开始会调用MQClientAPIImpl#fetchNameServerAddr获取nameServer,这里也调用了这个方法
  2. 定时更新topic的路由信息,这里会去nameServer获取路由信息,之后再分析
  3. 定时发送心跳信息到nameServer,在DefaultMQProducerImpl#start(boolean)中,我们也提到了向nameServer发送心跳信息,两处调用的是同一个方法
  4. 持久化消费者的消费偏移量,这个仅对消费者consumer有效,后面分析消费者时再作分析
  5. 调整线程池的线程数量,不过追踪到最后,发现这个并没有生效,就不多说了

这里我们重点来看topic路由信息的获取,我们经过对MQClientInstance#updateTopicRouteInfoFromNameServer()的一路追踪,我们来到了MQClientAPIImpl#getTopicRouteInfoFromNameServer(...)

public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,
        boolean allowTopicNotExist)
 throws MQClientException, InterruptedException, 
        RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException 
{
    GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
    requestHeader.setTopic(topic);
    // 发送请求的 code 为 GET_ROUTEINFO_BY_TOPIC
    RemotingCommand request = RemotingCommand
        .createRequestCommand(RequestCode.GET_ROUTEINFO_BY_TOPIC, requestHeader);
    RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
    assert response != null;
    switch (response.getCode()) {
        ...
        case ResponseCode.SUCCESS: {
            byte[] body = response.getBody();
            if (body != null) {
                return TopicRouteData.decode(body, TopicRouteData.class);
            }
        }
        ...
    }
    ...
}

这里发送向NameServer发送消息的codeGET_ROUTEINFO_BY_TOPIC,这点在前面分析nameServer的消息处理时也分析过了,并且还分析了当消息送达nameServer后,nameServer是如何返回topic数据的,遗忘的小伙伴可以看下之前分析nameServer的文章。

限于篇幅,本文就先到这里了,本文主要是分析producer启动流程,下一篇文章将分析消息发送流程。


限于作者个人水平,文中难免有错误之处,欢迎指正!原创不易,商业转载请联系作者获得授权,非商业转载请注明出处。

本文首发于微信公众号 「Java技术探秘」,如果您喜欢本文,欢迎关注该公众号,让我们一起在技术的世界里探秘吧!


浏览 27
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报