MQ之-RocketMq系列一
共 4209字,需浏览 9分钟
·
2022-08-25 02:13
官方定义:
Apache RocketMQ is a distributed messaging and streaming platform with low latency, high performance and reliability, trillion-level capacity and flexible scalability
核心组件
生产者 : 消息的发送者,
消费者 : 消息接收者
生产者组 : 一类生产者
消费者组 :一类消费者
NameServer :管理Broker、Topic
Broker :存储和收发消息
Topic :一类消息的统称
核心流程
每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer
Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic 服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署
Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,消费者在向Master拉取消息时,Master服务器会根据拉取偏移量与最大偏移量的距离(判断是否读老消息,产生读I/O),以及从服务器是否可读等因素建议下一次是从Master还 是Slave拉取
如图:
高级特性
延迟消息
是指消息发送到broker后,不会立即被消费,等待特定时间投递给真正的topic。broker有配置项messageDelayLevel,默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18个level。可以配置自定义messageDelayLevel。注意,messageDelayLevel是broker的属性,不属于某个topic。发消息时,设置delayLevel等级即可:msg.setDelayLevel(level)。level有以下三种情况:
level == 0,消息为非延迟消息
1<=level<=maxLevel,消息延迟特定时间,例如level==1,延迟1s
level > maxLevel,则level== maxLevel,例如level==20,延迟2h
定时消息会暂存在名为SCHEDULE_TOPIC_XXXX的topic中,并根据delayTimeLevel存入特定的queue,queueId = delayTimeLevel – 1,即一个queue只存相同延迟的消息,保证具有相同发送延迟的消息能够顺序消费。broker会调度消费SCHEDULE_TOPIC_XXXX,将消息写入真实的topic。需要注意的是,定时消息会在第一次写入和调度写入真实topic时都会计数,因此发送数量、tps都会变高。
查看SCHEDULE_TOPIC_XXXX 主题信息:
顺序消息
全局有序
需要把topic 的队列设置成一个,product 和 consumer 并发数也要是一个。
部分有序
要保证部分消息有序,需要发送端和消费端配合处理。在发送端,要做到把同一业务ID的消息发送到同一个Message Queue;在消费过程中,要做到从同一个Message Queue读取的消息不被并发处理,这样才能达到部分有序。消费端通过使用MessageListenerOrderly类来解决单Message Queue的消息被并发处理的问题
事务消息
消息存储
提升消息写入CommitLog 的速度至关重要,因为这个部分的性能提升会直接提升Broker处理消息写入的吞吐量,比如你写入一条消息到CommitLog 磁盘文件假设需要10ms,那么每个线程每秒可以处理100个写入消息,假设有100个线程,每秒只能处理1万个写入消息请求。但是如果把写入性能优化为只需要1ms,那么每个线程每秒可以处理1000个消息写入,此时100个线程每秒可以处理10万个写入消息的请求,Broker是基于OS操作系统的PageCache和顺序写两个机制,来提升写入CommitLog文件的性能的。
消费者从OS Cache 读取ConsumeQueue中的offset在从OS Cache 或 磁盘读取消息,因为OS Cache放不下所有commitLog所以,如果你的消费者机器一直快速的拉取和消费,跟上了product写入broker的消息速率,那么每次拉取几乎都是最近刚写入commitLog的数据,几乎都在OS Cache 里面。
Topic 下的每个MessageQueue 都会有一系列的ConsumeQueue文件,ConsumeQueue 也是基于OS Cache,存储的是一条消息对应在commitLog文件中的offset偏移量,实际上在ConsumeQueue中存储的每条数据不只是消息在CommitLog中的offset偏移量,还包含了消息的长度以及tag hashcode,一条数据是20个字节,每个ConsumeQueue文件保存30万条数据大概每个文件是5.72MB。
如图:
刷盘机制
RocketMQ 的所有消息都是持久化的,先写入系统 PageCache,然后刷盘,可以保证内存与磁盘都有一份数据, 访问时,直接从内存读取。消息在通过Producer写入RocketMQ的时候,有两种写磁盘方式,分布式同步刷盘和异步刷盘。
同步刷盘
同步刷盘与异步刷盘的唯一区别是异步刷盘写完 PageCache直接返回,而同步刷盘需要等待刷盘完成才返回, 同步刷盘流程如下:
(1). 写入 PageCache后,线程等待,通知刷盘线程刷盘。
(2). 刷盘线程刷盘后,唤醒前端等待线程,可能是一批线程。
(3). 前端等待线程向用户返回成功
异步刷盘
Product 发送消息给Broker,Broker将消息写入OS PageCache中就返回ACK给Product,有丢消息的可能。
零拷贝
Netty 扩展
RocketMQ的RPC通信采用Netty组件作为底层通信库,同样也遵循了Reactor多线程模型,同时又在这之上做了一些扩展和优化。
一个Reactor主线程负责监听TCP网络连接请求,建立好连接,创建SocketChannel,并注册到selector上。rocketMq会自动根据OS类型选择NIO和Epoll,然后监听真正的网络数据。拿到网络数据后,再交给Reactor线程池,在真正执行业务逻辑之前需要进行SSL验证、编解码、空闲检查、网络连接管理,这些工作交给Worker线程池去做。处理业务的操作交给业务线程池执行,根据RomotingComman的业务请求码code去processorTable这个本地缓存变量中找到对应的processor,然后封装成task任务后,提交给对应的业务processor处理线程池来执行。
源码环境搭建
源码拉取 https://github.com/apache/rocketmq 版本选择的是4.5.1
导入idea
执行安装 mvn clean install -Dmaven.test.skip=true
在项目根目录下创建文件夹conf和dataDir,从 distribution项目下 拷贝 broker.conf 和 logback_broker.xml 和logback_namesrv.xml 放入conf 目录下。
启动NameServer
控制台打印,说明 NameServer 启动成功。
The Name Server boot success. serializeType=JSON
启动Broker 将conf文件夹下的broker.conf 文件修改配置如下
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
# namesrvAddr地址
namesrvAddr=127.0.0.1:9876
# 启用自动创建主题
autoCreateTopicEnable=true
# 存储路径
storePathRootDir=D:\\study\\source-code\\rocketmq-rocketmq-all-4.5.1\\dataDir
# commitLog路径
storePathCommitLog=D:\\study\\source-code\\rocketmq-rocketmq-all-4.5.1\\dataDir\\commitlog
# 消息队列存储路径
storePathConsumeQueue=D:\\study\\source-code\\rocketmq-rocketmq-all-4.5.1\\dataDir\\consumequeue
# 消息索引存储路径
storePathIndex=D:\\study\\source-code\\rocketmq-rocketmq-all-4.5.1\\dataDir\\index
# checkpoint文件路径
storeCheckpoint=D:\\study\\source-code\\rocketmq-rocketmq-all-4.5.1\\dataDir\\checkpoint
# abort文件存储路径
abortFile=D:\\study\\source-code\\rocketmq-rocketmq-all-4.5.1\\dataDir\\abort
配置 broker.conf 和 ROCKETMQ_HOME , 并启动 BrokerStartup 。