消息队列简介及 RabbitMQ 的使用方法
什么是消息队列
消息队列拆开了看,就是消息 + 队列,消息是什么?其实就是程序之间通讯所用到的数据,消息从生产者那里产生,进入队列后,安装设计好的规则出队,由消费者消费。仅此而已。
为什么需要消息队列
消息队列,最重要的是队列,可以想象一下没有队列的场景,你去银行办业务的时候,大家都不排队的场景,大家都堆在一起,个子小没力气的根本办不了业务。
如果没有消息队列,你的系统将严重耦合,在升级维护的时候牵一发而动全身。
如果没有消息队列,你的系统的很多功能都是同步的,同步意味着前面的事件完成后,才可以进行后续的操作,前端用户的会觉得卡顿,体验很差。
如果没有消息队列,web 系统突然面对高并发的访问请求,可能会崩溃。
有了消息队列,系统解耦、异步通信、流量削峰、延迟通知、最终一致性保证、顺序消息、流式处理等需求都可以轻松解决。
常见的消息队列
比较常见的消息队列产品主要有 ActiveMQ、RabbitMQ、ZeroMQ、Kafka、RocketMQ 等。
ActiveMQ
Apache ActiveMQ 是 Apache 软件基金会所研发的开放源码消息中间件;由于 ActiveMQ 是一个纯Java程序,因此只需要操作系统支持 Java 虚拟机,ActiveMQ 便可执行。
支持Java消息服务 (JMS) 1.1 版本 Spring Framework 集群 (Clustering) 协议支持包括:OpenWire、REST、STOMP、WS-Notification、MQTT、XMPP 以及 AMQP
RabbitMQ
RabbitMQ 实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ 服务器是用高性能、健壮以及可伸缩性出名的 Erlang 语言编写的,支持所有主流的操作系统如 Linux,Windows,MacOS。客户端支持所有主要的编程语言。
可伸缩性:集群服务 消息持久化:从内存持久化消息到硬盘,再从硬盘加载到内存
ZeroMQZeroMQ(也拼写作 0MQ 或 ZMQ )是一个为可伸缩的分布式或并发应用程序设计的高性能异步消息库。它提供一个消息队列, 但是与面向消息的中间件不同,ZeroMQ 的运行不需要专门的消息代理(message broker)。该库设计成常见的套接字风格的API。
ZeroMQ 是由 iMatix 公司和大量贡献者组成的社群共同开发的。ZeroQ 通过许多第三方软件支持大部分流行的编程语言,从 Java 和Python 到 Erlang 和 Haskell。
Kafka
Kafka 是由 Apache 软件基金会开发的一个开源流处理平台,由 Scala 和 Java 编写。该项目的目标是为处理实时数据提供一个统一、高吞吐、低延迟的平台。其持久化层本质上是一个“按照分布式事务日志架构的大规模发布/订阅消息队列”,这使它作为企业级基础设施来处理流式数据非常有价值。此外,Kafka 可以通过 Kafka Connect 连接到外部系统(用于数据输入/输出),并提供了 Kafka Streams 的流式处理库。该设计受事务日志的影响较大。
RocketMQ
RocketMQ 是一个分布式消息和流数据平台,具有低延迟、高性能、高可靠性、万亿级容量和灵活的可扩展性。RocketMQ 是 2012 年阿里巴巴开源的第三代分布式消息中间件,2016 年 11 月 21 日,阿里巴巴向 Apache 软件基金会捐赠了 RocketMQ;第二年 2 月 20 日,Apache 软件基金会宣布 Apache RocketMQ 成为顶级项目。
消息队列的选型需要根据具体应用需求而定,ZeroMQ 小而美,RabbitMQ 大而稳,Kakfa 和 RocketMQ 快而强劲。
RabbitMQ 的部署和使用
推荐 Docker 部署,在安装 Docker 的环境下,执行:
docker run -d --hostname my-rabbit -p 15672:15672 -p 5672:5672 -- name rabbit-server -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password rabbitmq:3-management
现在在浏览器中打开 localhost:15672 。用户名是 user ,密码是 password
接下来,让我们创建一个队列,名字叫 my_app
创建一个 Exchange,名字叫 my_exchange
点击 Exchange 标签页,点击 my_exchange 进入详情页, 将 my_exchange 和 my_app 绑定,路由密钥设置为 test:
Python 编写生产者
现在可以使用 Python 编写生产者,来生产一条消息放入队列,并观察 Web 页面的变动情况:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/', pika.PlainCredentials('user', 'password')))
channel = connection.channel()
channel.basic_publish(exchange='my_exchange', routing_key='test', body='Test!')
connection.close()
执行上面的代码,即可将消息放入队列,这里我执行了四次,可以看到有四条消息:
消息将保留在队列中,直到消费者把它取出,接下来我们写一个消费消息的程序。
Python 编写消费者
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/', pika.PlainCredentials("user", "password")))
channel = connection.channel()
def callback(ch, method, properties, body):
print(f'{body} is received')
channel.basic_consume(queue="my_app", on_message_callback=callback, auto_ack=True)
channel.start_consuming()
执行:
此时,队列已经空了:
这段代码最低限度地演示了如何将消息发布到 RabbitMQ 中,更多用法还请移步到官方文档。
最后的话
消息队列可以进行系统模块之间的解耦,但自己就成了关键节点,在集群部署和故障转移方面,需要系统管理员的大量关注。本文简要介绍了什么是消息队列,为什么需要消息队列,常见的消息队列有哪些,RabbitMQ 的部署和使用,如果对你有所帮助,请点赞支持,欢迎留言讨论。