Rocketmq源码分析13:consumer 消费偏移量

java技术探秘

共 45379字,需浏览 91分钟

 · 2021-04-28

注:本系列源码分析基于RocketMq 4.8.0,gitee仓库链接:https://gitee.com/funcy/rocketmq.git.

接上文,继续分析consumer消费流程。

6. 不重复消费消息:消费位置的偏移量

rocketMq的消费者如何保证不重复消费消息呢?答应就在于偏移量!consumer在拉取消息时,会先获取偏移量信息,然后拉消息时会带上这个偏移量,之后broker则会根据这个偏移量,获取对应的消息返回给consumer

6.1 偏移量的存储初始化

处理偏移量存储的接口为OffsetStore,它有两个实现类:

  • LocalFileOffsetStore:本地文件存储,即存储在本地文件中
  • RemoteBrokerOffsetStore:远程broker存储,即存储在远程broker

这个接口的初始化在DefaultMQPushConsumerImpl#start方法中进行:

public synchronized void start() throws MQClientException {
    switch (this.serviceState) {
        case CREATE_JUST:
            ...

            if (this.defaultMQPushConsumer.getOffsetStore() != null) {
                this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
            } else {
                // 消息模式:广播模式存在本地,集群模式存在远程(broker)
                switch (this.defaultMQPushConsumer.getMessageModel()) {
                    case BROADCASTING:
                        this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, 
                            this.defaultMQPushConsumer.getConsumerGroup());
                        break;
                    case CLUSTERING:
                        this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, 
                            this.defaultMQPushConsumer.getConsumerGroup());
                        break;
                    default:
                        break;
                }
                this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
            }
            // 加载消费信息的偏移量
            this.offsetStore.load();
            ...
    }

    ...
}

这个方法中,与偏移量相关的操作有两个:

  1. 初始化:根据消息模式的不同,而初始化不同的OffsetStore,简单来说,广播模式下,偏移量存储在本地,集群模式下,偏移量存储在远程broker
  2. 加载偏移量信息

这里分别来看看两者的加载操作:

LocalFileOffsetStore的加载:

@Override
public void load() throws MQClientException {
    // 读取本地文件
    OffsetSerializeWrapper offsetSerializeWrapper = this.readLocalOffset();
    if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) {
        offsetTable.putAll(offsetSerializeWrapper.getOffsetTable());
        // 加载每个队列的偏移量
        for (MessageQueue mq : offsetSerializeWrapper.getOffsetTable().keySet()) {
            AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq);
        }
    }
}

private OffsetSerializeWrapper readLocalOffset() throws MQClientException {
    String content = null;
    try {
        // 读取文件操作,将文件内容转为String
        content = MixAll.file2String(this.storePath);
    } catch (IOException e) {
        log.warn("Load local offset store file exception", e);
    }
    if (null == content || content.length() == 0) {
        // 读取 bak 文件
        return this.readLocalOffsetBak();
    } else {
        OffsetSerializeWrapper offsetSerializeWrapper = null;
        try {
            offsetSerializeWrapper =
                OffsetSerializeWrapper.fromJson(content, OffsetSerializeWrapper.class);
        } catch (Exception e) {
            log.warn("readLocalOffset Exception, and try to correct", e);
            return this.readLocalOffsetBak();
        }

        return offsetSerializeWrapper;
    }
}

可以看到,这种操作下,仅仅只是读取本地文件而已。

再来看看RemoteBrokerOffsetStore的加载:

@Override
public void load() {
}

远程broker存储时,啥也没做。

6.2 偏移量持久化

偏移量持久化是在定时任务中进行的,定时任务的启动方法为MQClientInstance#startScheduledTask

private void startScheduledTask() {
    // 持久化消费者的消费偏移量,每5秒执行一次
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {
                MQClientInstance.this.persistAllConsumerOffset();
            } catch (Exception e) {
                log.error("ScheduledTask persistAllConsumerOffset exception", e);
            }
        }
    }, 1000 * 10this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

}

处理持久化操作的方法为OffsetStore#persistAll,我们先来看看LocalFileOffsetStore的持久化操作:

public void persistAll(Set<MessageQueue> mqs) {
    if (null == mqs || mqs.isEmpty())
        return;

    OffsetSerializeWrapper offsetSerializeWrapper = new OffsetSerializeWrapper();
    for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
        if (mqs.contains(entry.getKey())) {
            AtomicLong offset = entry.getValue();
            offsetSerializeWrapper.getOffsetTable().put(entry.getKey(), offset);
        }
    }

    String jsonString = offsetSerializeWrapper.toJson(true);
    if (jsonString != null) {
        try {
            // 保存到文件
            MixAll.string2File(jsonString, this.storePath);
        } catch (IOException e) {
            log.error("persistAll consumer offset Exception, " + this.storePath, e);
        }
    }
}

这个操作比较简单,就只是将偏移量信息写入到文件中。

再来看看RemoteBrokerOffsetStore的操作:

public void persistAll(Set<MessageQueue> mqs) {
    if (null == mqs || mqs.isEmpty())
        return;

    final HashSet<MessageQueue> unusedMQ = new HashSet<MessageQueue>();

    for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
        MessageQueue mq = entry.getKey();
        AtomicLong offset = entry.getValue();
        if (offset != null) {
            if (mqs.contains(mq)) {
                try {
                    // 更新偏移量信息到broker
                    this.updateConsumeOffsetToBroker(mq, offset.get());
                    log.info(...);
                } catch (Exception e) {
                    log.error(...);
                }
            } else {
                unusedMQ.add(mq);
            }
        }
    }

    if (!unusedMQ.isEmpty()) {
        for (MessageQueue mq : unusedMQ) {
            this.offsetTable.remove(mq);
            log.info("remove unused mq, {}, {}", mq, this.groupName);
        }
    }
}

RemoteBrokerOffsetStore#persistAll方法中,会调用this.updateConsumeOffsetToBroker(...)将持久化信息提交到broker上,更新的操作方法为 MQClientAPIImpl#updateConsumerOffset

public void updateConsumerOffset(
    final String addr,
    final UpdateConsumerOffsetRequestHeader requestHeader,
    final long timeoutMillis
)
 throws RemotingException, MQBrokerException, InterruptedException 
{
    // 更新偏移量的请求 code 为 UPDATE_CONSUMER_OFFSET
    RemotingCommand request = RemotingCommand.createRequestCommand(
        RequestCode.UPDATE_CONSUMER_OFFSET, requestHeader);
    // 执行 netty 请求
    RemotingCommand response = this.remotingClient.invokeSync(
        MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
        request, timeoutMillis);
    assert response != null;
    switch (response.getCode()) {
        case ResponseCode.SUCCESS: {
            return;
        }
        default:
            break;
    }

    throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}

broker收到请求后,又是如何处理的呢?这里我们根据UPDATE_CONSUMER_OFFSET找到处理该codeProcessorConsumerManageProcessor,它的ConsumerManageProcessor如下:

@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, 
        RemotingCommand request)
 throws RemotingCommandException 
{
    switch (request.getCode()) {
        case RequestCode.GET_CONSUMER_LIST_BY_GROUP:
            return this.getConsumerListByGroup(ctx, request);
        case RequestCode.UPDATE_CONSUMER_OFFSET:
            return this.updateConsumerOffset(ctx, request);
        case RequestCode.QUERY_CONSUMER_OFFSET:
            return this.queryConsumerOffset(ctx, request);
        default:
            break;
    }
    return null;
}

这个方法中会处理三种类型的请求:

  1. GET_CONSUMER_LIST_BY_GROUP:获取指定消费组下的所有消费者
  2. UPDATE_CONSUMER_OFFSET:更新消费位置的偏移量
  3. QUERY_CONSUMER_OFFSET:查询消费位置的偏移量

这里我们只关注更新消费位置的偏移量操作,进入ConsumerManageProcessor#updateConsumerOffset方法:

private RemotingCommand updateConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request)
    throws RemotingCommandException 
{
    final RemotingCommand response =
        RemotingCommand.createResponseCommand(UpdateConsumerOffsetResponseHeader.class);
    final UpdateConsumerOffsetRequestHeader requestHeader =
        (UpdateConsumerOffsetRequestHeader) request
            .decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class);
    // 继续处理
    this.brokerController.getConsumerOffsetManager().commitOffset(
        RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getConsumerGroup(),
        requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
    response.setCode(ResponseCode.SUCCESS);
    response.setRemark(null);
    return response;
}

继续跟进,最终来到ConsumerOffsetManager类:

public class ConsumerOffsetManager extends ConfigManager {
    ...

    /**
     * 存放偏移量的map
     */

    private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable =
        new ConcurrentHashMap<String, ConcurrentMap<Integer, Long>>(512);

    /**
     * 处理保存操作
     */

    public void commitOffset(final String clientHost, final String group, final String topic, 
            final int queueId,
        final long offset)
 
{
        // topic@group
        String key = topic + TOPIC_GROUP_SEPARATOR + group;
        // 继续调用重载方法
        this.commitOffset(clientHost, key, queueId, offset);
    }

    /**
     * 最终处理的地方
     */

    private void commitOffset(final String clientHost, final String key, final int queueId, 
            final long offset)
 
{
        ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);
        if (null == map) {
            map = new ConcurrentHashMap<Integer, Long>(32);
            map.put(queueId, offset);
            this.offsetTable.put(key, map);
        } else {
            Long storeOffset = map.put(queueId, offset);
            if (storeOffset != null && offset < storeOffset) {
                log.warn(...);
            }
        }
    }

    ...

ConsumerOffsetManager类就是用来处理偏移量的存储的,它使用一个ConcurrentMap来保存消费信息的偏移量,keytopic@groupvalue为消费位置的偏移量。

ConsumerOffsetManager来看,偏移量仅仅只是保存在了内存中,这也就是说,如果整个broker集群停机了,然后再重启,消费位置的偏移量就没有了。

6.3 偏移量的获取

consumer拉取消息时,在最初准备pullRequest对象时,会加载消费信息的偏移量,方法为RebalanceImpl#updateProcessQueueTableInRebalance

private boolean updateProcessQueueTableInRebalance(final String topic, 
        final Set<MessageQueue> mqSet, final boolean isOrder)
 
{
    ...

    List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
    for (MessageQueue mq : mqSet) {
        if (!this.processQueueTable.containsKey(mq)) {
            ...

            this.removeDirtyOffset(mq);
            ProcessQueue pq = new ProcessQueue();
            // 计算消费位置的偏移量
            long nextOffset = this.computePullFromWhere(mq);
            if (nextOffset >= 0) {
                ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
                if (pre != null) {
                    log.info(...);
                } else {
                    log.info(...);
                    // 添加 pullRequest
                    PullRequest pullRequest = new PullRequest();
                    pullRequest.setConsumerGroup(consumerGroup);
                    // 设置偏移量
                    pullRequest.setNextOffset(nextOffset);
                    pullRequest.setMessageQueue(mq);
                    pullRequest.setProcessQueue(pq);
                    pullRequestList.add(pullRequest);
                    changed = true;
                }
            } else {
                log.warn(...);
            }
        }
    }

    // 发布
    this.dispatchPullRequest(pullRequestList);

    return changed;
}

获取消费位置的偏移量的代码为

// 计算消费位置的偏移量
long nextOffset = this.computePullFromWhere(mq);

调用的方法为RebalancePushImpl#computePullFromWhere,进入其中:

public long computePullFromWhere(MessageQueue mq) {
    long result = -1;
    final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl
        .getDefaultMQPushConsumer().getConsumeFromWhere();
    // 获取 offsetStore
    final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore();
    switch (consumeFromWhere) {
        case CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST:
        case CONSUME_FROM_MIN_OFFSET:
        case CONSUME_FROM_MAX_OFFSET:
        case CONSUME_FROM_LAST_OFFSET: {
            // 读取操作
            long lastOffset = offsetStore.readOffset(
                mq, ReadOffsetType.READ_FROM_STORE);
            // 省略各种判断
            ...
            break;
        }
        case CONSUME_FROM_FIRST_OFFSET: {
            long lastOffset = offsetStore.readOffset(
                mq, ReadOffsetType.READ_FROM_STORE);
            // 省略各种判断
            ...
            break;
        }
        case CONSUME_FROM_TIMESTAMP: {
            long lastOffset = offsetStore.readOffset(
                mq, ReadOffsetType.READ_FROM_STORE);
            // 省略各种判断
            ...
            break;
        }

        default:
            break;
    }

    return result;
}

读取操作就是在这里进行的,我们直接看看本地存储的读取与远程存储的读取:

本地文件存储,就是直接读取本地文件,进入LocalFileOffsetStore#readOffset方法:

public long readOffset(final MessageQueue mq, final ReadOffsetType type) {
    if (mq != null) {
        switch (type) {
            case MEMORY_FIRST_THEN_STORE:
            case READ_FROM_MEMORY: {
                AtomicLong offset = this.offsetTable.get(mq);
                if (offset != null) {
                    return offset.get();
                } else if (ReadOffsetType.READ_FROM_MEMORY == type) {
                    return -1;
                }
            }
            case READ_FROM_STORE: {
                OffsetSerializeWrapper offsetSerializeWrapper;
                try {
                    // 读取本地文件
                    offsetSerializeWrapper = this.readLocalOffset();
                } catch (MQClientException e) {
                    return -1;
                }
                if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) {
                    AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq);
                    if (offset != null) {
                        this.updateOffset(mq, offset.get(), false);
                        return offset.get();
                    }
                }
            }
            default:
                break;
        }
    }

    return -1;
}

/**
 * 读取文件的操作
 */

private OffsetSerializeWrapper readLocalOffset() throws MQClientException {
    String content = null;
    try {
        // 将文件内容转化为字符串
        content = MixAll.file2String(this.storePath);
    } catch (IOException e) {
        log.warn("Load local offset store file exception", e);
    }
    if (null == content || content.length() == 0) {
        return this.readLocalOffsetBak();
    } else {
        OffsetSerializeWrapper offsetSerializeWrapper = null;
        try {
            offsetSerializeWrapper =
                OffsetSerializeWrapper.fromJson(content, OffsetSerializeWrapper.class);
        } catch (Exception e) {
            log.warn("readLocalOffset Exception, and try to correct", e);
            return this.readLocalOffsetBak();
        }

        return offsetSerializeWrapper;
    }
}

再来看看从远程broker是如何获取的,进入RemoteBrokerOffsetStore#readOffset方法:

public long readOffset(final MessageQueue mq, final ReadOffsetType type) {
    if (mq != null) {
        switch (type) {
            case MEMORY_FIRST_THEN_STORE:
            case READ_FROM_MEMORY: {
                AtomicLong offset = this.offsetTable.get(mq);
                if (offset != null) {
                    return offset.get();
                } else if (ReadOffsetType.READ_FROM_MEMORY == type) {
                    return -1;
                }
            }
            case READ_FROM_STORE: {
                try {
                    // 从broker中获取
                    long brokerOffset = this.fetchConsumeOffsetFromBroker(mq);
                    AtomicLong offset = new AtomicLong(brokerOffset);
                    this.updateOffset(mq, offset.get(), false);
                    return brokerOffset;
                }
                // No offset in broker
                catch (MQBrokerException e) {
                    return -1;
                }
                //Other exceptions
                catch (Exception e) {
                    log.warn("fetchConsumeOffsetFromBroker exception, " + mq, e);
                    return -2;
                }
            }
            default:
                break;
        }
    }

    return -1;
}

/**
 * 继续获取操作
 */

private long fetchConsumeOffsetFromBroker(MessageQueue mq) throws RemotingException, 
        MQBrokerException, InterruptedException, MQClientException 
{
    // 找到对应的 broker,只从对应的broker上获取
    FindBrokerResult findBrokerResult = this.mQClientFactory
        .findBrokerAddressInAdmin(mq.getBrokerName());
    if (null == findBrokerResult) {
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
        findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
    }

    if (findBrokerResult != null) {
        QueryConsumerOffsetRequestHeader requestHeader = new QueryConsumerOffsetRequestHeader();
        requestHeader.setTopic(mq.getTopic());
        requestHeader.setConsumerGroup(this.groupName);
        requestHeader.setQueueId(mq.getQueueId());
        // 从broker查询
        return this.mQClientFactory.getMQClientAPIImpl().queryConsumerOffset(
            findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
    } else {
        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist"null);
    }
}

查询操作在MQClientAPIImpl#queryConsumerOffset方法中,进入:

public long queryConsumerOffset(
    final String addr,
    final QueryConsumerOffsetRequestHeader requestHeader,
    final long timeoutMillis
)
 throws RemotingException, MQBrokerException, InterruptedException 
{
    RemotingCommand request = RemotingCommand.createRequestCommand(
        RequestCode.QUERY_CONSUMER_OFFSET, requestHeader);

    RemotingCommand response = this.remotingClient.invokeSync(
        MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
        request, timeoutMillis);
    assert response != null;
    switch (response.getCode()) {
        case ResponseCode.SUCCESS: {
            // 处理结果
            QueryConsumerOffsetResponseHeader responseHeader =
                (QueryConsumerOffsetResponseHeader) response.decodeCommandCustomHeader(
                    QueryConsumerOffsetResponseHeader.class);

            return responseHeader.getOffset();
        }
        default:
            break;
    }

    throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}

收到该消息后,broker会如何应答呢?

ConsumerManageProcessor#processRequest方法如下:

@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, 
        RemotingCommand request)
 throws RemotingCommandException 
{
    switch (request.getCode()) {
        case RequestCode.GET_CONSUMER_LIST_BY_GROUP:
            return this.getConsumerListByGroup(ctx, request);
        case RequestCode.UPDATE_CONSUMER_OFFSET:
            return this.updateConsumerOffset(ctx, request);
        case RequestCode.QUERY_CONSUMER_OFFSET:
            return this.queryConsumerOffset(ctx, request);
        default:
            break;
    }
    return null;
}

处理查询操作在ConsumerManageProcessor#queryConsumerOffset方法中:

private RemotingCommand queryConsumerOffset(ChannelHandlerContext ctx, 
        RemotingCommand request)
 throws RemotingCommandException 
{
    final RemotingCommand response =
        RemotingCommand.createResponseCommand(QueryConsumerOffsetResponseHeader.class);
    final QueryConsumerOffsetResponseHeader responseHeader =
        (QueryConsumerOffsetResponseHeader) response.readCustomHeader();
    final QueryConsumerOffsetRequestHeader requestHeader =
        (QueryConsumerOffsetRequestHeader) request
            .decodeCommandCustomHeader(QueryConsumerOffsetRequestHeader.class);

    // 查询操作
    long offset =
        this.brokerController.getConsumerOffsetManager().queryOffset(
            requestHeader.getConsumerGroup(), requestHeader.getTopic(), 
            requestHeader.getQueueId());

    ...

    return response;
}

最终调用ConsumerOffsetManager#queryOffset(...)方法完成查询操作:

public long queryOffset(final String group, final String topic, final int queueId) {
    // topic@group
    String key = topic + TOPIC_GROUP_SEPARATOR + group;
    ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);
    if (null != map) {
        Long offset = map.get(queueId);
        if (offset != null)
            return offset;
    }

    return -1;
}

其实就是从ConsumerOffsetManager的成员变量offsetTable中获取数据。


限于作者个人水平,文中难免有错误之处,欢迎指正!原创不易,商业转载请联系作者获得授权,非商业转载请注明出处。

本文首发于微信公众号 「Java技术探秘」,如果您喜欢本文,欢迎关注该公众号,让我们一起在技术的世界里探秘吧!


浏览 156
点赞
评论
收藏
分享

手机扫一扫分享

举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

举报