MQ的路由模式(direct)| RabbitMQ系列

愿天堂没有BUG

共 4545字,需浏览 10分钟

 ·

2021-08-19 12:04

前言

  • 模拟一个场景

    • 现在有三种级别的日志 info、wearing、error

    • 现在想把error日志给保存到本地, info和wearing直接丢弃掉(就是直接消费,不作任何处理,不消费也不行呀,就形成积压了)

    • 这样一想,我们是不是可以定义一个消费者绑定专门处理保存error日志,另一个消费者绑定info和wearing直接消费,不作任何处理

一、生产者

  •     public static void publishMessageIndividually() throws Exception {
           Channel channel = RabbitMqUtils.getChannel();
           channel.exchangeDeclare(ChangeNameConstant.DIRECT_MODEL, BuiltinExchangeType.DIRECT);
           //创建多个 bindingKey
           Map<String, String> bindingKeyMap = new HashMap<>();
           bindingKeyMap.put("info","普通 info 信息");
           bindingKeyMap.put("warning","警告 warning 信息");
           bindingKeyMap.put("error","错误 error 信息");
           //debug 没有消费这接收这个消息 所有就丢失了
           bindingKeyMap.put("debug","调试 debug 信息");
           for (Map.Entry<String, String> bindingKeyEntry: bindingKeyMap.entrySet()){
               String bindingKey = bindingKeyEntry.getKey();
               String message = bindingKeyEntry.getValue();
               channel.basicPublish(ChangeNameConstant.DIRECT_MODEL,bindingKey, null,
                       message.getBytes("UTF-8"));
               System.out.println("生产者发出消息:" + message);
          }
      }
    复制代码
  • 可以看到:direct_pattern交换机上设置了三个路由

二、消费者

  • 消费者A

  • /**
    * 这是一个测试的消费者
    *@author DingYongJun
    *@date 2021/8/1
    */
    public class DyConsumerTest_direct01 {

       public static void main(String[] args) throws Exception{
           //使用工具类来创建通道
           Channel channel = RabbitMqUtils.getChannel();

           String queueName = "disk";
           channel.queueDeclare(queueName, false, false, false, null);
           //这个专门处理error日志,将其保存至本地
           channel.queueBind(queueName, ChangeNameConstant.DIRECT_MODEL, "error");
           System.out.println("A等待接收消息,把接收到的消息打印在屏幕.....");
           DeliverCallback deliverCallback = (consumerTag, delivery) -> {
               String message = new String(delivery.getBody(), "UTF-8");
               System.out.println(message+"已经保存到本地啦");
          };
           channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
               System.out.println("消息中断了~");
          });
      }
    }
    复制代码
  • 消费者B

  • /**
    * 这是一个测试的消费者
    *@author DingYongJun
    *@date 2021/8/1
    */
    public class DyConsumerTest_direct02 {

       public static void main(String[] args) throws Exception{
           //使用工具类来创建通道
           Channel channel = RabbitMqUtils.getChannel();

           String queueName = "console";
           channel.queueDeclare(queueName, false, false, false, null);
           //这个专门处理error日志,将其保存至本地
           channel.queueBind(queueName, ChangeNameConstant.DIRECT_MODEL, "warning");
           channel.queueBind(queueName, ChangeNameConstant.DIRECT_MODEL, "info");
           System.out.println("B等待接收消息,把接收到的消息打印在屏幕.....");
           DeliverCallback deliverCallback = (consumerTag, delivery) -> {
               String message = new String(delivery.getBody(), "UTF-8");
               System.out.println(message+"已经消费完并丢弃了");
          };
           channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
               System.out.println("消息中断了~");
          });
      }
    }
    复制代码
  • 消费者AB都已准备好。

  • 执行结果

    • 生产者

    • 消费者A

    • 消费者B

    • 完美符合我们模拟的场景需求!vnice!

三、总结

  • 多重绑定

    • 在这种绑定情况下,生产者发布消息到 exchange 上,绑定键为 orange 的消息会被发布到队列 Q1。

    • 绑定键为 black、green 和的消息会被发布到队列 Q2,其他消息类型的消息将被丢弃。

  • 是不是比发布订阅模式更加智能了呢?

  • 当然如果 exchange 的绑定类型是 direct,但是它绑定的多个队列的 key 如果都相同。

  • 在这种情 况下虽然绑定类型是 direct 但是它表现的就和 fanout 有点类似了,就跟广播差不多。

  • 也就是这玩意是复杂模式可以向下兼容简单模式!


路漫漫其修远兮,吾必将上下求索~

如果你认为i博主写的不错!写作不易,请点赞、关注、评论给博主一个鼓励吧~hahah


作者:大鱼丶
链接:https://juejin.cn/post/6995709405085827108
来源:掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。



浏览 8
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报