Rocketmq源码分析08:producer 启动流程
注:本系列源码分析基于RocketMq 4.8.0,gitee仓库链接:https://gitee.com/funcy/rocketmq.git.
本文我们来分析rocketMq
producer
发送消息的流程.
producer
发送消息的示例在org.apache.rocketmq.example.simple.Producer
类中,代码如下:
public class Producer {
public static void main(String[] args)
throws MQClientException, InterruptedException {
String nameServer = "localhost:9876";
// 1. 创建 DefaultMQProducer 对象
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr(nameServer);
// 2. 启动 producer
producer.start();
for (int i = 0; i < 1; i++)
try {
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 3. 发送消息
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
}
以上代码分三步走:
创建 DefaultMQProducer
对象启动 producer
发送消息
接下来我们的分析也按这三步进行。
1. DefaultMQProducer
构造方法
DefaultMQProducer
构造方法代码如下:
public DefaultMQProducer(final String producerGroup) {
// 继续调用
this(null, producerGroup, null);
}
/**
* 最终调用的构造方法
*/
public DefaultMQProducer(final String namespace,
final String producerGroup, RPCHook rpcHook) {
this.namespace = namespace;
this.producerGroup = producerGroup;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
}
这个方法就是简单地赋了值,然后创建了DefaultMQProducerImpl
实例,我们继续看DefaultMQProducerImpl
的构造方法:
public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) {
this.defaultMQProducer = defaultMQProducer;
this.rpcHook = rpcHook;
// 异步发送的队列
this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(50000);
// 处理异步发送的线程池
this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.asyncSenderThreadPoolQueue,
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());
}
});
}
这个构造方法依然还是处理赋值操作,并没做什么实质性内容,就不继续深究了。
2. DefaultMQProducer#start
:启动producer
接着我们来看看producer
的启动流程,进入DefaultMQProducer#start
方法:
public void start() throws MQClientException {
this.setProducerGroup(withNamespace(this.producerGroup));
// 调用 defaultMQProducerImpl 的 start() 方法
this.defaultMQProducerImpl.start();
// 消息轨迹相关,我们不关注
if (null != traceDispatcher) {
...
}
}
这个方法先是调用了defaultMQProducerImpl#start
方法,然后处理消息轨迹相关操作,关于rocketMq
消息轨迹相关内容,本文就不过多探讨了,我们将目光聚集于DefaultMQProducerImpl#start(boolean)
方法:
public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// 检查一些配置信息
this.checkConfig();
// 修改当前的 instanceName 为当前进程id
if (!this.defaultMQProducer.getProducerGroup()
.equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}
// 获取mq实例
this.mQClientFactory = MQClientManager.getInstance()
.getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
// 注册 mqClient 实例
boolean registerOK = mQClientFactory.registerProducer(
this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException(...);
}
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(),
new TopicPublishInfo());
// 启动实例
if (startFactory) {
mQClientFactory.start();
}
log.info(...);
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException(...);
default:
break;
}
// 发送心跳到所有的broker
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
// 定时扫描异步请求的返回结果
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
RequestFutureTable.scanExpiredRequest();
} catch (Throwable e) {
log.error("scan RequestFutureTable exception", e);
}
}
}, 1000 * 3, 1000);
}
这个方法并不复杂相关内容都已经作了注释,这里重点提出3个方法:
mQClientFactory.start()
:执行方法为MQClientInstance#start
,这个方法里会启动一些组件,我们稍后会分析。mQClientFactory.sendHeartbeatToAllBrokerWithLock()
:发送心跳到所有的broker
,最终执行的方法为MQClientAPIImpl#sendHearbeat
:
这里是与public int sendHearbeat(
final String addr,
final HeartbeatData heartbeatData,
final long timeoutMillis
) throws RemotingException, MQBrokerException, InterruptedException {
// request 的 code 为 HEART_BEAT
RemotingCommand request = RemotingCommand
.createRequestCommand(RequestCode.HEART_BEAT, null);
request.setLanguage(clientConfig.getLanguage());
request.setBody(heartbeatData.encode());
// 异步调用
RemotingCommand response = this.remotingClient
.invokeSync(addr, request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return response.getVersion();
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}broker
通信,request
的code
为HEART_BEAT
,后面的分析中我们会看到,producer
也会同nameServer
通信。定时扫描异步请求的返回结果:最终执行的方法为 RequestFutureTable.scanExpiredRequest()
,关于该方法的内容,我们在分析producer
发送异步消息时再分析。
2.1 MQClientInstance#start
:启动MQClientInstance
接下来我们来看看MQClientInstance
的启动,方法为MQClientInstance#start
,代码如下:
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// 获取 nameServer 的地址
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// 启动远程服务,这个方法只是装配了netty客户端相关配置
// 注意:1. 这里是netty客户端,2. 这里并没有创建连接
this.mQClientAPIImpl.start();
// 启动定时任务
this.startScheduledTask();
// pull服务,仅对consumer启作用
this.pullMessageService.start();
// 启动负载均衡服务,仅对consumer启作用
this.rebalanceService.start();
// 启用内部的 producer
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case START_FAILED:
throw new MQClientException(...);
default:
break;
}
}
}
这个方法进行的操作在注释中已经说明得很清楚了,接下来我们对以上的部分操作做进一步分析。
1. mQClientAPIImpl.start()
:配置netty客户端
这里调用的是NettyRemotingClient#start
方法,代码如下:
@Override
public void start() {
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyClientConfig.getClientWorkerThreads(),
new ThreadFactory() {
...
});
// 这里使用的是Bootstrap而非ServerBootstrap,表示这是netty客户端
Bootstrap handler = this.bootstrap
.group(this.eventLoopGroupWorker)
.channel(NioSocketChannel.class)
.option(...)
// 省略各种option
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 省略pipeline的装配
...
}
});
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
...
}
}, 1000 * 3, 1000);
if (this.channelEventListener != null) {
this.nettyEventExecutor.start();
}
}
对于这个方法,说明有两个点:
方法里使用的是 Bootstrap
而非ServerBootstrap
,表示这是netty客户端整个方法中并没有创建连接
2. startScheduledTask()
:启动定时任务
启动定时任务的方法为MQClientInstance#startScheduledTask
,代码如下:
private void startScheduledTask() {
if (null == this.clientConfig.getNamesrvAddr()) {
// 定时获取 nameServer 的地址
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
} catch (Exception e) {
log.error("ScheduledTask fetchNameServerAddr exception", e);
}
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}
// 定时更新topic的路由信息
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
}
}
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
// 定时发送心跳信息
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.cleanOfflineBroker();
MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
} catch (Exception e) {
log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
}
}
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
// 持久化消息者的消费偏移量,可以放在本地文件,也可以推送到 broker
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
// 调整线程池的线程数量,并没有用上
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.adjustThreadPool();
} catch (Exception e) {
log.error("ScheduledTask adjustThreadPool exception", e);
}
}
}, 1, 1, TimeUnit.MINUTES);
}
这里共有5个定时任务:
定时获取 nameServer
的地址,MQClientInstance#start
一开始会调用MQClientAPIImpl#fetchNameServerAddr
获取nameServer
,这里也调用了这个方法定时更新 topic
的路由信息,这里会去nameServer
获取路由信息,之后再分析定时发送心跳信息到 nameServer
,在DefaultMQProducerImpl#start(boolean)
中,我们也提到了向nameServer
发送心跳信息,两处调用的是同一个方法持久化消费者的消费偏移量,这个仅对消费者 consumer
有效,后面分析消费者时再作分析调整线程池的线程数量,不过追踪到最后,发现这个并没有生效,就不多说了
这里我们重点来看topic
路由信息的获取,我们经过对MQClientInstance#updateTopicRouteInfoFromNameServer()
的一路追踪,我们来到了MQClientAPIImpl#getTopicRouteInfoFromNameServer(...)
public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,
boolean allowTopicNotExist) throws MQClientException, InterruptedException,
RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
requestHeader.setTopic(topic);
// 发送请求的 code 为 GET_ROUTEINFO_BY_TOPIC
RemotingCommand request = RemotingCommand
.createRequestCommand(RequestCode.GET_ROUTEINFO_BY_TOPIC, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
...
case ResponseCode.SUCCESS: {
byte[] body = response.getBody();
if (body != null) {
return TopicRouteData.decode(body, TopicRouteData.class);
}
}
...
}
...
}
这里发送向NameServer
发送消息的code
是GET_ROUTEINFO_BY_TOPIC
,这点在前面分析nameServer
的消息处理时也分析过了,并且还分析了当消息送达nameServer
后,nameServer
是如何返回topic
数据的,遗忘的小伙伴可以看下之前分析nameServer
的文章。
限于篇幅,本文就先到这里了,本文主要是分析producer
启动流程,下一篇文章将分析消息发送流程。
限于作者个人水平,文中难免有错误之处,欢迎指正!原创不易,商业转载请联系作者获得授权,非商业转载请注明出处。
本文首发于微信公众号 「Java技术探秘」,如果您喜欢本文,欢迎关注该公众号,让我们一起在技术的世界里探秘吧!