快手基于 RocketMQ 的在线消息系统建设实践

共 5176字,需浏览 11分钟

 ·

2021-03-04 00:07


作者:黄理,10 多年软件开发和架构经验,热衷于代码和性能优化,开发和参与过多个开源项目。曾在淘宝任业务架构师多年,当前在快手负责在线消息系统建设工作。
 

为什么建设在线消息系统



在引入 RocketMQ 之前,快手已经在大量的使用 Kafka 了,但并非所有情况下 Kafka 都是最合适的,比如以下场景:

  • 业务希望个别消费失败以后可以重试,并且不堵塞后续其它消息的消费。
  • 业务希望消息可以延迟一段时间再投递。
  • 业务需要发送的时候保证数据库操作和消息发送是一致的(也就是事务发送)。
  • 为了排查问题,有的时候业务需要一定的单个消息查询能力。

为了应对以上这类场景,我们需要建设一个主要面向在线业务的消息系统,作为 Kafka 的补充。在考察的一些消息中间件中,RocketMQ 和业务需求匹配度比较高,同时部署结构简单,使用的公司也比较多,于是最后我们就采用了 RocketMQ



部署模式和落地策略



在一个已有的体系内落地一个开源软件,通常大概有两种方式:

 
方式一:在开源软件的基础上做深度修改,很容易实现公司内需要的定制功能。但和社区开源版本分道扬镳,以后如何升级?

方式二:尽量不修改社区版本(或减少不兼容的修改),而是在它的外围或者上层进一步包装来实现公司内部需要的定制功能。

注:上图方式一的图画的比较极端,实际上很多公司是方式一、方式二结合的。
 
我们选择了方式二。最早的时候,我们使用的是 4.5.2 版本,后来社区 4.7 版本大幅减小了同步复制的延迟,正好我们的部署模式就是同步复制,于是就很轻松的升级了 4.7 系列,享受了新版本的红利。
 
在部署集群的时候,还会面临很多部署策略的选择:

  • 大集群 vs 小集群
  • 选择副本数
  • 同步刷盘 vs 异步刷盘
  • 同步复制  vs 异步复制
  • SSD vs 机械硬盘

大集群会有更好的性能弹性,而小集群具有更好的隔离型,此外小集群可以不需要跨可用区 /IDC 部署,所以会有更好的健壮性。我们非常看重稳定性,因此选择了小集群。集群同步复制异步刷盘,首选 SSD。


客户端封装策略



如上所述,我们没有在 RocketMQ 里面做深度修改,所以需要提供一个 SDK 来实现公司内需要的定制功能,这个 SDK 大概是这样的:
 


对外只提供最基本的 API,所有访问必须经过我们提供的接口。简洁的 API 就像冰山的一个角,除了对外的简单接口,下面所有的东西都可以升级更换,而不会破坏兼容性。
 
业务开发起来也很简单,只要需要提供 Topic(全局唯一)和 Group 就可以生产和消费,不用提供环境、NameServer 地址等。SDK 内部会根据 Topic 解析出集群 NameServer 的地址,然后连接相应的集群。生产环境和测试环境环境会解析出不同的地址,从而实现了隔离。
 
上图分为 3 层,第二层是通用的,第三层才对应具体的 MQ 实现,因此,理论上可以更换为其它消息中间件,而客户端程序不需要修改。
 
SDK 内部集成了热变更机制,可以在不重启 Client 的情况下做动态配置,比如下发路由策略(更换集群 NameServer 的地址,或者连接到别的集群去),Client 的线程数、超时时间等。通过 Maven 强制更新机制,可以保证业务使用的 SDK 基本上是最新的。


集群负载均衡 & 机房灾备



所有的 Topic 默认都分配到两个可用区,生产者和消费者会同时连接至少两个独立集群(分布在不同的可用区),如下图:

 
生产者同时连接两个集群,如果可用区 A 出现故障,流量就会自动切换到可用区 B 的集群 2 去。我们开发了一个小组件来实现自适应的集群负载均衡,它包含以下能力:

  • 千万级 OPS

  • 灵活的权重调整策略

  • 健康检查支持/事件通知

  • 并发度控制(自动降低响应慢的服务器的请求数)

  • 资源优先级(类似 Envoy,实现本地机房优先,或是被调服务器很多的时候选取一个子集来调用)

  • 自动优先级管理

  • 增量热变更


实际上它并不仅仅用于消息生产者,而是一个通用的主调方负载均衡类库,可以在 Github 上找到:https://github.com/PhantomThief/simple-failover-java。

核心的 SimpleFailover 接口和 PriorityFailover 类没有传递第三方依赖,非常容易整合。


多样的消息功能



延迟消息


延迟消息是非常重要的业务功能,不过 RocketMQ 内置的延迟消息只能支持几个固定的延迟级别,所以我们又开发了单独的 Delay Server 来调度延迟消息:


上图这个结构没有直接将延迟消息发到 Delay Server,而是更换 Topic 以后存入 RocketMQ。这样的好处是可以复用现有的消息发送接口(以及上面的所有扩展能力)。对业务来说,只需要在构造消息的时候额外指定一个延迟时间字段即可,其它用法都不变。


事务消息


RocketMQ 4.3 版本以后支持了事务消息,可以保证本地事务和消费发送同时成功或者失败,对于一些业务场景很有帮助。事务消息的用法和原理有很多资料,这里就不细述了。但关于事务消息的实践网上资料较少,我们可以给出一些建议。
 
首先,事务消息功能一直在不断完善,应该使用最新的版本,至少是 4.6.1 以后的版本,可以避免很多问题。
 
其次,事务消息性能是不如普通消息的,它在内部实际上会生成 3 个消息(一阶段 1 个,二阶段 2 个),所以性能大约只有普通消息的 1/3,如果事务消息量大的话,要做好容量规划。回查调度线程也只有 1 个,不要用极限压力去考验它。
 
最后有一些参数注意事项。在 Broker 的配置中:

  • transientStorePoolEnable 这个参数必须保持默认值 false,否则会有严重的问题。
  • endTransactionThreadPoolNums是事务消息二阶段处理线程大小,sendMessageThreadPoolNums 则指定一阶段处理线程池大小。如果二阶段的处理速度跟不上一阶段,就会造成二阶段消息丢失导致大量回查,所以建议 endTransactionThreadPoolNums 应该大于 sendMessageThreadPoolNums,建议至少 4 倍。
  • useReentrantLockWhenPutMessage 设置为 true(默认值是 false),以免线程抢锁出现严重的不公平,导致二阶段处理线程长时间抢不到锁。
  • transactionTimeOut 默认值 6 秒太短了,如果事务执行时间超过 6 秒,就可能导致消息丢失。建议改到 1 分钟左右。
 
生产者 Client 也有一个注意事项,如果有多组 Broker,并且是 2 副本(有 1 个 Slave),应该打开 retryAnotherBrokerWhenNotStoreOK,以免某个 Slave 出现故障以后,大量消息发送失败。
 

分布式对账监控



除了比较一些常规的监控手段以外,我们开发了一个监控程序做分布式对账。可以发现我们的集群以及我们提供的 SDK 是否有异常。
 

 
具体做法是在每个 Broker 上都建立一个监控专用的 Topic,监控程序使用我们自己提供的 SDK 框架来连接集群(就像我们的业务用户那样),监控生产者会给每个集群发送少量消息。然后检查发送是否成功:

发送成功

成功

刷盘超时

Slave 超时

Slave 不可用

发送失败

具体错误码


生产者只对这些结果进行打点,不判断是否正常,具体到监控(或者演练)场景可以配置不同的报警规则。
 
消费者收到了消息会通过 TCP 旁路 ACK 生产者,生产者这边会做分布式对账,将对账结果打点:

  • 收到消息
  • 消息丢失(或超时未收到消息)
  • 重复收到消息
  • 消息生成到最终消费的时间差
  • ACK 生产者失败(由消费者打点)
 
同样监控程序只负责打点,报警规则可另外配置。
 
这套机制也可以用于分布式性能压测和故障演练。在做压测的时候,每个消息都 ACK 的话,对生产者的内存压力很大,因为它发出去的消息,需要在内存中保留一段时间(直到到达这个消息的对账时间),这段时间消费者 ACK 或者重复 ACK 都需要记录。所以我们实现了按比例抽样对账的功能,开启以后只有需要对账的消息才会在内存中保留一段时间。
 
顺便说一下,我们做压测时,合格的标准是异步生产不失败、消费不延迟、每一个消息都不丢失。这样做是为了保证压测时能给出更加准确的,可供线上系统参考的性能数字,而不是制造理想条件,追求一个大的数字。比如异步生产比同步生产更脆弱(压测 Client 如果同步生产,Broker 抖动的时候,同步 Client 会被堵塞导致发送速度降低,于是降低了 Broker 压力,消息发送不容易失败,但是会看到发送速率在波动),更贴近生产环境的实际情况,我们就选择异步生产来评估。
 

性能优化



Broker 默认的参数在我们的场景下(SSD、同步复制、异步刷盘)不是最优的,有的参数也许在大多数场景下都不是最优的。我们列出一些重要的参数,供大家参考:
 
参数
默认值
说明
flushCommitLogTimed
False
默认值不合理,异步刷盘这个参数应该设置成 true,导致频繁刷盘,对性能影响极大。
deleteWhen
04
几点删除过期文件的时间,删除文件时有很多磁盘读,这个默认值是合理的,有条件的话还是建议低峰删除。
sendMessageThreadPoolNums
1
处理生产消息的线程数,这个线程干的事情很多,建议设置为 2~4,但太多也没有什么用。因为最终写 commit log 的时候只有一个线程能拿到锁。
useReentrantLockWhenPutMessage
False
如果前一个参数设置比较大,这个最好设置为 true,避免高负载下自旋锁空转消耗 CPU。
sendThreadPoolQueueCapacity
10000
处理生产消息的队列大小,默认值可能有点小,比如 5 万 TPS(异步发送)的情况下,卡 200ms 就会爆。设置比较小的数字可能是担心有大量大消息撑爆内存(比如 100K 的话, 1 万个的消息大概占用 1G 内存,也还好),具体可以自己算,如果都是小消息,可以把这个数字改大。可以修改 Broker 参数限制 Client 发送大消息。
brokerFastFailureEnable
True
Broker 端快速失败(限流),和下面两个参数配合。这个机制可能有争议,client 设置了超时时间,如果 client 还愿意等,并且 sendThreadPoolQueue 还没有满,不应该失败,sendThreadPoolQueue 满了自然会拒绝新的请求。但如果 Client 设置的超时时间很短,没有这个机制可能导致消息重复。可以自行决定是否开启。理想情况下,能根据 Client 设置的超时时间来清理队列是最好的。
waitTimeMillsInSendQueue
200
200ms 很容易导致发送失败,建议改大,比如 1000ms。
osPageCacheBusyTimeOutMills
1000
Page cache 超时时间,如果内存比较多,比如 32G 以上,建议改大点。


总结


得益于简单、几乎 0 依赖的部署模式,使得我们部署小集群的成本非常低;不对社区版本进行魔改,保证我们可以及时升级;统一 SDK 入口方便集群维护和功能升级;通过复合小集群+自动负载均衡实现多机房多活;充分利用 RocketMQ 的功能,比如事务消息、延迟消息(增强)来满足业务的多样性需求;通过自动的分布式对账,对每一个 Broker 以及我们的 SDK 进行正确性监控。

本文也进行了一些性能参数的分享,但写的比较简单,基本只说了怎么调,但没能细说为什么,以后我们会另写文章详述。目前 RocketMQ 已经应用在公司在大多数业务线,期待将来会有更好的发展!



Kafka Connect | 无缝结合Kafka构建高效ETL方案

浅谈RabbitMQ的基石—高级消息队列协议(AMQP)



欢迎点赞+收藏+转发朋友圈素质三连

文章不错?点个【在看】吧! 
浏览 19
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报