遇到了消息堆积,但是问题不大
共 2838字,需浏览 6分钟
·
2021-11-04 14:30
这一篇我们要说的话题是消息的堆积处理,其实这个话题还是挺大的,因为消息堆积还是真的很令人头疼的,当堆积的量很大的时候,这真的是个很暴躁的问题,不过这时候真考验大家冷静的处理问题的能力了
我们一起来分析分析有关问题吧
大量的消息堆积在MQ中几个小时还没解决怎么办呢
一般这种比较着急的问题,最好的办法就是临时扩容,用更快的速度来消费数据
1、临时建立一个新的Topic,然后调整queue的数量为原来的10倍或者20倍,根据堆积情况来决定
2、然后写一个临时分发消息的consumer程序,这个程序部署上去消费积压的消息,消费的就是刚刚新建的Topic,消费之后不做耗时的处理,只需要直接均匀的轮询将这些消息轮询的写入到临时创建的queue里面即可
3、然后增加相应倍数的机器来部署真正的consumer消费,注意这里的Topic,然后让这些consumer去真正的消费这些临时的queue里面的消息
不知道大家明白没有,很简单的道理,我给大家举个形象的例子
一个topic堵住了,新建一个topic去进行分流,临时将queue资源和consumer资源扩大10倍,将消息平均分配到这些新增的queue资源和consumer资源上,以正常10倍的速度来消费消息,等到这些堆积的消息消费完了,便可以恢复到原来的部署架构
这种只是用于临时解决一些异常情况导致的消息堆积的处理,如果消息经常出现堵塞的情况,那该考虑一下彻底增强系统的部署架构了
消息设置了过期时间,过期就丢了怎么办呢
在rabbitmq中,可以设置过期时间TTL,和Redis的过期时间一样,如果消息在queue中积压超过一定时间就会被rabbitmq清理掉,这个数据就没了
这样可能会造成大量的数据丢失
这种情况下上面的解决方案就不太合适了,可以采取批量重导的方案来解决,在系统流量比较低的时候,用程序去查询丢失的这部分数据,然后将消息重新发送到MQ中,把丢失的数据重新补回来
这也算是一种补偿任务吧,补偿任务一般是用于对定时跑批的一种补偿
分析下RocketMQ中的消息堆积原因
消息的堆积归根到底就是生产者生产消息的速度和消费者消费的速度不匹配导致的,输入的和消费的速度不统一
或许是突然搞了一波促销,系统业务量暴增,导致生产者发消息暴增,消费速度跟不上
也有可能是消费方出现失败的情况,疯狂重试,也或者就是消费方的消费能力太低了
RocketMQ是按照队列进行消息负载的,如果consumer中的一台机器由于硬件各方面原因导致该机器上的消息队列不能及时处理,就会造成整个消息队列的堆积
RocketMQ分为发布方和订阅方,双方都有负载均衡策略,默认都是采用平均分配,producer消息以轮询方式发送到消息队列queue中,broker将这些的queue再平均分配到属于同一个group id的订阅方集群
.如果消费者consumer机器数量和消息队列相等,则消息队列平均分配到每一个consumer上 如果consumer数量大于消息队列数量,则超出消息队列数量的机器没有可以处理的消息队列 若消息队列数量不是consumer的整数倍,则部分consumer会承担跟多的消息队列的消费任务
那说一下解决思路吧
RocketMQ中消费完的消息去了哪里呢
CommitLog文件什么时候进行清除
消息文件过期(默认72小时),且到达清理时点(默认是凌晨4点),删除过期文件。 消息文件过期(默认72小时),且磁盘空间达到了水位线(默认75%),删除过期文件。 磁盘已经达到必须释放的上限(85%水位线)的时候,则开始批量清理文件(无论是否过期),直到空间充足。
为什么这么设计呢
有道无术,术可成;有术无道,止于术
欢迎大家关注Java之道公众号
好文章,我在看❤️