Rocketmq源码分析15:延迟消息
注:本系列源码分析基于RocketMq 4.8.0,gitee仓库链接:https://gitee.com/funcy/rocketmq.git.
rocketmq
支持延迟消息,本文我们将从源码角度分析延迟消息的实现原理。
1. demo 准备
延迟消息的demo在org.apache.rocketmq.example.delay
包下,发送消息的producer
如下:
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
String nameServer = "localhost:9876";
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr(nameServer);
producer.start();
for (int i = 0; i < 1; i++)
try {
{
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
// delayLevel=1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
// delayTime =1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
// 设置延迟延迟级别
msg.setDelayTimeLevel(5);
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
}
rocketmq
在实现延迟消息时,会准备18个延迟级别,这些级别对应的延迟时间如下:
1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 | 11 | 12 | 13 | 14 | 15 | 16 | 17 | 18 |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
1s | 5s | 10s | 30s | 1m | 2m | 3m | 4m | 5m | 6m | 7m | 8m | 9m | 10m | 20m | 30m | 1h | 2h |
在发送延迟消息时,需要指定消息的延迟级别:
msg.setDelayTimeLevel(5);
这里指定的延迟级别为5,即延迟1分钟后发送。
2. 延迟消息的存储
延迟消息与普通消息的发送并无太多差别,不过在broker
在存储延迟消息时,会做一些额外的处理,进入CommitLog#asyncPutMessage
方法:
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
// 消息的存储时间
msg.setStoreTimestamp(System.currentTimeMillis());
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
AppendMessageResult result = null;
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
String topic = msg.getTopic();
int queueId = msg.getQueueId();
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// 延迟消息
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore
.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(
this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
// 指定延迟消息对应的topic
topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
// 延迟级别对应的队列,即每个延迟级别都对应一条队列
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// 原始的topic与queueId
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID,
String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
msg.setTopic(topic);
msg.setQueueId(queueId);
}
}
// 省略消息写入 commitLog 的操作
...
}
在延迟消息写入前,会做一些特别处理,其实就是将消息的topic
与queueId
修改为延迟消息专用的topic
与queueId
。
获取延迟队列的方法为ScheduleMessageService#delayLevel2QueueId
,代码如下:
public static int delayLevel2QueueId(final int delayLevel) {
return delayLevel - 1;
}
这里的delayLevel
,就对应前面提到的18个延迟级别,这也就是说,每个延迟级别的消息都会有一个专门队列来存储。这样存储有何好处呢?最大的好处就是避免了排序,举个简单的例子:上午10:00broker
收到了一条延迟消息1
,延迟级别为5;然后在10:02又收到了一条延迟消息2
,延迟级别也为5,由于延迟级别相同,他们会存储在同一条队列中.
由于队列天生有序,入队时间先按送达broker
的时间先后进行排序,而同一队列上延迟时间也相同,因此延迟消息1一定会在延迟消息2前进行消消费,后面如果有消息再进入该队列中,也会按照先进先出
的方式进行消费。
3. 延迟消息的投递
上一节分析了延迟消息的存储,本节我们来分析延迟消息的消费。
延迟消息存储到队列后,会有一个专门的线程定期扫描这些队列,找到满足消费时间的消息,然后将其投递到真正的topic
与queueId
中,这样这条消息就能被consumer
消息了。
处理延迟队列扫描的线程为scheduleMessageService
,它在DefaultMessageStore#start
方法中启动:
public void start() throws Exception {
...
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
this.haService.start();
// 这里处理延迟消息
this.handleScheduleMessageService(messageStoreConfig.getBrokerRole());
}
...
}
继续跟进,进入DefaultMessageStore#handleScheduleMessageService
方法:
@Override
public void handleScheduleMessageService(final BrokerRole brokerRole) {
if (this.scheduleMessageService != null) {
if (brokerRole == BrokerRole.SLAVE) {
this.scheduleMessageService.shutdown();
} else {
// 启动
this.scheduleMessageService.start();
}
}
}
继续跟进,进入ScheduleMessageService#start
方法:
/**
* 延迟消息服务的启动方式
*/
public void start() {
// CAS 锁机制保证必须 shutdown 后才能再次start
if (started.compareAndSet(false, true)) {
this.timer = new Timer("ScheduleMessageTimerThread", true);
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
Long offset = this.offsetTable.get(level);
if (null == offset) {
offset = 0L;
}
if (timeDelay != null) {
// 定时执行延迟消息处理任务
this.timer.schedule(
new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
}
}
// 每隔10s,将延迟消息的相关信息持久化到硬盘中
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
if (started.get()) ScheduleMessageService.this.persist();
} catch (Throwable e) {
log.error("scheduleAtFixedRate flush exception", e);
}
}
}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
}
}
在这个线程中,主要做了两件事:
遍历所有的延迟级别,为每个延迟级别在延迟 FIRST_DELAY_TIME
毫秒后就处理延迟消息的投递操作开启执久化定时任务:定时将延迟消息的相关信息持久化到硬盘中
3.1 投递操作
处理延迟消息的投递任务为DeliverDelayedMessageTimerTask#run
方法,代码如下:
public void run() {
try {
if (isStarted()) {
this.executeOnTimeup();
}
} catch (Exception e) {
// XXX: warn and notify me
log.error("ScheduleMessageService, executeOnTimeup exception", e);
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);
}
}
在这个方法中,调用了executeOnTimeup()
方法继续操作,我们再进入ScheduleMessageService.DeliverDelayedMessageTimerTask#executeOnTimeup
方法:
public void executeOnTimeup() {
// 获得一条队列
ConsumeQueue cq =
ScheduleMessageService.this.defaultMessageStore
.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
delayLevel2QueueId(delayLevel));
long failScheduleOffset = offset;
if (cq != null) {
SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
if (bufferCQ != null) {
try {
long nextOffset = offset;
int i = 0;
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
long offsetPy = bufferCQ.getByteBuffer().getLong();
int sizePy = bufferCQ.getByteBuffer().getInt();
long tagsCode = bufferCQ.getByteBuffer().getLong();
if (cq.isExtAddr(tagsCode)) {
if (cq.getExt(tagsCode, cqExtUnit)) {
tagsCode = cqExtUnit.getTagsCode();
} else {
log.error(...);
// 1. 消息的写入的时间
long msgStoreTime = defaultMessageStore.getCommitLog()
.pickupStoreTimestamp(offsetPy, sizePy);
// 2. 计算投递时间,投递时间 = 消息写入时间 + 延迟级别对应的时间
tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
}
}
// 处理投递时间,保证投递时间必须小于(当前时间 + 延迟级别对应的时间)
long now = System.currentTimeMillis();
long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
long countdown = deliverTimestamp - now;
// 小于等于0,表示消费需要投递
if (countdown <= 0) {
MessageExt msgExt =
ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
offsetPy, sizePy);
if (msgExt != null) {
try {
MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC
.equals(msgInner.getTopic())) {
log.error(...);
continue;
}
// 3. 投递操作
PutMessageResult putMessageResult =
ScheduleMessageService.this.writeMessageStore
.putMessage(msgInner);
if (putMessageResult != null && putMessageResult
.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
continue;
} else {
// XXX: warn and notify me
log.error(...);
ScheduleMessageService.this.timer.schedule(
new DeliverDelayedMessageTimerTask(this.delayLevel,
nextOffset), DELAY_FOR_A_PERIOD);
ScheduleMessageService.this.updateOffset(this.delayLevel,
nextOffset);
return;
}
} catch (Exception e) {
log.error(...);
}
}
} else {
// 4. 安排下一次执行,执行时间为 countdown 毫秒后
ScheduleMessageService.this.timer.schedule(
new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
countdown);
// 5. 更新偏移量
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
}
}
// 之后再执行
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
// 更新偏移量
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
} finally {
bufferCQ.release();
}
}
else {
long cqMinOffset = cq.getMinOffsetInQueue();
if (offset < cqMinOffset) {
failScheduleOffset = cqMinOffset;
log.error(...);
}
}
}
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
failScheduleOffset), DELAY_FOR_A_WHILE);
}
这个方法虽然有点长,但逻辑很清晰,执行过程如下:
获取消息写入时间,就是写入到 commitLog
的时间计算投递时间,投递时间 = 消息写入时间 + 延迟级别对应的时间,如果当前时间大于等于投递时间,就表示消息需要进行投递操作 如果消息满足投递时间,就进行投递操作,所谓的投递操作,就是将消息写入到真正的 topic
与queueId
的队列中如果当前消息不满足投递时间,就表明该队列上之后的消息也不会投递时间,就计算投递时间与当前时间的差值,这个差值就是下次执行 executeOnTimeup()
方法的时间更新偏移量,就是记录当前队列的消费位置
我们来看看偏移量的更新操作,进入ScheduleMessageService#updateOffset
方法:
public class ScheduleMessageService extends ConfigManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
private static final long FIRST_DELAY_TIME = 1000L;
private static final long DELAY_FOR_A_WHILE = 100L;
private static final long DELAY_FOR_A_PERIOD = 10000L;
/** 延迟级别对应的延迟时间,单位为毫秒 */
private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable =
new ConcurrentHashMap<Integer, Long>(32);
/** 延迟级别对应的偏移量 */
private final ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable =
new ConcurrentHashMap<Integer, Long>(32);
private final DefaultMessageStore defaultMessageStore;
private final AtomicBoolean started = new AtomicBoolean(false);
private Timer timer;
private MessageStore writeMessageStore;
private int maxDelayLevel;
...
/**
* 更新偏移量的操作
*/
private void updateOffset(int delayLevel, long offset) {
this.offsetTable.put(delayLevel, offset);
}
...
}
可以看到,这里的更新偏移量,就是将当前延迟级别消费位置的偏移量添加到offsetTable
中进行保存。
3.2 持久化
让我们回到``ScheduleMessageService#start`方法,这个方法中开启了一个持久化任务:
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
if (started.get()) ScheduleMessageService.this.persist();
} catch (Throwable e) {
log.error("scheduleAtFixedRate flush exception", e);
}
}
}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
该任务会定期执行ConfigManager#persist
方法进行持久化操作:
public synchronized void persist() {
String jsonString = this.encode(true);
if (jsonString != null) {
String fileName = this.configFilePath();
try {
MixAll.string2File(jsonString, fileName);
} catch (IOException e) {
log.error("persist file " + fileName + " exception", e);
}
}
}
这个方法主要进行了两个操作:
调用 this.encode(true)
得到json
字符串将 json
字符串写入到文件中
这个json
字符串是个啥呢?我们进入ScheduleMessageService#encode(boolean)
方法:
public String encode(final boolean prettyFormat) {
DelayOffsetSerializeWrapper delayOffsetSerializeWrapper
= new DelayOffsetSerializeWrapper();
// 这个 offsetTable 就是用来保存消费位置偏移量的
delayOffsetSerializeWrapper.setOffsetTable(this.offsetTable);
return delayOffsetSerializeWrapper.toJson(prettyFormat);
}
从代码来看,这个方法就是将ScheduleMessageService#offsetTable
序列化成json
字符串的, 这个 offsetTable
就是用来保存消费位置偏移量的。由此不难得出这个定时任务的作用:定期将延迟队列的消费位置偏移量持久化到文件中。
4. 总结
RocketMq
支持了18种延迟级别,每个延迟级别对应不同的延迟时间延迟消息对应着一个 topic
,每个延迟级别都对应着该topic
下的一个队列当 broker
收到延迟消息后,会将该消息放入到延迟级别对应的延迟消息中消息投递由定时线程执行,当消息达到投递时间后,会从延迟队列中写入到真正需要投递的队列中
客观来说,开源版 RocketMq 的延迟消息比较简陋,仅支持18种延迟级别,而阿里云版可指定发送时间。
限于作者个人水平,文中难免有错误之处,欢迎指正!原创不易,商业转载请联系作者获得授权,非商业转载请注明出处。
本文首发于微信公众号 「Java技术探秘」,如果您喜欢本文,欢迎关注该公众号,让我们一起在技术的世界里探秘吧!