Rocketmq源码分析07:broker 消息投递流程
注:本系列源码分析基于RocketMq 4.8.0,gitee仓库链接:https://gitee.com/funcy/rocketmq.git.
RocketMq
消息处理整个流程如下:
消息接收:消息接收是指接收 producer
的消息,处理类是SendMessageProcessor
,将消息写入到commigLog
文件后,接收流程处理完毕;消息分发: broker
处理消息分发的类是ReputMessageService
,它会启动一个线程,不断地将commitLong
分到到对应的consumerQueue
,这一步操作会写两个文件:consumerQueue
与indexFile
,写入后,消息分发流程处理 完毕;消息投递:消息投递是指将消息发往 consumer
的流程,consumer
会发起获取消息的请求,broker
收到请求后,调用PullMessageProcessor
类处理,从consumerQueue
文件获取消息,返回给consumer
后,投递流程处理完毕。
以上就是rocketMq
处理消息的流程了,接下来我们就从源码来分析消息投递的实现。
1. 处理PULL_MESSAGE
请求
与producer
不同,consumer
从broker
拉取消息时,发送的请求code
为PULL_MESSAGE
,processor
为PullMessageProcessor
,我们直接进入它的processRequest
方法:
@Override
public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
// 调用方法
return this.processRequest(ctx.channel(), request, true);
}
这个方法就只是调用了一个重载方法,多出来的参数true
表示允许broker
挂起请求,我们继续,
/**
* 继续处理
*/
private RemotingCommand processRequest(final Channel channel, RemotingCommand request,
boolean brokerAllowSuspend)throws RemotingCommandException {
RemotingCommand response = RemotingCommand
.createResponseCommand(PullMessageResponseHeader.class);
final PullMessageResponseHeader responseHeader
= (PullMessageResponseHeader) response.readCustomHeader();
final PullMessageRequestHeader requestHeader = (PullMessageRequestHeader)
request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
response.setOpaque(request.getOpaque());
// 省略权限校验流程
// 1. rocketMq 可以设置校验信息,以阻挡非法客户端的连接
// 2. 同时,对topic可以设置DENY(拒绝)、ANY(PUB 或者 SUB 权限)、PUB(发送权限)、SUB(订阅权限)等权限,
// 可以细粒度控制客户端对topic的操作内容
...
// 获取订阅组
SubscriptionGroupConfig subscriptionGroupConfig =
this.brokerController.getSubscriptionGroupManager()
.findSubscriptionGroupConfig(requestHeader.getConsumerGroup());
...
// 获取订阅主题
TopicConfig topicConfig = this.brokerController.getTopicConfigManager()
.selectTopicConfig(requestHeader.getTopic());
...
// 处理filter
// consumer在订阅消息时,可以对订阅的消息进行过滤,过滤方法有两种:tag与sql92
// 这里我们重点关注拉取消息的流程,具体的过滤细节后面再分析
...
// 获取消息
// 1. 根据 topic 与 queueId 获取 ConsumerQueue 文件
// 2. 根据 ConsumerQueue 文件的信息,从 CommitLog 中获取消息内容
final GetMessageResult getMessageResult = this.brokerController.getMessageStore().getMessage(
requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(),
requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);
if (getMessageResult != null) {
// 省略一大堆的校验过程
...
switch (response.getCode()) {
// 表示消息可以处理,这里会把消息内容写入到 response 中
case ResponseCode.SUCCESS:
...
// 处理消息消息内容,就是把消息从 getMessageResult 读出来,放到 response 中
if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) {
final long beginTimeMills = this.brokerController.getMessageStore().now();
// 将消息内容转为byte数组
final byte[] r = this.readGetMessageResult(getMessageResult,
requestHeader.getConsumerGroup(), requestHeader.getTopic(),
requestHeader.getQueueId());
...
response.setBody(r);
} else {
try {
// 消息转换
FileRegion fileRegion = new ManyMessageTransfer(response.encodeHeader(
getMessageResult.getBufferTotalSize()), getMessageResult);
channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {
...
});
} catch (Throwable e) {
...
}
response = null;
}
break;
// 未找到满足条件的消息
case ResponseCode.PULL_NOT_FOUND:
// 如果支持挂起,就挂起当前请求
if (brokerAllowSuspend && hasSuspendFlag) {
...
PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
this.brokerController.getMessageStore().now(), offset, subscriptionData,
messageFilter);
// 没有找到相关的消息,挂起操作
this.brokerController.getPullRequestHoldService()
.suspendPullRequest(topic, queueId, pullRequest);
response = null;
break;
}
// 省略其他类型的处理
...
break;
default:
assert false;
}
} else {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("store getMessage return null");
}
...
return response;
}
在源码中,这个方法也是非常长,这里我抹去了各种细枝末节,仅留下了一些重要的流程,整个处理流程如下:
权限校验: rocketMq
可以设置校验信息,以阻挡非法客户端的连接,同时也可以设置客户端的发布、订阅权限,细节度控制访问权限;获取订阅组、订阅主题等,这块主要是通过请求消息里的内容获取 broker
中对应的记录创建过滤组件: consumer
在订阅消息时,可以对订阅的消息进行过滤,过滤方法有两种:tag
与sql92
获取消息:先是根据 topic
与queueId
获取ConsumerQueue
文件,根据ConsumerQueue
文件的信息,从CommitLog
中获取消息内容,消息的过滤操作也是发生在这一步转换消息:如果获得了消息,就是把具体的消息内容,复制到 reponse
中挂起请求:如果没获得消息,而当前请求又支持挂起,就挂起当前请求
以上代码还是比较清晰的,相关流程代码中都作了注释。
以上流程就是整个消息的获取流程了,在本文中,我们仅关注与获取消息相关的步骤,重点关注以下两个操作:
获取消息 挂起请求
2. 获取消息
获取消息的方法为DefaultMessageStore#getMessage
,代码如下:
public GetMessageResult getMessage(final String group, final String topic, final int queueId,
final long offset, final int maxMsgNums, final MessageFilter messageFilter) {
// 省略一些判断
...
// 根据topic与queueId一个ConsumeQueue,consumeQueue记录的是消息在commitLog的位置
ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
if (consumeQueue != null) {
minOffset = consumeQueue.getMinOffsetInQueue();
maxOffset = consumeQueue.getMaxOffsetInQueue();
if (...) {
// 判断 offset 是否符合要求
...
} else {
// 从 consumerQueue 文件中获取消息
SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);
if (bufferConsumeQueue != null) {
...
for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount;
i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
// 省略一大堆的消息过滤操作
...
// 从 commitLong 获取消息
SelectMappedBufferResult selectResult
= this.commitLog.getMessage(offsetPy, sizePy);
if (null == selectResult) {
if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.MESSAGE_WAS_REMOVING;
}
nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);
continue;
}
// 省略一大堆的消息过滤操作
...
}
}
} else {
status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE;
nextBeginOffset = nextOffsetCorrection(offset, 0);
}
if (GetMessageStatus.FOUND == status) {
this.storeStatsService.getGetMessageTimesTotalFound().incrementAndGet();
} else {
this.storeStatsService.getGetMessageTimesTotalMiss().incrementAndGet();
}
long elapsedTime = this.getSystemClock().now() - beginTime;
this.storeStatsService.setGetMessageEntireTimeMax(elapsedTime);
getResult.setStatus(status);
// 又是处理 offset
getResult.setNextBeginOffset(nextBeginOffset);
getResult.setMaxOffset(maxOffset);
getResult.setMinOffset(minOffset);
return getResult;
}
这个方法不是比较长的,这里仅保留了关键流程,获取消息的关键流程如下:
根据 topic
与queueId
找到ConsumerQueue
从 ConsumerQueue
对应的文件中获取消息信息,如tag
的hashCode
、消息在commitLog
中的位置信息根据位置信息,从 commitLog
中获取完整的消息
经过以上步骤,消息就能获取到了,不过在获取消息的前后,会进行消息过滤操作,即根据tag
或sql
语法来过滤消息,关于消息过滤的一些细节,我们留到后面消息过滤相关章节作进一步分析。
3. 挂起请求:PullRequestHoldService#suspendPullRequest
当broker
无新消息时,consumer
拉取消息的请求就会挂起,方法为PullRequestHoldService#suspendPullRequest
:
public class PullRequestHoldService extends ServiceThread {
private ConcurrentMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable =
new ConcurrentHashMap<String, ManyPullRequest>(1024);
public void suspendPullRequest(final String topic, final int queueId,
final PullRequest pullRequest) {
String key = this.buildKey(topic, queueId);
ManyPullRequest mpr = this.pullRequestTable.get(key);
if (null == mpr) {
mpr = new ManyPullRequest();
ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr);
if (prev != null) {
mpr = prev;
}
}
mpr.addPullRequest(pullRequest);
}
...
}
在suspendPullRequest
方法中,所做的工作仅是把当前请求放入pullRequestTable
中了。从代码中可以看到,pullRequestTable
是一个ConcurrentMap
,key
是 topic@queueId
,value
就是挂起的请求了。
请求挂起后,何时处理呢?这就是PullRequestHoldService
线程的工作了。
3.1 处理挂起请求的线程:PullRequestHoldService
看完PullRequestHoldService#suspendPullRequest
方法后,我们再来看看PullRequestHoldService
。
PullRequestHoldService
是ServiceThread
的子类(上一次看到ServiceThread
的子类还是ReputMessageService
),它也会启动一个新线程来处理挂起操作。
我们先来看看它是在哪里启动PullRequestHoldService
的线程的,在BrokerController
的启动方法start()
中有这么一行:
BrokerController#start
public void start() throws Exception {
...
if (this.pullRequestHoldService != null) {
this.pullRequestHoldService.start();
}
...
}
这里就是启动pullRequestHoldService
的线程操作了。
为了探究这个线程做了什么,我们进入PullRequestHoldService#run
方法:
@Override
public void run() {
log.info("{} service started", this.getServiceName());
while (!this.isStopped()) {
try {
// 等待中
if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
this.waitForRunning(5 * 1000);
} else {
this.waitForRunning(
this.brokerController.getBrokerConfig().getShortPollingTimeMills());
}
long beginLockTimestamp = this.systemClock.now();
// 检查操作
this.checkHoldRequest();
long costTime = this.systemClock.now() - beginLockTimestamp;
if (costTime > 5 * 1000) {
log.info("[NOTIFYME] check hold request cost {} ms.", costTime);
}
} catch (Throwable e) {
log.warn(this.getServiceName() + " service has exception. ", e);
}
}
log.info("{} service end", this.getServiceName());
}
从代码来看,这个线程先是进行等待,然后调用PullRequestHoldService#checkHoldRequest
方法,看来关注就是这个方法了,它的代码如下:
private void checkHoldRequest() {
for (String key : this.pullRequestTable.keySet()) {
String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);
if (2 == kArray.length) {
String topic = kArray[0];
int queueId = Integer.parseInt(kArray[1]);
final long offset = this.brokerController.getMessageStore()
.getMaxOffsetInQueue(topic, queueId);
try {
// 调用notifyMessageArriving方法操作
this.notifyMessageArriving(topic, queueId, offset);
} catch (Throwable e) {
log.error(...);
}
}
}
}
这个方法调用了PullRequestHoldService#notifyMessageArriving(...)
,我们继续进入:
public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset) {
// 继续调用
notifyMessageArriving(topic, queueId, maxOffset, null, 0, null, null);
}
/**
* 这个方法就是最终调用的了
*/
public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset,
final Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
String key = this.buildKey(topic, queueId);
ManyPullRequest mpr = this.pullRequestTable.get(key);
if (mpr != null) {
List<PullRequest> requestList = mpr.cloneListAndClear();
if (requestList != null) {
List<PullRequest> replayList = new ArrayList<PullRequest>();
for (PullRequest request : requestList) {
// 判断是否有新消息到达,要根据 comsumerQueue 的偏移量与request的偏移量判断
long newestOffset = maxOffset;
if (newestOffset <= request.getPullFromThisOffset()) {
newestOffset = this.brokerController.getMessageStore()
.getMaxOffsetInQueue(topic, queueId);
}
if (newestOffset > request.getPullFromThisOffset()) {
boolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode,
new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap));
if (match && properties != null) {
match = request.getMessageFilter().isMatchedByCommitLog(null, properties);
}
if (match) {
try {
// 唤醒操作
this.brokerController.getPullMessageProcessor()
.executeRequestWhenWakeup(request.getClientChannel(),
request.getRequestCommand());
} catch (Throwable e) {
log.error("execute request when wakeup failed.", e);
}
continue;
}
}
// 超时时间到了
if (System.currentTimeMillis() >=
(request.getSuspendTimestamp() + request.getTimeoutMillis())) {
try {
// 唤醒操作
this.brokerController.getPullMessageProcessor()
.executeRequestWhenWakeup(request.getClientChannel(),
request.getRequestCommand());
} catch (Throwable e) {
log.error("execute request when wakeup failed.", e);
}
continue;
}
replayList.add(request);
}
if (!replayList.isEmpty()) {
mpr.addPullRequest(replayList);
}
}
}
}
这个方法就是用来检查是否有新消息送达的操作了,方法虽然有点长,但可以用一句话来总结:如果有新消息送达,或者pullRquest
hold
住的时间到了,就唤醒pullRquest
(即调用PullMessageProcessor#executeRequestWhenWakeup
方法)。
在判断是否有新消息送达时,会获取 comsumerQueue
文件中的最大偏移量,与当前pullRquest
中的偏移量进行比较,如果前者大,就表示有新消息送达了,需要唤醒pullRquest
前面说过,当 consumer
请求没获取到消息时,broker
会hold
这个请求一段时间(30s),当这个时间到了,也会唤醒pullRquest
,之后就不会再hold
住它了
3.2 唤醒请求:PullMessageProcessor#executeRequestWhenWakeup
我们再来看看 PullMessageProcessor#executeRequestWhenWakeup
方法:
public void executeRequestWhenWakeup(final Channel channel,
final RemotingCommand request) throws RemotingCommandException {
// 关注 Runnable#run() 方法即可
Runnable run = new Runnable() {
@Override
public void run() {
try {
// 再一次调用 PullMessageProcessor#processRequest(...) 方法
final RemotingCommand response = PullMessageProcessor.this
.processRequest(channel, request, false);
...
} catch (RemotingCommandException e1) {
log.error("excuteRequestWhenWakeup run", e1);
}
}
};
// 提交任务
this.brokerController.getPullMessageExecutor()
.submit(new RequestTask(run, channel, request));
}
这个方法准备了一个任务,然后将其提交到线程池中执行,任务内容很简单,仅是调用了PullMessageProcessor#processRequest(...)
方法,这个方法就是本节一始提到的处理consumer
拉取消息的方法了。
3.3 消息分发中唤醒consumer
请求
在分析消息分发流程时,DefaultMessageStore.ReputMessageService#doReput
方法中有这么一段:
private void doReput() {
...
// 分发消息
DefaultMessageStore.this.doDispatch(dispatchRequest);
// 长轮询:如果有消息到了主节点,并且开启了长轮询
if (BrokerRole.SLAVE != DefaultMessageStore.this
.getMessageStoreConfig().getBrokerRole()
&&DefaultMessageStore.this.brokerConfig.isLongPollingEnable()){
// 调用NotifyMessageArrivingListener的arriving方法
DefaultMessageStore.this.messageArrivingListener.arriving(
dispatchRequest.getTopic(),
dispatchRequest.getQueueId(),
dispatchRequest.getConsumeQueueOffset() + 1,
dispatchRequest.getTagsCode(),
dispatchRequest.getStoreTimestamp(),
dispatchRequest.getBitMap(),
dispatchRequest.getPropertiesMap());
}
...
}
这段就是用来主动唤醒hold
住的consumer
请求的,我们进入NotifyMessageArrivingListener#arriving
方法:
@Override
public void arriving(String topic, int queueId, long logicOffset, long tagsCode,
long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
this.pullRequestHoldService.notifyMessageArriving(topic, queueId, logicOffset, tagsCode,
msgStoreTime, filterBitMap, properties);
}
最终它也是调用了 PullRequestHoldService#notifyMessageArriving(...)
方法。
4. 总结
本文主要分析了broker
处理PULL_MESSAGE
请求的流程,总结如下:
broker
处理PULL_MESSAGE
的processor
为PullMessageProcessor
,PullMessageProcessor
的processRequest(...)
就是整个消息获取流程了broker
在获取消息时,先根据请求的topic
与queueId
找到consumerQueue
,然后根据请求中的offset
参数从consumerQueue
文件中找到消息在commitLog
的位置信息,最后根据位置信息从commitLog
中获取消息内容如果 broker
中没有当前consumerQueue
的消息,broker
会挂起当前线程,直到超时(默认30s)或收到新的消息时再唤醒
限于作者个人水平,文中难免有错误之处,欢迎指正!原创不易,商业转载请联系作者获得授权,非商业转载请注明出处。
本文首发于微信公众号 Java技术探秘,如果您喜欢本文,欢迎关注该公众号,让我们一起在技术的世界里探秘吧!
- END -