RabbitMQ 线上事故!慌的一批,脑袋一片空白
共 6375字,需浏览 13分钟
·
2020-11-11 04:15
阅读本文大概需要 7 分钟。
来自:不一样的科技宅
前言
事故经过
ready
状态,还有几百条unacked
的消息。unacked
的消息全部变成ready
,但是没过多久又有几百条unacked
的消息了,这个就很明显了能消费,没有进行ack
呀。ack
,让运维老哥检查了一下,发现网络没问题。现在看是真的是傻,网络有问题连接都连不上。由于确定的是无法ack
造成的,立马将ack模式
由原来的manual
改成auto
紧急发布。将所有的节点升级好以后,发现推送正常了。镜像队列
,立即将这台有问题的MQ从集群中移除。直接进行重置,然后加入回集群。这事情算是告一段落了。此时已经接近24:00了。bug
修了紧急发布。吐槽一波公司的ELK,压根就没有收集到这个报错信息,导致我没有及时发现。
事故重现-队列阻塞
MQ配置
spring:
# 消息队列
rabbitmq:
host: 10.0.0.53
username: guest
password: guest
virtual-host: local
port: 5672
# 消息发送确认
publisher-confirm-type: correlated
# 开启发送失败退回
publisher-returns: true
listener:
simple:
# 消费端最小并发数
concurrency: 1
# 消费端最大并发数
max-concurrency: 5
# 一次请求中预处理的消息数量
prefetch: 2
# 手动应答
acknowledge-mode: manual
问题代码
@RabbitListener(queues = ORDER_QUEUE)
public void receiveOrder(@Payload String encryptOrderDto,
@Headers Mapheaders, throws Exception {
Channel channel)
// 解密和解析
String decryptOrderDto = EncryptUtil.decryptByAes(encryptOrderDto);
OrderDto orderDto = JSON.parseObject(decryptOrderDto, OrderDto.class);
try {
// 模拟推送
pushMsg(orderDto);
}catch (Exception e){
log.error("推送失败-错误信息:{},消息内容:{}", e.getLocalizedMessage(), JSON.toJSONString(orderDto));
}finally {
// 消息签收
channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG),false);
}
}
json
串,然后再使用AES
进行加密,所以这边需要,先进行解密然后在进行解析。才能得到订单数据。失败重发
机制,防止消息丢失,不巧的是重发的时候没有对订单数据进行加密。这就导致推送系统,在解密的时候出异常,从而无法进行ack
。默默的吐槽一句:人在家中坐,锅从天上来。
模拟推送
curl http://localhost:8080/sendMsg/3
curl http://localhost:8080/sendErrorMsg/1
curl http://localhost:8080/sendMsg/3
unacked
的消息。curl http://localhost:8080/sendErrorMsg/1
curl http://localhost:8080/sendMsg/3
RabbitMQ
管控台也可以看到,刚刚发送的的3条消息处于ready
状态。这个时候就如果一直有消息进入,都会堆积在队里里面无法被消费。curl http://localhost:8080/sendMsg/3
分析原因
ack
导致队里阻塞。那么问题来了,这是为什么呢?其实这是RabbitMQ
的一种保护机制。防止当消息激增的时候,海量的消息进入consumer
而引发consumer
宕机。PrefetchCount
实现。consumer
前面加了一个缓冲容器,容器能容纳最大的消息数量就是PrefetchCount
。如果容器没有满RabbitMQ
就会将消息投递到容器内,如果满了就不投递了。当consumer
对消息进行ack
以后就会将此消息移除,从而放入新的消息。listener:
simple:
# 消费端最小并发数
concurrency: 1
# 消费端最大并发数
max-concurrency: 5
# 一次处理的消息数量
prefetch: 2
# 手动应答
acknowledge-mode: manual
prefetch参数就是PrefetchCount
prefetch
我只配置了2,并且concurrency
配置的只有1,所以当我发送了2条错误消息以后,由于解密失败这2条消息一直没有被ack
。将缓冲区沾满了,这个时候RabbitMQ
认为这个consumer
已经没有消费能力了就不继续给它推送消息了,所以就造成了队列阻塞。判断队列是否有阻塞的风险。
ack
模式为manual
,并且线上出现了unacked
消息,这个时候不用慌。由于QOS是限制信道channel
上的消费者所能保持的最大未确认的数量。所以允许出现unacked
的数量可以通过channelCount * prefetchCount * 节点数量
得出。channlCount
就是由concurrency
,max-concurrency
决定的。
min
=concurrency * prefetch * 节点数量
max
=max-concurrency * prefetch * 节点数量
unacked_msg_count
<min
队列不会阻塞。但需要及时处理unacked
的消息。unacked_msg_count
>=min
可能会出现堵塞。unacked_msg_count
>=max
队列一定阻塞。
处理方法
try catch
中就解决了这样不管解密正常与否,消息都会被签收。如果出错将会输出错误日志,让开发人员进行处理了。对于这个就需要有日志监控系统,来及时告警了。
@RabbitListener(queues = ORDER_QUEUE)
public void receiveOrder(@Payload String encryptOrderDto,
@Headers Mapheaders, throws Exception {
Channel channel)
try {
// 解密和解析
String decryptOrderDto = EncryptUtil.decryptByAes(encryptOrderDto);
OrderDto orderDto = JSON.parseObject(decryptOrderDto, OrderDto.class);
// 模拟推送
pushMsg(orderDto);
}catch (Exception e){
log.error("推送失败-错误信息:{},消息内容:{}", e.getLocalizedMessage(), encryptOrderDto);
}finally {
// 消息签收
channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG),false);
}
}
注意的点
unacked
的消息在consumer
切断连接后(重启),会自动回到队头。事故重现-磁盘占用飙升
ack
所以将ack
模式改成auto
自动,紧急升级了,这样不管正常与否,消息都会被签收,所以在当时确实是解决了问题。ack
模式改成auto
自动,这样会使QOS不生效。会出现大量消息涌入consumer
从而造成consumer
宕机,可以是因为当时在晚上,交易比较少,并且推送系统有多个节点,才没出现问题。问题代码
@RabbitListener(queues = ORDER_QUEUE)
public void receiveOrder(@Payload String encryptOrderDto,
@Headers Mapheaders, throws Exception {
Channel channel)
// 解密和解析
String decryptOrderDto = EncryptUtil.decryptByAes(encryptOrderDto);
OrderDto orderDto = JSON.parseObject(decryptOrderDto, OrderDto.class);
try {
// 模拟推送
pushMsg(orderDto);
}catch (Exception e){
log.error("推送失败-错误信息:{},消息内容:{}", e.getLocalizedMessage(), encryptOrderDto);
}finally {
// 消息签收
channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG),false);
}
}
listener:
simple:
# 消费端最小并发数
concurrency: 1
# 消费端最大并发数
max-concurrency: 5
# 一次处理的消息数量
prefetch: 2
# 手动应答
acknowledge-mode: auto
curl http://localhost:8080/sendErrorMsg/1
原因
RabbitMQ
消息监听程序异常时,consumer
会向rabbitmq server
发送Basic.Reject
,表示消息拒绝接受,由于Spring
默认requeue-rejected
配置为true
,消息会重新入队,然后rabbitmq server
重新投递。就相当于死循环了,所以控制台在疯狂刷错误日志造成磁盘利用率飙升的原因。解决方法
default-requeue-rejected: false
即可。总结
个人建议,生产环境不建议使用自动ack,这样会QOS无法生效。 在使用手动ack的时候,需要非常注意消息签收。 其实在将有问题的MQ重置时,是将错误的消息给清除才没有问题了,相当于是消息丢失了。
try {
// 业务逻辑。
}catch (Exception e){
// 输出错误日志。
}finally {
// 消息签收。
}
参考资料
RabbitMQ消息监听异常问题探究
代码地址
https://gitee.com/huangxunhui/rabbitmq_accdient.git
结尾
推荐阅读:
微信扫描二维码,关注我的公众号
朕已阅