rabbitMq工作模式特性及整合springboot

共 52599字,需浏览 106分钟

 ·

2021-04-01 02:38

点击上方蓝色字体,选择“标星公众号”

优质文章,第一时间送达

因为公司项目后面需要用到mq做数据的同步,所以学习mq并在此记录,这里的是rabbitMq


mq(message queue)消息队列

官网:www.rabbitmq.com
使用消息队列的优点:
    1、异步可加快访问速度 (以前一个订单接口需要做下单、库存、付款、快递等相关操作,有了mq只需要给相关信息传入队列,下单、库存、付款、快递等相关操作会自动从队列中收到信息进行异步操作)
    2、解耦下游服务或其他服务或语言可接入
    3、削峰高并发访问量可分摊多个队列分摊
缺点:
    1、系统可用性降低(一旦mq挂了系统就宕机了)
    2、系统复杂性增大 (增加了mq模块需要考虑更多)

RabbitMQ的高级特性

  • 消费端限流

  • TTL 全称time to live(存活时间/过期时间) - 当消息到达存活时间后还没被消费会被丢弃 ttl+死信队列可实现延迟队列效果

  • 死信队列

  • 延迟队列

  • 消息可靠性投递

  • Consumer ACK

rabbitMq为了确保消息投递的可靠性提供了两种方式 confirm和return

rabbitmq整个消息投递的路径为
producer--->rabbitmq broker--->exchange--->queue--->consumer
1.消息从producer到exchange则会返回一个confirmCallback.
2.消息从exchange到queue投递失败则会返回一个returnCallBack.
我们将利用这两个callback控制消息的可靠性投递

Consumer ACK

ack指acknowledge,确认。表示消费者端接收到消息后的确认方式
有三种方式确认:
    自动确认:acknowledge="none"
    手动确认:acknowledge="manual"
    根据异常情况确认:acknowledge="auto"
    
自动确认指,当消息一旦被消费者接收到,则自动确认收到,并将相应的message从mq的消息缓存中移除。
但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。
如果设置了手动确认模式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,
则调用channel.basicNack()方法,让其自动重新发送消息。    

我这里学习了前面五种

1:简单模式

2:工作队列模式

3:发布订阅模式

4:路由模式

5:主题模式


简单模式:即一条线一个发送到队列,队列发送到接收者

工作队列模式:即有一个发送者发送信息到队列,队列发给多个接收者,比如群发

发布订阅模式:这个是使用的最多的,发布者需要先发送到交换机,交换机再发送到与之绑定的队列, 然后队列在发送到与之绑定队列的接收者

路由模式:路由模式在发布订阅上增加了条件筛选,在消息到达交换机后发送队列时进行条件匹配,匹配成功才能发送给对应绑定的队列,最后再发送给接收者

主题模式:主题模式在路由模式上面进行升级,条件可进行模糊匹配,通配符规则 #可以匹配多个词 * 只能匹配一个词 如:test.# 匹配 test.one.tow test.one.q.wqe / test.* 匹配 test.one test.two


先安装rabbitMq,不同的环境可安装相关的版本,我这里已经安装好了

然后运行sbin下面的rabbitmq-server.bat


然后网页localhost:15672,如下页面即安装成功


然后去rabbitmq的官网


左边是下载右边是文档


文档中也会有一些代码案例,点击文档可以看到mq有七种方式


第一个是在测试的时候需要引入的包,第二个是在springboot上需要引入的包


com.rabbitmq

amqp-client

5.3.0

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

一:简单模式

我给mq的连接封装在工具类里,一些队列名放在常量类里了

工具类代码:

package com.lansi.realtynavi.test.utils;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @Description 描述
 * @Date 2021/3/23 11:22
 * @Created by huyao
 */
public class RabbitUtils {

    public static ConnectionFactory factory = new ConnectionFactory();
    static {
        factory.setHost("localhost");
    }

    public static Connection getConnection() throws Exception{
        Connection connection = null;
        try {
            //获取长连接
            connection  = factory.newConnection();
        }catch (Exception e){
            e.printStackTrace();
        }/*finally {
            connection.close();
        } */
        return connection;
    }


}


常量类代码:

package com.lansi.realtynavi.test.constant;

/**
 * @Description 描述
 * @Date 2021/3/23 11:01
 * @Created by huyao
 */
public class MqConstant {

    public static final String MQ_HELLO_WORD = "helloWord";
    public static final String MQ_PUBLISH = "publish";
    public static final String MQ_ROUTING = "routing";
    public static final String MQ_TOPICS = "topics";
    public static final String MQ_WORK_QUEUES = "workQueues";


    public static final String MQ_QUEUE_BAIDU = "baidu";
    public static final String MQ_QUEUE_XINLANG = "xinlang";



    public static final String MQ_PUBLISH_JHJ = "jiaohuanji";
    public static final String MQ_ROUTING_JHJ = "jiaohuanjiRout";
    public static final String MQ_TOPIC_JHJ = "jiaohuanjiTopic";


}


生产者代码

package com.lansi.realtynavi.test.helloWord;

import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**
 * @Description 简单模式
 * @Date 2021/3/22 17:19
 * @Created by huyao
 */
public class Producer {


    public static void main(String[] args) throws Exception{


        Channel channel = null;

        Connection connection = null;
        try {
            //获取长连接
            connection = RabbitUtils.getConnection();
            channel = connection.createChannel();

            channel.queueDeclare(MqConstant.MQ_HELLO_WORD, falsefalsefalse, null);
            String message = "这是我发送的第三个队列消息";
            //第一个参数是交换机信息   简单队列不需要交换机  第二个参数队列名称 ,第三个额外信息,第四个需要发布的信息
            channel.basicPublish("", MqConstant.MQ_HELLO_WORD, null, message.getBytes());
            System.out.println("[x] Send ‘" + message + "’");
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            channel.close();
            connection.close();
        }

    }

}


消费者代码:

package com.lansi.realtynavi.test.helloWord;

import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @Description 描述
 * @Date 2021/3/22 17:27
 * @Created by huyao
 */
public class Consumer {

    public static void main(String[] argv) throws Exception {
        //连接
        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();
        //声明并创建一个队列
        //参数1 队列ID
        //参数2 是否持久化,false对应不持久化数据,mq停掉数据就会丢失
        //参数3 是否队列私有化,false则代表所有消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用
        //参数4 是否自动删除, false代表连接停掉后不自动删除这个队列
        // 其他额外的参数,null
        channel.queueDeclare(MqConstant.MQ_HELLO_WORD, falsefalsefalse, null);


        //从MQ服务器中获取数据

        //创建一个消息消费者
        //参数1:队列ID
        //参数2:代表是否自动确认收到消息,false代表手动编程来确认消息,这是mq的推荐做法
        //参数3:参数要传入的DefaultConsumer的实现类
        channel.basicConsume(MqConstant.MQ_HELLO_WORD, false, new Reciver(channel));
    }
}

class Reciver extends DefaultConsumer {
    private Channel channel;
    //重写构造函数,Channel通道对象需要从外层传入,在handleDelivery中用到
    public Reciver(Channel channel) {
        super(channel);
        this.channel = channel;
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body);
        System.out.println("消费者接收到的消息:"+message);
        System.out.println("消息的ID:"+envelope.getDeliveryTag());

        //false只确认签收当前的消息,设置为true的时候则代表签收该消费者所有未签收的消息
        channel.basicAck(envelope.getDeliveryTag(), false);

    }
}


测试的时候队列需要手动去创建,不过springboot的话可以自动创建

这里已经手动创建好了

运行接收者,运行启动者

这里接收者自动接收消息


二:工作队列模式

  一个队列多个接收者


生产者代码:

package com.lansi.realtynavi.test.workQueues;

import com.google.gson.Gson;
import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**
 * @Description 工作队列模式
 * @Date 2021/3/22 17:33
 * @Created by huyao
 */
public class Producer {


    public static void main(String[] args) throws Exception{

        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(MqConstant.MQ_WORK_QUEUES, falsefalsefalse, null);

        for(int i = 1; i<=20; i++){
            SMS sms = new SMS("乘客" + i, "123456789""你的车票已预订成功");
            String message = new Gson().toJson(sms);
            channel.basicPublish("", MqConstant.MQ_WORK_QUEUES, null, message.getBytes());
        }

        System.out.println("发送数据成功");
        channel.close();
        connection.close();
    }

}


封装对象代码:

package com.lansi.realtynavi.test.workQueues;

/**
 * @Description 描述
 * @Date 2021/3/23 11:28
 * @Created by huyao
 */
public class SMS {

    private String name;
    private String mobile;
    private String content;

    public SMS(String name, String mobile, String content) {
        this.name = name;
        this.mobile = mobile;
        this.content = content;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getMobile() {
        return mobile;
    }

    public void setMobile(String mobile) {
        this.mobile = mobile;
    }

    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }
}


三个接收者代码


接收者1

package com.lansi.realtynavi.test.workQueues;

import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @Description 描述
 * @Date 2021/3/23 11:33
 * @Created by huyao
 */
public class Consumer1 {

    public static void main(String[] args) throws Exception{

        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(MqConstant.MQ_WORK_QUEUES, falsefalsefalse, null);

        //如果不写baiscQos(1) 则自动mq会将所有请求平均发送给所有消费者
        //baiscQos,mq不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),再从队列中获取一个新的
        channel.basicQos(1);//处理完一个取一个

        channel.basicConsume(MqConstant.MQ_WORK_QUEUES, false, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body);
                System.out.println("smsConsumer1-短信发送成功:"+message);

                //服务器好的话可以在这里睡眠   这里可动态配置开启和设置睡眠时间
                /*try {
                    Thread.sleep(10);
                }catch (Exception e){
                    e.printStackTrace();
                }*/

                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
    }
}


接收者2

package com.lansi.realtynavi.test.workQueues;

import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @Description 描述
 * @Date 2021/3/23 11:40
 * @Created by huyao
 */
public class Consumer2 {

    public static void main(String[] args) throws Exception{

        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(MqConstant.MQ_WORK_QUEUES, falsefalsefalse, null);

        channel.basicConsume(MqConstant.MQ_WORK_QUEUES, false, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body);
                System.out.println("smsConsumer2-短信发送成功:"+message);
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
    }

}


接收者3

package com.lansi.realtynavi.test.workQueues;

import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @Description 描述
 * @Date 2021/3/23 11:41
 * @Created by huyao
 */
public class Consumer3 {

    public static void main(String[] args) throws Exception{

        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(MqConstant.MQ_WORK_QUEUES, falsefalsefalse, null);

        channel.basicConsume(MqConstant.MQ_WORK_QUEUES, false, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body);
                System.out.println("smsConsumer1-短信发送成功:"+message);
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
    }

}


启动三个接收类,启动发送类





三个接收都拿到了数据,我学习的时候队列是以轮询的方式给三个消费者发送数据,这里出现了接收数据不均衡的情况应该是缓存没用清理,给队列删掉重新创建就好了

三:发布订阅模式

生成者代码:

这里和前面两种模式不同,发送者绑定了交换机,没用绑定队列,需要消费者绑定交换机和队列

package com.lansi.realtynavi.test.publish;

import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.util.Scanner;

/**
 * @Description 发布订阅模式
 * @Date 2021/3/23 13:31
 * @Created by huyao
 */
public class Producer {

    public static void main(String[] args) throws Exception{

        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(MqConstant.MQ_PUBLISH, falsefalsefalse, null);

        String input = new Scanner(System.in).next();


        //第一个参数交换机名字,其他参数和之前一样
        channel.basicPublish(MqConstant.MQ_PUBLISH_JHJ, "", null, input.getBytes());

        channel.close();
        connection.close();

    }
}


接收者1代码:

package com.lansi.realtynavi.test.publish;

import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @Description 消费者
 * @Date 2021/3/23 13:50
 * @Created by huyao
 */
public class ConsumerXinLang {

    public static void main(String[] args) throws Exception{

        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(MqConstant.MQ_QUEUE_XINLANG, falsefalsefalse, null);


        //队列绑定交换机
        //参数1:队列名,参数2:交换机名,参数3:路由key(目前用不到,路由模式通配符模式使用)
        channel.queueBind(MqConstant.MQ_QUEUE_XINLANG, MqConstant.MQ_PUBLISH_JHJ, "");
        channel.basicQos(1);

        channel.basicConsume(MqConstant.MQ_QUEUE_XINLANG, false, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者新浪收到消息:"+new String(body));
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });


    }



}


接收者2代码:

package com.lansi.realtynavi.test.publish;

import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @Description 消费者
 * @Date 2021/3/23 13:50
 * @Created by huyao
 */
public class ConsumerBaiDu {

    public static void main(String[] args) throws Exception{

        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(MqConstant.MQ_QUEUE_BAIDU, falsefalsefalse, null);


        //队列绑定交换机   目前交换机需要在rabbit也手动创建,在和spring整合的时候spring会自动帮我们创建
        //参数1:队列名,参数2:交换机名,参数3:路由key(目前用不到,路由模式通配符模式使用)
        channel.queueBind(MqConstant.MQ_QUEUE_BAIDU, MqConstant.MQ_PUBLISH_JHJ, "");
        channel.basicQos(1);

        channel.basicConsume(MqConstant.MQ_QUEUE_BAIDU, false, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者百度收到消息:"+new String(body));
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });


    }



}


启动生产者消费者,在生产者控制台输入信息:



两个消费者都接收到了



四 :路由模式

路由模式发送需要携带路由key,用作接收者进行判断

生产者代码:

package com.lansi.realtynavi.test.routing;

import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;

/**
 * @Description 路由模式
 * @Date 2021/3/23 13:31
 * @Created by huyao
 *
 *
 * 交换机类型:fanout广播(发布订阅)   direct转发(路由)  topic通配符(通配模式)
 *
 */
public class Producer {

    public static void main(String[] args) throws Exception{

        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(MqConstant.MQ_PUBLISH, falsefalsefalse, null);
        LinkedHashMap<String, String> map = new LinkedHashMap<>();
        map.put("test1","测试一数据");
        map.put("test2","测试二数据");
        map.put("test3","测试三数据");
        map.put("test4","测试四数据");
        map.put("test5","测试五数据");
        map.put("test6","测试六数据");
        map.put("test7","测试七数据");
        Iterator<Map.Entry<String, String>> iterator = map.entrySet().iterator();
        while (iterator.hasNext()){
            Map.Entry<String, String> next = iterator.next();
            //第一个参数交换机名字,第二个参数指定rout_key
            channel.basicPublish(MqConstant.MQ_ROUTING_JHJ, next.getKey(), null, next.getValue().getBytes());
        }



        channel.close();
        connection.close();

    }



}


接收者1:

package com.lansi.realtynavi.test.routing;

import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @Description 消费者
 * @Date 2021/3/23 13:50
 * @Created by huyao
 */
public class ConsumerBaiDu {

    public static void main(String[] args) throws Exception{

        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(MqConstant.MQ_QUEUE_BAIDU, falsefalsefalse, null);


        //队列绑定交换机   目前交换机需要在rabbit也手动创建,在和spring整合的时候spring会自动帮我们创建
        //参数1:队列名,参数2:交换机名,参数3:路由key(目前用不到,路由模式通配符模式使用)
        channel.queueBind(MqConstant.MQ_QUEUE_BAIDU, MqConstant.MQ_ROUTING_JHJ, "test1");
        channel.queueBind(MqConstant.MQ_QUEUE_BAIDU, MqConstant.MQ_ROUTING_JHJ, "test2");
        channel.basicQos(1);

        channel.basicConsume(MqConstant.MQ_QUEUE_BAIDU, false, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者百度收到消息:"+new String(body));
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });


    }



}


接收者二

package com.lansi.realtynavi.test.routing;

import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @Description 消费者
 * @Date 2021/3/23 13:50
 * @Created by huyao
 */
public class ConsumerXinLang {

    public static void main(String[] args) throws Exception{

        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(MqConstant.MQ_QUEUE_XINLANG, falsefalsefalse, null);


        //队列绑定交换机
        //参数1:队列名,参数2:交换机名,参数3:路由key
        channel.queueBind(MqConstant.MQ_QUEUE_XINLANG, MqConstant.MQ_ROUTING_JHJ, "test10");
        channel.queueBind(MqConstant.MQ_QUEUE_XINLANG, MqConstant.MQ_ROUTING_JHJ, "test6");
        channel.queueBind(MqConstant.MQ_QUEUE_XINLANG, MqConstant.MQ_ROUTING_JHJ, "test5");
        channel.basicQos(1);

        channel.basicConsume(MqConstant.MQ_QUEUE_XINLANG, false, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者新浪收到消息:"+new String(body));
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });


    }



}


在这里看到百度接收者只接受test1、test2,所以只接收到了1和2的数据,新浪同理


五 :主题模式

 在路由的基础上增加了通配符匹配
 通配符规则  #可以匹配多个词  * 只能匹配一个词

生产者代码:

package com.lansi.realtynavi.test.topics;

import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;

/**
 * @Description 通配符模式
 * @Date 2021/3/23 13:31
 * @Created by huyao
 *
 *
 * 交换机类型:fanout广播(发布订阅)   direct转发(路由)  topic通配符(通配模式)
 *
 *  通配符规则  #可以匹配多个词  * 只能匹配一个词
 *              test.#  test.one.tow  test.one.q.wqe  /  test.*   test.one test.two
 */
public class Producer {

    public static void main(String[] args) throws Exception{

        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(MqConstant.MQ_TOPIC_JHJ, falsefalsefalse, null);
        LinkedHashMap<String, String> map = new LinkedHashMap<>();
        map.put("test.one","测试一数据");
        map.put("test2.two.one","测试二数据");
        map.put("test.wqe","测试三数据");
        map.put("test4.com.hash.oqp","测试四数据");
        map.put("test5.com.code.oqp","测试五数据");
        map.put("test6.com.code.oqp","测试六数据");
        Iterator<Map.Entry<String, String>> iterator = map.entrySet().iterator();
        while (iterator.hasNext()){
            Map.Entry<String, String> next = iterator.next();
            //第一个参数交换机名字,第二个参数指定rout_key
            channel.basicPublish(MqConstant.MQ_TOPIC_JHJ, next.getKey(), null, next.getValue().getBytes());
        }
        channel.close();
        connection.close();

    }



}


接收者1代码:

package com.lansi.realtynavi.test.topics;

import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @Description 消费者
 * @Date 2021/3/23 13:50
 * @Created by huyao
 */
public class ConsumerBaiDu {

    public static void main(String[] args) throws Exception{

        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(MqConstant.MQ_QUEUE_BAIDU, falsefalsefalse, null);


        //队列绑定交换机   目前交换机需要在rabbit也手动创建,在和spring整合的时候spring会自动帮我们创建
        //参数1:队列名,参数2:交换机名,参数3:路由key(目前用不到,路由模式通配符模式使用)
        channel.queueBind(MqConstant.MQ_QUEUE_BAIDU, MqConstant.MQ_TOPIC_JHJ, "*.*.*.oqp");
        channel.basicQos(1);

        channel.basicConsume(MqConstant.MQ_QUEUE_BAIDU, false, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者百度收到消息:"+new String(body));
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });


    }



}


接收者2代码

package com.lansi.realtynavi.test.topics;

import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @Description 消费者
 * @Date 2021/3/23 13:50
 * @Created by huyao
 */
public class ConsumerXinLang {

    public static void main(String[] args) throws Exception{

        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(MqConstant.MQ_QUEUE_XINLANG, falsefalsefalse, null);


        //队列绑定交换机
        //参数1:队列名,参数2:交换机名,参数3:路由key
        channel.queueBind(MqConstant.MQ_QUEUE_XINLANG, MqConstant.MQ_TOPIC_JHJ, "test.#");
        channel.basicQos(1);

        channel.basicConsume(MqConstant.MQ_QUEUE_XINLANG, false, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者新浪收到消息:"+new String(body));
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });


    }



}



最后就是springboot上整合rabbitmq

需要用到的依赖

  <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

然后配置rabbitmq连接

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=111111
#发送者开启confirm确认机制
spring.rabbitmq.publisher-confirms=true
#发送者开启return确认机制
spring.rabbitmq.publisher-returns=true

#开启ack

spring.rabbitmq.listener.type=simple
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.default-requeue-rejected=false

接下来一个rabbitmq的配置

package com.lansi.realtynavi.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @Description mq的配置
 * @Date 2021/3/24 14:19
 * @Created by huyao
 */
@Configuration
public class RabbitMqConfig {

    //定义交换机的名字
    public static final String EXCHANGE_NAME = "boot_topic_exchange";

    public static final String QUEUE_NAME = "boot_queue";

    //1.声明交换机
    @Bean("bootExchange")
    public Exchange bootExchange(){
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
    }

    //2.声明队列
    @Bean("bootQueue")
    public Queue bootQueue(){
        return QueueBuilder.durable(QUEUE_NAME).build();
    }

    //3.绑定
    @Bean
    public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange") Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
    }
}


接收者

package com.lansi.realtynavi.config;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @Description mq监听/消费者手动签收消息
 * @Date 2021/3/24 14:44
 * @Created by huyao
 *
 *rabbitmq给了两种消息的可靠性  confirm和return
 *
 */
@Component
public class RabbitMqConsumer {


    //可监听分布式其他项目,只要mq连接的地址相同监听的队列名存在即可
    //消费者
    @RabbitListener(queues = "boot_queue")
    public void ListenerQueue(Message message, Channel channel) throws Exception{

        System.out.println("消费者接收到消息:"+new String(message.getBody()));

        
        try{
            //开始业务处理

            System.out.println("开始业务处理");

            //int i = 5/0;

            System.out.println("业务处理完成");
            //业务处理完成确认收到消息  , 第二个参数为true支持多消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);

        }catch (Exception e){
            System.out.println("业务处理异常");
            //业务异常,拒收消息,请求重发    参数三为true则重回队列发送
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), truetrue);
        }


    }

}


这里的生产者我写的一个controller中的列子(错误示范,只能调用一次)

testTopic1 是测试mq的高级特性,这里只用到testTopic就可以

package com.lansi.realtynavi.rabbitmq;

import com.lansi.realtynavi.config.RabbitMqConfig;
import com.lansi.realtynavi.dev.helloWord.HelloSender;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @Description 描述
 * @Date 2021/3/24 13:46
 * @Created by huyao
 */
@RestController
@RequestMapping("api/rabbitMq")
public class RabbitMqController {


    @Autowired
    private HelloSender helloSender;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("helloWorld")
    public void hello(){
        helloSender.send();
    }

    @GetMapping("testTopic")
    public void testTopic(){
        rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME,"boot.hhh""topic的mq.......");
    }

    //mq的可靠性机制,必须要在配置文件中开启
    @GetMapping("testTopic1")
    public void testTopic1(){

        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean b, String s) {
                System.out.println("confirm方法被执行了。。。");
                if(b){
                    System.out.println("交换机确认成功!!");
                } else {
                    System.out.println("交换机确认失败!!");
                }

            }
        });

        //设置交换机处理失败消息的模式,为true的时候,消息打到不了队列时,会将消息重新返回给生产者
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {

            /**
             * @param message 消息对象
             * @param returnCode 错误码
             * @param returnText 错误信息
             * @param exchange 交换机
             * @param routingKey 路由键
             *
             * */
            @Override
            public void returnedMessage(Message message, int returnCode, String returnText, String exchange,String routingKey) {
                System.out.println("return被执行了。。。");
                System.out.println("message:"+new String(message.getBody()));
                System.out.println("错误码:"+returnCode);
                System.out.println("错误信息:"+returnText);
                System.out.println("交换机:"+exchange);
                System.out.println("路由键:"+routingKey);
            }
        });
        rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME,"abc.boot.hhh""topic的mq.......");
    }

}


运行后掉对应的接口,消费者接收



这样rabbitmq就整合进springboot中了

————————————————

版权声明:本文为CSDN博主「oNuoyi」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。

原文链接:

https://blog.csdn.net/qq_41973632/article/details/115233999




锋哥最新SpringCloud分布式电商秒杀课程发布

👇👇👇

👆长按上方微信二维码 2 秒





感谢点赞支持下哈 

浏览 21
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报