MQ之-RocketMq系列一

凯哥java

共 4209字,需浏览 9分钟

 ·

2022-08-25 02:13

官方定义:

Apache RocketMQ is a distributed messaging and streaming platform with low latency,  high performance and reliability, trillion-level capacity and flexible  scalability

核心组件

  • 生产者 : 消息的发送者,

  • 消费者 : 消息接收者

  • 生产者组 : 一类生产者

  • 消费者组 :一类消费者

  • NameServer :管理Broker、Topic

  • Broker :存储和收发消息

  • Topic :一类消息的统称

核心流程

  1. 每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer

  2. Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic 服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署

  3. Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,消费者在向Master拉取消息时,Master服务器会根据拉取偏移量与最大偏移量的距离(判断是否读老消息,产生读I/O),以及从服务器是否可读等因素建议下一次是从Master还 是Slave拉取

如图:

高级特性

延迟消息

是指消息发送到broker后,不会立即被消费,等待特定时间投递给真正的topic。broker有配置项messageDelayLevel,默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18个level。可以配置自定义messageDelayLevel。注意,messageDelayLevel是broker的属性,不属于某个topic。发消息时,设置delayLevel等级即可:msg.setDelayLevel(level)。level有以下三种情况:

  • level == 0,消息为非延迟消息

  • 1<=level<=maxLevel,消息延迟特定时间,例如level==1,延迟1s

  • level > maxLevel,则level== maxLevel,例如level==20,延迟2h

定时消息会暂存在名为SCHEDULE_TOPIC_XXXX的topic中,并根据delayTimeLevel存入特定的queue,queueId = delayTimeLevel – 1,即一个queue只存相同延迟的消息,保证具有相同发送延迟的消息能够顺序消费。broker会调度消费SCHEDULE_TOPIC_XXXX,将消息写入真实的topic。需要注意的是,定时消息会在第一次写入和调度写入真实topic时都会计数,因此发送数量、tps都会变高。

查看SCHEDULE_TOPIC_XXXX 主题信息:


顺序消息
全局有序

需要把topic 的队列设置成一个,product 和 consumer 并发数也要是一个。

部分有序

要保证部分消息有序,需要发送端和消费端配合处理。在发送端,要做到把同一业务ID的消息发送到同一个Message Queue;在消费过程中,要做到从同一个Message Queue读取的消息不被并发处理,这样才能达到部分有序。消费端通过使用MessageListenerOrderly类来解决单Message Queue的消息被并发处理的问题


事务消息

消息存储
  1. 提升消息写入CommitLog 的速度至关重要,因为这个部分的性能提升会直接提升Broker处理消息写入的吞吐量,比如你写入一条消息到CommitLog 磁盘文件假设需要10ms,那么每个线程每秒可以处理100个写入消息,假设有100个线程,每秒只能处理1万个写入消息请求。但是如果把写入性能优化为只需要1ms,那么每个线程每秒可以处理1000个消息写入,此时100个线程每秒可以处理10万个写入消息的请求,Broker是基于OS操作系统的PageCache和顺序写两个机制,来提升写入CommitLog文件的性能的。

  1. 消费者从OS Cache 读取ConsumeQueue中的offset在从OS Cache 或 磁盘读取消息,因为OS Cache放不下所有commitLog所以,如果你的消费者机器一直快速的拉取和消费,跟上了product写入broker的消息速率,那么每次拉取几乎都是最近刚写入commitLog的数据,几乎都在OS Cache 里面。

  1. Topic 下的每个MessageQueue 都会有一系列的ConsumeQueue文件,ConsumeQueue 也是基于OS Cache,存储的是一条消息对应在commitLog文件中的offset偏移量,实际上在ConsumeQueue中存储的每条数据不只是消息在CommitLog中的offset偏移量,还包含了消息的长度以及tag hashcode,一条数据是20个字节,每个ConsumeQueue文件保存30万条数据大概每个文件是5.72MB。

如图:



刷盘机制

RocketMQ 的所有消息都是持久化的,先写入系统 PageCache,然后刷盘,可以保证内存与磁盘都有一份数据, 访问时,直接从内存读取。消息在通过Producer写入RocketMQ的时候,有两种写磁盘方式,分布式同步刷盘和异步刷盘。

同步刷盘

同步刷盘与异步刷盘的唯一区别是异步刷盘写完 PageCache直接返回,而同步刷盘需要等待刷盘完成才返回, 同步刷盘流程如下:

(1). 写入 PageCache后,线程等待,通知刷盘线程刷盘。

(2). 刷盘线程刷盘后,唤醒前端等待线程,可能是一批线程。

(3). 前端等待线程向用户返回成功

异步刷盘

Product 发送消息给Broker,Broker将消息写入OS PageCache中就返回ACK给Product,有丢消息的可能。

零拷贝

Netty 扩展

RocketMQ的RPC通信采用Netty组件作为底层通信库,同样也遵循了Reactor多线程模型,同时又在这之上做了一些扩展和优化。

一个Reactor主线程负责监听TCP网络连接请求,建立好连接,创建SocketChannel,并注册到selector上。rocketMq会自动根据OS类型选择NIO和Epoll,然后监听真正的网络数据。拿到网络数据后,再交给Reactor线程池,在真正执行业务逻辑之前需要进行SSL验证、编解码、空闲检查、网络连接管理,这些工作交给Worker线程池去做。处理业务的操作交给业务线程池执行,根据RomotingComman的业务请求码code去processorTable这个本地缓存变量中找到对应的processor,然后封装成task任务后,提交给对应的业务processor处理线程池来执行。


源码环境搭建

  1. 源码拉取 https://github.com/apache/rocketmq 版本选择的是4.5.1

  2. 导入idea

  3. 执行安装 mvn clean install -Dmaven.test.skip=true

  4. 在项目根目录下创建文件夹conf和dataDir,从 distribution项目下 拷贝 broker.conf 和 logback_broker.xml 和logback_namesrv.xml 放入conf 目录下。

  5. 启动NameServer

控制台打印,说明 NameServer 启动成功。

The Name Server boot success. serializeType=JSON
  1. 启动Broker 将conf文件夹下的broker.conf 文件修改配置如下

    brokerClusterName = DefaultCluster
    brokerName = broker-a
    brokerId = 0
    deleteWhen = 04
    fileReservedTime = 48
    brokerRole = ASYNC_MASTER
    flushDiskType = ASYNC_FLUSH
    # namesrvAddr地址
    namesrvAddr=127.0.0.1:9876
    # 启用自动创建主题
    autoCreateTopicEnable=true
    # 存储路径
    storePathRootDir=D:\\study\\source-code\\rocketmq-rocketmq-all-4.5.1\\dataDir
    # commitLog路径
    storePathCommitLog=D:\\study\\source-code\\rocketmq-rocketmq-all-4.5.1\\dataDir\\commitlog
    # 消息队列存储路径
    storePathConsumeQueue=D:\\study\\source-code\\rocketmq-rocketmq-all-4.5.1\\dataDir\\consumequeue
    # 消息索引存储路径
    storePathIndex=D:\\study\\source-code\\rocketmq-rocketmq-all-4.5.1\\dataDir\\index
    # checkpoint文件路径
    storeCheckpoint=D:\\study\\source-code\\rocketmq-rocketmq-all-4.5.1\\dataDir\\checkpoint
    # abort文件存储路径
    abortFile=D:\\study\\source-code\\rocketmq-rocketmq-all-4.5.1\\dataDir\\abort
  1. 配置 broker.conf 和 ROCKETMQ_HOME , 并启动 BrokerStartup 。


浏览 48
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报