程序员都应该知道的常用消息中间件以及RabbitMQ消息中间件

愿天堂没有BUG

共 3055字,需浏览 7分钟

 · 2022-06-20

常用消息中间件

早期使用ActiveMQ作为消息中间件的项目比较多,作为Apache的一个子项目,ActiveMQ支持常用的多种语言:C++、Java、.Net、Python、PHP、Ruby等。

RabbitMQ是一个使用Erlang编写的AMQP(高级消息队列协议)的服务实现。简单来说,它就是一个功能强大的消息队列服务。


Kafka 是 最 早 使 用 Scala 实 现 的 一 个 高 性 能 分 布 式Publish/Subscribe消息队列系统,具有快速持久化、高吞吐、离线堆积等特性,在实时计算、日志采集等场景和大数据领域已经成为业内的标准。

RocketMQ作为一款纯Java、分布式、队列模型的开源消息中间件,参考了优秀的开源消息中间件Kafka,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。目前,在金融公司和以消息可靠性、低延迟为主要诉求的场景下RabbitMQ使用得比较多;而Kafka则在日志、数据流转、大数据传输、高吞吐量等方面更有保障,后面我们还会对这两种典型的消息中间件做进一步介绍。

RabbitMQ消息中间件

RabbitMQ本身支持很多协议:AMQP、XMPP、SMTP、STOMP,也正因如此,它才变得非常重量级,更适合企业级的开发。它的核心思想是生产者不会将消息直接发送给队列,在发送给客户端时消息先在中心队列排队。它对路由、负载均衡、数据持久化都有很好的支持。

RabbitMQ核心组件

RabbitMQ最核心的组件是Exchange和Queue。Exchange和Queue部署在RabbitMQ Broker,而Producer和Consumer部署在应用端。

Virtual Host

Virtual Host(虚拟主机)类似权限控制组,一个Virtual Host里可以有多个Exchange和Queue,权限控制的最小粒度是VirtualHost。

Producer:即数据的发送方(生产者)一般一个Message有两个部分:payload(有效载荷)和label(标签),payload顾名思义就是传输的数据,label是exchange的名字或者说是一个tag,它描述payload,而且RabbitMQ也通过label来决定把Message发给哪个Consumer。

Consumer:即数据的接收方(消费者)如果有多个Consumer同时订阅同一个Queue中的消息,Queue中的消息会被分发给多个Consumer。

Connection与Channel

两者都是RabbitMQ对外提供的API中最基本的对象。Connection是一个TCP的连接,Producer和Consumer都是通过TCP连接到RabbitMQServer的。

一个TCP建立后,客户端会建立一个Channel(通信信道),每个Channel都被指派成唯一的ID。Channel是我们与RabbitMQ打交道的最重要的接口,我们的大部分业务操作是在Channel这个接口中完成的,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息等。

Queue

Queue(队列)是RabbitMQ的内部对象,用来存储信息。应用程序在权限范围内可以自由地创建、共享、使用和消费Queue。Queue提供有限制的先进先出保证,服务器会将某一个Producer发出的同等优先级消息按照它们进入队列的顺序传递给某一个Consumer,Queue可以是持久的、临时的或者自动删除的。Queue可以保存在内存、硬盘或者两种介质的组合中,在Virtual Host范围之内,Queue保存消息,并将消息分发给一个或者多个订阅客户端。

绑定

所谓绑定就是将一个特定的Exchange与一个特定的Queue绑定起来,绑定的关键字是BindingKey。Exchange和Queue的绑定可以是多对多的关系,每个发送给Exchange的消息都有一个叫作RoutingKey的关键字,Exchange要将该消息转发给特定队列,该队列与交换器的BindingKey必须与消息的RoutingKey相匹配。

Exchange(交换器)

Exchange用于转发消息,但是它不会做存储,如果没有Queue绑定到Exchange,它会直接丢弃Producer发送过来的消息。这里有一个比较重要的概念:路由键(RoutingKey)。当消息发送到Exchange的时候,Exchange会将其转发到对应的队列中,至于转发到哪个队列取决于路由键。

Exchange的功能是接收消息并且转发到绑定的队列,Exchange不存储消息,在启用ACK模式后,Exchange找不到队列会返回错误。

Exchange有四种类型:Direct、Fanout、Topic和Headers。

● Direct(默认):直接交换器

这 种 方 式 类 似 于 单 播 , Exchange 会 将 消 息 发 送 给 完 全 匹 配RoutingKey的Queue。下图是直接交换器的工作方式:当某个消息到达交换器X时,如果它的RoutingKey是orange,它将直接被交给Queue1,如果是green,直接被交给Queue2。


● Fanout:广播式交换器

不管消息的RoutingKey是什么,交换器都会将消息转发给所有绑定的Queue。广播式交换器的工作方式如下图所示:发送到交换器X的所有消息都被无条件地发到所有绑定的Queue上。


● Topic:主题交换器

这种方式类似于组播,交换器会将消息转发给和RoutingKey匹配模式相同的所有队列,比如RoutingKey为orange的Message会转发给绑定匹配模式为*.orange*和#.orange.#的队列(*表示是匹配一个任意词组,#表示匹配0个或多个词组),如下图所示。


● Headers:消息体的Header匹配(ignore)

与Routing不同的地方是,Header模式取消了RoutingKey,使用Header中的Key/value(键/值对)进行匹配。

订阅模式与检索模式

RabbitMQ支持两种消息处理模式,一种是订阅模式(Push模式),由Broker主动将消息推送给订阅队列的Consumer;另一种是检索模式(Pull模式),需要Consumer调用channel.basicGet方法主动从队列中拉取消息。

● 订阅模式(Push)

实现一个Consumer,最容易的方式是继承DefaultConsumer类实现订阅模式,重写其中的方法即可,具体使用示例如下:


这里因为关闭了消息自动确认机制,所以我们必须手动在handleDelivery方法中确认消息已经消费处理成功。

● 检索模式(Pull)

使用Channel.basicGet拉取消息,返回的数据类型是GetResponse实例。


同样,由于这里设置了“autoAck=false;”,我们必须手动确认已经成功接收到了消息。

本文给大家讲解的内容是MOM异步通信,常用消息中间件以及RabbitMQ消息中间件

  1. 下篇文章给大家讲解的内容是MOM异步通信,Kafka消息中间件

  2. 觉得文章不错的朋友可以转发此文关注小编;

  3. 感谢大家的支持!

本文就是愿天堂没有BUG给大家分享的内容,大家有收获的话可以分享下,想学习更多的话可以到微信公众号里找我,我等你哦。

浏览 17
点赞
评论
收藏
分享

手机扫一扫分享

举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

举报