RabbitMQ简介以及应用
阅读本文大概需要 7 分钟。
作者:会跳舞的机器人
一、简要介绍
开源AMQP实现,Erlang语言编写,支持多种客户端 分布式、高可用、持久化、可靠、安全 支持多种协议:AMQP、STOMP、MQTT、HTTP 适用于多系统之间的业务解耦的消息中间件
二、基本概念
1、exchange:交换器,负责接收消息,转发消息至绑定的队列,有四种类型:
direct:完全匹配的路由 topic:模式匹配的路由 fanout:广播模式 headers:键值对匹配路由
持久化:如果启用,那么rabbit服务重启之后仍然存在 自动删除:如果启用,那么交换器将会在其绑定的队列都被删除掉之后自动删除掉自身
2、Queue:队列,rabbitmq的内部对象,用于存储消息,其属性类似于Exchange,同样可以设置是否持久化、自动删除等。
3、Binding:绑定,根据路由规则绑定交换器与队列
4、Routing:路由键,路由的关键字
三、消息的可靠性
Message acknowledgment:消息确认,在消息确认机制下,收到回执才会删除消息,未收到回执而断开了连接,消息会转发给其他消费者,如果忘记回执,会导致消息堆积,消费者重启后会重复消费这些消息并重复执行业务逻辑。
Message durability:消息持久化,设置消息持久化可以避免绝大部分消息丢失,比如rabbitmq服务重启,但是采用非持久化可以提升队列的处理效率。如果要确保消息的持久化,那么消息对应的Exchange和Queue同样要设置为持久化。
Prefetch count,每次发送给消费者消息的数量,默认为1
四、简单应用
高性能,它的实现语言是天生具备高并发高可用的erlang 语言 支持消息的持久化,即使服务器挂了,也不会丢失消息 消息应答(ack)机制,消费者消费完消息后发送一个消息应答,rabbitmq才会删除消息,确保消息的可靠性 支持高可用集群 灵活的路由
EXCHANGE.ORDER_CREATE
交换器,该交换器绑定了两个队列,QUEUE.ORDER_INCREASESCORE
、QUEUE.ORDER_NOTIFY
,消费者订阅这两个队列分别用来处理增加积分、发送用户通知。EXCHANGE.ORDER_CREATE
即可。package com.robot.rabbitmq;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
/**
* @author: 会跳舞的机器人
* @date: 2017/10/13 10:46
* @description: 模拟用户下单之后发送rabbitmq消息
*/
public class OrderCreator {
// 交换器名称
private static final String EXCHANGE = "EXCHANGE.ORDER_CREATE";
// 消息内容
private static String msg = "create order success";
/**
* 模拟创建订单后发送mq消息
*/
public void createOrder() {
System.out.println("下单成功,开始发送rabbitmq消息");
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.12.44");
connectionFactory.setPort(56720);
connectionFactory.setUsername("baibei");
connectionFactory.setPassword("baibei");
Connection connection;
Channel channel;
try {
connection = connectionFactory.newConnection();
channel = connection.createChannel();
// 持久化
boolean durable = true;
// topic类型
String type = "topic";
// 声明交换器,如果交换器不存在则创建之
channel.exchangeDeclare(EXCHANGE, type, durable);
String messgeId = UUID.randomUUID().toString();
// deliveryMode>=2表示设置消息持久化
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().deliveryMode(2).messageId(messgeId).build();
// 发布消息
String routingKey = "order_create";
channel.basicPublish(EXCHANGE, routingKey, props, msg.getBytes("utf-8"));
connection.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
package com.robot.rabbitmq;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author: 会跳舞的机器人
* @date: 2017/10/13 16:02
* @description: rabbitmq消费者,模拟下单成功后给用户增加积分
*/
public class IncreaseScoreConsumer implements Consumer {
private Connection connection;
private Channel channel;
// 交换器名称
private static final String EXCHANGE = "EXCHANGE.ORDER_CREATE";
// 增加积分队列名称
private static final String QUEUENAME = "QUEUE.ORDER_INCREASESCORE";
public void consume() {
// 初始化rabbitmq连接信息
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.12.44");
connectionFactory.setPort(56720);
connectionFactory.setUsername("baibei");
connectionFactory.setPassword("baibei");
try {
connection = connectionFactory.newConnection();
channel = connection.createChannel();
// 声明交换器
channel.exchangeDeclare(EXCHANGE, "topic", true);
// 声明队列
channel.queueDeclare(QUEUENAME, true, false, false, null);
// 交换器与队列绑定并设置routingKey
channel.queueBind(QUEUENAME, EXCHANGE, "order_create");
// 消费消息,callback是该类,关闭自动确认消息,在完成业务逻辑后手动确认确认
channel.basicConsume(QUEUENAME, false, this);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("《积分系统》收到订单消息:" + msg + ",给用户增加积分......");
// 手动确认消息
channel.basicAck(envelope.getDeliveryTag(), false);
/**
* channel.basicReject(envelope.getDeliveryTag(), false);该方法会丢弃掉队列中的这条消息
* channel.basicReject(envelope.getDeliveryTag(), true);该方法会把消息重新放回队列
* 一般系统会设定一个重试次数,如果超过重试次数,则会丢弃消息,反之则会把消息再放入队列
*/
}
public void handleConsumeOk(String consumerTag) {
}
public void handleCancelOk(String consumerTag) {
}
public void handleCancel(String consumerTag) throws IOException {
}
public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
}
public void handleRecoverOk(String consumerTag) {
}
}
package com.robot.rabbitmq;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author: 会跳舞的机器人
* @date: 2017/10/13 16:20
* @description: rabbitmq消费者,模拟下单成功后给用户发送通知
*/
public class NotifyConsumer implements Consumer {
private Connection connection;
private Channel channel;
// 交换器名称
private static final String EXCHANGE = "EXCHANGE.ORDER_CREATE";
// 通知用户下单成功通知队列名称
private static final String QUEUENAME = "QUEUE.ORDER_NOTIFY";
public void consume() {
// 初始化rabbitmq连接信息
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.12.44");
connectionFactory.setPort(56720);
connectionFactory.setUsername("baibei");
connectionFactory.setPassword("baibei");
try {
connection = connectionFactory.newConnection();
channel = connection.createChannel();
// 声明交换器
channel.exchangeDeclare(EXCHANGE, "topic", true);
// 声明队列
channel.queueDeclare(QUEUENAME, true, false, false, null);
// 交换器与队列绑定并设置routingKey
channel.queueBind(QUEUENAME, EXCHANGE, "order_create");
// 消费消息,callback是该类,关闭自动确认消息,在完成业务逻辑后手动确认确认
channel.basicConsume(QUEUENAME, false, this);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("《通知系统》收到订单消息:" + msg + ",开始给用户发送通知......");
// 手动确认消息
channel.basicAck(envelope.getDeliveryTag(), false);
/**
* channel.basicReject(envelope.getDeliveryTag(), false);该方法会丢弃掉队列中的这条消息
* channel.basicReject(envelope.getDeliveryTag(), true);该方法会把消息重新放回队列
* 一般系统会设定一个重试次数,如果超过重试次数,则会丢弃消息,反之则会把消息再放入队列
*/
}
public void handleConsumeOk(String consumerTag) {
}
public void handleCancelOk(String consumerTag) {
}
public void handleCancel(String consumerTag) throws IOException {
}
public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
}
public void handleRecoverOk(String consumerTag) {
}
}
package com.robot.rabbitmq;
/**
* @author: 会跳舞的机器人
* @date: 2017/10/13 16:27
* @description:
*/
public class Test {
public static void main(String[] args) {
IncreaseScoreConsumer increaseScoreConsumer = new IncreaseScoreConsumer();
increaseScoreConsumer.consume();
NotifyConsumer notifyConsumer = new NotifyConsumer();
notifyConsumer.consume();
OrderCreator orderCreator = new OrderCreator();
for (int i = 0; i < 3; i++) {
orderCreator.createOrder();
}
}
}
下单成功,开始发送rabbitmq消息
《积分系统》收到订单消息:create order success,给用户增加积分......
《通知系统》收到订单消息:create order success,开始给用户发送通知......
下单成功,开始发送rabbitmq消息
《积分系统》收到订单消息:create order success,给用户增加积分......
《通知系统》收到订单消息:create order success,开始给用户发送通知......
下单成功,开始发送rabbitmq消息
《积分系统》收到订单消息:create order success,给用户增加积分......
《通知系统》收到订单消息:create order success,开始给用户发送通知......
推荐阅读:
微信扫描二维码,关注我的公众号
朕已阅