Rocketmq源码分析16:消息过滤
注:本系列源码分析基于RocketMq 4.8.0,gitee仓库链接:https://gitee.com/funcy/rocketmq.git.
在rocketmq
中,消息过滤有两种方式:
tag
sql92
本文将从源码角度来分析消息过滤的一些细节。
1. demo 准备
消息过滤的示例demo位于org.apache.rocketmq.example.filter
包下,这里我们分别来看下tag
与sql
的过滤方式。
1.1 消息过滤producer
public class FilterProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer
= new DefaultMQProducer("please_rename_unique_group_name");
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC"};
for (int i = 0; i < 60; i++) {
Message msg = new Message("TagFilterTest",
// 指定消息的tag
tags[i % tags.length],
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
}
}
在producer
中,我们仅是指定了消息的tag
,然后调用send(...)
方法发送该消息。
关于消息过滤,producer
就只是把它当作普通消息发送出去,并没有做什么额外的操作。
1.2 消息过滤consumer
1. tag 过滤
tag
过滤的consumer
示例如下:
public class TagFilterConsumer {
public static void main(String[] args) throws
InterruptedException, MQClientException, IOException {
DefaultMQPushConsumer consumer
= new DefaultMQPushConsumer("please_rename_unique_group_name");
consumer.subscribe("TagFilterTest",
// 设置要过滤的tag,多个使用 || 分开
"TagA || TagC");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n",
Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
在使用时,需要指定过滤的tag
,多个tag
使用||
分开。
2. sql 过滤
sql
过滤的consumer
示例如下:
public class SqlFilterConsumer {
public static void main(String[] args) throws Exception {
String nameServer = "localhost:9876";
DefaultMQPushConsumer consumer
= new DefaultMQPushConsumer("please_rename_unique_group_name");
consumer.setNamesrvAddr(nameServer);
consumer.subscribe("SqlFilterTest",
// sql 过滤语句
MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +
"and (a is not null and a between 0 and 3)"));
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n",
Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
与tag
过滤不同的是,sql过滤时,需要使用MessageSelector.bySql(...)
指定sql语句。
另外,为了让broker
支持sql
过滤,需要设置属性:enablePropertyFilter=true
,这样broker
才能支持sql
过滤。
从以上代码来看,consumer
会指定过滤规则,告诉broker
自己能接收哪些消息,broker
从而返回对应的消息。
2. 从broker
获取消息
consumer
从broker
拉取消息时,会把自己的过滤规则一并上报,当broker
收到consumer
的消息后,从而为consumer
返回相应的消息,broker
获取消息的方法为PullMessageProcessor#processRequest(...)
:
private RemotingCommand processRequest(final Channel channel, RemotingCommand request,
boolean brokerAllowSuspend) throws RemotingCommandException {
...
// 创建消息过滤的filter
SubscriptionData subscriptionData = null;
ConsumerFilterData consumerFilterData = null;
if (hasSubscriptionFlag) {
try {
// 构建过滤数据
subscriptionData = FilterAPI.build(requestHeader.getTopic(),
requestHeader.getSubscription(), requestHeader.getExpressionType());
// 如果不是tag类型的过滤,创建 consumerFilterData 对象
if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {
consumerFilterData = ConsumerFilterManager.build(requestHeader.getTopic(),
requestHeader.getConsumerGroup(), requestHeader.getSubscription(),
requestHeader.getExpressionType(), requestHeader.getSubVersion());
assert consumerFilterData != null;
}
} catch (Exception e) {
...
}
} else {
...
}
...
// 消息过滤对象
MessageFilter messageFilter;
if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) {
messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData,
this.brokerController.getConsumerFilterManager());
} else {
messageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData,
this.brokerController.getConsumerFilterManager());
}
// 获取消息
// 1. 根据 topic 与 queueId 获取 ConsumerQueue 文件
// 2. 根据 ConsumerQueue 文件的信息,从 CommitLog 中获取消息内容
final GetMessageResult getMessageResult = this.brokerController.getMessageStore().getMessage(
requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(),
requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);
...
return response;
}
这个方法是consumer
从broker
拉取消息的核心方法了,不过我们这里仅关注消息过滤相关的操作,因此这里省去了大量代码,仅保留了消息过滤相关的内容。
消息过滤相关的内容如下:
构建 subscriptionData
构建 ConsumerFilterData
:如果不是tag
类型的过滤,创建consumerFilterData
对象创建消息过滤对象 MessageFilter
获取消息,在这里会进行消息过滤,处理方法为 DefaultMessageStore#getMessage
接下来我们就来分别看看这些步骤。
2.1 构建subscriptionData
:FilterAPI#build
构建subscriptionData
的方法为FilterAPI#build
,代码如下:
public static SubscriptionData build(final String topic, final String subString,
final String type) throws Exception {
// 这里是构建tag类型的过滤数据
if (ExpressionType.TAG.equals(type) || type == null) {
return buildSubscriptionData(null, topic, subString);
}
if (subString == null || subString.length() < 1) {
throw new IllegalArgumentException("Expression can't be null! " + type);
}
// 构建sql类型的过滤数据
SubscriptionData subscriptionData = new SubscriptionData();
subscriptionData.setTopic(topic);
subscriptionData.setSubString(subString);
subscriptionData.setExpressionType(type);
return subscriptionData;
}
/**
* 构建tag过滤消息
*/
public static SubscriptionData buildSubscriptionData(final String consumerGroup,
String topic, String subString) throws Exception {
SubscriptionData subscriptionData = new SubscriptionData();
subscriptionData.setTopic(topic);
subscriptionData.setSubString(subString);
if (null == subString || subString.equals(SubscriptionData.SUB_ALL)
|| subString.length() == 0) {
subscriptionData.setSubString(SubscriptionData.SUB_ALL);
} else {
// 如果指定了tag,按 || 拆分tag
String[] tags = subString.split("\\|\\|");
if (tags.length > 0) {
for (String tag : tags) {
if (tag.length() > 0) {
String trimString = tag.trim();
if (trimString.length() > 0) {
// tag 放入 tagsSet,tag 的 hashCode 放入 codeSet
subscriptionData.getTagsSet().add(trimString);
subscriptionData.getCodeSet().add(trimString.hashCode());
}
}
}
} else {
throw new Exception("subString split error");
}
}
return subscriptionData;
}
从上面的方法来看,构建subscriptionData
时,会根据tag
与非tag
过滤来构建不同的subscriptionData
:
如果是 tag
过滤,则按“||”拆分指定的tag
,得到的tag
放入tagsSet
中,tag
的hash值
放入codeSet
中如果是非 tag
过滤,则不用处理tag
相关操作,设置其他属性即可
2.2 构建ConsumerFilterData
:ConsumerFilterManager#build
对于非tag
过滤的类型,rocketMq
会额外构建ConsumerFilterData
对象,方法为ConsumerFilterManager#build
:
public static ConsumerFilterData build(final String topic, final String consumerGroup,
final String expression, final String type,
final long clientVersion) {
if (ExpressionType.isTagType(type)) {
return null;
}
ConsumerFilterData consumerFilterData = new ConsumerFilterData();
// 设置一系列的属性
consumerFilterData.setTopic(topic);
consumerFilterData.setConsumerGroup(consumerGroup);
consumerFilterData.setBornTime(System.currentTimeMillis());
consumerFilterData.setDeadTime(0);
consumerFilterData.setExpression(expression);
consumerFilterData.setExpressionType(type);
consumerFilterData.setClientVersion(clientVersion);
try {
// 设置处理表达式的过滤器
consumerFilterData.setCompiledExpression(
FilterFactory.INSTANCE.get(type).compile(expression)
);
} catch (Throwable e) {
log.error(...);
return null;
}
return consumerFilterData;
}
这个方法中,关键就是如下一行:
consumerFilterData.setCompiledExpression(
FilterFactory.INSTANCE.get(type).compile(expression)
);
它设置了表达式的解析器,FilterFactory
代码如下:
public class FilterFactory {
/** 单例对象 */
public static final FilterFactory INSTANCE = new FilterFactory();
/** 存放过滤器的map */
protected static final Map<String, FilterSpi> FILTER_SPI_HOLDER
= new HashMap<String, FilterSpi>(4);
static {
FilterFactory.INSTANCE.register(new SqlFilter());
}
/**
* 将 过滤器添加到 FILTER_SPI_HOLDER 中
*/
public void register(FilterSpi filterSpi) {
if (FILTER_SPI_HOLDER.containsKey(filterSpi.ofType())) {
throw new IllegalArgumentException(...);
}
FILTER_SPI_HOLDER.put(filterSpi.ofType(), filterSpi);
}
/**
* 根据类型获取 filter
*/
public FilterSpi get(String type) {
return FILTER_SPI_HOLDER.get(type);
}
...
}
可以看到,整个FILTER_SPI_HOLDERy
就只有一个FilterSpi
实例:SqlFilter
,sql的过滤也是由这个类来处理的。
2.3 创建MessageFilter
对象
这块就是创建了一个MessageFilter
对象,上面创建的subscriptionData
与consumerFilterData
都会被传入这个对象中。
2.4 获取消息
到了这一步,就是真正去commitLog
中获取消息了,获取方法为DefaultMessageStore#getMessage
:
public GetMessageResult getMessage(final String group, final String topic, final int queueId,
final long offset, final int maxMsgNums, final MessageFilter messageFilter) {
...
// 判断消息是否满足过滤条件
if (messageFilter != null && !messageFilter.isMatchedByConsumeQueue(
isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) {
if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.NO_MATCHED_MESSAGE;
}
continue;
}
// 获取消息
SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
if (null == selectResult) {
if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.MESSAGE_WAS_REMOVING;
}
nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);
continue;
}
if (messageFilter != null
// 比较sql表达式
&& !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) {
if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.NO_MATCHED_MESSAGE;
}
// release...
selectResult.release();
continue;
}
...
}
在这个方法里,我们依旧只关注过滤相关流程,该方法所进行的操作如下:
判断消息是否满足过滤条件,这里只过滤 tag
的hashCode
,不满足条件的消息就不会获取到获取消息,就是从 commitlog
文件中获取消息判断消息是否满足过滤条件,这里处理 sql
类型的过滤,不满足条件的消息不会返回
1. 过滤tag
的hashCode
:
broker
处理tag
的操作方法为DefaultMessageFilter#isMatchedByConsumeQueue
,代码如下:
@Override
public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {
if (null == tagsCode || null == subscriptionData) {
return true;
}
if (subscriptionData.isClassFilterMode()) {
return true;
}
return subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)
// 判断是否满足标签的hashcode
|| subscriptionData.getCodeSet().contains(tagsCode.intValue());
}
需要注意的是,这里只判断tag
的hashCode
是否相等,但不同tag
的hashCode
可能相等,真正的tag
过滤是在consumer
中进行的。
2. sql 过滤
从commitlog
中获得消息后,接下来会进行sql
过滤,方法为ExpressionMessageFilter#isMatchedByCommitLog
:
public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, Map<String, String> properties) {
// 省略一些内容
...
Object ret = null;
try {
MessageEvaluationContext context = new MessageEvaluationContext(tempProperties);
// 处理值
ret = realFilterData.getCompiledExpression().evaluate(context);
} catch (Throwable e) {
log.error("Message Filter error, " + realFilterData + ", " + tempProperties, e);
}
log.debug("Pull eval result: {}, {}, {}", ret, realFilterData, tempProperties);
if (ret == null || !(ret instanceof Boolean)) {
return false;
}
return (Boolean) ret;
}
realFilterData
内容如下:
3. consumer
的过滤tag
对于tag
过滤,broker
仅是根据tag
的hashCode
进行过滤了,在consumer
才会根据tag
的内容过滤,我们进入拉取消息的方法 DefaultMQPushConsumerImpl#pullMessage
:
public void pullMessage(final PullRequest pullRequest) {
...
// 消息拉取的回调函数,在拉取到消息后会进入这个方法处理
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
// 处理消息,将二制消息解码为java对象,也会对消息进行tag过滤
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(
pullRequest.getMessageQueue(), pullResult, subscriptionData);
...
}
...
}
...
}
}
根据跟进PullAPIWrapper#processPullResult
方法:
public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
final SubscriptionData subscriptionData) {
PullResultExt pullResultExt = (PullResultExt) pullResult;
this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());
if (PullStatus.FOUND == pullResult.getPullStatus()) {
// 将二进制数据解码为对象
ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());
List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);
List<MessageExt> msgListFilterAgain = msgList;
// 按 tag 过滤
if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {
msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());
for (MessageExt msg : msgList) {
if (msg.getTags() != null) {
// 根据tag过滤消息
if (subscriptionData.getTagsSet().contains(msg.getTags())) {
msgListFilterAgain.add(msg);
}
}
}
}
...
}
....
}
从代码来看,方法中会根据tag
是否在TagsSet
中来决定该消息是否需要加入msgListFilterAgain
,而msgListFilterAgain
就是过滤的消息列表了。
3. 总结
RocketMq
消息过滤支持tag
与sql
两种方式,
1. tag
方式
在broker
获取消息时,根据tag
的hashCode
过滤一波消息,但这样得到的消息可能并不只是指定tag
的,因此需要在consumer
上做进一步的过滤。
举例来说,consumer
订阅了tag
为tag1
的消息,tag1
与tag11
两者的hashCode
都是100,因此在broker
上过滤时,根据tag
的hashCode
,这两者对应的消息都会发往consumer
,因此consumer
需要再进比较tag
的值,过滤出真正需要的消息。
2. sql
方式
sql
方式的过滤方式,只在broker
中进行。
限于作者个人水平,文中难免有错误之处,欢迎指正!原创不易,商业转载请联系作者获得授权,非商业转载请注明出处。
本文首发于微信公众号 「Java技术探秘」,如果您喜欢本文,欢迎关注该公众号,让我们一起在技术的世界里探秘吧!