RabbitMQ 死信队列 + TTL介绍

java1234

共 7806字,需浏览 16分钟

 ·

2021-03-15 09:25

点击上方蓝色字体,选择“标星公众号”

优质文章,第一时间送达

  作者 |  认真对待世界的小白

来源 |  urlify.cn/rimuy2

76套java从入门到精通实战课程分享

一、RabbitMQ的的死信队列+ TTL

1、什么是TTL

  • time to live 消息存活时间

  • 如果消息在存活时间内未被消费,则会被清除

  • RabbitMQ支持两种ttl设置

    • 单独消息进行配置ttl

    • 整个队列进行配置ttl(居多)

2、什么是rabbitmq的死信队列

  • 没有被及时消费的消息存放的队列

3、什么是rabbitmq的死信交换机

  • Dead Letter Exchange(死信交换机,缩写:DLX)当消息成为死信后,会被重新发送到另⼀个交换机,这个交换机就是DLX死信交换机。

4、消息有哪几种情况成为死信

  • 消费者拒收消息(basic.reject/ basic.nack) ,并且没有重新入队 requeue=false

  • 消息在队列中未被消费,且超过队列或者消息本身的过期时间TTL(time-to-live)

  • 队列的消息长度达到极限

  • 结果:消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列

二、RabbitMQ管控台消息TTL测试

队列过期时间使用参数,对整个队列消息统⼀过期

  • x-message-ttl

  • 单位ms(毫秒)

消息过期时间使用参数(如果队列头部消息未过期,队列中级消息已经过期,消息还在队列里面)

  • expiration

  • )

两者都配置的话,时间短的先触发

1、RabbitMQ Web控制台测试 

新建死信交换机(和普通没区别) 

新建死信队列 (和普通没区别) 

死信交换机和队列绑定 

 

新建普通队列,设置过期时间、指定死信交换机 

测试:直接web控制台往product_qeueu发送消息即可,会看到消息先是在product_qeueu队列停留10秒(因为没有消费者消费),然后该消息从product_qeueu移入到dead_queue。

三、RabbitMQ的延迟队列和应用场景 

1、什么是延迟队列

⼀种带有延迟功能的消息队列, Producer 将消息发送到消息队列服务端,但并不期望这条消息立马投递,而是推迟到在当前时间点之后的某⼀个时间投递到Consumer进行消费,该消息即定时消息。

2、使用场景

  • 通过消息触发⼀些定时任务,比如在某⼀固定时间点向用户发送提醒消息

  • 用户登录之后5分钟给用户做分类推送、用户多少天未登录给用户做召回推送;

  • 消息生产和消费有时间窗⼝要求:比如在天猫电商交易中超时未支付关闭订单的场景,在订单创建时会发送⼀条延时消息。这条消息将会在30分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。如支付未完成,则关闭订单。如已完成支付则忽略

四、实战

1、背景

JD、淘系、天猫、拼多多电商平台,规定新注册的商家,审核通过后需要在【规定时间】内上架商品,否则冻结账号。

2、代码开发

死信交换机和死信队列开发,topic交换机和队列开发,绑定死信交换机

package net.xdclass.xdclasssp.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitMQConfig {
    //死信队列
    public static final String LOCK_MERCHANT_DEAD_QUEUE = "lock_merchant_dead_queue";

    //死信交换机
    public static final String LOCK_MERCHANT_DEAD_EXCHANGE = "lock_merchant_dead_exchange";

    //进入死信队列的路由key
    public static final String LOCK_MERCHANT_ROUTING_KEY = "lock_merchant_routing_key";

    //创建死信交换机
    @Bean
    public Exchange lockMerchantDeadExchange() {
        return new TopicExchange(LOCK_MERCHANT_DEAD_EXCHANGE, truefalse);
    }

    //创建死信队列
    @Bean
    public Queue lockMerchantDeadQueue() {
        return QueueBuilder.durable(LOCK_MERCHANT_DEAD_QUEUE).build();
    }

    //绑定死信交换机和死信队列
    @Bean
    public Binding lockMerchantBinding() {
        return new Binding(LOCK_MERCHANT_DEAD_QUEUE, Binding.DestinationType.QUEUE,
                LOCK_MERCHANT_DEAD_EXCHANGE, LOCK_MERCHANT_ROUTING_KEY, null);
    }

    //普通队列,绑定的个死信交换机
    public static final String NEW_MERCHANT_QUEUE = "new_merchant_queue";

    //普通的topic交换机
    public static final String NEW_MERCHANT_EXCHANGE = "new_merchant_exchange";

    //路由key
    public static final String NEW_MERCHANT_ROUTIING_KEY = "new_merchant_routing_key";

    //创建普通交换机
    @Bean
    public Exchange newMerchantExchange() {
        return new TopicExchange(NEW_MERCHANT_EXCHANGE, truefalse);
    }

    //创建普通队列
    @Bean
    public Queue newMerchantQueue() {
        Map<String, Object> args = new HashMap<>(3);
        //消息过期后,进入到死信交换机
        args.put("x-dead-letter-exchange", LOCK_MERCHANT_DEAD_EXCHANGE);
        //消息过期后,进入到死信交换机的路由key
        args.put("x-dead-letter-routing-key", LOCK_MERCHANT_ROUTING_KEY);
        //过期时间,单位毫秒
        args.put("x-message-ttl", 10000);
        return QueueBuilder.durable(NEW_MERCHANT_QUEUE).withArguments(args).build();
    }

    //绑定交换机和队列
    @Bean
    public Binding newMerchantBinding() {
        return new Binding(NEW_MERCHANT_QUEUE, Binding.DestinationType.QUEUE,
                NEW_MERCHANT_EXCHANGE, NEW_MERCHANT_ROUTIING_KEY, null);
    }
}

消息生产和消费 

  • 消息生产

    • 投递到普通的topic交换机

    • 消息过期,进入死信交换机

  • 消息消费

    • 消费者监听死信交换机的队列

MerchantAccountController 模拟请求

@RestController
@RequestMapping("/api/admin/merchant")
public class MerchantAccountController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("check")
    public Object check(){

        //修改数据库的商家账号状态  TODO

        rabbitTemplate.convertAndSend(RabbitMQConfig.NEW_MERCHANT_EXCHANGE,RabbitMQConfig.NEW_MERCHANT_ROUTIING_KEY,"商家账号通过审核");

        Map<String,Object> map = new HashMap<>();
        map.put("code",0);
        map.put("msg","账号审核通过,请10秒内上传1个商品");
        return map;
    }
}

MerchantMQListener消费

package net.xdclass.xdclasssp.mq;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component
@RabbitListener(queues = "lock_merchant_dead_queue")
public class MerchantMQListener {

    @RabbitHandler
    public void messageHandler(String body, Message message, Channel channel) throws IOException {

        long msgTag = message.getMessageProperties().getDeliveryTag();
        System.out.println("msgTag="+msgTag);
        System.out.println("body="+body);
        //做复杂业务逻辑  TODO

        //告诉broker,消息已经被确认
        channel.basicAck(msgTag,false);
    }
}





粉丝福利:Java从入门到入土学习路线图

👇👇👇

👆长按上方微信二维码 2 秒


感谢点赞支持下哈 

浏览 28
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报