Springboot 支持多MQ数据源

共 13847字,需浏览 28分钟

 ·

2021-03-03 11:00

点击上方蓝色字体,选择“标星公众号”

优质文章,第一时间送达

76套java从入门到精通实战课程分享

maven依赖:

config set maxmemory-policy allkeys-lru
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>2.2.5-RELEASE</version>
        </dependency>

MQ配置(配置 normal、soft 两个 MQ 数据源):

config set maxmemory-policy allkeys-lru
spring:
  rabbitmq:
    normal:
      host: 192.168.96.8
      port: 5672
      username: guest
      password: guest
      template:
        retry:
          enabled: true   #失败重试
          initial-interval: 10000ms    #第一次重试间隔时长
          max-interval: 30000ms        #最大重试间隔时长
          multiplier: 2    # 下次重试间隔的倍数 2:重试间隔是上次的2倍
      listener:
        simple:
          acknowledge-mode: manual  # 手动确认ack
    soft:
      host: 192.168.96.6
      port: 5672
      username: guest
      password: guest
      template:
        retry:
          enabled: true   #失败重试
          initial-interval: 10000ms    #第一次重试间隔时长
          max-interval: 30000ms        #最大重试间隔时长
          multiplier: 2    # 下次重试间隔的倍数 2:重试间隔是上次的2倍
      listener:
        simple:
          acknowledge-mode: manual  # 手动确认ack

config配置:

config set maxmemory-policy allkeys-lru
package com.xiaodeai.iot.parser.mq.conf;
 
import com.rabbitmq.client.Channel;
import com.xiaodeai.common.core.constant.LogTypeEnum;
import com.xiaodeai.common.core.utils.LoggerUtil;
import com.xiaodeai.iot.core.constants.IotConstants;
import com.xiaodeai.iot.core.enums.MqQueue;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.Connection;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.context.annotation.Scope;
 
 
/**
 * rabbitmq 多数据源配置类
 **/
@Configuration
public class MultiRabbitMqConfig {
 
    @Bean(name = "normalConnectionFactory")
    @Primary
    public ConnectionFactory normalConnectionFactory(
            @Value("${spring.rabbitmq.normal.host}") String host, @Value("${spring.rabbitmq.normal.port}") int port,
            @Value("${spring.rabbitmq.normal.username}") String username, @Value("${spring.rabbitmq.normal.password}") String password) {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        return connectionFactory;
    }
 
    @Bean(name = "softConnectionFactory")
    public ConnectionFactory softConnectionFactory(
            @Value("${spring.rabbitmq.soft.host}") String host,
            @Value("${spring.rabbitmq.soft.port}") int port,
            @Value("${spring.rabbitmq.soft.username}") String username,
            @Value("${spring.rabbitmq.soft.password}") String password
    ) {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        return connectionFactory;
    }
 
    @Bean(name = "normalRabbitTemplate")
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)//必须是prototype类型
    @Primary
    public RabbitTemplate normalRabbitTemplate(
            @Qualifier("normalConnectionFactory") ConnectionFactory connectionFactory
    ) {
        RabbitTemplate normalRabbitTemplate = new RabbitTemplate(connectionFactory);
        return normalRabbitTemplate;
    }
 
    @Bean(name = "softRabbitTemplate")
    public RabbitTemplate softRabbitTemplate(
            @Qualifier("softConnectionFactory") ConnectionFactory connectionFactory
    ) {
        RabbitTemplate softRabbitTemplate = new RabbitTemplate(connectionFactory);
        return softRabbitTemplate;
    }
 
    @Bean(name = "normalFactory")
    public SimpleRabbitListenerContainerFactory normalFactory(
            SimpleRabbitListenerContainerFactoryConfigurer configurer,
            @Qualifier("normalConnectionFactory") ConnectionFactory connectionFactory
    ) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        configurer.configure(factory, connectionFactory);
        return factory;
    }
 
    @Bean(name = "softFactory")
    public SimpleRabbitListenerContainerFactory softFactory(
            SimpleRabbitListenerContainerFactoryConfigurer configurer,
            @Qualifier("softConnectionFactory") ConnectionFactory connectionFactory
    ) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        configurer.configure(factory, connectionFactory);
        return factory;
    }
 
    /**  
     * 绑定 queue、exchange、routing
     */
    @Bean
    public Channel runNormalQueue(@Qualifier("normalConnectionFactory") ConnectionFactory connectionFactory) {
        Connection connection = connectionFactory.createConnection();
        Channel channel = connection.createChannel(false);
        try {
            channel.queueDeclare("hello1"truefalsefalse, null);
            channel.queueDeclare("hello2"truefalsefalse, null);
            
            channel.queueBind("hello1""test_exchange""hello1.#");
            channel.queueBind("hello2""test_exchange""hello2.#");
        } catch (Exception e) {
            
        }
 
        return channel;
    }
 
    @Bean
    public Channel runSoftQueue(@Qualifier("softConnectionFactory") ConnectionFactory connectionFactory) {
        Connection connection = connectionFactory.createConnection();
        Channel channel = connection.createChannel(false);
        try {
            channel.queueDeclare("hello3"truefalsefalse, null);
            channel.queueBind("hello3""test_exchange_1""hello3.#");
        } catch (Exception e) {
            e.printStackTrace();
        }
        return channel;
    }}


代码调用:

config set maxmemory-policy allkeys-lru
    @Resource(name = "normalRabbitTemplate")
    private RabbitTemplate normalRabbitTemplate;
 
    @Resource(name = "softRabbitTemplate")
    private RabbitTemplate softRabbitTemplate;
 
    /**
     * rabbitmq 消息推送
     *
     * @param exchange 路由
     * @param routing mq路由
     * @param msg 消息内容
     */
    public void sendMsg(String exchange, String routing, Object msg) {
//        normalRabbitTemplate.convertAndSend(exchange, routing, msg);
        softRabbitTemplate.convertAndSend(exchange, routing, msg);
    }

————————————————

版权声明:本文为CSDN博主「scanklm」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。

原文链接:

https://blog.csdn.net/sxg0205/article/details/114079321





粉丝福利:Java从入门到入土学习路线图

👇👇👇

👆长按上方微信二维码 2 秒


感谢点赞支持下哈 

浏览 94
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报