Rocketmq源码分析06:broker 消息分发流程
注:本系列源码分析基于RocketMq 4.8.0,gitee仓库链接:https://gitee.com/funcy/rocketmq.git.
RocketMq
消息处理整个流程如下:
消息接收:消息接收是指接收 producer
的消息,处理类是SendMessageProcessor
,将消息写入到commigLog
文件后,接收流程处理完毕;消息分发: broker
处理消息分发的类是ReputMessageService
,它会启动一个线程,不断地将commitLong
分到到对应的consumerQueue
,这一步操作会写两个文件:consumerQueue
与indexFile
,写入后,消息分发流程处理 完毕;消息投递:消息投递是指将消息发往 consumer
的流程,consumer
会发起获取消息的请求,broker
收到请求后,调用PullMessageProcessor
类处理,从consumerQueue
文件获取消息,返回给consumer
后,投递流程处理完毕。
以上就是rocketMq
处理消息的流程了,接下来我们就从源码来分析消息分发的实现。
1. 分发线程的启动
消息写入到commitlog
后,接着broker
会对这些消息进行分发操作,这里的分发,是指broker
将消息写入到consumerQueue
文件中。
broker
消息分发的操作是在一个单独的线程中进行的,这里我们来回忆下BrokerController
的启动流程,进入BrokerController#start
方法:
public void start() throws Exception {
// 启动各组件
if (this.messageStore != null) {
this.messageStore.start();
}
...
}
继续进入DefaultMessageStore#start
方法:
public void start() throws Exception {
...
// 处理 maxPhysicalPosInLogicQueue 的值
long maxPhysicalPosInLogicQueue = commitLog.getMinOffset();
for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
for (ConsumeQueue logic : maps.values()) {
if (logic.getMaxPhysicOffset() > maxPhysicalPosInLogicQueue) {
maxPhysicalPosInLogicQueue = logic.getMaxPhysicOffset();
}
}
}
if (maxPhysicalPosInLogicQueue < 0) {
maxPhysicalPosInLogicQueue = 0;
}
if (maxPhysicalPosInLogicQueue < this.commitLog.getMinOffset()) {
maxPhysicalPosInLogicQueue = this.commitLog.getMinOffset();
}
this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue);
// 消息分发操作,启动新线程来处理
this.reputMessageService.start();
...
}
在BrokerController
启动时,会处理maxPhysicalPosInLogicQueue
的值,这个值就是分发commitlog
消息的偏移量,之后就启动ReputMessageService
服务来处理。ReputMessageService
是DefaultMessageStore
的内部类,它是ServiceThread
的子类,start()
方法如下:
public abstract class ServiceThread implements Runnable {
public void start() {
if (!started.compareAndSet(false, true)) {
return;
}
stopped = false;
this.thread = new Thread(this, getServiceName());
this.thread.setDaemon(isDaemon);
this.thread.start();
}
...
}
这个方法仅仅是处理线程的启动,我们继续看ServiceThread
。ServiceThread
是Runnable
的子类,它的run()
方法如下:
class ReputMessageService extends ServiceThread {
@Override
public void run() {
DefaultMessageStore.log.info(...);
while (!this.isStopped()) {
try {
Thread.sleep(1);
// 调用的是 doReput() 方法
this.doReput();
} catch (Exception e) {
DefaultMessageStore.log.warn(...);
}
}
DefaultMessageStore.log.info(...);
}
}
从ReputMessageService#run()
方法来看,该线程会休眠1ms,然后调用doReput()
方法处理,看来doReput()
方法就是关键了!
2. 消息分发:DefaultMessageStore.ReputMessageService#doReput
我们进入DefaultMessageStore.ReputMessageService#doReput
方法:
private void doReput() {
// 处理 reputFromOffset
if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
log.warn(...);
this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
}
for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()
&& this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
break;
}
// 从CommitLog中获取需要进行转发的消息
SelectMappedBufferResult result
= DefaultMessageStore.this.commitLog.getData(reputFromOffset);
if (result != null) {
try {
this.reputFromOffset = result.getStartOffset();
for (int readSize = 0; readSize < result.getSize() && doNext; ) {
// 检验数据
DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog
.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
int size = dispatchRequest.getBufferSize() == -1
? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();
if (dispatchRequest.isSuccess()) {
if (size > 0) {
// 分发消息
DefaultMessageStore.this.doDispatch(dispatchRequest);
// 长轮询:如果有消息到了主节点,并且开启了长轮询
if (BrokerRole.SLAVE != DefaultMessageStore.this
.getMessageStoreConfig().getBrokerRole()
&&DefaultMessageStore.this.brokerConfig.isLongPollingEnable()){
// 调用NotifyMessageArrivingListener的arriving方法
DefaultMessageStore.this.messageArrivingListener.arriving(
dispatchRequest.getTopic(),
dispatchRequest.getQueueId(),
dispatchRequest.getConsumeQueueOffset() + 1,
dispatchRequest.getTagsCode(),
dispatchRequest.getStoreTimestamp(),
dispatchRequest.getBitMap(),
dispatchRequest.getPropertiesMap());
}
...
} else if (size == 0) {
...
}
} else if (!dispatchRequest.isSuccess()) {
...
}
}
} finally {
result.release();
}
} else {
doNext = false;
}
}
}
该方法依旧很长,我们重点关注与分发相关的流程:
commitLog.getData(...)
:从CommitLog
中获取DispatchRequest
需要分发的消息,参数reputFromOffset
就是消息在文件中的偏移量this.doDispatch(...)
:分发操作,就是把消息的相关写入ConsumeQueue
与IndexFile
两个文件中如果当前节点为主节点,且启用了长轮询,则调用 NotifyMessageArrivingListener
的arriving
方法,在这里会把消息主动投递到consumer
总的来说,当消息写入到commitLog
后,ReputMessage
会根据上一次分发消息的偏移量依次从commitLog
文件中读取消息信息,写入到ConsumeQueue
与IndexFile
两个文件中,当然了,这里写入的只是消息的发送时间、在commitLog
中的位置信息,完整的消息只有commitLog
文件才存在。
写完这两个文件后,接下来就等待consumer
来拉取消息了。当然,consumer
主动来拉取可能会导致消息无法实时送达,为解决这个问题,rocketMq
给出的解决方案是长轮询,具体为:如果当前没有消息,就hold
住consumer
的请求30s,这30s内一旦有消息过来,就及时唤醒consumer
的请求,实际将消息发送出去,就也是NotifyMessageArrivingListener#arriving
方法所做的工作,关于这点我们在分析consumer
拉取消息时再详细分析。
我们再来看看消息分发消息,进入DefaultMessageStore#doDispatch
:
public class DefaultMessageStore implements MessageStore {
private final LinkedList<CommitLogDispatcher> dispatcherList;
/**
* DefaultMessageStore 构造方法
*/
public DefaultMessageStore(...) throws IOException {
...
// 消息分发处理
this.dispatcherList = new LinkedList<>();
// 写入 ConsumeQueue 文件
this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue());
// 写入 Index 文件
this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex());
...
}
/**
* 分发操作
*/
public void doDispatch(DispatchRequest req) {
// 进行分发操作,dispatcherList 包含两个对象:
// 1. CommitLogDispatcherBuildConsumeQueue:写入 ConsumeQueue 文件
// 2. CommitLogDispatcherBuildIndex:写入 Index 文件
for (CommitLogDispatcher dispatcher : this.dispatcherList) {
dispatcher.dispatch(req);
}
}
}
从整个方法的运行来看,DefaultMessageStore
在创建时,会准备两个CommitLogDispatcher
:
CommitLogDispatcherBuildConsumeQueue
:处理ConsumeQueue
文件的写入CommitLogDispatcherBuildIndex
:处理IndexFile
文件的写入
在DefaultMessageStore#doDispatch
方法中,就是对这两个文件的写入操作了:
/**
* consumerQueue 文件分发的构建器
*/
class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {
@Override
public void dispatch(DispatchRequest request) {
final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
switch (tranType) {
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
// 将消息在commitLog文件的位置、tags等信息写入ConsumerQueue文件
DefaultMessageStore.this.putMessagePositionInfo(request);
break;
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
break;
}
}
}
/**
* indexFile 文件分发的构建器
*/
class CommitLogDispatcherBuildIndex implements CommitLogDispatcher {
@Override
public void dispatch(DispatchRequest request) {
if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {
DefaultMessageStore.this.indexService.buildIndex(request);
}
}
}
需要注意的是,在这两个文件中,写入的仅是消息的位置信息,完整的消息内容仅在commitLog
中保存。
3. 总结
本文主要分析了broker
消息分发分发,这里说的分发流程,是指broker
将消息写入到consumerQueue
文件的流程。
在broker
启动时,会启动一个专门的线程:ReputMessageService
,该线程会不停地从comsumer
获取消息,然后将其写入到consumerQueue
文件与IndexFile
文件中。
当消息分发到consumerQueue
文件后,接着consumer
就可以很方便地从各队列中获取消息了,下一篇我们来分析broker
是如何响应consumer
获取消息请求的。
限于作者个人水平,文中难免有错误之处,欢迎指正!原创不易,商业转载请联系作者获得授权,非商业转载请注明出处。
本文首发于微信公众号 Java技术探秘,如果您喜欢本文,欢迎关注该公众号,让我们一起在技术的世界里探秘吧!
- END -