Rocketmq源码分析16:消息过滤
注:本系列源码分析基于RocketMq 4.8.0,gitee仓库链接:https://gitee.com/funcy/rocketmq.git.
在rocketmq中,消息过滤有两种方式:
tagsql92
本文将从源码角度来分析消息过滤的一些细节。
1. demo 准备
消息过滤的示例demo位于org.apache.rocketmq.example.filter包下,这里我们分别来看下tag与sql的过滤方式。
1.1 消息过滤producer
public class FilterProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer
= new DefaultMQProducer("please_rename_unique_group_name");
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC"};
for (int i = 0; i < 60; i++) {
Message msg = new Message("TagFilterTest",
// 指定消息的tag
tags[i % tags.length],
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
}
}
在producer中,我们仅是指定了消息的tag,然后调用send(...)方法发送该消息。
关于消息过滤,producer就只是把它当作普通消息发送出去,并没有做什么额外的操作。
1.2 消息过滤consumer
1. tag 过滤
tag过滤的consumer示例如下:
public class TagFilterConsumer {
public static void main(String[] args) throws
InterruptedException, MQClientException, IOException {
DefaultMQPushConsumer consumer
= new DefaultMQPushConsumer("please_rename_unique_group_name");
consumer.subscribe("TagFilterTest",
// 设置要过滤的tag,多个使用 || 分开
"TagA || TagC");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n",
Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
在使用时,需要指定过滤的tag,多个tag使用||分开。
2. sql 过滤
sql过滤的consumer示例如下:
public class SqlFilterConsumer {
public static void main(String[] args) throws Exception {
String nameServer = "localhost:9876";
DefaultMQPushConsumer consumer
= new DefaultMQPushConsumer("please_rename_unique_group_name");
consumer.setNamesrvAddr(nameServer);
consumer.subscribe("SqlFilterTest",
// sql 过滤语句
MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +
"and (a is not null and a between 0 and 3)"));
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n",
Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
与tag过滤不同的是,sql过滤时,需要使用MessageSelector.bySql(...)指定sql语句。
另外,为了让broker支持sql过滤,需要设置属性:enablePropertyFilter=true,这样broker才能支持sql过滤。
从以上代码来看,consumer会指定过滤规则,告诉broker自己能接收哪些消息,broker从而返回对应的消息。
2. 从broker获取消息
consumer从broker拉取消息时,会把自己的过滤规则一并上报,当broker收到consumer的消息后,从而为consumer返回相应的消息,broker获取消息的方法为PullMessageProcessor#processRequest(...):
private RemotingCommand processRequest(final Channel channel, RemotingCommand request,
boolean brokerAllowSuspend) throws RemotingCommandException {
...
// 创建消息过滤的filter
SubscriptionData subscriptionData = null;
ConsumerFilterData consumerFilterData = null;
if (hasSubscriptionFlag) {
try {
// 构建过滤数据
subscriptionData = FilterAPI.build(requestHeader.getTopic(),
requestHeader.getSubscription(), requestHeader.getExpressionType());
// 如果不是tag类型的过滤,创建 consumerFilterData 对象
if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {
consumerFilterData = ConsumerFilterManager.build(requestHeader.getTopic(),
requestHeader.getConsumerGroup(), requestHeader.getSubscription(),
requestHeader.getExpressionType(), requestHeader.getSubVersion());
assert consumerFilterData != null;
}
} catch (Exception e) {
...
}
} else {
...
}
...
// 消息过滤对象
MessageFilter messageFilter;
if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) {
messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData,
this.brokerController.getConsumerFilterManager());
} else {
messageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData,
this.brokerController.getConsumerFilterManager());
}
// 获取消息
// 1. 根据 topic 与 queueId 获取 ConsumerQueue 文件
// 2. 根据 ConsumerQueue 文件的信息,从 CommitLog 中获取消息内容
final GetMessageResult getMessageResult = this.brokerController.getMessageStore().getMessage(
requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(),
requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);
...
return response;
}
这个方法是consumer从broker拉取消息的核心方法了,不过我们这里仅关注消息过滤相关的操作,因此这里省去了大量代码,仅保留了消息过滤相关的内容。
消息过滤相关的内容如下:
构建 subscriptionData构建 ConsumerFilterData:如果不是tag类型的过滤,创建consumerFilterData对象创建消息过滤对象 MessageFilter获取消息,在这里会进行消息过滤,处理方法为 DefaultMessageStore#getMessage
接下来我们就来分别看看这些步骤。
2.1 构建subscriptionData:FilterAPI#build
构建subscriptionData的方法为FilterAPI#build,代码如下:
public static SubscriptionData build(final String topic, final String subString,
final String type) throws Exception {
// 这里是构建tag类型的过滤数据
if (ExpressionType.TAG.equals(type) || type == null) {
return buildSubscriptionData(null, topic, subString);
}
if (subString == null || subString.length() < 1) {
throw new IllegalArgumentException("Expression can't be null! " + type);
}
// 构建sql类型的过滤数据
SubscriptionData subscriptionData = new SubscriptionData();
subscriptionData.setTopic(topic);
subscriptionData.setSubString(subString);
subscriptionData.setExpressionType(type);
return subscriptionData;
}
/**
* 构建tag过滤消息
*/
public static SubscriptionData buildSubscriptionData(final String consumerGroup,
String topic, String subString) throws Exception {
SubscriptionData subscriptionData = new SubscriptionData();
subscriptionData.setTopic(topic);
subscriptionData.setSubString(subString);
if (null == subString || subString.equals(SubscriptionData.SUB_ALL)
|| subString.length() == 0) {
subscriptionData.setSubString(SubscriptionData.SUB_ALL);
} else {
// 如果指定了tag,按 || 拆分tag
String[] tags = subString.split("\\|\\|");
if (tags.length > 0) {
for (String tag : tags) {
if (tag.length() > 0) {
String trimString = tag.trim();
if (trimString.length() > 0) {
// tag 放入 tagsSet,tag 的 hashCode 放入 codeSet
subscriptionData.getTagsSet().add(trimString);
subscriptionData.getCodeSet().add(trimString.hashCode());
}
}
}
} else {
throw new Exception("subString split error");
}
}
return subscriptionData;
}
从上面的方法来看,构建subscriptionData时,会根据tag与非tag过滤来构建不同的subscriptionData:
如果是 tag过滤,则按“||”拆分指定的tag,得到的tag放入tagsSet中,tag的hash值放入codeSet中如果是非 tag过滤,则不用处理tag相关操作,设置其他属性即可
2.2 构建ConsumerFilterData:ConsumerFilterManager#build
对于非tag过滤的类型,rocketMq会额外构建ConsumerFilterData对象,方法为ConsumerFilterManager#build:
public static ConsumerFilterData build(final String topic, final String consumerGroup,
final String expression, final String type,
final long clientVersion) {
if (ExpressionType.isTagType(type)) {
return null;
}
ConsumerFilterData consumerFilterData = new ConsumerFilterData();
// 设置一系列的属性
consumerFilterData.setTopic(topic);
consumerFilterData.setConsumerGroup(consumerGroup);
consumerFilterData.setBornTime(System.currentTimeMillis());
consumerFilterData.setDeadTime(0);
consumerFilterData.setExpression(expression);
consumerFilterData.setExpressionType(type);
consumerFilterData.setClientVersion(clientVersion);
try {
// 设置处理表达式的过滤器
consumerFilterData.setCompiledExpression(
FilterFactory.INSTANCE.get(type).compile(expression)
);
} catch (Throwable e) {
log.error(...);
return null;
}
return consumerFilterData;
}
这个方法中,关键就是如下一行:
consumerFilterData.setCompiledExpression(
FilterFactory.INSTANCE.get(type).compile(expression)
);
它设置了表达式的解析器,FilterFactory代码如下:
public class FilterFactory {
/** 单例对象 */
public static final FilterFactory INSTANCE = new FilterFactory();
/** 存放过滤器的map */
protected static final Map<String, FilterSpi> FILTER_SPI_HOLDER
= new HashMap<String, FilterSpi>(4);
static {
FilterFactory.INSTANCE.register(new SqlFilter());
}
/**
* 将 过滤器添加到 FILTER_SPI_HOLDER 中
*/
public void register(FilterSpi filterSpi) {
if (FILTER_SPI_HOLDER.containsKey(filterSpi.ofType())) {
throw new IllegalArgumentException(...);
}
FILTER_SPI_HOLDER.put(filterSpi.ofType(), filterSpi);
}
/**
* 根据类型获取 filter
*/
public FilterSpi get(String type) {
return FILTER_SPI_HOLDER.get(type);
}
...
}
可以看到,整个FILTER_SPI_HOLDERy就只有一个FilterSpi实例:SqlFilter,sql的过滤也是由这个类来处理的。
2.3 创建MessageFilter对象
这块就是创建了一个MessageFilter对象,上面创建的subscriptionData与consumerFilterData都会被传入这个对象中。
2.4 获取消息
到了这一步,就是真正去commitLog中获取消息了,获取方法为DefaultMessageStore#getMessage:
public GetMessageResult getMessage(final String group, final String topic, final int queueId,
final long offset, final int maxMsgNums, final MessageFilter messageFilter) {
...
// 判断消息是否满足过滤条件
if (messageFilter != null && !messageFilter.isMatchedByConsumeQueue(
isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) {
if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.NO_MATCHED_MESSAGE;
}
continue;
}
// 获取消息
SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
if (null == selectResult) {
if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.MESSAGE_WAS_REMOVING;
}
nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);
continue;
}
if (messageFilter != null
// 比较sql表达式
&& !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) {
if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.NO_MATCHED_MESSAGE;
}
// release...
selectResult.release();
continue;
}
...
}
在这个方法里,我们依旧只关注过滤相关流程,该方法所进行的操作如下:
判断消息是否满足过滤条件,这里只过滤 tag的hashCode,不满足条件的消息就不会获取到获取消息,就是从 commitlog文件中获取消息判断消息是否满足过滤条件,这里处理 sql类型的过滤,不满足条件的消息不会返回
1. 过滤tag的hashCode:
broker处理tag的操作方法为DefaultMessageFilter#isMatchedByConsumeQueue,代码如下:
@Override
public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {
if (null == tagsCode || null == subscriptionData) {
return true;
}
if (subscriptionData.isClassFilterMode()) {
return true;
}
return subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)
// 判断是否满足标签的hashcode
|| subscriptionData.getCodeSet().contains(tagsCode.intValue());
}
需要注意的是,这里只判断tag的hashCode是否相等,但不同tag的hashCode可能相等,真正的tag过滤是在consumer中进行的。
2. sql 过滤
从commitlog中获得消息后,接下来会进行sql过滤,方法为ExpressionMessageFilter#isMatchedByCommitLog:
public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, Map<String, String> properties) {
// 省略一些内容
...
Object ret = null;
try {
MessageEvaluationContext context = new MessageEvaluationContext(tempProperties);
// 处理值
ret = realFilterData.getCompiledExpression().evaluate(context);
} catch (Throwable e) {
log.error("Message Filter error, " + realFilterData + ", " + tempProperties, e);
}
log.debug("Pull eval result: {}, {}, {}", ret, realFilterData, tempProperties);
if (ret == null || !(ret instanceof Boolean)) {
return false;
}
return (Boolean) ret;
}
realFilterData内容如下:


3. consumer的过滤tag
对于tag过滤,broker仅是根据tag的hashCode进行过滤了,在consumer才会根据tag的内容过滤,我们进入拉取消息的方法 DefaultMQPushConsumerImpl#pullMessage:
public void pullMessage(final PullRequest pullRequest) {
...
// 消息拉取的回调函数,在拉取到消息后会进入这个方法处理
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
// 处理消息,将二制消息解码为java对象,也会对消息进行tag过滤
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(
pullRequest.getMessageQueue(), pullResult, subscriptionData);
...
}
...
}
...
}
}
根据跟进PullAPIWrapper#processPullResult方法:
public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
final SubscriptionData subscriptionData) {
PullResultExt pullResultExt = (PullResultExt) pullResult;
this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());
if (PullStatus.FOUND == pullResult.getPullStatus()) {
// 将二进制数据解码为对象
ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());
List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);
List<MessageExt> msgListFilterAgain = msgList;
// 按 tag 过滤
if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {
msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());
for (MessageExt msg : msgList) {
if (msg.getTags() != null) {
// 根据tag过滤消息
if (subscriptionData.getTagsSet().contains(msg.getTags())) {
msgListFilterAgain.add(msg);
}
}
}
}
...
}
....
}
从代码来看,方法中会根据tag是否在TagsSet中来决定该消息是否需要加入msgListFilterAgain,而msgListFilterAgain就是过滤的消息列表了。
3. 总结
RocketMq消息过滤支持tag与sql两种方式,
1. tag 方式
在broker获取消息时,根据tag的hashCode过滤一波消息,但这样得到的消息可能并不只是指定tag的,因此需要在consumer上做进一步的过滤。
举例来说,consumer订阅了tag为tag1的消息,tag1与tag11两者的hashCode都是100,因此在broker上过滤时,根据tag的hashCode,这两者对应的消息都会发往consumer,因此consumer需要再进比较tag的值,过滤出真正需要的消息。
2. sql 方式
sql方式的过滤方式,只在broker中进行。
限于作者个人水平,文中难免有错误之处,欢迎指正!原创不易,商业转载请联系作者获得授权,非商业转载请注明出处。
本文首发于微信公众号 「Java技术探秘」,如果您喜欢本文,欢迎关注该公众号,让我们一起在技术的世界里探秘吧!
