RocketMQ这样做,压测后性能提高30%
Hollis
共 3429字,需浏览 7分钟
·
2021-12-14 01:41
从官方这边获悉,RocketMQ在4.9.1版本中对消息发送进行了大量的优化,性能提升十分显著,接下来请跟着我一起来欣赏大神们的杰作。
对WaitNotifyObject的锁进行优化(item2) 移除HAService中的锁(item3) 移除GroupCommitService中的锁(item4) 消除HA中不必要的数组拷贝(item5) 调整消息发送几个参数的默认值(item7) sendMessageThreadPoolNums useReentrantLockWhenPutMessage flushCommitLogTimed endTransactionThreadPoolNums 减少琐的作用范围(item8-12)
移除不必要的锁 降低锁粒度(范围) 修改消息发送相关参数
1、移除不必要的锁
温馨提示:在RocketMQ4.7版本开始对消息发送进行了优化,同步消息发送模型引入了jdk的CompletableFuture实现消息的异步发送。
消息发送线程调用Commitlog的aysncPutMessage方法写入消息。 Commitlog调用submitReplicaRequest方法,将任务提交到GroupTransferService中,并获取一个Future,实现异步编程。值得注意的是这里需要等待,待数据成功写入从节点(内部基于CompletableFuture机制的内部线程池ForkJoin)。 GroupTransferService中对提交的任务依次进行判断,判断对应的请求是否已同步到从节点。 如果已经复制到从节点,则通过Future唤醒,并将结果返回给消息发送端。
为了更加方便大家理解接下来的优化点,首先再总结提炼一下GroupTransferService的设计理念:
首先引入两个List结合,分别命名为读、写链表。 外部调用GroupTransferService的putRequest请求,将存储在写链表中(requestWrite)。 GroupTransferService的run方法从requestRead链表中获取任务,判断这些任务对应的请求的数据是否成功写入到从节点。 每当requestRead中没有数据可读时,两个队列进行交互,从而实现读写分离,降低锁竞争。
更改putRequest的锁类型,用自旋锁替换synchronized 去除doWaitTransfer方法中多余的锁
1.1 使用自旋锁替换synchronized
1.2 去除多余的锁
2、降低锁的范围
3、调整消息发送相关的参数
sendMessageThreadPoolNums Broker端消息发送端线程池数量,该值在4.9.0版本之前默认为1,新版本调整为操作系统的CPU核数,并且不小于4。该参数的调整有利有弊。提高了消息发送的并发度,但同时会导致消息顺序的乱序,其示例图如下同步发送下不会有顺序问题,可放心修改 在顺序消费场景,该参数不建议修改。在实际过程中应该对RocketMQ集群进行治理,顺序消费的场景使用专门集群。 useReentrantLockWhenPutMessage MQ消息写入时对内存加锁使用的锁类型,低版本之前默认为false,表示默认使用自旋锁;新版本使用ReentrantLock。自旋主要的优势是没有线程切换成本,但自旋容易造成CPU的浪费,内存写入大部分情况下是很快,但RocketMQ比较依赖页缓存,如果出现也缓存抖动,带来的CPU浪费是非常不值得,在sendMessageThreadPoolNums设置超过1之后,锁的类型使用ReentrantLock更加稳定。 flushCommitLogTimed 首先我们通过观察源码了解一下该参数的含义: 其主要作用是控制刷盘线程阻塞等待的方式,低版本flushCommitLogTimed为false,默认使用CountDownLatch,而高版本则直接使用Thread.sleep。猜想的原因是刷盘线程比较独立,无需与其他线程进行直接的交互协作,故无需使用CountDownLatch这种专门用来线程协作的“外来和尚”。 endTransactionThreadPoolNums 主要用于设置事务消息线程池的大小。 新版本主要是可通过调整发送线程池来动态调节事务消息的值,这个大家可以根据压测结果动态调整。
有道无术,术可成;有术无道,止于术
欢迎大家关注Java之道公众号
好文章,我在看❤️
评论