RocketMQ 消息丢失场景分析及如何解决!
共 3027字,需浏览 7分钟
·
2020-09-19 11:02
阅读本文大概需要 4 分钟。
来自:https://blog.csdn.net/LO_YUN/article/details/103949317
既然在项目中使用了MQ,那么就不可避免的需要考虑消息丢失问题。在一些涉及到了金钱交易的场景下,消息丢失还是很致命的。那么在RocketMQ中存在哪几种消息丢失的场景呢?
先来一张最简单的消费流程图:
生产者产生消息发送给RocketMQ RocketMQ接收到了消息之后,必然需要存到磁盘中,否则断电或宕机之后会造成数据的丢失 消费者从RocketMQ中获取消息消费,消费成功之后,整个流程结束
RocketMQ为了减少磁盘的IO,会先将消息写入到os cache中,而不是直接写入到磁盘中,消费者从os cache中获取消息类似于直接从内存中获取消息,速度更快,过一段时间会由os线程异步的将消息刷入磁盘中,此时才算真正完成了消息的持久化。在这个过程中,如果消息还没有完成异步刷盘,RocketMQ中的Broker宕机的话,就会导致消息丢失 如果消息已经被刷入了磁盘中,但是数据没有做任何备份,一旦磁盘损坏,那么消息也会丢失
首先生产者发送half消息到RocketMQ中,此时消费者是无法消费half消息的,若half消息就发送失败了,则执行相应的回滚逻辑 half消息发送成功之后,且RocketMQ返回成功响应,则执行生产者的核心链路 如果生产者自己的核心链路执行失败,则回滚,并通知RocketMQ删除half消息 如果生产者的核心链路执行成功,则通知RocketMQ commit half消息,让消费者可以消费这条数据
https://blog.csdn.net/LO_YUN/article/details/101673893
//注册消息监听器处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(Listmsgs, ConsumeConcurrentlyContext context) {
//对消息进行处理
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//注册消息监听器处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(Listmsgs, ConsumeConcurrentlyContext context) {
//开启子线程异步处理消息
new Thread() {
public void run() {
//对消息进行处理
}
}.start();
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
使用事务机制传输消息,会比普通的消息传输多出很多步骤,耗费性能 同步刷盘相比异步刷盘,一个是存储在磁盘中,一个存储在内存中,速度完全不是一个数量级 主从机构的话,需要Leader将数据同步给Follower 消费时无法异步消费,只能等待消费完成再通知RocketMQ消费完成
推荐阅读:
微信扫描二维码,关注我的公众号
朕已阅