RabbitMQ 死信队列 + TTL介绍
共 7806字,需浏览 16分钟
·
2021-03-15 09:25
点击上方蓝色字体,选择“标星公众号”
优质文章,第一时间送达
作者 | 认真对待世界的小白
来源 | urlify.cn/rimuy2
一、RabbitMQ的的死信队列+ TTL
1、什么是TTL
time to live 消息存活时间
如果消息在存活时间内未被消费,则会被清除
RabbitMQ支持两种ttl设置
单独消息进行配置ttl
整个队列进行配置ttl(居多)
2、什么是rabbitmq的死信队列
没有被及时消费的消息存放的队列
3、什么是rabbitmq的死信交换机
Dead Letter Exchange(死信交换机,缩写:DLX)当消息成为死信后,会被重新发送到另⼀个交换机,这个交换机就是DLX死信交换机。
4、消息有哪几种情况成为死信
消费者拒收消息(basic.reject/ basic.nack) ,并且没有重新入队 requeue=false
消息在队列中未被消费,且超过队列或者消息本身的过期时间TTL(time-to-live)
队列的消息长度达到极限
结果:消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列
二、RabbitMQ管控台消息TTL测试
队列过期时间使用参数,对整个队列消息统⼀过期
x-message-ttl
单位ms(毫秒)
消息过期时间使用参数(如果队列头部消息未过期,队列中级消息已经过期,消息还在队列里面)
expiration
)
两者都配置的话,时间短的先触发
1、RabbitMQ Web控制台测试
新建死信交换机(和普通没区别)
新建死信队列 (和普通没区别)
死信交换机和队列绑定
新建普通队列,设置过期时间、指定死信交换机
测试:直接web控制台往product_qeueu发送消息即可,会看到消息先是在product_qeueu队列停留10秒(因为没有消费者消费),然后该消息从product_qeueu移入到dead_queue。
三、RabbitMQ的延迟队列和应用场景
1、什么是延迟队列
⼀种带有延迟功能的消息队列, Producer 将消息发送到消息队列服务端,但并不期望这条消息立马投递,而是推迟到在当前时间点之后的某⼀个时间投递到Consumer进行消费,该消息即定时消息。
2、使用场景
通过消息触发⼀些定时任务,比如在某⼀固定时间点向用户发送提醒消息
用户登录之后5分钟给用户做分类推送、用户多少天未登录给用户做召回推送;
消息生产和消费有时间窗⼝要求:比如在天猫电商交易中超时未支付关闭订单的场景,在订单创建时会发送⼀条延时消息。这条消息将会在30分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。如支付未完成,则关闭订单。如已完成支付则忽略
四、实战
1、背景
JD、淘系、天猫、拼多多电商平台,规定新注册的商家,审核通过后需要在【规定时间】内上架商品,否则冻结账号。
2、代码开发
死信交换机和死信队列开发,topic交换机和队列开发,绑定死信交换机
package net.xdclass.xdclasssp.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitMQConfig {
//死信队列
public static final String LOCK_MERCHANT_DEAD_QUEUE = "lock_merchant_dead_queue";
//死信交换机
public static final String LOCK_MERCHANT_DEAD_EXCHANGE = "lock_merchant_dead_exchange";
//进入死信队列的路由key
public static final String LOCK_MERCHANT_ROUTING_KEY = "lock_merchant_routing_key";
//创建死信交换机
@Bean
public Exchange lockMerchantDeadExchange() {
return new TopicExchange(LOCK_MERCHANT_DEAD_EXCHANGE, true, false);
}
//创建死信队列
@Bean
public Queue lockMerchantDeadQueue() {
return QueueBuilder.durable(LOCK_MERCHANT_DEAD_QUEUE).build();
}
//绑定死信交换机和死信队列
@Bean
public Binding lockMerchantBinding() {
return new Binding(LOCK_MERCHANT_DEAD_QUEUE, Binding.DestinationType.QUEUE,
LOCK_MERCHANT_DEAD_EXCHANGE, LOCK_MERCHANT_ROUTING_KEY, null);
}
//普通队列,绑定的个死信交换机
public static final String NEW_MERCHANT_QUEUE = "new_merchant_queue";
//普通的topic交换机
public static final String NEW_MERCHANT_EXCHANGE = "new_merchant_exchange";
//路由key
public static final String NEW_MERCHANT_ROUTIING_KEY = "new_merchant_routing_key";
//创建普通交换机
@Bean
public Exchange newMerchantExchange() {
return new TopicExchange(NEW_MERCHANT_EXCHANGE, true, false);
}
//创建普通队列
@Bean
public Queue newMerchantQueue() {
Map<String, Object> args = new HashMap<>(3);
//消息过期后,进入到死信交换机
args.put("x-dead-letter-exchange", LOCK_MERCHANT_DEAD_EXCHANGE);
//消息过期后,进入到死信交换机的路由key
args.put("x-dead-letter-routing-key", LOCK_MERCHANT_ROUTING_KEY);
//过期时间,单位毫秒
args.put("x-message-ttl", 10000);
return QueueBuilder.durable(NEW_MERCHANT_QUEUE).withArguments(args).build();
}
//绑定交换机和队列
@Bean
public Binding newMerchantBinding() {
return new Binding(NEW_MERCHANT_QUEUE, Binding.DestinationType.QUEUE,
NEW_MERCHANT_EXCHANGE, NEW_MERCHANT_ROUTIING_KEY, null);
}
}
消息生产和消费
消息生产
投递到普通的topic交换机
消息过期,进入死信交换机
消息消费
消费者监听死信交换机的队列
MerchantAccountController 模拟请求
@RestController
@RequestMapping("/api/admin/merchant")
public class MerchantAccountController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("check")
public Object check(){
//修改数据库的商家账号状态 TODO
rabbitTemplate.convertAndSend(RabbitMQConfig.NEW_MERCHANT_EXCHANGE,RabbitMQConfig.NEW_MERCHANT_ROUTIING_KEY,"商家账号通过审核");
Map<String,Object> map = new HashMap<>();
map.put("code",0);
map.put("msg","账号审核通过,请10秒内上传1个商品");
return map;
}
}
MerchantMQListener消费
package net.xdclass.xdclasssp.mq;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
@RabbitListener(queues = "lock_merchant_dead_queue")
public class MerchantMQListener {
@RabbitHandler
public void messageHandler(String body, Message message, Channel channel) throws IOException {
long msgTag = message.getMessageProperties().getDeliveryTag();
System.out.println("msgTag="+msgTag);
System.out.println("body="+body);
//做复杂业务逻辑 TODO
//告诉broker,消息已经被确认
channel.basicAck(msgTag,false);
}
}
粉丝福利:Java从入门到入土学习路线图
👇👇👇
👆长按上方微信二维码 2 秒
感谢点赞支持下哈