《RabbitMQ》如何保证消息不被重复消费
字节传说
共 3447字,需浏览 7分钟
· 2020-08-06
一 重复消息
为什么会出现消息重复?消息重复的原因有两个:1.生产时消息重复,2.消费时消息重复。
1.1 生产时消息重复
由于生产者发送消息给MQ,在MQ确认的时候出现了网络波动,生产者没有收到确认,实际上MQ已经接收到了消息。这时候生产者就会重新发送一遍这条消息。
生产者中如果消息未被确认,或确认失败,我们可以使用定时任务+(redis/db)来进行消息重试。
@Component
@Slf4J
public class SendMessage {
@Autowired
private MessageService messageService;
@Autowired
private RabbitTemplate rabbitTemplate;
// 最大投递次数
private static final int MAX_TRY_COUNT = 3;
/**
* 每30s拉取投递失败的消息, 重新投递
*/
@Scheduled(cron = "0/30 * * * * ?")
public void resend() {
log.info("开始执行定时任务(重新投递消息)");
List msgLogs = messageService.selectTimeoutMsg();
msgLogs.forEach(msgLog -> {
String msgId = msgLog.getMsgId();
if (msgLog.getTryCount() >= MAX_TRY_COUNT) {
messageService.updateStatus(msgId, Constant.MsgLogStatus.DELIVER_FAIL);
log.info("超过最大重试次数, 消息投递失败, msgId: {}", msgId);
} else {
messageService.updateTryCount(msgId, msgLog.getNextTryTime());// 投递次数+1
CorrelationData correlationData = new CorrelationData(msgId);
rabbitTemplate.convertAndSend(msgLog.getExchange(), msgLog.getRoutingKey(), MessageHelper.objToMsg(msgLog.getMsg()), correlationData);// 重新投递
log.info("第 " + (msgLog.getTryCount() + 1) + " 次重新投递消息");
}
});
log.info("定时任务执行结束(重新投递消息)");
}
}
1.2消费时消息重复
消费者消费成功后,再给MQ确认的时候出现了网络波动,MQ没有接收到确认,为了保证消息被消费,MQ就会继续给消费者投递之前的消息。这时候消费者就接收到了两条一样的消息。
修改消费者,模拟异常
@RabbitListener(queuesToDeclare = @Queue(value = "javatrip", durable = "true"))
public void receive(String message, @Headers Map headers, Channel channel) throws Exception{
System.out.println("重试"+System.currentTimeMillis());
System.out.println(message);
int i = 1 / 0;
}
配置yml重试策略
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 开启消费者进行重试
max-attempts: 5 # 最大重试次数
initial-interval: 3000 # 重试时间间隔
由于重复消息是由于网络原因造成的,因此不可避免重复消息。但是我们需要保证消息的幂等性。
二 如何保证消息幂等性
让每个消息携带一个全局的唯一ID,即可保证消息的幂等性,具体消费过程为:
消费者获取到消息后先根据id去查询redis/db是否存在该消息。
如果不存在,则正常消费,消费完毕后写入redis/db。
如果存在,则证明消息被消费过,直接丢弃。
生产者
@PostMapping("/send")
public void sendMessage(){
JSONObject jsonObject = new JSONObject();
jsonObject.put("message","Java旅途");
String json = jsonObject.toJSONString();
Message message = MessageBuilder.withBody(json.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("UTF-8").setMessageId(UUID.randomUUID()+"").build();
amqpTemplate.convertAndSend("javatrip",message);
}
消费者
@Component
@RabbitListener(queuesToDeclare = @Queue(value = "javatrip", durable = "true"))
public class Consumer {
@RabbitHandler
public void receiveMessage(Message message) throws Exception {
Jedis jedis = new Jedis("localhost", 6379);
String messageId = message.getMessageProperties().getMessageId();
String msg = new String(message.getBody(),"UTF-8");
System.out.println("接收到的消息为:"+msg+"==消息id为:"+messageId);
String messageIdRedis = jedis.get("messageId");
if(messageId == messageIdRedis){
return;
}
JSONObject jsonObject = JSONObject.parseObject(msg);
String email = jsonObject.getString("message");
jedis.set("messageId",messageId);
}
}
如果需要存入db的话,可以直接将这个ID设为消息的主键,下次如果获取到重复消息进行消费时,由于数据库主键的唯一性,则会直接抛出异常。
< END >往期精选☞《RabbitMQ》如何保证消息的可靠性☞ 一文搞懂TCP和UDP的区别☞ 程序员接私活的19个平台☞ Spring AOP实现原理☞ Spring IOC实现原理☞ Nginx超简单教程
评论
偷偷告诉你如何一台电脑开多个微信!
大家好,我是轩辕。前几天在粉丝群里,有人问我是怎么在一台电脑上同时登录两个微信的?正好之前写过一篇文章,分析过原理,分享给没看过的小伙伴学习一下。手机端多开微信估计很多人都知道,像华为、小米等手机系统都对此做了支持,不过在运行Windows系统的电脑上怎么启动两个微信呢?其实很简单,你只需要写一个批
编程技术宇宙
0
测试新人,如何快速上手一个陌生的系统!
大家好,我是狂师!作为刚入行不久的测试新人,面对一个陌生的系统时,可能会感到有些手足无措。面对一个全新的系统系统,如何快速上手并展开有效的测试工作是一个重要的挑战。本文将探讨测试新人如何通过一系列步骤和策略,快速熟悉并掌握新系统的测试要点,从而提高测试效率和质量。本文旨在为测试新手提供一份指导,帮助
测试开发技术
0
光纤详解:光纤跳线如何分类,多向单模转换?
本文来自“光纤详解:光纤跳线如何分类,多向单模转换?”,光纤跳线作为光网络布线最基础的元件之一,被广泛应用于光纤链路的搭建中。如今,光纤制造商根据应用场景的不同推出众多类型的光纤跳线,如MPO/LC/SC/FC/ST光纤跳线,单工/双工光纤跳线,单模/多模光纤跳线等,它们之间各有特色,且不可替代。本
架构师技术联盟
0
如何计算数据中心的冷却需求?
今日分享 【导读】数据中心的冷却要求受多种因素影响,包括设备的热量输出、占地面积、设施设计和电气系统功率额定值等等……众所周知,环境因素会严重影响数据中心设备。过多的热量积聚会损坏服务器,可能导致其自动关闭。经常在高于可接受的温度下运行服务器会缩短其使用
数据中心运维管理
0
5000w+ 的大表如何拆?亿级别大表拆分实战复盘
前言笔者是在两年前接手公司的财务系统的开发和维护工作。在系统移交的初期,笔者和团队就发现,系统内有一张5000W+的大表。跟踪代码发现,该表是用于存储资金流水的表格,关联着众多功能点,同时也有众多的下游系统在使用这张表的数据。进一步的观察发现,这张表还在以每月600W+的数据持续增长,也就是说,不超
码农编程进阶笔记
0
如何做到无感刷新Token?
来源:juejin.cn/post/7316797749517631515为什么需要无感刷新Token?自动刷新token前端token续约疑问及思考图片为什么需要无感刷新Token?「最近浏览到一个文章里面的提问,是这样的:」当我在系统页面上做业务操作的时候会出现突然闪退的情况,然后跳转到登录页面
Java专栏
2
BigDecimal 为什么可以保证精度不丢失?
来源:juejin.cn/post/7348709938023940136👉 欢迎加入小哈的星球 ,你将获得: 专属的项目实战 / Java 学习路线 / 一对一提问 / 学习打卡 / 赠书福利全栈前后端分离博客项目 2.0 版本完结啦, 演示链接
小哈学Java
0
【性能监控】如何有效监测网页静态资源大小?
前言作为前端人员肯定经常遇到这样的场景:需求刚上线,产品拿着手机来找你,为什么页面打开这么慢呀,心想自己开发的时候也有注意性能问题呀,不可能会这么夸张。那没办法只能排查下是哪一块影响了页面的整体性能,打开浏览器控制台一看,页面上的这些配图每张都非常大,心想这些配图都这么大,页面怎么快,那么我们有没有
高级前端进阶
0