RabbiMQ 的 6 种模式的基本应用
共 18658字,需浏览 38分钟
·
2021-07-22 12:53
咱们从今天开始进入开源组件的学习,一边学习一边总结一边分享
文章提纲如下:
RabbitMQ 成员组成
RabbitMQ 的六种工作模式编码
RabbitMQ 成员组成
生产者 producer
消费者 consumer
交换机 exchange
用于接受、分配消息
消息 message
队列 queue
用于存储生产者的消息
信道 channel AMQP
消息推送使用的通道
连接 connections
生成者或者消费者与Rabbit 建立的TCP 连接
路由键 routingKey
用于把生成者的数据分配到交换器上
绑定键 BindingKey
用于把交换器的消息绑定到队列上
连接管理器 ConnectionFactory
应用程序与 Rabbit 之间建立连接的管理器,程序代码中使用
RabbitMQ 的六种工作模式编码
single 模式
消息产生者将消息放入队列
消息的消费者监听消息队列,如果队列中有消息就消费掉
目录如下:
.
├── consumer.go
├── go.mod
├── go.sum
├── main.go
└── xmtmq
└── xmtmq.go
复制代码
实际编码如下:
每种模式的编码思路如下:
生产者 / 消费者
连接 RabbitMQ 的 server
初始化连接 connection
初始化通道 channel
初始化交换机 exchange
初始化队列 queue
使用路由key,绑定队列 bind , key
生产消息 / 消费消息 produce , consume
消息xmtmq.go
package xmtmq
import (
"github.com/streadway/amqp"
"log"
)
// single 模式
// 定义 RabbitMQ 的数据结构
// go get github.com/streadway/amqp
type RabbitMQ struct {
conn *amqp.Connection // 连接
channel *amqp.Channel // 通道
QueueName string // 队列名
Exchange string // 交换机
Key string // 路由键
MQUrl string // MQ的虚拟机地址
}
// New 一个 RabbitMQ
func NewRabbitMQ(rbt *RabbitMQ) {
if rbt == nil || rbt.QueueName == "" || rbt.MQUrl == "" {
log.Panic("please check QueueName,Exchange,MQUrl ...")
}
conn, err := amqp.Dial(rbt.MQUrl)
if err != nil {
log.Panicf("amqp.Dial error : %v", err)
}
rbt.conn = conn
channel, err := rbt.conn.Channel()
if err != nil {
log.Panicf("rbt.conn.Channel error : %v", err)
}
rbt.channel = channel
}
func RabbitMQFree(rbt *RabbitMQ){
if rbt == nil{
log.Printf("rbt is nil,free failed")
return
}
rbt.channel.Close()
rbt.conn.Close()
}
func (rbt *RabbitMQ) Init() {
// 申请队列
_, err := rbt.channel.QueueDeclare(
rbt.QueueName, // 队列名
true, // 是否持久化
false, // 是否自动删除
false, // 是否排他
false, // 是否阻塞
nil, // 其他参数
)
if err != nil {
log.Printf("rbt.channel.QueueDeclare error : %v", err)
return
}
}
// 生产消息
func (rbt *RabbitMQ) Produce(data []byte) {
// 向队列中加入数据
err := rbt.channel.Publish(
rbt.Exchange, // 交换机
rbt.QueueName, // 队列名
false, // 若为true,根据自身exchange类型和routekey规则无法找到符合条件的队列会把消息返还给发送者
false, // 若为true,当exchange发送消息到队列后发现队列上没有消费者,则会把消息返还给发送者
amqp.Publishing{
ContentType: "text/plain",
Body: data,
},
)
if err != nil {
log.Printf("rbt.channel.Publish error : %v", err)
return
}
return
}
// 消费消息
func (rbt *RabbitMQ) Consume() {
// 消费数据
msg, err := rbt.channel.Consume(
rbt.QueueName, // 队列名
"xmt", // 消费者的名字
true, // 是否自动应答
false, // 是否排他
false, // 若为true,表示 不能将同一个Conenction中生产者发送的消息传递给这个Connection中 的消费者
false, // 是否阻塞
nil, // 其他属性
)
if err != nil {
log.Printf("rbt.channel.Consume error : %v", err)
return
}
for data := range msg {
log.Printf("received data is %v", string(data.Body))
}
}
复制代码
main.go
package main
import (
"fmt"
"log"
"time"
"xmt/xmtmq"
)
/*
RabbimtMQ single 模式 案例
应用场景:简单消息队列的使用,一个生产者一个消费者
生产消息
*/
func main() {
// 设置日志
log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)
rbt := &xmtmq.RabbitMQ{
QueueName: "xmtqueue",
MQUrl: "amqp://guest:guest@127.0.0.1:5672/xmtmq",
}
xmtmq.NewRabbitMQ(rbt)
var index = 0
for {
// 生产消息
rbt.Produce([]byte(fmt.Sprintf("hello wolrd %d ", index)))
log.Println("发送成功 ", index)
index++
time.Sleep(1 * time.Second)
}
}
复制代码
consumer.go
package main
import (
"log"
"xmt/xmtmq"
)
func main() {
log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)
rbt := &xmtmq.RabbitMQ{
QueueName: "xmtqueue",
MQUrl: "amqp://guest:guest@127.0.0.1:5672/xmtmq",
}
xmtmq.NewRabbitMQ(rbt)
rbt.Consume()
}
复制代码
运行的时候,打开2个终端
终端1:go run main.go
终端2:go run consumer.go
work 模式
多个消费端消费同一个队列中的消息,队列采用轮询的方式将消息是平均发送给消费者,此处的资源是竞争关系
当生产者生产消息的速度大于消费者消费的速度,就要考虑用 work 工作模式,这样能提高处理速度提高负载
work 模式与 single 模式类似, 只是work 模式比 single 模式多了一些消费者
基于single
模式,开一个终端3 :go run consumer.go
publish / subscribe 模式
publish / subscribe
发布订阅模式 , 相对于Work queues模式多了一个交换机,此处的资源是共享的
用于场景
邮件群发
群聊天
广播(广告等)
目录和上述编码保持一致:
xmtmq.go
开始用到交换机 exchange ,fanout 类型
生产端先把消息发送到交换机,再由交换机把消息发送到绑定的队列中,每个绑定的队列都能收到由生产端发送的消息
package xmtmq
import (
"github.com/streadway/amqp"
"log"
)
// publish 模式
// 定义 RabbitMQ 的数据结构
// go get github.com/streadway/amqp
type RabbitMQ struct {
conn *amqp.Connection // 连接
channel *amqp.Channel // 通道
QueueName string // 队列名
Exchange string // 交换机
Key string // 路由键
MQUrl string // MQ的虚拟机地址
}
// New 一个 RabbitMQ
func NewRabbitMQ(rbt *RabbitMQ) {
if rbt == nil || rbt.Exchange == "" || rbt.MQUrl == "" {
log.Panic("please check Exchange,MQUrl ...")
}
conn, err := amqp.Dial(rbt.MQUrl)
if err != nil {
log.Panicf("amqp.Dial error : %v", err)
}
rbt.conn = conn
channel, err := rbt.conn.Channel()
if err != nil {
log.Panicf("rbt.conn.Channel error : %v", err)
}
rbt.channel = channel
}
func RabbitMQFree(rbt *RabbitMQ) {
if rbt == nil {
log.Printf("rbt is nil,free failed")
return
}
rbt.channel.Close()
rbt.conn.Close()
}
func (rbt *RabbitMQ) Init() {
// 1、创建交换机
err := rbt.channel.ExchangeDeclare(
rbt.Exchange, // 交换机
amqp.ExchangeFanout, // 交换机类型
true, // 是否持久化
false, //是否自动删除
false, //true表示这个exchange不可以被client用来推送消息,仅用来进行exchange和exchange之间的绑定
false, // 是否阻塞
nil, // 其他属性
)
if err != nil {
log.Printf("rbt.channel.ExchangeDeclare error : %v", err)
return
}
}
// 生产消息 publish
func (rbt *RabbitMQ) PublishMsg(data []byte) {
// 1、向队列中加入数据
err := rbt.channel.Publish(
rbt.Exchange, // 交换机
"", // 队列名
false, // 若为true,根据自身exchange类型和routekey规则无法找到符合条件的队列会把消息返还给发送者
false, // 若为true,当exchange发送消息到队列后发现队列上没有消费者,则会把消息返还给发送者
amqp.Publishing{
ContentType: "text/plain",
Body: data,
},
)
if err != nil {
log.Printf("rbt.channel.Publish error : %v", err)
return
}
return
}
// 消费消息
func (rbt *RabbitMQ) SubscribeMsg() {
// 1、创建队列
q, err := rbt.channel.QueueDeclare(
"", // 此处我们传入的是空,则是随机产生队列的名称
true,
false,
false,
false,
nil,
)
if err != nil {
log.Printf("rbt.channel.QueueDeclare error : %v", err)
return
}
// 2、绑定队列
err = rbt.channel.QueueBind(
q.Name, // 队列名字
"", // 在publish模式下,这里key 为空
rbt.Exchange, // 交换机名称
false, // 是否阻塞
nil, // 其他属性
)
if err != nil {
log.Printf("rbt.channel.QueueBind error : %v", err)
return
}
// 3、消费数据
msg, err := rbt.channel.Consume(
q.Name, // 队列名
"xmt", // 消费者的名字
true, // 是否自动应答
false, // 是否排他
false, // 若为true,表示 不能将同一个Conenction中生产者发送的消息传递给这个Connection中 的消费者
false, // 是否阻塞
nil, // 其他属性
)
if err != nil {
log.Printf("rbt.channel.Consume error : %v", err)
return
}
for data := range msg {
log.Printf("received data is %v", string(data.Body))
}
}
复制代码
main.go
package main
import (
"fmt"
"log"
"time"
"xmt/xmtmq"
)
/*
RabbimtMQ publish 模式 案例
应用场景:邮件群发,群聊天,广播(广告)
生产消息
*/
func main() {
log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)
rbt := &xmtmq.RabbitMQ{
Exchange: "xmtPubEx",
MQUrl: "amqp://guest:guest@127.0.0.1:5672/xmtmq",
}
xmtmq.NewRabbitMQ(rbt)
rbt.Init()
var index = 0
for {
rbt.PublishMsg([]byte(fmt.Sprintf("hello wolrd %d ", index)))
log.Println("发送成功 ", index)
index++
time.Sleep(1 * time.Second)
}
xmtmq.RabbitMQFree(rbt)
}
复制代码
consumer.go
package main
import (
"log"
"xmt/xmtmq"
)
func main() {
log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)
rbt := &xmtmq.RabbitMQ{
Exchange: "xmtPubEx",
MQUrl: "amqp://guest:guest@127.0.0.1:5672/xmtmq",
}
xmtmq.NewRabbitMQ(rbt)
rbt.SubscribeMsg()
xmtmq.RabbitMQFree(rbt)
}
复制代码
执行的操作和上述保持一致
终端1:go run main.go
终端2:go run consumer.go
终端3:go run consumer.go
效果和上述single
模式和 work
模式的明显区别是:发布订阅模式的案例,生产者生产的消息,对应的消费者消费其生产的内容
routing 模式
消息生产者将消息发送给交换机按照路由判断,路由是字符串 当前产生的消息携带路由字符(对象的方法),交换机根据路由的key,只能匹配上路由key对应的消息队列,对应的消费者才能消费消息
**应用场景:**从系统的代码逻辑中获取对应的功能字符串,将消息任务扔到对应的队列中业务场景,例如处理错误,处理特定消息等
生产者处理流程:
声明队列并声明交换机 -> 创建连接 -> 创建通道 -> 通道声明交换机 -> 通道声明队列 -> 通过通道使队列绑定到交换机并指定该队列的routingkey(通配符) -> 制定消息 -> 发送消息并指定routingkey(通配符)
复制代码
消费者处理流程:
声明队列并声明交换机 -> 创建连接 -> 创建通道 -> 通道声明交换机 -> 通道声明队列 -> 通过通道使队列绑定到交换机并指定routingkey(通配符) -> 重写消息消费方法 -> 执行消息方法
复制代码
目录结构如下:
.
├── consumer2.go
├── consumer.go
├── go.mod
├── go.sum
├── main.go
└── xmtmq
└── xmtmq.go
复制代码
xmtmq.go
用到交换机 为
direct
类型用到路由键
package xmtmq
import (
"github.com/streadway/amqp"
"log"
)
// routing 模式
// 定义 RabbitMQ 的数据结构
// go get github.com/streadway/amqp
type RabbitMQ struct {
conn *amqp.Connection // 连接
channel *amqp.Channel // 通道
QueueName string // 队列名
Exchange string // 交换机
Key string // 路由键
MQUrl string // MQ的虚拟机地址
}
// New 一个 RabbitMQ
func NewRabbitMQ(rbt *RabbitMQ) {
if rbt == nil || rbt.Exchange == "" || rbt.QueueName == "" || rbt.Key == "" || rbt.MQUrl == "" {
log.Panic("please check Exchange,,QueueName,Key,MQUrl ...")
}
conn, err := amqp.Dial(rbt.MQUrl)
if err != nil {
log.Panicf("amqp.Dial error : %v", err)
}
rbt.conn = conn
channel, err := rbt.conn.Channel()
if err != nil {
log.Panicf("rbt.conn.Channel error : %v", err)
}
rbt.channel = channel
}
func RabbitMQFree(rbt *RabbitMQ) {
if rbt == nil {
log.Printf("rbt is nil,free failed")
return
}
rbt.channel.Close()
rbt.conn.Close()
}
func (rbt *RabbitMQ) Init() {
// 1、创建交换机
err := rbt.channel.ExchangeDeclare(
rbt.Exchange, // 交换机
amqp.ExchangeDirect, // 交换机类型
true, // 是否持久化
false, //是否自动删除
false, //true表示这个exchange不可以被client用来推送消息,仅用来进行exchange和exchange之间的绑定
false, // 是否阻塞
nil, // 其他属性
)
if err != nil {
log.Printf("rbt.channel.ExchangeDeclare error : %v", err)
return
}
// 2、创建队列
_, err = rbt.channel.QueueDeclare(
rbt.QueueName, // 此处我们传入的是空,则是随机产生队列的名称
true,
false,
false,
false,
nil,
)
if err != nil {
log.Printf("rbt.channel.QueueDeclare error : %v", err)
return
}
// 3、绑定队列
err = rbt.channel.QueueBind(
rbt.QueueName, // 队列名字
rbt.Key, // routing,这里key 需要填
rbt.Exchange, // 交换机名称
false, // 是否阻塞
nil, // 其他属性
)
if err != nil {
log.Printf("rbt.channel.QueueBind error : %v", err)
return
}
}
// 生产消息 publish
func (rbt *RabbitMQ) ProduceRouting(data []byte) {
// 1、向队列中加入数据
err := rbt.channel.Publish(
rbt.Exchange, // 交换机
rbt.Key, // key
false, // 若为true,根据自身exchange类型和routekey规则无法找到符合条件的队列会把消息返还给发送者
false, // 若为true,当exchange发送消息到队列后发现队列上没有消费者,则会把消息返还给发送者
amqp.Publishing{
ContentType: "text/plain",
Body: data,
},
)
if err != nil {
log.Printf("rbt.channel.Publish error : %v", err)
return
}
return
}
// 消费消息
func (rbt *RabbitMQ) ConsumeRoutingMsg() {
// 4、消费数据
msg, err := rbt.channel.Consume(
rbt.QueueName, // 队列名
"", // 消费者的名字
true, // 是否自动应答
false, // 是否排他
false, // 若为true,表示 不能将同一个Conenction中生产者发送的消息传递给这个Connection中 的消费者
false, // 是否阻塞
nil, // 其他属性
)
if err != nil {
log.Printf("rbt.channel.Consume error : %v", err)
return
}
for data := range msg {
log.Printf("received data is %v", string(data.Body))
}
}
复制代码
main.go
package main
import (
"fmt"
"log"
"time"
"xmt/xmtmq"
)
/*
RabbimtMQ routing 模式 案例
应用场景:从系统的代码逻辑中获取对应的功能字符串,将消息任务扔到对应的队列中业务场景,例如处理错误,处理特定消息等
生产消息
*/
func main() {
log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)
rbt1 := &xmtmq.RabbitMQ{
Exchange: "xmtPubEx2",
Key: "xmt1",
QueueName: "Routingqueuexmt1",
MQUrl: "amqp://guest:guest@127.0.0.1:5672/xmtmq",
}
xmtmq.NewRabbitMQ(rbt1)
rbt1.Init()
rbt2 := &xmtmq.RabbitMQ{
Exchange: "xmtPubEx2",
Key: "xmt2",
QueueName: "Routingqueuexmt2",
MQUrl: "amqp://guest:guest@127.0.0.1:5672/xmtmq",
}
xmtmq.NewRabbitMQ(rbt2)
rbt2.Init()
var index = 0
for {
rbt1.ProduceRouting([]byte(fmt.Sprintf("hello wolrd xmt1 %d ", index)))
log.Println("发送成功xmt1 ", index)
rbt2.ProduceRouting([]byte(fmt.Sprintf("hello wolrd xmt2 %d ", index)))
log.Println("发送成功xmt2 ", index)
index++
time.Sleep(1 * time.Second)
}
xmtmq.RabbitMQFree(rbt1)
xmtmq.RabbitMQFree(rbt2)
}
复制代码
consumer.go
package main
import (
"log"
"xmt/xmtmq"
)
func main() {
log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)
rbt := &xmtmq.RabbitMQ{
Exchange: "xmtPubEx2",
Key: "xmt1",
QueueName: "Routingqueuexmt1",
MQUrl: "amqp://guest:guest@127.0.0.1:5672/xmtmq",
}
xmtmq.NewRabbitMQ(rbt)
rbt.ConsumeRoutingMsg()
xmtmq.RabbitMQFree(rbt)
}
复制代码
consumer2.go
package main
import (
"log"
"xmt/xmtmq"
)
func main() {
log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)
rbt := &xmtmq.RabbitMQ{
Exchange: "xmtPubEx2",
Key: "xmt2",
QueueName: "Routingqueuexmt2",
MQUrl: "amqp://guest:guest@127.0.0.1:5672/xmtmq",
}
xmtmq.NewRabbitMQ(rbt)
rbt.ConsumeRoutingMsg()
xmtmq.RabbitMQFree(rbt)
}
复制代码
topic 模式
话题模式,一个消息被多个消费者获取,消息的目标 queue 可用 BindingKey
的通配符
Topics 模式实际上是路由模式的一种
他俩的最大的区别是 :Topics 模式发送消息和消费消息的时候是通过通配符去进行匹配的
*号代表可以同通配一个单词
#号代表可以通配零个或多个单词
编码的案例与上述 routing 模式保持一直,只是 exchange 为 topic
类型
如下是上述几种模式涉及到的交换机
和队列
rpc 模式
RPC
远程过程调用,客户端远程调用服务端的方法 ,使用 MQ
可以实现 RPC
的异步调用
目录结构为:
.
├── consumer.go
├── go.mod
├── go.sum
├── main.go
└── xmtmq
└── xmtmq.go
复制代码
客户端即是生产者也是消费者,向
RPC
请求队列发送RPC
调用消息,同时监听RPC
响应队列服务端监听
RPC
请求队列的消息,收到消息后执行服务端的方法,得到方法返回的结果服务端将
RPC
方法 的结果发送到RPC
响应队列。客户端监听
RPC
响应队列,接收到RPC
调用结果
xmtmq.go
package xmtmq
import (
"github.com/streadway/amqp"
"log"
"math/rand"
)
// rpc 模式
// 定义 RabbitMQ 的数据结构
// go get github.com/streadway/amqp
type RabbitMQ struct {
conn *amqp.Connection // 连接
channel *amqp.Channel // 通道
QueueName string // 队列名
Exchange string // 交换机
Key string // 路由键
MQUrl string // MQ的虚拟机地址
}
// New 一个 RabbitMQ
func NewRabbitMQ(rbt *RabbitMQ) {
if rbt == nil || rbt.QueueName == "" || rbt.MQUrl == "" {
log.Panic("please check QueueName,Exchange,MQUrl ...")
}
conn, err := amqp.Dial(rbt.MQUrl)
if err != nil {
log.Panicf("amqp.Dial error : %v", err)
}
rbt.conn = conn
channel, err := rbt.conn.Channel()
if err != nil {
log.Panicf("rbt.conn.Channel error : %v", err)
}
rbt.channel = channel
}
func RabbitMQFree(rbt *RabbitMQ) {
if rbt == nil {
log.Printf("rbt is nil,free failed")
return
}
rbt.channel.Close()
rbt.conn.Close()
}
// 生产消息
func (rbt *RabbitMQ) Produce(data []byte) {
// 申请队列
q, err := rbt.channel.QueueDeclare(
rbt.QueueName, // 队列名
true, // 是否持久化
false, // 是否自动删除
false, // 是否排他
false, // 是否阻塞
nil, // 其他参数
)
if err != nil {
log.Printf("rbt.channel.QueueDeclare error : %v", err)
return
}
err = rbt.channel.Qos(1, 0, false)
if err != nil {
log.Printf("rbt.channel.Qos error : %v", err)
return
}
d, err := rbt.channel.Consume(
q.Name,
"",
false,
false,
false,
false,
nil)
if err != nil {
log.Printf("rbt.channel.Consume error : %v", err)
return
}
for msg := range d {
log.Println("received msg is ", string(msg.Body))
err := rbt.channel.Publish(
"",
msg.ReplyTo,
false,
false,
amqp.Publishing{
ContentType: "test/plain",
CorrelationId: msg.CorrelationId,
Body: data,
})
if err != nil {
log.Printf("rbt.channel.Publish error : %v", err)
return
}
msg.Ack(false)
log.Println("svr response ok ")
}
return
}
func randomString(l int) string {
bytes := make([]byte, l)
for i := 0; i < l; i++ {
bytes[i] = byte(rand.Intn(l))
}
return string(bytes)
}
// 消费消息
func (rbt *RabbitMQ) Consume() {
// 申请队列
q, err := rbt.channel.QueueDeclare(
"", // 队列名
true, // 是否持久化
false, // 是否自动删除
false, // 是否排他
false, // 是否阻塞
nil, // 其他参数
)
if err != nil {
log.Printf("rbt.channel.QueueDeclare error : %v", err)
return
}
// 消费数据
msg, err := rbt.channel.Consume(
q.Name, // 队列名
"xmt", // 消费者的名字
true, // 是否自动应答
false, // 是否排他
false, // 若为true,表示 不能将同一个Conenction中生产者发送的消息传递给这个Connection中 的消费者
false, // 是否阻塞
nil, // 其他属性
)
if err != nil {
log.Printf("rbt.channel.Consume error : %v", err)
return
}
id := randomString(32)
err = rbt.channel.Publish(
"",
rbt.QueueName,
false,
false,
amqp.Publishing{
ContentType: "test/plain",
CorrelationId: id,
ReplyTo: q.Name,
Body: []byte("321"),
})
if err != nil {
log.Printf("rbt.channel.Publish error : %v", err)
return
}
for data := range msg {
log.Printf("received data is %v", string(data.Body))
}
}
复制代码
main.go
package main
import (
"fmt"
"log"
"xmt/xmtmq"
)
/*
RabbimtMQ rpc 模式 案例
应用场景:简单消息队列的使用,一个生产者一个消费者
生产消息
*/
func main() {
log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)
rbt := &xmtmq.RabbitMQ{
QueueName: "xmtqueue",
MQUrl: "amqp://guest:guest@127.0.0.1:5672/xmtmq",
}
xmtmq.NewRabbitMQ(rbt)
rbt.Produce([]byte(fmt.Sprintf("hello wolrd")))
}
复制代码
consumer.go
package main
import (
"log"
"math/rand"
"time"
"xmt/xmtmq"
)
func main() {
log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)
rand.Seed(time.Now().UTC().UnixNano())
rbt := &xmtmq.RabbitMQ{
QueueName: "xmtqueue",
MQUrl: "amqp://guest:guest@127.0.0.1:5672/xmtmq",
}
xmtmq.NewRabbitMQ(rbt)
rbt.Consume()
}
复制代码
咱们先运行消费者,多运行几个,可以看到咱们的队列中已经有数据了,咱们运行的是2个消费者,因此此处是 2
再运行生产者,就能看到生产者将消费者发送的消息消费掉,并且通过 CorrelationId
找到对应消费者监听的队列,将数据发送到队列中
消费者监听的队列有数据了,消费者就取出来进行消费
总结
RabbitMQ
的六种工作模式:
single 模式
work 模式
publish / subscribe 模式
routing 模式
topic 模式
rpc 模式
参考资料:
RabbitMQ Tutorials
欢迎点赞,关注,收藏
朋友们,你的支持和鼓励,是我坚持分享,提高质量的动力
好了,本次就到这里
技术是开放的,我们的心态,更应是开放的。拥抱变化,向阳而生,努力向前行。
欢迎点赞关注收藏,下次见~
作者:小魔童哪吒
链接:https://juejin.cn/post/6985182716358557733
来源:掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。