SpringBoot集成RabbitMQ
共 5684字,需浏览 12分钟
·
2021-01-06 15:05
点击上方蓝色字体,选择“标星公众号”
优质文章,第一时间送达
作者 | 未夏
来源 | urlify.cn/MV3qeq
一、介绍
RabbitMQ是一个实现了AMQP协议(Advanced Message Queue Protocol)的消息队列
AMQP信息模型如下图所示:
RoutingKey
,生产者分布信息时,指定RoutingKey
BindingKey
,表示把队列绑定到交换机的路径名
当一个消息发布到 RabbitMQ 后,首先到达指定的Exchange并从消息中取出RoutingKey
,由Exchange判断同哪个BindingKey
匹配,配对成功后 Exchange 把消息分配给 指定的Queue,消费者从 Queue 中得到消息
Exchange
有4种类型:
Fanout Exchange:忽略key对比,发送Message到Exchange下游绑定的所有Queue
Direct Exchange:比较Message的routing key和Queue的binding key,完全匹配时,Message才会发送到该Queue
Topic Exchange:比较Message的routing key和Queue的binding key,按规则匹配成功时,Message才会发送到该Queue(使用
*
和#
这2个通配符。*
- 匹配一个词,#
- 匹配 0 个或多个词)默认Exchange:比较Message的routing key和Queue的名字,完全匹配时,Message才会发送到该Queue
消息队列是有序的,只有头部的信息被消费后,才能消费下一个消息
二、SpringBoot集成RabbitMQ
2.1引入依赖
org.springframework.boot
spring-boot-starter-amqp
2.2配置RabbitMQ
配置主要分为3步:
声明交换机
声明队列
将队列绑定到队列上
@Configuration
public class RabbitmqConfig {
public final static String EXCHANGE_TOPIC = "topicExchange";
public final static String QUEUE_USER = "user_Queue";
public final static String QUEUE_CITY = "city_Queue";
public final static String QUEUE_DEVICE = "device_Queue";
public final static String BINDINGKEY_ONE = "topic.a";
public final static String BINDINGKEY_OTHER = "other.#";
@Bean
TopicExchange topicExchange() {
return new TopicExchange(EXCHANGE_TOPIC);
}
@Bean
Queue userQueue() {
return new Queue(QUEUE_USER);
}
@Bean
Queue cityQueue() {
return new Queue(QUEUE_CITY);
}
@Bean
Queue deviceQueue() {
return new Queue(QUEUE_DEVICE);
}
@Bean
Binding userBinding(@Qualifier("userQueue") Queue queue, TopicExchange topicExchange) {
return BindingBuilder.bind(queue).to(topicExchange).with(BINDINGKEY_ONE);
}
@Bean
Binding cityBinding(@Qualifier("cityQueue") Queue queue, TopicExchange topicExchange) {
return BindingBuilder.bind(queue).to(topicExchange).with(BINDINGKEY_OTHER);
}
@Bean
Binding deviceBinding(@Qualifier("deviceQueue") Queue queue, TopicExchange topicExchange) {
return BindingBuilder.bind(queue).to(topicExchange).with(BINDINGKEY_OTHER);
}
}
2.3生产信息
Spring提供模板方法AmqpTemplate
来进行信息的发布、接收
AmqpTemplate
的convertAndSend
需要指定发往的交换机名、RoutingKey以及信息内容
下面进行简单封装:
@Component
public class MessageSender {
@Autowired
private AmqpTemplate amqpTemplate;
public void send(String routingkey, String message) {
this.amqpTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_TOPIC, routingkey, message);
}
}
单元测试:
@ExtendWith(SpringExtension.class)
@SpringBootTest
class MessageSenderTest {
@Autowired
private MessageSender messageSender;
@Test
public void userQueueTest() {
messageSender.send("topic.a", "wo is user");
}
@Test
public void cityQueueTest() {
messageSender.send("other.a", "wo is city");
}
@Test
public void deviceQueueTest() {
messageSender.send("other.b", "wo is device");
}
}
2.4消费消息
@RabbitListener注解声明消息监听器,并指定要监听的队列
@RabbitHandler注解声明消息的具体处理方法
@Component
@RabbitListener(queues = RabbitmqConfig.QUEUE_CITY)
public class CityQueueListenter {
@RabbitHandler
public void handle(String message) {
System.out.println("city-queue: " + message);
}
}
三、延时队列
延时队列是一种特殊的消息队列
消息队列是有序的,只有头部的信息被消费后,才能消费下一个消息。延时队列也遵循这个规则,不同于普通队列是队列里面的某个信息一旦到达指定期限将会被提权进行优先处理,而不管队列前面有多少个消息在排队
3.1利用死信队列实现
RabbitMQ中有一个高级特性叫TTL(Time To Live),表示一条消息或者该队列中的所有消息的最大存活时间,单位是毫秒
如果一条消息设置了TTL属性或者进入了设置TTL属性的队列,那么这条消息如果在TTL设置的时间内没有被消费,则会成为“死信”
“死信”是RabbitMQ中的一种消息机制,同普通的消息队列没有什么区别,只不过消息生成者由RabbitMQ充当。如果队列里的消息出现以下情况:
消息被否定确认(使用
channel.basicNack
或channel.basicReject
),并且此时requeue
属性被设置为false
消息在队列的存活时间超过设置的TTL时间
消息队列的消息数量已经超过最大队列长度
RabbitMQ会将消息从原先的队列抽取出来,发送到死信队列里面
如果设置了队列的TTL属性,那么一旦消息过期,就会被队列丢弃;如果消息单独设置TTL属性,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间。
利用死信实现延时队列:
通过
x-message-ttl
为队列设置TTL,一旦消息超时将被转发给死信交换机通过
x-dead-letter-exchange
为消息队列绑定死信交换机,将消息路由至死信队列,然后消费者监听死信队列
Map args = new HashMap<>();
// 设置死信交换机
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
// 给成为死信的消息指定Routing Key
args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEA_ROUTING_KEY);
// 设置队列TTL, 凡进入该队列的消息超时后将成为死信
args.put("x-message-ttl", 6000);
// 返回延时队列
return QueueBuilder.durable(DELAY_QUEUEA_NAME).withArguments(args).build();
仅在声明延时队列时指定上述设置,其余死信交换机、死信队列以及消费消息正常设置即可
3.2利用插件实现
为队列指定TTL实现延时队列的缺点是所有消息的过期时间一样,不够多样化;如果为消息单独指定TTL,无法保证消息超时一定变为死信
RabbitMQ官方提供一个插件bbitmq_delayed_message_exchange
,解决上述问题
点击官网下载插件,放到安装目录plugins
文件夹下,然后转到sbin
目录下执行:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
利用死信实现延时队列需要声明两个交换机、两个队列以及为其中一个队列设置TTL等属性,而使用插件仅需配置一个CustomExchange
类型的交换机即可:
@Configuration
public class RabbitmqConfig {
@Bean
public CustomExchange delayExchange() {
Map args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange("delayed_exchange", "x-delayed-message", true, false, args);
}
@Bean
public Queue queue() {
Queue queue = new Queue("delay_queue_1");
return queue;
}
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(delayExchange()).with("delay_queue_1").noargs();
}
}
CustomExchange
必须是x-delayed-message
类型
消息生产端,AmqpTemplate
的convertAndSend
方法需要传入一个MessagePostProcessor
实例设置消息的TTL
rabbitTemplate.convertAndSend("delayed_exchange", "delay_queue_1", msg, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setHeader("x-delay", TTL);
return message;
}
});
消费端代码不用变更
四、参考
图解RabbitMQ
一文带你搞定RabbitMQ死信队列
一文带你搞定RabbitMQ延迟队列
粉丝福利:Java从入门到入土学习路线图
???
?长按上方微信二维码 2 秒
感谢点赞支持下哈