Rocketmq源码分析16:消息过滤

共 30475字,需浏览 61分钟

 ·

2021-05-02 15:10

注:本系列源码分析基于RocketMq 4.8.0,gitee仓库链接:https://gitee.com/funcy/rocketmq.git.

rocketmq中,消息过滤有两种方式:

  • tag
  • sql92

本文将从源码角度来分析消息过滤的一些细节。

1. demo 准备

消息过滤的示例demo位于org.apache.rocketmq.example.filter包下,这里我们分别来看下tagsql的过滤方式。

1.1 消息过滤producer

public class FilterProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer 
            = new DefaultMQProducer("please_rename_unique_group_name");
        producer.start();
        String[] tags = new String[] {"TagA""TagB""TagC"};

        for (int i = 0; i < 60; i++) {
            Message msg = new Message("TagFilterTest",
                // 指定消息的tag
                tags[i % tags.length],
                "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));

            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        producer.shutdown();
    }
}

producer中,我们仅是指定了消息的tag,然后调用send(...)方法发送该消息。

关于消息过滤,producer就只是把它当作普通消息发送出去,并没有做什么额外的操作。

1.2 消息过滤consumer

1. tag 过滤

tag过滤的consumer示例如下:

public class TagFilterConsumer {
    public static void main(String[] args) throws 
            InterruptedException, MQClientException, IOException 
{

        DefaultMQPushConsumer consumer 
            = new DefaultMQPushConsumer("please_rename_unique_group_name");
        consumer.subscribe("TagFilterTest"
            // 设置要过滤的tag,多个使用 || 分开
            "TagA || TagC");
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context)
 
{
                System.out.printf("%s Receive New Messages: %s %n"
                    Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

在使用时,需要指定过滤的tag,多个tag使用||分开。

2. sql 过滤

sql过滤的consumer示例如下:

public class SqlFilterConsumer {

    public static void main(String[] args) throws Exception {
        String nameServer = "localhost:9876";
        DefaultMQPushConsumer consumer 
            = new DefaultMQPushConsumer("please_rename_unique_group_name");
        consumer.setNamesrvAddr(nameServer);
        consumer.subscribe("SqlFilterTest",
            // sql 过滤语句
            MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +
                "and (a is not null and a between 0 and 3)"));
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context)
 
{
                System.out.printf("%s Receive New Messages: %s %n"
                    Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

tag过滤不同的是,sql过滤时,需要使用MessageSelector.bySql(...)指定sql语句。

另外,为了让broker支持sql过滤,需要设置属性:enablePropertyFilter=true,这样broker才能支持sql过滤。

从以上代码来看,consumer会指定过滤规则,告诉broker自己能接收哪些消息,broker从而返回对应的消息。

2. 从broker获取消息

consumerbroker拉取消息时,会把自己的过滤规则一并上报,当broker收到consumer的消息后,从而为consumer返回相应的消息,broker获取消息的方法为PullMessageProcessor#processRequest(...)

private RemotingCommand processRequest(final Channel channel, RemotingCommand request, 
        boolean brokerAllowSuspend)
 throws RemotingCommandException 
{
    ...

    // 创建消息过滤的filter
    SubscriptionData subscriptionData = null;
    ConsumerFilterData consumerFilterData = null;
    if (hasSubscriptionFlag) {
        try {
            // 构建过滤数据
            subscriptionData = FilterAPI.build(requestHeader.getTopic(), 
                requestHeader.getSubscription(), requestHeader.getExpressionType());
            // 如果不是tag类型的过滤,创建 consumerFilterData 对象
            if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {
                consumerFilterData = ConsumerFilterManager.build(requestHeader.getTopic(), 
                    requestHeader.getConsumerGroup(), requestHeader.getSubscription(),
                    requestHeader.getExpressionType(), requestHeader.getSubVersion());
                assert consumerFilterData != null;
            }
        } catch (Exception e) {
            ...
        }
    } else {
        ...
    }

    ...

    // 消息过滤对象
    MessageFilter messageFilter;
    if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) {
        messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData,
            this.brokerController.getConsumerFilterManager());
    } else {
        messageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData,
            this.brokerController.getConsumerFilterManager());
    }

    // 获取消息
    // 1. 根据 topic 与 queueId 获取 ConsumerQueue 文件
    // 2. 根据 ConsumerQueue 文件的信息,从 CommitLog 中获取消息内容
    final GetMessageResult getMessageResult = this.brokerController.getMessageStore().getMessage(
        requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), 
        requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);
    ...
    return response;
}

这个方法是consumerbroker拉取消息的核心方法了,不过我们这里仅关注消息过滤相关的操作,因此这里省去了大量代码,仅保留了消息过滤相关的内容。

消息过滤相关的内容如下:

  1. 构建subscriptionData
  2. 构建ConsumerFilterData:如果不是tag类型的过滤,创建 consumerFilterData 对象
  3. 创建消息过滤对象MessageFilter
  4. 获取消息,在这里会进行消息过滤,处理方法为DefaultMessageStore#getMessage

接下来我们就来分别看看这些步骤。

2.1 构建subscriptionDataFilterAPI#build

构建subscriptionData的方法为FilterAPI#build,代码如下:

public static SubscriptionData build(final String topic, final String subString,
    final String type)
 throws Exception 
{
    // 这里是构建tag类型的过滤数据
    if (ExpressionType.TAG.equals(type) || type == null) {
        return buildSubscriptionData(null, topic, subString);
    }

    if (subString == null || subString.length() < 1) {
        throw new IllegalArgumentException("Expression can't be null! " + type);
    }
    // 构建sql类型的过滤数据
    SubscriptionData subscriptionData = new SubscriptionData();
    subscriptionData.setTopic(topic);
    subscriptionData.setSubString(subString);
    subscriptionData.setExpressionType(type);
    return subscriptionData;
}

/**
 * 构建tag过滤消息
 */

public static SubscriptionData buildSubscriptionData(final String consumerGroup, 
        String topic, String subString)
 throws Exception 
{
    SubscriptionData subscriptionData = new SubscriptionData();
    subscriptionData.setTopic(topic);
    subscriptionData.setSubString(subString);

    if (null == subString || subString.equals(SubscriptionData.SUB_ALL) 
            || subString.length() == 0) {
        subscriptionData.setSubString(SubscriptionData.SUB_ALL);
    } else {
        // 如果指定了tag,按 || 拆分tag
        String[] tags = subString.split("\\|\\|");
        if (tags.length > 0) {
            for (String tag : tags) {
                if (tag.length() > 0) {
                    String trimString = tag.trim();
                    if (trimString.length() > 0) {
                        // tag 放入 tagsSet,tag 的 hashCode 放入 codeSet
                        subscriptionData.getTagsSet().add(trimString);
                        subscriptionData.getCodeSet().add(trimString.hashCode());
                    }
                }
            }
        } else {
            throw new Exception("subString split error");
        }
    }

    return subscriptionData;
}

从上面的方法来看,构建subscriptionData时,会根据tag与非tag过滤来构建不同的subscriptionData

  1. 如果是tag过滤,则按“||”拆分指定的tag,得到的tag 放入 tagsSet中,taghash值 放入 codeSet
  2. 如果是非tag过滤,则不用处理tag相关操作,设置其他属性即可

2.2 构建ConsumerFilterDataConsumerFilterManager#build

对于非tag过滤的类型,rocketMq会额外构建ConsumerFilterData对象,方法为ConsumerFilterManager#build

public static ConsumerFilterData build(final String topic, final String consumerGroup,
        final String expression, final String type,
        final long clientVersion)
 
{
        if (ExpressionType.isTagType(type)) {
            return null;
        }

        ConsumerFilterData consumerFilterData = new ConsumerFilterData();
        // 设置一系列的属性
        consumerFilterData.setTopic(topic);
        consumerFilterData.setConsumerGroup(consumerGroup);
        consumerFilterData.setBornTime(System.currentTimeMillis());
        consumerFilterData.setDeadTime(0);
        consumerFilterData.setExpression(expression);
        consumerFilterData.setExpressionType(type);
        consumerFilterData.setClientVersion(clientVersion);
        try {
            // 设置处理表达式的过滤器
            consumerFilterData.setCompiledExpression(
                FilterFactory.INSTANCE.get(type).compile(expression)
            );
        } catch (Throwable e) {
            log.error(...);
            return null;
        }

        return consumerFilterData;
    }

这个方法中,关键就是如下一行:

consumerFilterData.setCompiledExpression(
    FilterFactory.INSTANCE.get(type).compile(expression)
);

它设置了表达式的解析器,FilterFactory代码如下:

public class FilterFactory {

    /** 单例对象 */
    public static final FilterFactory INSTANCE = new FilterFactory();

    /** 存放过滤器的map */
    protected static final Map<String, FilterSpi> FILTER_SPI_HOLDER 
        = new HashMap<String, FilterSpi>(4);

    static {
        FilterFactory.INSTANCE.register(new SqlFilter());
    }

    /**
     * 将 过滤器添加到 FILTER_SPI_HOLDER 中
     */

    public void register(FilterSpi filterSpi) {
        if (FILTER_SPI_HOLDER.containsKey(filterSpi.ofType())) {
            throw new IllegalArgumentException(...);
        }

        FILTER_SPI_HOLDER.put(filterSpi.ofType(), filterSpi);
    }

    /**
     * 根据类型获取 filter
     */

    public FilterSpi get(String type) {
        return FILTER_SPI_HOLDER.get(type);
    }

    ...
}

可以看到,整个FILTER_SPI_HOLDERy就只有一个FilterSpi实例:SqlFilter,sql的过滤也是由这个类来处理的。

2.3 创建MessageFilter对象

这块就是创建了一个MessageFilter对象,上面创建的subscriptionDataconsumerFilterData都会被传入这个对象中。

2.4 获取消息

到了这一步,就是真正去commitLog中获取消息了,获取方法为DefaultMessageStore#getMessage

public GetMessageResult getMessage(final String group, final String topic, final int queueId, 
        final long offset, final int maxMsgNums, final MessageFilter messageFilter)
 
{
    ...

    // 判断消息是否满足过滤条件
    if (messageFilter != null && !messageFilter.isMatchedByConsumeQueue(
                isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) {
        if (getResult.getBufferTotalSize() == 0) {
            status = GetMessageStatus.NO_MATCHED_MESSAGE;
        }
        continue;
    }

    // 获取消息
    SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
    if (null == selectResult) {
        if (getResult.getBufferTotalSize() == 0) {
            status = GetMessageStatus.MESSAGE_WAS_REMOVING;
        }

        nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);
        continue;
    }

    if (messageFilter != null
        // 比较sql表达式    
        && !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) {
        if (getResult.getBufferTotalSize() == 0) {
            status = GetMessageStatus.NO_MATCHED_MESSAGE;
        }
        // release...
        selectResult.release();
        continue;
    }
    ...
}

在这个方法里,我们依旧只关注过滤相关流程,该方法所进行的操作如下:

  1. 判断消息是否满足过滤条件,这里只过滤taghashCode,不满足条件的消息就不会获取到
  2. 获取消息,就是从commitlog文件中获取消息
  3. 判断消息是否满足过滤条件,这里处理sql类型的过滤,不满足条件的消息不会返回
1. 过滤taghashCode

broker处理tag的操作方法为DefaultMessageFilter#isMatchedByConsumeQueue,代码如下:

@Override
public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {
    if (null == tagsCode || null == subscriptionData) {
        return true;
    }

    if (subscriptionData.isClassFilterMode()) {
        return true;
    }

    return subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)
            // 判断是否满足标签的hashcode
            || subscriptionData.getCodeSet().contains(tagsCode.intValue());
}

需要注意的是,这里只判断taghashCode是否相等,但不同taghashCode可能相等,真正的tag过滤是在consumer中进行的。

2. sql 过滤

commitlog中获得消息后,接下来会进行sql过滤,方法为ExpressionMessageFilter#isMatchedByCommitLog

public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, Map<String, String> properties) {
    // 省略一些内容
    ...

    Object ret = null;
    try {
        MessageEvaluationContext context = new MessageEvaluationContext(tempProperties);
        // 处理值
        ret = realFilterData.getCompiledExpression().evaluate(context);
    } catch (Throwable e) {
        log.error("Message Filter error, " + realFilterData + ", " + tempProperties, e);
    }

    log.debug("Pull eval result: {}, {}, {}", ret, realFilterData, tempProperties);

    if (ret == null || !(ret instanceof Boolean)) {
        return false;
    }

    return (Boolean) ret;
}

realFilterData内容如下:

3. consumer的过滤tag

对于tag过滤,broker仅是根据taghashCode进行过滤了,在consumer才会根据tag的内容过滤,我们进入拉取消息的方法 DefaultMQPushConsumerImpl#pullMessage

   public void pullMessage(final PullRequest pullRequest) {
       ...
       // 消息拉取的回调函数,在拉取到消息后会进入这个方法处理
        PullCallback pullCallback = new PullCallback() {
            @Override
            public void onSuccess(PullResult pullResult) {
                if (pullResult != null) {
                    // 处理消息,将二制消息解码为java对象,也会对消息进行tag过滤
                    pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(
                        pullRequest.getMessageQueue(), pullResult, subscriptionData);
                    ...
                }
                ...
            }
            ...
        }
   }

根据跟进PullAPIWrapper#processPullResult方法:

public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
    final SubscriptionData subscriptionData)
 
{
    PullResultExt pullResultExt = (PullResultExt) pullResult;

    this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());
    if (PullStatus.FOUND == pullResult.getPullStatus()) {
        // 将二进制数据解码为对象
        ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());
        List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);

        List<MessageExt> msgListFilterAgain = msgList;
        // 按 tag 过滤
        if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {
            msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());
            for (MessageExt msg : msgList) {
                if (msg.getTags() != null) {
                    // 根据tag过滤消息
                    if (subscriptionData.getTagsSet().contains(msg.getTags())) {
                        msgListFilterAgain.add(msg);
                    }
                }
            }
        }
        ...
    }
    ....
}

从代码来看,方法中会根据tag是否在TagsSet中来决定该消息是否需要加入msgListFilterAgain,而msgListFilterAgain就是过滤的消息列表了。

3. 总结

RocketMq消息过滤支持tagsql两种方式,

1. tag 方式

broker获取消息时,根据taghashCode过滤一波消息,但这样得到的消息可能并不只是指定tag的,因此需要在consumer上做进一步的过滤。

举例来说,consumer订阅了tagtag1的消息,tag1tag11两者的hashCode都是100,因此在broker上过滤时,根据taghashCode,这两者对应的消息都会发往consumer,因此consumer需要再进比较tag的值,过滤出真正需要的消息。

2. sql 方式

sql方式的过滤方式,只在broker中进行。


限于作者个人水平,文中难免有错误之处,欢迎指正!原创不易,商业转载请联系作者获得授权,非商业转载请注明出处。

本文首发于微信公众号 「Java技术探秘」,如果您喜欢本文,欢迎关注该公众号,让我们一起在技术的世界里探秘吧!


浏览 37
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报