Rocketmq源码分析13:consumer 消费偏移量
注:本系列源码分析基于RocketMq 4.8.0,gitee仓库链接:https://gitee.com/funcy/rocketmq.git.
接上文,继续分析consumer
消费流程。
6. 不重复消费消息:消费位置的偏移量
rocketMq
的消费者如何保证不重复消费消息呢?答应就在于偏移量!consumer
在拉取消息时,会先获取偏移量信息,然后拉消息时会带上这个偏移量,之后broker
则会根据这个偏移量,获取对应的消息返回给consumer
。
6.1 偏移量的存储初始化
处理偏移量存储的接口为OffsetStore
,它有两个实现类:
LocalFileOffsetStore
:本地文件存储,即存储在本地文件中RemoteBrokerOffsetStore
:远程broker
存储,即存储在远程broker
中
这个接口的初始化在DefaultMQPushConsumerImpl#start
方法中进行:
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
...
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
// 消息模式:广播模式存在本地,集群模式存在远程(broker)
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup());
break;
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
// 加载消费信息的偏移量
this.offsetStore.load();
...
}
...
}
这个方法中,与偏移量
相关的操作有两个:
初始化:根据消息模式的不同,而初始化不同的 OffsetStore
,简单来说,广播模式下,偏移量存储在本地,集群模式下,偏移量存储在远程broker
加载偏移量信息
这里分别来看看两者的加载操作:
LocalFileOffsetStore
的加载:
@Override
public void load() throws MQClientException {
// 读取本地文件
OffsetSerializeWrapper offsetSerializeWrapper = this.readLocalOffset();
if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) {
offsetTable.putAll(offsetSerializeWrapper.getOffsetTable());
// 加载每个队列的偏移量
for (MessageQueue mq : offsetSerializeWrapper.getOffsetTable().keySet()) {
AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq);
}
}
}
private OffsetSerializeWrapper readLocalOffset() throws MQClientException {
String content = null;
try {
// 读取文件操作,将文件内容转为String
content = MixAll.file2String(this.storePath);
} catch (IOException e) {
log.warn("Load local offset store file exception", e);
}
if (null == content || content.length() == 0) {
// 读取 bak 文件
return this.readLocalOffsetBak();
} else {
OffsetSerializeWrapper offsetSerializeWrapper = null;
try {
offsetSerializeWrapper =
OffsetSerializeWrapper.fromJson(content, OffsetSerializeWrapper.class);
} catch (Exception e) {
log.warn("readLocalOffset Exception, and try to correct", e);
return this.readLocalOffsetBak();
}
return offsetSerializeWrapper;
}
}
可以看到,这种操作下,仅仅只是读取本地文件而已。
再来看看RemoteBrokerOffsetStore
的加载:
@Override
public void load() {
}
远程broker
存储时,啥也没做。
6.2 偏移量持久化
偏移量持久化是在定时任务中进行的,定时任务的启动方法为MQClientInstance#startScheduledTask
:
private void startScheduledTask() {
// 持久化消费者的消费偏移量,每5秒执行一次
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
}
处理持久化操作的方法为OffsetStore#persistAll
,我们先来看看LocalFileOffsetStore
的持久化操作:
public void persistAll(Set<MessageQueue> mqs) {
if (null == mqs || mqs.isEmpty())
return;
OffsetSerializeWrapper offsetSerializeWrapper = new OffsetSerializeWrapper();
for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
if (mqs.contains(entry.getKey())) {
AtomicLong offset = entry.getValue();
offsetSerializeWrapper.getOffsetTable().put(entry.getKey(), offset);
}
}
String jsonString = offsetSerializeWrapper.toJson(true);
if (jsonString != null) {
try {
// 保存到文件
MixAll.string2File(jsonString, this.storePath);
} catch (IOException e) {
log.error("persistAll consumer offset Exception, " + this.storePath, e);
}
}
}
这个操作比较简单,就只是将偏移量信息写入到文件中。
再来看看RemoteBrokerOffsetStore
的操作:
public void persistAll(Set<MessageQueue> mqs) {
if (null == mqs || mqs.isEmpty())
return;
final HashSet<MessageQueue> unusedMQ = new HashSet<MessageQueue>();
for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
MessageQueue mq = entry.getKey();
AtomicLong offset = entry.getValue();
if (offset != null) {
if (mqs.contains(mq)) {
try {
// 更新偏移量信息到broker
this.updateConsumeOffsetToBroker(mq, offset.get());
log.info(...);
} catch (Exception e) {
log.error(...);
}
} else {
unusedMQ.add(mq);
}
}
}
if (!unusedMQ.isEmpty()) {
for (MessageQueue mq : unusedMQ) {
this.offsetTable.remove(mq);
log.info("remove unused mq, {}, {}", mq, this.groupName);
}
}
}
在RemoteBrokerOffsetStore#persistAll
方法中,会调用this.updateConsumeOffsetToBroker(...)
将持久化信息提交到broker
上,更新的操作方法为 MQClientAPIImpl#updateConsumerOffset
:
public void updateConsumerOffset(
final String addr,
final UpdateConsumerOffsetRequestHeader requestHeader,
final long timeoutMillis
) throws RemotingException, MQBrokerException, InterruptedException {
// 更新偏移量的请求 code 为 UPDATE_CONSUMER_OFFSET
RemotingCommand request = RemotingCommand.createRequestCommand(
RequestCode.UPDATE_CONSUMER_OFFSET, requestHeader);
// 执行 netty 请求
RemotingCommand response = this.remotingClient.invokeSync(
MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return;
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}
broker
收到请求后,又是如何处理的呢?这里我们根据UPDATE_CONSUMER_OFFSET
找到处理该code
的Processor
为ConsumerManageProcessor
,它的ConsumerManageProcessor
如下:
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
switch (request.getCode()) {
case RequestCode.GET_CONSUMER_LIST_BY_GROUP:
return this.getConsumerListByGroup(ctx, request);
case RequestCode.UPDATE_CONSUMER_OFFSET:
return this.updateConsumerOffset(ctx, request);
case RequestCode.QUERY_CONSUMER_OFFSET:
return this.queryConsumerOffset(ctx, request);
default:
break;
}
return null;
}
这个方法中会处理三种类型的请求:
GET_CONSUMER_LIST_BY_GROUP
:获取指定消费组下的所有消费者UPDATE_CONSUMER_OFFSET
:更新消费位置的偏移量QUERY_CONSUMER_OFFSET
:查询消费位置的偏移量
这里我们只关注更新消费位置的偏移量操作,进入ConsumerManageProcessor#updateConsumerOffset
方法:
private RemotingCommand updateConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
final RemotingCommand response =
RemotingCommand.createResponseCommand(UpdateConsumerOffsetResponseHeader.class);
final UpdateConsumerOffsetRequestHeader requestHeader =
(UpdateConsumerOffsetRequestHeader) request
.decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class);
// 继续处理
this.brokerController.getConsumerOffsetManager().commitOffset(
RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getConsumerGroup(),
requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
继续跟进,最终来到ConsumerOffsetManager
类:
public class ConsumerOffsetManager extends ConfigManager {
...
/**
* 存放偏移量的map
*/
private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable =
new ConcurrentHashMap<String, ConcurrentMap<Integer, Long>>(512);
/**
* 处理保存操作
*/
public void commitOffset(final String clientHost, final String group, final String topic,
final int queueId,
final long offset) {
// topic@group
String key = topic + TOPIC_GROUP_SEPARATOR + group;
// 继续调用重载方法
this.commitOffset(clientHost, key, queueId, offset);
}
/**
* 最终处理的地方
*/
private void commitOffset(final String clientHost, final String key, final int queueId,
final long offset) {
ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);
if (null == map) {
map = new ConcurrentHashMap<Integer, Long>(32);
map.put(queueId, offset);
this.offsetTable.put(key, map);
} else {
Long storeOffset = map.put(queueId, offset);
if (storeOffset != null && offset < storeOffset) {
log.warn(...);
}
}
}
...
ConsumerOffsetManager
类就是用来处理偏移量的存储的,它使用一个ConcurrentMap
来保存消费信息的偏移量,key
为topic@group
,value
为消费位置的偏移量。
从ConsumerOffsetManager
来看,偏移量仅仅只是保存在了内存中,这也就是说,如果整个broker
集群停机了,然后再重启,消费位置的偏移量就没有了。
6.3 偏移量的获取
在consumer
拉取消息时,在最初准备pullRequest
对象时,会加载消费信息的偏移量,方法为RebalanceImpl#updateProcessQueueTableInRebalance
:
private boolean updateProcessQueueTableInRebalance(final String topic,
final Set<MessageQueue> mqSet, final boolean isOrder) {
...
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
for (MessageQueue mq : mqSet) {
if (!this.processQueueTable.containsKey(mq)) {
...
this.removeDirtyOffset(mq);
ProcessQueue pq = new ProcessQueue();
// 计算消费位置的偏移量
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) {
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) {
log.info(...);
} else {
log.info(...);
// 添加 pullRequest
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
// 设置偏移量
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;
}
} else {
log.warn(...);
}
}
}
// 发布
this.dispatchPullRequest(pullRequestList);
return changed;
}
获取消费位置的偏移量的代码为
// 计算消费位置的偏移量
long nextOffset = this.computePullFromWhere(mq);
调用的方法为RebalancePushImpl#computePullFromWhere
,进入其中:
public long computePullFromWhere(MessageQueue mq) {
long result = -1;
final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl
.getDefaultMQPushConsumer().getConsumeFromWhere();
// 获取 offsetStore
final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore();
switch (consumeFromWhere) {
case CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST:
case CONSUME_FROM_MIN_OFFSET:
case CONSUME_FROM_MAX_OFFSET:
case CONSUME_FROM_LAST_OFFSET: {
// 读取操作
long lastOffset = offsetStore.readOffset(
mq, ReadOffsetType.READ_FROM_STORE);
// 省略各种判断
...
break;
}
case CONSUME_FROM_FIRST_OFFSET: {
long lastOffset = offsetStore.readOffset(
mq, ReadOffsetType.READ_FROM_STORE);
// 省略各种判断
...
break;
}
case CONSUME_FROM_TIMESTAMP: {
long lastOffset = offsetStore.readOffset(
mq, ReadOffsetType.READ_FROM_STORE);
// 省略各种判断
...
break;
}
default:
break;
}
return result;
}
读取操作就是在这里进行的,我们直接看看本地存储的读取与远程存储的读取:
本地文件存储,就是直接读取本地文件,进入LocalFileOffsetStore#readOffset
方法:
public long readOffset(final MessageQueue mq, final ReadOffsetType type) {
if (mq != null) {
switch (type) {
case MEMORY_FIRST_THEN_STORE:
case READ_FROM_MEMORY: {
AtomicLong offset = this.offsetTable.get(mq);
if (offset != null) {
return offset.get();
} else if (ReadOffsetType.READ_FROM_MEMORY == type) {
return -1;
}
}
case READ_FROM_STORE: {
OffsetSerializeWrapper offsetSerializeWrapper;
try {
// 读取本地文件
offsetSerializeWrapper = this.readLocalOffset();
} catch (MQClientException e) {
return -1;
}
if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) {
AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq);
if (offset != null) {
this.updateOffset(mq, offset.get(), false);
return offset.get();
}
}
}
default:
break;
}
}
return -1;
}
/**
* 读取文件的操作
*/
private OffsetSerializeWrapper readLocalOffset() throws MQClientException {
String content = null;
try {
// 将文件内容转化为字符串
content = MixAll.file2String(this.storePath);
} catch (IOException e) {
log.warn("Load local offset store file exception", e);
}
if (null == content || content.length() == 0) {
return this.readLocalOffsetBak();
} else {
OffsetSerializeWrapper offsetSerializeWrapper = null;
try {
offsetSerializeWrapper =
OffsetSerializeWrapper.fromJson(content, OffsetSerializeWrapper.class);
} catch (Exception e) {
log.warn("readLocalOffset Exception, and try to correct", e);
return this.readLocalOffsetBak();
}
return offsetSerializeWrapper;
}
}
再来看看从远程broker
是如何获取的,进入RemoteBrokerOffsetStore#readOffset
方法:
public long readOffset(final MessageQueue mq, final ReadOffsetType type) {
if (mq != null) {
switch (type) {
case MEMORY_FIRST_THEN_STORE:
case READ_FROM_MEMORY: {
AtomicLong offset = this.offsetTable.get(mq);
if (offset != null) {
return offset.get();
} else if (ReadOffsetType.READ_FROM_MEMORY == type) {
return -1;
}
}
case READ_FROM_STORE: {
try {
// 从broker中获取
long brokerOffset = this.fetchConsumeOffsetFromBroker(mq);
AtomicLong offset = new AtomicLong(brokerOffset);
this.updateOffset(mq, offset.get(), false);
return brokerOffset;
}
// No offset in broker
catch (MQBrokerException e) {
return -1;
}
//Other exceptions
catch (Exception e) {
log.warn("fetchConsumeOffsetFromBroker exception, " + mq, e);
return -2;
}
}
default:
break;
}
}
return -1;
}
/**
* 继续获取操作
*/
private long fetchConsumeOffsetFromBroker(MessageQueue mq) throws RemotingException,
MQBrokerException, InterruptedException, MQClientException {
// 找到对应的 broker,只从对应的broker上获取
FindBrokerResult findBrokerResult = this.mQClientFactory
.findBrokerAddressInAdmin(mq.getBrokerName());
if (null == findBrokerResult) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
}
if (findBrokerResult != null) {
QueryConsumerOffsetRequestHeader requestHeader = new QueryConsumerOffsetRequestHeader();
requestHeader.setTopic(mq.getTopic());
requestHeader.setConsumerGroup(this.groupName);
requestHeader.setQueueId(mq.getQueueId());
// 从broker查询
return this.mQClientFactory.getMQClientAPIImpl().queryConsumerOffset(
findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
} else {
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
}
查询操作在MQClientAPIImpl#queryConsumerOffset
方法中,进入:
public long queryConsumerOffset(
final String addr,
final QueryConsumerOffsetRequestHeader requestHeader,
final long timeoutMillis
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(
RequestCode.QUERY_CONSUMER_OFFSET, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(
MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
// 处理结果
QueryConsumerOffsetResponseHeader responseHeader =
(QueryConsumerOffsetResponseHeader) response.decodeCommandCustomHeader(
QueryConsumerOffsetResponseHeader.class);
return responseHeader.getOffset();
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}
收到该消息后,broker
会如何应答呢?
ConsumerManageProcessor#processRequest
方法如下:
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
switch (request.getCode()) {
case RequestCode.GET_CONSUMER_LIST_BY_GROUP:
return this.getConsumerListByGroup(ctx, request);
case RequestCode.UPDATE_CONSUMER_OFFSET:
return this.updateConsumerOffset(ctx, request);
case RequestCode.QUERY_CONSUMER_OFFSET:
return this.queryConsumerOffset(ctx, request);
default:
break;
}
return null;
}
处理查询操作在ConsumerManageProcessor#queryConsumerOffset
方法中:
private RemotingCommand queryConsumerOffset(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response =
RemotingCommand.createResponseCommand(QueryConsumerOffsetResponseHeader.class);
final QueryConsumerOffsetResponseHeader responseHeader =
(QueryConsumerOffsetResponseHeader) response.readCustomHeader();
final QueryConsumerOffsetRequestHeader requestHeader =
(QueryConsumerOffsetRequestHeader) request
.decodeCommandCustomHeader(QueryConsumerOffsetRequestHeader.class);
// 查询操作
long offset =
this.brokerController.getConsumerOffsetManager().queryOffset(
requestHeader.getConsumerGroup(), requestHeader.getTopic(),
requestHeader.getQueueId());
...
return response;
}
最终调用ConsumerOffsetManager#queryOffset(...)
方法完成查询操作:
public long queryOffset(final String group, final String topic, final int queueId) {
// topic@group
String key = topic + TOPIC_GROUP_SEPARATOR + group;
ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);
if (null != map) {
Long offset = map.get(queueId);
if (offset != null)
return offset;
}
return -1;
}
其实就是从ConsumerOffsetManager
的成员变量offsetTable
中获取数据。
限于作者个人水平,文中难免有错误之处,欢迎指正!原创不易,商业转载请联系作者获得授权,非商业转载请注明出处。
本文首发于微信公众号 「Java技术探秘」,如果您喜欢本文,欢迎关注该公众号,让我们一起在技术的世界里探秘吧!