你必须知道的消息的推拉机制
共 3666字,需浏览 8分钟
·
2021-11-01 18:36
大家好,我是Captain
我们在之前也说了不少RocketMQ的知识点了,这一篇要说的是RocketMQ的消息的推拉机制,这个应该也是属于面试的热点,学起来吧
先捞一捞之前的文章
我们下面要说的推拉模式指的是broker和consumer之间的,producer和broker之间的模式是推的模式,也就是每次producer每次生产了消息,会主动推给broker
其实这个大家也应该好理解,如果producer和broker之间交互用broker来拉取,就会怪怪的,每次消息都要存储到producer的本地,然后等待broker来拉取,这个要取决于多个producer的可靠性,显然这种设计是很糟糕的
我们下面要讨论的是broker和consumer之间的交互是推还是拉,大家也可以自己先思考下到底是推还是拉
说一下推模式以及优缺点
推模式指的是broker将消息推向Consumer,也就是Consumer是被动的去接收这个消息,broker来将消息主动的去推给Consumer
那么这种模式的优缺点呢,大家可以想一下
很明显的一个优点就是延迟小,实时性比较好,broker接收到消息之后就会立刻推送到Consumer,实时性相对来说是比较高的
还有一个优点其实就是简化了Consumer端的逻辑,消费端不需要自己去处理这个拉取的逻辑,只需要监听这个消息的topic,然后去专心的处理这个消息的业务逻辑即可
上面说的两点是优点,那么有优点就肯定也会伴随相应的缺点
第二点简化了Consumer消费端的逻辑的同时,也就复杂化了broker端的逻辑,这其实也不算是优点或者缺点吧,算是这个模式的一个特点,需要根据场景来选择自己合适的模式
最大的一个缺点就是推送的速率和消费的速率不好去匹配,这样就是很糟糕的,你想,如果broker拿到消息就推给Consumer,不在乎Consumer的消费能力如何,就往Consumer直接扔,那Consumer有可能会崩溃
就像一个生产线,本来只能接收的最大速度是10立方米每秒,结果呢,你每秒往生产线上扔100立方米每秒,那这个生产线可能就因为无法处理而直接崩盘
当推送速率很快的时候,甚至都像DDos的攻击一样,消费者就更难受了,不同的消费者的消费速率也是不一样的,broker也很难平衡每个消费者的速率,如果broker需要记住每个Consumer的消费能力和速度的话,那broker的复杂度可就直线上升
还以一个缺点就是消费者推出去之后,无法保证消息发送成功,push采用的是广播模式,也就是只有服务端和客户端都在同一个频道的时候,推模式才可以成功的将消息推到消费者
分析一下拉模式以及优缺点
拉模式,也是同样的道理,就是Consumer是主动从broker拉取消息,哎,这次我Consumer主动了,我不需要你来喂我了,我每过一段时间去你那里拿消息就好了,你也别在乎我的消费速率了
咋回事知道了,想想这样的优缺点,知道了优缺点就对这个模式肯定了解的八九不离十了
最大的优点就是主动权掌握在Consumer这边了,每个消费者的消费能力可能不一样,消费者可以根据自身的情况来拉取消息的请求,如果消费者真的出现那种忙不过来的情况下,可以根据一定的策略去暂停拉取
服务端也相对来说轻松了,不需要去进行消息的处理逻辑了,你来了我就给你就好了,你要多少我就给你就好了,broker就是一个没得感情的存储机器
拉模式也更适合批量消息的发送,推模式是来一个消息就推一个,当然也可以缓存一部分消息再推送,但是无法确定Consumer是否能够处理这批推送的消息,拉模式则是Consumer主动来告诉broker,这样broker也可以更好的决定缓存多少消息用于批量发送
说完了优点,就需要说缺点了,拉模式需要Consumer对于服务端有一定的了解,主要的缺点就是实时性较差,针对于服务器端的实时更新的信息,客户端还是难以获取实时的信息
毕竟消费者是去拉取消息,消费者怎么知道消息到了呢,所以消费者能做的就是不断的去拉取,但是又不能频繁的去拉取,这样也耗费性能,因此就必须降低请求的频率,请求间隔时间也就意味着消息的延迟
RocketMQ最终的选择呢,为什么是拉模式
RocketMQ最终决定的拉模式,kafka也是如此
RocketMQ的使用的拉模式的使用特点
自己维护offsetStore:用户需要自己保存消费者组的offset,比如存入Redis,或者调用MQ接口将其保存到broker端
自主选择MessageQueue和offset进行消息拉取,用户拉取消息的时候,用户自己决定拉取哪个队列从哪个offset开始,拉取多少消息
为什么拉模式稍微更合适些呢,现在的消息队列都有持久化消息的需求,削峰主要就是靠持久化来削的,也就是本身需要有个存储的功能,它的使命就是接受消息,保存好消息,然后等着消费者来拉取就好了
消费者也是各种各样,消费者的能力也是参差不齐,所以broker不能和Consumer有太多依赖
拉模式也是有缺点的,上面我们也说过了,最大的缺点就是实时性比较差,所以RocketMQ也尽力的去操作减轻这些缺点
broker来消息的时候,broker会去提醒Consumer来消息了,需要来拉取消息了,总之,就是broke和Consumer相互打配合,下面会详细说
RocketMQ是如何实现拉模式的
拉模式指的是Consumer主动去找broker拉取消息,拉取模式分为普通轮询和长轮询两种方式
1、普通轮询也是比较简单的,就是定时发起请求,服务端收到请求之后无论是否有数据更新,都立即回复,也是属于比较好理解的,实现起来也是比较简单的,缺点呢,就是broker比较被动,需要不断的处理客户端连接的,就是服务端属于一种有求必应的方式
2、长轮询就是属于对普通轮询的一种优化,当然也是Consumer向服务端发起请求,而服务端收到后不会立即去响应,而是hold住客户端连接,等待数据产生变更之后才会回复客户端,或者超过指定时间还未产生变更
其实说白了,就是对普通轮询进行一定程度的限制,客户端可以随时请求服务端,但是我并不一定立即回复你
RocketMQ就是使用长轮询来实现拉模式,Consumer发起pull请求之后,broker在处理请求拉取消息的时候,如果没有查询到消息则不会回复消费者任何消息,而是等待触发通知消费者的这个事件
这里会有两种触发事件的条件:
1、DefaultMessageStore.ReputMessageService.run,一个定时任务,1毫秒一次,不断的检查是否有消息的产生,如果检测到了,就会通知消费者,将新消息发送给消费者
2、PullRequestHoldService.run也是定时任务,5秒一次,该任务会逐个的检查其中的请求,判断是否有对应的新消息产生,如果有直接返回消费者,没有就检查该请求是否超过默认的长轮询等待时间(默认15秒),如果超出,则返回消费者
那pushConsumer怎么说?
RocketMQ中的PushConsumer其实底层也是拉模式实现的,只是一层披着拉模式的狼而已
因为RocketMQ后台有个ReblanceService线程会自己偷偷的去找broker请求数据,这个线程会根据topic的队列数量和当前的消费组的消费者个数进行负载均衡,每个队列产生的请求都会放入到阻塞队列中
然后有一个PullMessageService线程不断的从该阻塞队列中获取请求,然后通过网络请求broker,这样算是实现了一种准实时的拉取消息
源码在PullMessageProcessor里面的processRequest方法,用来处理拉消息的请求,有消息返回,没有消息就进入了上述说的长轮询过程,这部分源码我就不截了,大家感兴趣的可以去研究研究
佛系求关注
Captain希望有一天能够靠写作养活自己,现在还在磨练,这个时间可能会持续很久,但是,请看我漂亮的坚持
感谢大家能够做我最初的读者和传播者,请大家相信,只要你给我一份爱,我终究会还你们一页情的。
Captain会持续更新技术文章,和生活中的暴躁文章,欢迎大家关注【Java贼船】,成为船长的学习小伙伴,和船长一起乘千里风、破万里浪
哦对了,后续所有的远程文章都会更新到这里
https://github.com/DayuMM2021/Java