SpringCloud项目:消息中间件中获取消息
点击上方 Java学习之道,选择 设为星标
来源
: blog.csdn.net/yt812100/article/details/111874857
作者: 杨桃桃
Part1外部环境搭建
发送消息到MQ和外部环境的搭建见 : Springcloud项目发送消息到RabbitMQ以及环境搭建。
(注:RabbitMQ是安装在虚拟机上的)
Part2依赖注入
本文不仅导入了上文的amqp依赖坐标还有新的netty依赖坐标
Part3编写配置文件(yaml)
和上文一样。不变的是这个。注意端口是5672,路径看rabbitMQ安装在本机还是虚拟机
Part4业务层逻辑分析
首先声明本文的业务逻辑。各位读者可能遇到的业务逻辑不一样,所以写法会有些许不同。但是大致还是一样,本文在这先声明本文在处理消息发送时候的业务逻辑
业务场景:在用户已经关注了粉丝的情况下,RabbitMQ中已经有了用户的消息队列。那么我只需要在作者发布文章的时候或者点赞的时候,将存入进队列的消息立刻发送给已经登录的用户即可。(注:发送消息参考上文:发送消息至MQ)
那么业务层的处理首先需要准备一下六个类:那么接下来就详解每个类的作用。其中业务逻辑复杂的只有监听器类和业务逻辑类
工具类
“ApplicationContextProvider”:返回一些需要的Bean实例以及上下文对象实例(无需改变)
package com.tensquare.notice.config;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
@Component
public class ApplicationContextProvider implements ApplicationContextAware {
/**
* 上下文对象实例
*/
private static ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
/**
* 获取applicationContext
*
* @return
*/
public static ApplicationContext getApplicationContext() {
return applicationContext;
}
/**
* 通过name获取 Bean.
*
* @param name
* @return
*/
public Object getBean(String name) {
return getApplicationContext().getBean(name);
}
/**
* 通过class获取Bean.
*
* @param clazz
* @param <T>
* @return
*/
public <T> T getBean(Class<T> clazz) {
return getApplicationContext().getBean(clazz);
}
/**
* 通过name,以及Clazz返回指定的Bean
*
* @param name
* @param clazz
* @param <T>
* @return
*/
public <T> T getBean(String name, Class<T> clazz) {
return getApplicationContext().getBean(name, clazz);
}
}
Nettt服务类
“NettyServer”:实现NIO的传输模式 --固定写法,配置端口以及协议名即可(端口自定义,无需改变)
package com.tensquare.notice.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
public class NettyServer {
/**
* 启动netty服务,传递一个端口号
*/
public void start(int port){
System.out.println("准备启动Netty......");
//服务器引导程序
ServerBootstrap serverBootstrap = new ServerBootstrap();
//用来处理新的连接
EventLoopGroup boos = new NioEventLoopGroup();
//用来处理业务逻辑(读写)
EventLoopGroup worker = new NioEventLoopGroup();
serverBootstrap.group(boos,worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(Channel ch) throws Exception {
//请求消息解码器
ch.pipeline().addLast(new HttpServerCodec());
//将多个消息转为单一的request或者response对象
ch.pipeline().addLast(new HttpObjectAggregator(65536));
//处理websocket的消息事件(websocket服务器协议处理程序)
ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws"));
//创建自己的webscoket处理器,自己用来编写业务逻辑
MyWebSocketHandler myWebSocketHandler = new MyWebSocketHandler();
ch.pipeline().addLast(myWebSocketHandler);
}
}).bind(port);
}
}
Netty配置类
“NettyConfig”:NettyConfig是Springcloud项目中的一种配置文件。自动加载。所以会自动开启线程 因此需要configuration注解以及Bean注解
package com.tensquare.notice.config;
import com.tensquare.notice.netty.NettyServer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class NettyConfig {
@Bean
public NettyServer createNettyServer(){
NettyServer nettyServer = new NettyServer();
//启动netty服务,使用新的线程启动
new Thread(){
@Override
public void run(){
nettyServer.start(1234);
}
}.start();
return nettyServer;
}
}
消息容器配置类:
“RabbitConfig”类:声明出需要的消息容器,(注:与后续的消息监听器相呼应。名称不建议改变)
package com.tensquare.notice.config;
import com.tensquare.notice.listener.SysNoticeListener;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
//配置类
@Configuration
public class RabbitConfig {
@Bean("sysNoticeContainer")
public SimpleMessageListenerContainer create(ConnectionFactory connectionFactory){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
//使用Channel
container.setExposeListenerChannel(true);
//设置自己编写的监听器
container.setMessageListener(new SysNoticeListener());
return container;
}
}
通讯处理类
**“MyWebSocketHandler”**类:也就是MQ和WebSocket进行交互
一:MyWebSocketHandler是用来进行通讯处理的,也就是MQ和WebSocket进行交互(通讯处理类–核心业务类) 二:MyWebSocketHandler进行业务处理,获取消息数量(业务场景:获取到消息数量即可) 三:MyWebSocketHandler继承SimpleChannelInboundHandler< TextWebSocketFrame>,重写channelRead0(ChannelHandlerContext 这个参数获取连接,TextWebSocketFrame 这个参数获取页面参数
package com.tensquare.notice.netty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.tensquare.entity.Result;
import com.tensquare.entity.StatusCode;
import com.tensquare.notice.config.ApplicationContextProvider;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.HashMap;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
//核心业务类,获取MQ的消息
public class MyWebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
/**
* 创建对象监听器
*/
private static ObjectMapper Mapper = new ObjectMapper();
/**
* 从Spring容器中获取消息监听器容器,处理订阅消息sysNotice
*/
SimpleMessageListenerContainer sysNoticeContainer = (SimpleMessageListenerContainer) ApplicationContextProvider.getApplicationContext().getBean("sysNoticeContainer");
/**
* 从spring容器中获取RabbitTemplate
*
*/
RabbitTemplate rabbitTemplate = ApplicationContextProvider.getApplicationContext().getBean(RabbitTemplate.class);
// @Autowired
// private RabbitTemplate rabbitTemplate;
/**
* 存放WebScoket连接Map,根据用户ID存放
*/
public static ConcurrentHashMap<String, Channel> userChannelMap = new ConcurrentHashMap<>();
/**
*用户请求服务端,执行的方法
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
//约定用户第一次请求携带的数据:{"userid":"1"}
//获取用户请求数据并解析
String json = msg.text();
//解析数据获取用户ID
String userId = Mapper.readTree(json).get("userId").asText();
//第一次请求的时候需要建立WebScoket连接
Channel channel = userChannelMap.get(userId);
if (channel==null){
//获取WebScoket连接
channel = ctx.channel();
//把连接放到容器中
userChannelMap.put(userId,channel);
}
//只用完成新消息的提醒即可,只需要获取消息的数量
//获取RabbitMQ的内容,并且发送给用户
RabbitAdmin rabbitAdmin = new RabbitAdmin(rabbitTemplate);
//拼接捕获队列的名称
String queueName = "article_subscribe_"+userId;
//获取Rabbit的properties容器 (获取rabbit的属性容器)
Properties queueProperties = rabbitAdmin.getQueueProperties(queueName);
//获取消息数量
int noticeCount = 0;
//判断properties是否不为空
if (queueProperties!=null){
//如果不为空,获取消息数量
noticeCount = (int)queueProperties.get("QUEUE_MESSAGE_COUNT");
}
//----------------------------------
//封装返回的数据
HashMap countMap = new HashMap();
countMap.put("sysNoticeCount",noticeCount);
Result result = new Result(true, StatusCode.OK,"查询成功!!",countMap);
//把数据发送给用户
channel.writeAndFlush(new TextWebSocketFrame(Mapper.writeValueAsString(result)));
//把消息从队列里清空,否则MQ消息监听器会再次消费一次
if (noticeCount>0){
rabbitAdmin.purgeQueue(queueName,true);
}
//为用户的消息队列通知注册监听器,便于用户在线的时候,
//一旦有新消息,可以主动推送给用户,不需要用户请求服务器获取数据
sysNoticeContainer.addQueueNames(queueName);
}
}
接下来就是关于这个类的具体解释了。
务必细看。截图都是从刚刚代码中截取的。和我发的源码是一样的
测试参数是自定义的,真实开发环境不会如此 这个其实就是将参数获取到,然后以id为标识将连接存入连接容器的过程 其中有一个Result类可以不用定义,本文是作测试用的所以定义了通过管家获取到消息的数量发送消息的代码那么以上就是关于整个MyWebSocketHandler类的详解。
监听器:
SysNoticeListener类:判断用户是否在线,发送消息
package com.tensquare.notice.listener;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.Channel;
import com.tensquare.entity.Result;
import com.tensquare.entity.StatusCode;
import com.tensquare.notice.netty.MyWebSocketHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import java.util.HashMap;
//消息监听器
public class SysNoticeListener implements ChannelAwareMessageListener {
private static ObjectMapper MAPPER = new ObjectMapper();
@Override
public void onMessage(Message message, Channel channel) throws Exception {
//获取用户id,可以通过队列名称获取
String queueName = message.getMessageProperties().getConsumerQueue();
String userId = queueName.substring(queueName.lastIndexOf("_")+1);
io.netty.channel.Channel wsChannel = MyWebSocketHandler.userChannelMap.get(userId);
//判断用户是否在线
if (wsChannel!=null){
//如果连接不为空,代表用户在线
//封装返回数据
HashMap countMap = new HashMap();
countMap.put("sysNoticeCount",1);
Result result = new Result(true, StatusCode.OK,"查询成功",countMap);
//把数据通过WebScoket连接主动推送给用户
wsChannel.writeAndFlush(new TextWebSocketFrame(MAPPER.writeValueAsString(result)));
}
}
}
这里与RabbitConfig工具类中相对应 具体作用如注释所说。
测试:
这里将一个静态html页面用作测试,加载服务的静态资源里面即可
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1transitional.dtd">
<html xmlns="http://www.w3.org/1999/xhtml">
<head>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
<title>测试 notice 微服务与页面 websocket 交互</title>
</head>
<body>
<h1>
websocket连接服务器获取mq消息测试
</h1>
<form onSubmit="return false;">
<table><tr>
<td><span>服务器地址:</span></td>
<td><input type="text" id="serverUrl" value="ws://127.0.0.1:1234/ws" /></td>
</tr>
<tr>
<td><input type="button" id="action" value="连接服务器" onClick="connect()" /></td>
<td><input type="text" id="connStatus" value="未连接 ......" /></td>
</tr></table>
<br />
<hr color="blue" />
<div>
<div style="width: 50%;float:left;">
<div>
<table><tr>
<td><h3>发送给服务端的消息</h3></td>
<td><input type="button" value="发送" onClick="send(this.form.message.value)" /></td>
</tr></table>
</div>
<div><textarea type="text" name="message" style="width:500px;height:300px;">
{
"userId":"1"
}
</textarea></div>
</div>
<div style="width: 50%;float:left;">
<div><table>
<tr>
<td><h3>服务端返回的应答消息</h3></td>
</tr>
</table></div>
<div><textarea id="responseText" name="responseText" style="width: 500px;height: 300px;" onfocus="this.scrollTop = this.scrollHeight ">
这里显示服务器推送的信息
</textarea></div>
</div>
</div>
</form>
<script type="text/javascript">
var socket;
var connStatus = document.getElementById('connStatus');;
var respText = document.getElementById('responseText');
var actionBtn = document.getElementById('action');
var sysCount = 0;
var userCount = 0;
function connect() {
connStatus.value = "正在连接 ......";
if(!window.WebSocket){
window.WebSocket = window.MozWebSocket;
}
if(window.WebSocket){
socket = new WebSocket("ws://127.0.0.1:1234/ws");
socket.onmessage = function(event){
respText.scrollTop = respText.scrollHeight;
respText.value += "\r\n" + event.data;
var sysData = JSON.parse(event.data).data.sysNoticeCount;
if(sysData){
sysCount = sysCount + parseInt(sysData)
}
var userData = JSON.parse(event.data).data.userNoticeCount;
if(userData){
userCount = userCount + parseInt(sysData)
}
respText.value += "\r\n现在有" + sysCount + "条订阅新消息";
respText.value += "\r\n现在有" + userCount + "条点赞新消息";
respText.scrollTop = respText.scrollHeight;
};
socket.onopen = function(event){
respText.value += "\r\nWebSocket 已连接";
respText.scrollTop = respText.scrollHeight;
connStatus.value = "已连接 O(∩_∩)O";
actionBtn.value = "断开服务器";
actionBtn.onclick =function(){
disconnect();
};
};
socket.onclose = function(event){
respText.value += "\r\n" + "WebSocket 已关闭";
respText.scrollTop = respText.scrollHeight;
connStatus.value = "已断开";
actionBtn.value = "连接服务器";
actionBtn.onclick = function() {
connect();
};
};
} else {
//alert("您的浏览器不支持WebSocket协议!");
connStatus.value = "您的浏览器不支持WebSocket协议!";
}
}
function disconnect() {
socket.close();
}
function send(message){
if(!window.WebSocket){return;}
if(socket.readyState == WebSocket.OPEN){
socket.send(message);
}else{
alert("WebSocket 连接没有建立成功!");
}
}
</script>
</body>
</html>
端口不需要改变。下图为测试结果
可以看到,我多发送2条文章,由于关联了一个粉丝,所以又多了2条消息 而消息中间件中消息总数始终为01,因为都以及发送出去了
- | 更多精彩文章 -
▽加我微信,交个朋友 长按/扫码添加↑↑↑