Gateway绑定MQTT实现发布订阅

SegmentFault

共 17643字,需浏览 36分钟

 ·

2021-01-06 18:42

作者:isWulongbo
来源:SegmentFault 思否社区



前言


实现MQTT协议的中间件有很多,本文使用的是企业级 EMQX EnterPrise,不了解的小伙伴可以翻阅之前的博客。这里,主要介绍SpringBoot2.0集成MQTT实现消息推送的功能。




创建项目


创建父工程


打开 idea 点击 File>New>Project 选择Spring Initializr >JDK版本>Next 并按下图创建项目




点击 next ,开发者工具 Developer Tools我们勾选前两个,Web 我们勾选第一个,安全框架和SQL这里暂时不需要勾选,Messaging中间件,我们同样勾选第一个就好,Cloud组件我们也不用勾选。



依次点击 next finish创建好项目



删除 src ,.gitignore,HELP.md,mvnw和mvnw.cmd 目录,本文采用Gateway绑定的方式,需要引入以下依赖:



    org.springframework.integration
    spring-integration-stream



    org.springframework.integration
    spring-integration-mqtt


父工程pom文件:


"1.0" encoding="UTF-8"?>
"http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    4.0.0
    pom
    
        springboot_emqx_common
        springboot_emqx_publish
        springboot_emqx_subscribe
    

    
        org.springframework.boot
        spring-boot-starter-parent
        2.4.1
         
    

    com.baba.wlb
    springboot_emqx
    1.0-SNAPSHOT
    springboot_emqx
    Demo project for Spring Boot

    
        1.8
    


    
        
            org.springframework.boot
            spring-boot-starter-integration
        


        
            org.springframework.integration
            spring-integration-stream
        


        
            org.springframework.integration
            spring-integration-mqtt
        


        
            org.springframework.boot
            spring-boot-starter-web
        


        
            org.springframework.boot
            spring-boot-devtools
            runtime
            true
        

        
            org.projectlombok
            lombok
            true
        

        
            org.springframework.boot
            spring-boot-starter-test
            test
        

        
            org.springframework.integration
            spring-integration-test
            test
        

    


    
        
            
                org.springframework.boot
                spring-boot-maven-plugin








                
                    com.baba.wlb.publish.PublishApplication
                

            

        

    




创建子工程


在父工程中点击New>>module>Next 分别创建三个子工程:


springboot_emqx_common
springboot_emqx_publish
springboot_emqx_subscribe



springboot_emqx_common

在该模块下新建如下package包


注:(config包下暂时没放公共配置,因为我试过好久,发现丢进来的配置只有主类'mainClass'才能加载到,其他模块加载不到通用配置,不清楚是不是我漏了什么注解,望了解这部分的人多多指教!所以只好拆分配置到各个模块中了)



系统常量:Constants.java


package com.baba.wlb.share.common;

/**
 * @Author wulongbo
 * @Date 2020/12/29 13:50
 * @Version 1.0
 */

/**
 * 系统常量
 */
public class Constants {

    public static final String MQTT_PUBLISH_CHANNEL = "mqttPublishChannel";
    public static final String MQTT_SUBSCRIBE_CHANNEL = "mqttSubscribeChannel";

}


Emqx配置类:EmqxMqttProperties.java


package com.baba.wlb.share.properties;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

/**
 * @Author wulongbo
 * @Date 2020/12/29 11:33
 * @Version 1.0
 */

/**
 * 配置文件
 */

@Data
@Component
@ConfigurationProperties("wulongbo.mqtt.emqx")
public class EmqxMqttProperties {
    private String username;
    private String password;
    private String hostUrl;
    private String clientId;
    private String defaultTopic;
    private Integer timeout;
    private Integer keepAlive;
    private Integer qos;
    private Integer version;
}


在 resource资源目录下新建一个 application-common.yml的yml文件。


注:方法一:以application-*.yml的形式命名。方法二:模块之间并不用写依赖配置,直接在common模块的resource目录,添加一个config文件夹,在里面创建application.yml文件即可


官网是这么介绍的(附上官网地址)

https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/#boot-features-external-config-application-property-files


这里选择第一种方式。


yml配置文件:application-common.yml


wulongbo:
  mqtt:
    emqx:
      username: admin
      password: public
      #tcp://ip:port
      host-url: tcp://39.102.56.91:1883
      client-id: wulongbo${random.value}
      default-topic: wulongbo_topic
      #      default-topic: $SYS/brokers/+/clients/#
      timeout: 60
      keep-alive: 60
      # qos:{0:至多一次的传输 /1:至少分发一次,可重复 /2:只分发一次,不可重复}
      qos: 1
      version: 4


注:我自身的EMQX 是启用了Mysql认证登录的,并且关闭了匿名登录的哈,所以需要正确的用户名和密码


springboot_emqx_publish

在该模块下新建如下package包



config类:EmqxMqttConfig.java


package com.baba.wlb.publish.config;

/**
 * @Author wulongbo
 * @Date 2020/12/29 11:38
 * @Version 1.0
 */

import com.baba.wlb.share.common.Constants;
import com.baba.wlb.share.properties.EmqxMqttProperties;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

import javax.annotation.Resource;

/**
 * EMQX配置工具类
 */

@Configuration
@IntegrationComponentScan //消息扫描件
@Slf4j
public class EmqxMqttConfig {

    @Resource
    private EmqxMqttProperties emqxMqttProperties;

    /**
     * MQTT的连接
     */
    @Bean
    public MqttConnectOptions getMqttConnectOptions() {
        // 设置相关的属性
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        mqttConnectOptions.setUserName(emqxMqttProperties.getUsername());
        mqttConnectOptions.setPassword(emqxMqttProperties.getPassword().toCharArray());
        mqttConnectOptions.setServerURIs(new String[]{emqxMqttProperties.getHostUrl()});
        // 心跳
        mqttConnectOptions.setKeepAliveInterval(emqxMqttProperties.getKeepAlive());
        mqttConnectOptions.setMqttVersion(emqxMqttProperties.getVersion());
        mqttConnectOptions.setConnectionTimeout(emqxMqttProperties.getTimeout());
        // 保留/清空曾经连接的客户端信息
        mqttConnectOptions.setCleanSession(false);
        // qos
        String playload = "设备已断开连接";
        // 遗嘱消息
        mqttConnectOptions.setWill("last_topic", playload.getBytes(), emqxMqttProperties.getQos(), false);
        return mqttConnectOptions;
    }

    /**
     * paho factory,mqtt自定义的连接放入factory工厂中
     */
    @Bean
    public MqttPahoClientFactory getMqttPahoClientFactory() {
        DefaultMqttPahoClientFactory defaultMqttPahoClientFactory = new DefaultMqttPahoClientFactory();
        defaultMqttPahoClientFactory.setConnectionOptions(getMqttConnectOptions());
        return defaultMqttPahoClientFactory;
    }

    /**
     * 开启连接通道
     */
    @Bean(name = Constants.MQTT_PUBLISH_CHANNEL)
    public MessageChannel getMqttPublishMessageChannel() {
        DirectChannel directChannel = new DirectChannel();
        return directChannel;
    }


//    /**
//     * 开启连接通道
//     */
//    @Bean(name = Constants.MQTT_SUBSCRIBE_CHANNEL)
//    public MessageChannel getMqttSubscribeMessageChannel() {
//        DirectChannel directChannel = new DirectChannel();
//        return directChannel;
//    }
//
//
//
//    /**
//     * 监听topic.订阅者,消费者
//     */
//    @Bean
//    public MessageProducer inbound() {
//        MqttPahoMessageDrivenChannelAdapter mqttPahoMessageDrivenChannelAdapter = new MqttPahoMessageDrivenChannelAdapter(
//                emqxMqttProperties.getClientId() + "_wlb", getMqttPahoClientFactory(), emqxMqttProperties.getDefaultTopic().split(",")
//        );
//        mqttPahoMessageDrivenChannelAdapter.setDisconnectCompletionTimeout(emqxMqttProperties.getTimeout());
//        mqttPahoMessageDrivenChannelAdapter.setConverter(new DefaultPahoMessageConverter());
//        mqttPahoMessageDrivenChannelAdapter.setQos(emqxMqttProperties.getQos());
//        mqttPahoMessageDrivenChannelAdapter.setOutputChannel(getMqttPublishMessageChannel());
//        return mqttPahoMessageDrivenChannelAdapter;
//    }

    /**
     * 订阅者,消费者
     */
    @Bean
    @ServiceActivator(inputChannel = Constants.MQTT_PUBLISH_CHANNEL)
    public MessageHandler getMessageHandler() {
        MqttPahoMessageHandler mqttPahoMessageHandler = new MqttPahoMessageHandler(emqxMqttProperties.getClientId(),getMqttPahoClientFactory());
        mqttPahoMessageHandler.setAsync(true);
        mqttPahoMessageHandler.setDefaultQos(emqxMqttProperties.getQos());
        mqttPahoMessageHandler.setDefaultTopic(emqxMqttProperties.getDefaultTopic());
        return mqttPahoMessageHandler;
    }

}

controller类:PublishController.java

package com.baba.wlb.publish.controller;


import com.baba.wlb.publish.service.PublishService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @Author wulongbo
 * @Date 2020/12/29 13:58
 * @Version 1.0
 */

/**
 * 发送消息的Controller
 */

@RestController
@RequestMapping("/publish")
public class PublishController {

    /**
     * 注入发布者的service服务
     */
    @Autowired
    private PublishService publishService;

    /**
     * 发送消息
     */
    @RequestMapping("/emqxPublish")
    public String emqxPublish(String data,String topic){
        publishService.sendToMqtt(data,topic);
        return "success";
    }
}

service: PublishService.java

package com.baba.wlb.publish.service;

import com.baba.wlb.share.common.Constants;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

/**
 * @Author wulongbo
 * @Date 2020/12/29 14:00
 * @Version 1.0
 */

@MessagingGateway(defaultRequestChannel = Constants.MQTT_PUBLISH_CHANNEL)
@Component
public interface PublishService {
    void sendToMqtt(String data, @Header(MqttHeaders.TOPIC) String topic);

    void sendToMqtt(String data);

    void sendToMqtt(@Header(MqttHeaders.TOPIC)String topic, int qos, String data);
}


注:必须加@Header(MqttHeaders.TOPIC)注解哈


application启动类:PublishApplication.java


package com.baba.wlb.publish;

import com.baba.wlb.share.properties.EmqxMqttProperties;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;

/**
 * @Author wulongbo
 * @Date 2020/12/29 14:04
 * @Version 1.0
 */

/**
 * emqx 发布者启动程序
 */
@SpringBootApplication
@EnableConfigurationProperties({EmqxMqttProperties.class})
public class PublishApplication {

    public static void main(String[] args) {
        SpringApplication.run(PublishApplication.class,args);
    }
}

注:须加入@EnableConfigurationProperties,才能加载到配置文件

yml文件:application.yml

server:
  port: 1001

#spring:
#  profiles:
#    active: common


注:这里我们因为把publish模块设置成为了主类,所以可引入common yml,也可以不引入


springboot_emqx_subscribe

在该模块下新建如下package包



config类:EmqxMqttConfig.java


package com.baba.wlb.subscribe.config;

/**
 * @Author wulongbo
 * @Date 2020/12/29 11:38
 * @Version 1.0
 */

import com.baba.wlb.share.common.Constants;
import com.baba.wlb.share.properties.EmqxMqttProperties;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

import javax.annotation.Resource;

/**
 * EMQX配置工具类
 */

@Configuration
@IntegrationComponentScan //消息扫描件
@Slf4j
public class EmqxMqttConfig {

    @Resource
    private EmqxMqttProperties emqxMqttProperties;

    /**
     * MQTT的连接
     */
    @Bean
    public MqttConnectOptions getMqttConnectOptions() {
        // 设置相关的属性
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        mqttConnectOptions.setUserName(emqxMqttProperties.getUsername());
        mqttConnectOptions.setPassword(emqxMqttProperties.getPassword().toCharArray());
        mqttConnectOptions.setServerURIs(new String[]{emqxMqttProperties.getHostUrl()});
        // 心跳
        mqttConnectOptions.setKeepAliveInterval(emqxMqttProperties.getKeepAlive());
        mqttConnectOptions.setMqttVersion(emqxMqttProperties.getVersion());
        mqttConnectOptions.setConnectionTimeout(emqxMqttProperties.getTimeout());
        // 保留/清空曾经连接的客户端信息
        mqttConnectOptions.setCleanSession(false);
        // qos
        String playload = "设备已断开连接";
        // 遗嘱消息
        mqttConnectOptions.setWill("last_topic", playload.getBytes(), emqxMqttProperties.getQos(), false);
        return mqttConnectOptions;
    }

    /**
     * paho factory,mqtt自定义的连接放入factory工厂中
     */
    @Bean
    public MqttPahoClientFactory getMqttPahoClientFactory() {
        DefaultMqttPahoClientFactory defaultMqttPahoClientFactory = new DefaultMqttPahoClientFactory();
        defaultMqttPahoClientFactory.setConnectionOptions(getMqttConnectOptions());
        return defaultMqttPahoClientFactory;
    }

//    /**
//     * 开启连接通道
//     */
//    @Bean(name = Constants.MQTT_PUBLISH_CHANNEL)
//    public MessageChannel getMqttPublishMessageChannel() {
//        DirectChannel directChannel = new DirectChannel();
//        return directChannel;
//    }


    /**
     * 开启连接通道
     */
    @Bean(name = Constants.MQTT_SUBSCRIBE_CHANNEL)
    public MessageChannel getMqttSubscribeMessageChannel() {
        DirectChannel directChannel = new DirectChannel();
        return directChannel;
    }



    /**
     * 监听topic.订阅者,消费者
     */
    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter mqttPahoMessageDrivenChannelAdapter = new MqttPahoMessageDrivenChannelAdapter(
                emqxMqttProperties.getClientId() + "_wlb", getMqttPahoClientFactory(), emqxMqttProperties.getDefaultTopic().split(",")
        );
        mqttPahoMessageDrivenChannelAdapter.setDisconnectCompletionTimeout(emqxMqttProperties.getTimeout());
        mqttPahoMessageDrivenChannelAdapter.setConverter(new DefaultPahoMessageConverter());
        mqttPahoMessageDrivenChannelAdapter.setQos(emqxMqttProperties.getQos());
        mqttPahoMessageDrivenChannelAdapter.setOutputChannel(getMqttSubscribeMessageChannel());
        return mqttPahoMessageDrivenChannelAdapter;
    }

    /**
     * 发布者,生产者
     */
    @Bean
    @ServiceActivator(inputChannel = Constants.MQTT_SUBSCRIBE_CHANNEL)
    public MessageHandler getMessageHandler() {
        MqttPahoMessageHandler mqttPahoMessageHandler = new MqttPahoMessageHandler(emqxMqttProperties.getClientId(),getMqttPahoClientFactory());
        mqttPahoMessageHandler.setAsync(true);
        mqttPahoMessageHandler.setDefaultQos(emqxMqttProperties.getQos());
        mqttPahoMessageHandler.setDefaultTopic(emqxMqttProperties.getDefaultTopic());
        return mqttPahoMessageHandler;
    }

}


service业务类:SubscribeService.java

package com.baba.wlb.subscribe.service;

import com.baba.wlb.share.common.Constants;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Service;

/**
 * @Author wulongbo
 * @Date 2020/12/29 14:11
 * @Version 1.0
 */

/**
 * 订阅者
 */
@Service
public class SubscribeService {

    @Bean
    @ServiceActivator(inputChannel = Constants.MQTT_SUBSCRIBE_CHANNEL)
    public MessageHandler messageHandler() {
        MessageHandler messageHandler = new MessageHandler() {
            @Override
            public void handleMessage(Message message) throws MessagingException {
                System.out.println("订阅者订阅消息头是:" + message.getHeaders());
                System.out.println("订阅者订阅消息主体是:" + message.getPayload());
            }
        };
        return messageHandler;
    }
}


注:我们把MessageHandler放入了专门的server做业务处理,其实放config类也是OK的


application启动类:SubscribeApplication.java


package com.baba.wlb.subscribe;

/**
 * @Author wulongbo
 * @Date 2020/12/29 14:16
 * @Version 1.0
 */

import com.baba.wlb.share.properties.EmqxMqttProperties;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;

/**
 * 订阅者启动类
 */
@SpringBootApplication
@EnableConfigurationProperties({EmqxMqttProperties.class})
public class SubscribeApplication {
    public static void main(String[] args) {
        SpringApplication.run(SubscribeApplication.class,args);
    }
}


yml配置文件:application.yml


server:
  port: 1002

spring:
  profiles:
    include: common


注:当然我们上面的publish和subscribe模块都是依赖于common模块的我们需要在各个模块上右击--Open Model Settings



并按下图依次来添加模块之间的依赖关系




最后,我们在分别在 publish和subscribe模块的pom文件中引入common依赖就Ok了

    

        
            com.baba.wlb
            springboot_emqx_common
            1.0-SNAPSHOT
        

    




至此,我们多模块用Gateway绑定的方式就集成好了MQTT消息推送和消息订阅功能。




启动项目


分别启动PublishApplication和 SubscribeApplication


端口分别为:1001,1002





PostMan测试


打开postman:发起Get请求


localhost:1001/publish/emqxPublish?topic=wulongbo_topic&data=我是一条消息


可以看到我们订阅者订阅到了这条消息:



至于service业务模块对消息的处理:具体是根据主题来筛选,还是根据playload来区分,看具体的业务场景和设计需要。当然EMQX 有更解耦的方式就是规则引擎来对各个事件响应动作,也有HTTP API供我们调用,读者灵活运用即可。




点击左下角阅读原文,到 SegmentFault 思否社区 和文章作者展开更多互动和交流。

- END -

浏览 7
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报