Rocketmq源码分析17:RocketMq 知识点总结
架构总览
RocketMQ架构上主要分为四部分,如上图所示:
Producer:消息发布的角色,支持分布式集群方式部署。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。
Consumer:消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。
NameServer:NameServer是一个非常简单的Topic路由注册中心,其角色类似Dubbo中的zookeeper,支持Broker的动态注册与发现。主要包括两个功能:Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;路由信息管理,每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后Producer和Conumser通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。NameServer通常也是集群的方式部署,各实例间相互不进行信息通讯。Broker是向每一台NameServer注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息。当某个NameServer因某种原因下线了,Broker仍然可以向其它NameServer同步其路由信息,Producer,Consumer仍然可以动态感知Broker的路由的信息。
BrokerServer:Broker主要负责消息的存储、投递和查询以及服务高可用保证
topic、borker与queue三者关系
三者关系如下图:
在图中,共有2个broker,2个topic,每个topic都有4个队列。在producer发送消息时,是发送到具体的队列上,consumer获取消息时,也是从队列上获取。
注:RocketMq
的 topic 可以在控制台手动创建,也可以自动创建(需要开启配置autoCreateTopicEnable=true
),官方建议生产环境下关闭自动创建。
消息处理流程
RocketMq消息处理整个流程如下:
消息接收:消息接收是指接收 producer
的消息,处理类是SendMessageProcessor
,将消息写入到commigLog
文件后,接收流程处理完毕;消息分发: broker
处理消息分发的类是ReputMessageService
,它会启动一个线程,不断地将commitLong
分到到对应的consumerQueue
,这一步操作会写两个文件:consumerQueue
与indexFile
,写入后,消息分发流程处理 完毕;消息投递:消息投递是指将消息发往 consumer
的流程,consumer
会发起获取消息的请求,broker
收到请求后,调用PullMessageProcessor
类处理,从consumerQueue
文件获取消息,返回给consumer
后,投递流程处理完毕。
具体分析可参考
三高保证
高并发
netty 高性能传输:
producer
、broker
、comsumer
之间使用netty通信,高性能传输;业务处理时,使用的是自定义的工作线程池,最终处理操作在NettyServerHandler
中丢给工作线程池。自旋锁减少上下文切换:RocketMQ 的 CommitLog 为了避免并发写入,使用一个
PutMessageLock
。PutMessageLock
有 2个实现版本:PutMessageReentrantLock
和PutMessageSpinLock
。PutMessageReentrantLock
是基于 java 的同步等待唤醒机制;PutMessageSpinLock
使用 Java 的 CAS 原语,通过自旋设值实现上锁和解锁。RocketMQ 默认使用PutMessageSpinLock
以提高高并发写入时候的上锁解锁效率,并减少线程上下文切换次数。顺序写文件:写入
commitLog
时,使用的是顺序写入,比随机写入的性能高很多,写入commitLog
时,并不是直接写入磁盘的,而是先写入PageCache
,最后由操作系统异步将PageCache
的数据刷到磁盘中MappedFile
预热和零拷贝机制
:Linux 系统在写数据时候不会直接把数据写到磁盘上,而是写到磁盘对应的PageCache
中,并把该页标记为脏页。当脏页累计到一定程度或者一定时间后再把数据 flush 到磁盘(当然在此期间如果系统掉电,会导致脏页数据丢失)。多
broker
多Queue
模式:使用多broker
多Queue
的模式,提高消息的并行处理能力。
高可用
RocketMq 的高可用由 DLedger 提供,整个 broker 的高可用架构如下:
DLedger
是一个多节点的集群,内部使用raft
算法选举leader
节点,由该leader
节点对broker
中的节点进行故障转移
多 NameServer
避免NameServer
的单点故障多个 broker
集群,当一个broker
集群出现故障时,其他broker
集群也能正常工作每个 broker
集群有一个master
节点和多个slave
节点,当master
节点出现故障,DLedger
在感知到故障后,会将其中一个slave
节点切换为master
节点,保证该集群继续正常工作
高扩展
broker
与producer
/consumer
没有耦合关系,需要添加broker
集群(1主多从)时,只需配置好nameServer
的地址,然后添加即可,理论上broker
可任意扩展。
当broker
添加到集群后,新加入的 broker
集群会被注册到nameServer
上,producer
/consumer
就能发现该broker
集群了。
消息可靠性
RocketMq
的消息可靠性分为如下几个阶段:
消息发送阶段的可靠性 消息存储阶段可靠性 消息消费阶段的可靠性
下面我们将介绍这3个阶段的可靠性是如何做到的。
消息发送阶段的可靠性
消息发送阶段的可靠性由producer
来处理,rocketmq
主要支持三种消息发送方式
同步:消息发放后,线程会阻塞,直到返回结果 异步:在发送消息时,可以设置消息发送结果的监听,消息发送后,线程不会阻塞,消息发送完成后,发送结果会被监听到 单向:消息发送完成后,线程不会阻塞,不会有结果返回,也无法设置发送结果的监听,即发送就可以,不关心发送结果,不关心是否发送成功
在消息可靠性方面,
同步发送:消息发送失败时,内部会「重试」(默认1次发送+2次失败重试,共3次),另外,由于发送完成后可以得到发送结果,因此也「可对失败的结果进行自主处理」异步发送:消息发送失败时,同时有内部「重试」(默认1次发送+2次失败重试,共3次),另外,发送消息时可以设置消息的监听规则,当发送失败时,可以「在监听代码中自主对失败的消息进行处理」单向发送:该模式下,消息发送失败时「无重试」(只是打出一条warn级别的日志),且「无发送结果返回、无结果监听」
消息存储阶段可靠性
消息存储阶段可靠性由broker
来保证,
在单master
架构的broker
中,消息先写入内存的PageCache
中,然后再进行刷盘,刷盘方式有两种:
SYNC_FLUSH
(同步刷盘):消息写入内存的 PageCache后,立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写成功的状态。这种方式可以保证数据绝对安全,但是吞吐量不大。ASYNC_FLUSH
(异步刷盘(默认)):消息写入到内存的 PageCache中,就立刻给客户端返回写操作成功,当 PageCache中的消息积累到一定的量时,触发一次写操作,或者定时等策略将 PageCache中的消息写入到磁盘中。这种方式吞吐量大,性能高,但是 PageCache 中的数据可能丢失,不能保证数据绝对的安全。
小结:同步刷盘,不丢失数据但影响性能;异步刷盘性能高,但如果在消息刷盘前发生断电意外,消息就会丢失。
如果一主多从的broker
架构中,master
节点有两种角色选择:
SYNC_MASTER
(同步主机):当接收到消息后,立即同步到slave
节点,当slave
节点同步成功后,才返回成功,可靠性高ASYNC_MASTER
(异步主机):当接收到消息,并不立即同步给slave
节点,同步操作由后台线程进行,如果在发生主从切换时,同步操作还未进行,就有可能会丢失数据
小结:同步主机可靠性高,发生主从切换时不会丢失数据,但由于需要等待slave
节点同步成功后才返回,因此性能略低;异步主机性能高,但如果在同步操作前发生了主从切换,原master
上的数据可能并没有同步给slave
,因此会造成消息丢失
总结:如果要保证消息的可靠性,单master
节点的刷新方式可选择SYNC_FLUSH
(同步刷盘)方式;一主多从的broker
架构中,master
节点的刷新方式可选择ASYNC_FLUSH
(异步刷盘)方式,master
节点的角色使用SYNC_MASTER
(同步主机),实际中就结合具体场景进行合理选择。
消息消费阶段的可靠性
消息消费阶段的可靠性由comsumer
来保证。在消费消息时,可返回两种结果:
CONSUME_SUCCESS
:消费成功RECONSUME_LATER
:消费失败,稍后再消费
Consumer
消费消息失败后,RocketMq
会提供一种重试机制,令消息再消费一次。Consumer
消费消息失败通常可以认为有以下几种情况:
由于消息本身的原因,例如反序列化失败,消息数据本身无法处理(例如话费充值,当前消息的手机号被注销,无法充值)等。这种错误通常需要跳过这条消息,再消费其它消息,而这条失败的消息即使立刻重试消费,99%也不成功,所以最好提供一种定时重试机制,即过10秒后再重试。 由于依赖的下游应用服务不可用,例如db连接不可用,外系统网络不可达等。遇到这种错误,即使跳过当前失败的消息,消费其他消息同样也会报错。这种情况建议应用sleep 30s,再消费下一条消息,这样可以减轻Broker重试消息的压力。
RocketMQ会为每个消费组都设置一个Topic名称为%RETRY%+consumerGroup
的重试队列(这里需要注意的是,这个Topic的重试队列是针对消费组,而不是针对每个Topic设置的),用于暂时保存因为各种异常而导致Consumer端无法消费的消息。考虑到异常恢复起来需要一些时间,会为重试队列设置多个重试级别,每个重试级别都有与之对应的重新投递延时,重试次数越多投递延时就越大。RocketMQ对于重试消息的处理是先保存至Topic名称为SCHEDULE_TOPIC_XXXX
的延迟队列中,后台定时任务按照对应的时间进行Delay后重新保存至%RETRY%+consumerGroup
的重试队列中。
负载均衡
producer 负载均衡
Producer端在发送消息的时候,会先根据Topic找到指定的TopicPublishInfo
,在获取了TopicPublishInfo
路由信息后,RocketMQ的客户端在默认方式下selectOneMessageQueue()
方法会从TopicPublishInfo
中的messageQueueList
中选择一个队列(MessageQueue)进行发送消息。具体的容错策略均在MQFaultStrategy
这个类中定义。
这里有一个sendLatencyFaultEnable
开关变量,如果开启,在随机递增取模的基础上,再过滤掉not available
的Broker代理。所谓的"latencyFaultTolerance",是指对之前失败的,按一定的时间做退避。例如,如果上次请求的latency超过550Lms,就退避3000Lms;超过1000L,就退避60000L;如果关闭,采用随机递增取模的方式选择一个队列(MessageQueue
)来发送消息,latencyFaultTolerance
机制是实现消息发送高可用的核心关键所在。
consumer 负载均衡
在RocketMQ中,Consumer端的两种消费模式(Push/Pull)都是基于拉模式来获取消息的,而在Push模式只是对pull模式的一种封装,其本质实现为消息拉取线程在从服务器拉取到一批消息后,然后提交到消息消费线程池后,又“马不停蹄”的继续向服务器再次尝试拉取消息。如果未拉取到消息,则延迟一下又继续拉取。在两种基于拉模式的消费方式(Push/Pull)中,均需要Consumer端在知道从Broker端的哪一个消息队列—队列中去获取消息。因此,有必要在Consumer端来做负载均衡,即Broker端中多个MessageQueue分配给同一个ConsumerGroup中的哪些Consumer消费。
Consumer端的心跳包发送
在Consumer启动后,它就会通过定时任务不断地向RocketMQ集群中的所有Broker实例发送心跳包(其中包含了,消息消费分组名称、订阅关系集合、消息通信模式和客户端id的值等信息)。Broker端在收到Consumer的心跳消息后,会将它维护在ConsumerManager的本地缓存变量—consumerTable,同时并将封装后的客户端网络通道信息保存在本地缓存变量—channelInfoTable中,为之后做Consumer端的负载均衡提供可以依据的元数据信息。
Consumer端实现负载均衡的核心类—RebalanceImpl
在Consumer实例的启动流程中的启动MQClientInstance实例部分,会完成负载均衡服务线程—RebalanceService的启动(每隔20s执行一次)。通过查看源码可以发现,RebalanceService线程的run()方法最终调用的是RebalanceImpl类的rebalanceByTopic()方法,该方法是实现Consumer端负载均衡的核心。这里,rebalanceByTopic()方法会根据消费者通信类型为“广播模式”还是“集群模式”做不同的逻辑处理。这里主要来看下集群模式下的主要处理流程:
(1) 从rebalanceImpl实例的本地缓存变量—topicSubscribeInfoTable中,获取该Topic主题下的消息消费队列集合(mqSet);
(2) 根据topic和consumerGroup为参数调用mQClientFactory.findConsumerIdList()方法向Broker端发送获取该消费组下消费者Id列表的RPC通信请求(Broker端基于前面Consumer端上报的心跳包数据而构建的consumerTable做出响应返回,业务请求码:GET_CONSUMER_LIST_BY_GROUP);
(3) 先对Topic下的消息消费队列、消费者Id排序,然后用消息队列分配策略算法(默认为:消息队列的平均分配算法),计算出待拉取的消息队列。这里的平均分配算法,类似于分页的算法,将所有MessageQueue排好序类似于记录,将所有消费端Consumer排好序类似页数,并求出每一页需要包含的平均size和每个页面记录的范围range,最后遍历整个range而计算出当前Consumer端应该分配到的记录(这里即为:MessageQueue)。
(4) 然后,调用updateProcessQueueTableInRebalance()方法,具体的做法是,先将分配到的消息队列集合(mqSet)与processQueueTable做一个过滤比对。
上图中processQueueTable标注的红色部分,表示与分配到的消息队列集合mqSet互不包含。将这些队列设置Dropped属性为true,然后查看这些队列是否可以移除出processQueueTable缓存变量,这里具体执行removeUnnecessaryMessageQueue()方法,即每隔1s 查看是否可以获取当前消费处理队列的锁,拿到的话返回true。如果等待1s后,仍然拿不到当前消费处理队列的锁则返回false。如果返回true,则从processQueueTable缓存变量中移除对应的Entry;
上图中processQueueTable的绿色部分,表示与分配到的消息队列集合mqSet的交集。判断该ProcessQueue是否已经过期了,在Pull模式的不用管,如果是Push模式的,设置Dropped属性为true,并且调用removeUnnecessaryMessageQueue()方法,像上面一样尝试移除Entry;
最后,为过滤后的消息队列集合(mqSet)中的每个MessageQueue创建一个ProcessQueue对象并存入RebalanceImpl的processQueueTable队列中(其中调用RebalanceImpl实例的computePullFromWhere(MessageQueue mq)方法获取该MessageQueue对象的下一个进度消费值offset,随后填充至接下来要创建的pullRequest对象属性中),并创建拉取请求对象—pullRequest添加到拉取列表—pullRequestList中,最后执行dispatchPullRequest()方法,将Pull消息的请求对象PullRequest依次放入PullMessageService服务线程的阻塞队列pullRequestQueue中,待该服务线程取出后向Broker端发起Pull消息的请求。其中,可以重点对比下,RebalancePushImpl和RebalancePullImpl两个实现类的dispatchPullRequest()方法不同,RebalancePullImpl类里面的该方法为空,这样子也就回答了上一篇中最后的那道思考题了。
消息消费队列在同一消费组不同消费者之间的负载均衡,其核心设计理念是在一个消息消费队列在同一时间只允许被同一消费组内的一个消费者消费,一个消息消费者能同时消费多个消息队列。
广播模式与集群模式
广播模式
广播模式下,同一条消息会被同一consumerGroup
下的每个consumer
消费
如图,同一topic
下有3个MessageQueue
,且有一个consumerGroup
,组内有两个consumer
,在广播模式下,consumer1
与consumer2
都会消费MessageQueue1
、MessageQueue2
与 MessageQueue3
的消息。
集群模式
集群模式下,同一条消息只会被同一consumerGroup
下的一个consumer
消费
如图所示,同一topic
下有3个MessageQueue
,有两个consumerGroup
去消费这3个MessageQueue
上的消息,consumerGroup1
中的consumer1
会消息MessageQueue1
、MessageQueue2
上的消息,consumerGroup1
中的consumer2
会消息MessageQueue3
上的消息;consumerGroup2
中的consumer1
会消息MessageQueue1
、MessageQueue2
上的消息,consumerGroup2
中的consumer2
会消息MessageQueue3
上的消息。
顺序消息
消息有序指的是一类消息消费时,能按照发送的顺序来消费。例如:一个订单产生了三条消息分别是订单创建、订单付款、订单完成。消费时要按照这个顺序消费才能有意义,但是同时订单之间是可以并行消费的。
RocketMQ可以严格的保证消息有序。
顺序消息分为全局顺序消息与分区顺序消息,全局顺序是指某个Topic下的所有消息都要保证顺序;部分顺序消息只要保证每一组消息被顺序消费即可。
全局顺序:对于指定的一个 Topic,所有消息按照严格的先入先出(FIFO)的顺序进行发布和消费。适用场景:性能要求不高,所有的消息严格按照 FIFO 原则进行消息发布和消费的场景 分区顺序:对于指定的一个 Topic,所有消息根据 sharding key 进行区块分区。同一个分区内的消息按照严格的 FIFO 顺序进行发布和消费。Sharding key 是顺序消息中用来区分不同分区的关键字段,和普通消息的 Key 是完全不同的概念。适用场景:性能要求高,以 sharding key 作为分区字段,在同一个区块中严格的按照 FIFO 原则进行消息发布和消费的场景。
事务消息
上图说明了事务消息的大致方案,其中分为两个流程:正常事务消息的发送及提交、事务消息的补偿流程。
事务消息发送及提交:
发送消息(half消息)。 服务端响应消息写入结果。 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)
补偿流程:
对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查” Producer收到回查消息,检查回查消息对应的本地事务的状态 根据本地事务状态,重新Commit或者Rollback
其中,补偿阶段用于解决消息Commit
或者Rollback
发生超时或者失败的情况。
延迟消息
rocketmq
在实现延迟消息时,默认18个延迟级别,这些级别对应的延迟时间如下:
1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 | 11 | 12 | 13 | 14 | 15 | 16 | 17 | 18 |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
1s | 5s | 10s | 30s | 1m | 2m | 3m | 4m | 5m | 6m | 7m | 8m | 9m | 10m | 20m | 30m | 1h | 2h |
发消息时,设置delayLevel等级即可:msg.setDelayLevel(level)。level有以下三种情况:
level == 0,消息为非延迟消息 1<=level<=maxLevel,消息延迟特定时间,例如level==1,延迟1s level > maxLevel,则level== maxLevel,例如level==20,延迟2h
定时消息会暂存在名为SCHEDULE_TOPIC_XXXX的topic中,并根据delayTimeLevel存入特定的queue,queueId = delayTimeLevel – 1,即一个queue只存相同延迟的消息,保证具有相同发送延迟的消息能够顺序消费。broker会调度地消费SCHEDULE_TOPIC_XXXX,将消息写入真实的topic。
消息过滤
RocketMQ分布式消息队列的消息过滤方式有别于其它MQ中间件,是在Consumer端订阅消息时再做消息过滤的。RocketMQ这么做是在于其Producer端写入消息和Consumer端订阅消息采用分离存储的机制来实现的,Consumer端订阅消息是需要通过ConsumeQueue这个消息消费的逻辑队列拿到一个索引,然后再从CommitLog里面读取真正的消息实体内容,所以说到底也是还绕不开其存储结构。其ConsumeQueue的存储结构如下,可以看到其中有8个字节存储的Message Tag的哈希值,基于Tag的消息过滤正式基于这个字段值的。
主要支持如下2种的过滤方式
Tag过滤方式:Consumer端在订阅消息时除了指定Topic还可以指定TAG,如果一个消息有多个TAG,可以用||分隔。其中,Consumer端会将这个订阅请求构建成一个 SubscriptionData,发送一个Pull消息的请求给Broker端。Broker端从RocketMQ的文件存储层—Store读取数据之前,会用这些数据先构建一个MessageFilter,然后传给Store。Store从 ConsumeQueue读取到一条记录后,会用它记录的消息tag hash值去做过滤,由于在服务端只是根据hashcode进行判断,无法精确对tag原始字符串进行过滤,故在消息消费端拉取到消息后,还需要对消息的原始tag字符串进行比对,如果不同,则丢弃该消息,不进行消息消费。
SQL92的过滤方式:这种方式的大致做法和上面的Tag过滤方式一样,只是在Store层的具体过滤过程不太一样,真正的 SQL expression 的构建和执行由rocketmq-filter模块负责的。每次过滤都去执行SQL表达式会影响效率,所以RocketMQ使用了BloomFilter避免了每次都去执行。SQL92的表达式上下文为消息的属性。
参考:
https://github.com/apache/rocketmq/blob/master/docs/cn/design.md。 https://github.com/apache/rocketmq/blob/rocketmq-all-4.8.0-LEARN/docs/cn/design.md
限于作者个人水平,文中难免有错误之处,欢迎指正!原创不易,商业转载请联系作者获得授权,非商业转载请注明出处。
本文首发于微信公众号 「Java技术探秘」,如果您喜欢本文,欢迎关注该公众号,让我们一起在技术的世界里探秘吧!