一文读懂RocketMQ的高可用机制——集群管理高可用

老周聊架构

共 29477字,需浏览 59分钟

 · 2021-03-26

点击上方老周聊架构关注我



一、前言

在前三篇我们介绍了

一文读懂RocketMQ的高可用机制——消息存储高可用

一文读懂RocketMQ的高可用机制——消息发送高可用

一文读懂RocketMQ的高可用机制——消息消费高可用


这一篇我们来说一下集群管理是如何保证高可用的。

集群管理的高可用主要体现在 NameServer 的设计上,NameServer 承担着路由注册中心的作用。当部分 NameServer 节点宕机时不会有什么糟糕的影响,只剩一个 NameServer 节点 RocketMQ 集群也能正常运行,即使 NameServer 全部宕机,也不影响已经运行的 Broker、Producer 和 Consumer。

在说 NameServer 之前,我们是否有以下几点思考。既然作为路由注册中心,那有哪些路由信息注册到了 NameServer?生产者如何知道消息要发送到哪台消息服务器呢?当 Broker 不可用后,NameServer 并不会立即将变更后的注册信息推送至 Client(Producer/Consumer),此时 RocketMQ 如何保证 Client 正常发送/消费消息?

带着这几个疑问来开启我们的 RocketMQ 集群管理高可用之旅。

二、架构设计

1、NameServer 互相独立,彼此没有通信关系,单台 NameServer 挂掉,不影响其他 NameServer。

2、NameServer 不去连接别的机器,不主动推消息。

3、单个 Broker(Master、Slave) 与所有 NameServer 进行定时注册,以便告知 NameServer 自己还活着。

  • Broker 每隔 30 秒向所有 NameServer 发送心跳,心跳包含了自身的 topic 配置信息。

  • NameServer 每隔 10 秒,扫描所有还存活的 broker 连接,如果某个连接的最后更新时间与当前时间差值超过 2 分钟,则断开此连接,NameServer 也会断开此 broker 下所有与 slave 的连接。同时更新 topic 与队列的对应关系,但不通知生产者和消费者。

  • Broker slave 同步或者异步从 Broker master 上拷贝数据。


4、Consumer 随机与一个 NameServer 建立长连接,如果该 NameServer 断开,则从 NameServer 列表中查找下一个进行连接。

  • Consumer 主要从 NameServer 中根据 Topic 查询 Broker 的地址,查到就会缓存到客户端,并向提供 Topic 服务的 Master、Slave 建立长连接,且定时向 Master、Slave 发送心跳。

  • 如果 Broker 宕机,则 NameServer 会将其剔除,而 Consumer 端的定时任务 MQClientInstance.this.updateTopicRouteInfoFromNameServer 每 30 秒执行一次,将 Topic 对应的 Broker 地址拉取下来,此地址只有 Slave 地址了,此时 Consumer 从 Slave 上消费。

  • 消费者与 Master 和 Slave 都建有连接,在不同场景有不同的消费规则。


5、Producer 随机与一个 NameServer 建立长连接,每隔 30 秒(此处时间可配置)从 NameServer 获取 Topic 的最新队列情况,如果某个 Broker Master 宕机,Producer 最多 30 秒才能感知,在这个期间,发往该 broker master 的消息失败。Producer 向提供 Topic 服务的 Master 建立长连接,且定时向 Master 发送心跳。

  • 生产者与所有的 master 连接,但不能向 slave 写入。

  • 客户端是先从 NameServer 寻址的,得到可用 Broker 的 IP 和端口信息,然后据此信息连接 broker。


综上所述,NameServer 在 RocketMQ 中的作用:

  • NameServer 用来保存活跃的 broker 列表,包括 Master 和 Slave 。

  • NameServer 用来保存所有 topic 和该 topic 所有队列的列表。

  • NameServer 用来保存所有 broker 的 Filter 列表。

  • 命名服务器为客户端,包括生产者,消费者和命令行客户端提供最新的路由信息。

三、启动流程

NameServer 启动流程重点关注两部分:路由信息维护和网络通信(包括心跳),涉及到的核心类如下图所示。NameServerStartup 是 NameServer 的启动类,负责解析配置文件、加载运行时参数信息和初始化并启动 NameServerController。NameServerController 是 NameServer 的核心控制器,其通过 RouteInfoManager 管理路由信息,通过 RemotingServer 与 RocketMQ 其他组件(Broker、Producer 和 Consumer)通信。


NameServer 的配置参数包括 NamesrvConfig 和 NettyServerConfig:

NamesrvConfig 为 NameServer 业务参数,如 RocketMQ 主目录路径、KV 配置属性持久化路径、是否支持顺序消息等;

NettyServerConfig 为 NameServer 网络参数,如监听端口、IO 线程池线程个数、业务线程池线程个数、是否开启 epoll 等。

在分析源码之前我们先来看张时序图:

启动类:

org.apache.rocketmq.namesrv.NamesrvStartup

1、解析配置文件,填充 NameServerConfig、NettyServerConfig 属性值,并创建 NamesrvController。

代码:

NamesrvStartup#createNamesrvController

public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {    System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
Options options = ServerUtil.buildCommandlineOptions(new Options()); commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser()); if (null == commandLine) { System.exit(-1); return null; } // 创建NamesrvConfig final NamesrvConfig namesrvConfig = new NamesrvConfig(); // 创建NettyServerConfig final NettyServerConfig nettyServerConfig = new NettyServerConfig(); // 设置启动端口号 nettyServerConfig.setListenPort(9876); // 解析启动-c参数 if (commandLine.hasOption('c')) { String file = commandLine.getOptionValue('c'); if (file != null) { InputStream in = new BufferedInputStream(new FileInputStream(file)); properties = new Properties(); properties.load(in); MixAll.properties2Object(properties, namesrvConfig); MixAll.properties2Object(properties, nettyServerConfig);
namesrvConfig.setConfigStorePath(file);
System.out.printf("load config properties file OK, %s%n", file); in.close(); } } // 解析启动-p参数 if (commandLine.hasOption('p')) { InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME); MixAll.printObjectProperties(console, namesrvConfig); MixAll.printObjectProperties(console, nettyServerConfig); System.exit(0); } // 将启动参数填充到namesrvConfig,nettyServerConfig MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
if (null == namesrvConfig.getRocketmqHome()) { System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV); System.exit(-2); }
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory(); JoranConfigurator configurator = new JoranConfigurator(); configurator.setContext(lc); lc.reset(); configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
MixAll.printObjectProperties(log, namesrvConfig); MixAll.printObjectProperties(log, nettyServerConfig); // 创建NameServerController final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
// remember all configs to prevent discard controller.getConfiguration().registerConfig(properties);
return controller;}


2、NamesrvConfig 属性

  • rocketmqHome:rocketmq 主目录

  • kvConfig:NameServer 存储 KV 配置属性的持久化路径

  • configStorePath:nameServer 默认配置文件路径

  • orderMessageEnable:是否支持顺序消息


3、NettyServerConfig 属性

  • listenPort:NameServer 监听端口,该值默认会被初始化为 9876;

  • serverWorkerThreads:Netty 业务线程池线程个数

  • serverCallbackExecutorThreads:Netty public 任务线程池线程个数, Netty 网络设计,根据业务类型会创建不同的线程池,比如处理消息发送、消息消费、心跳检测等。如果该业务类型未注册线程池,则由 public 线程池执行

  • serverSelectorThreads:IO 线程池个数,主要是 NameServer、Broker 端解析请求、返回相应的线程个数,这类线程主要是处理网路请求的,解析请求包,然后转发到各个业务线程池完成具体的操作,然后将结果返回给调用方;

  • serverOnewaySemaphoreValue:send oneway 消息请求并发读(Broker端参数);

  • serverAsyncSemaphoreValue:异步消息发送最大并发度;

  • serverChannelMaxIdleTimeSeconds:网络连接最大的空闲时间,默认 120s

  • serverSocketSndBufSize:网络socket发送缓冲区大小

  • serverSocketRcvBufSize:网络接收端缓存区大小

  • serverPooledByteBufAllocatorEnable:ByteBuffer 是否开启缓存;

  • useEpollNativeSelector:是否启用 Epoll IO 模型


4、根据启动属性创建 NamesrvController 实例,并初始化该实例。NameServerController 实例为 NameServer 核心控制器。

代码:

NamesrvController#initialize

public boolean initialize() {    // 加载KV配置    this.kvConfigManager.load();    // 创建NettyServer网络处理对象    this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);    // 开启定时任务:每隔10s扫描一次Broker,移除不活跃的Broker    this.remotingExecutor =        Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
this.registerProcessor();
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override public void run() { NamesrvController.this.routeInfoManager.scanNotActiveBroker(); } }, 5, 10, TimeUnit.SECONDS); // 开启定时任务:每隔10min打印一次KV配置 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override public void run() { NamesrvController.this.kvConfigManager.printAllPeriodically(); } }, 1, 10, TimeUnit.MINUTES);
if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {        // Register a listener to reload SslContext try { fileWatchService = new FileWatchService( new String[] { TlsSystemConfig.tlsServerCertPath, TlsSystemConfig.tlsServerKeyPath, TlsSystemConfig.tlsServerTrustCertPath }, new FileWatchService.Listener() { boolean certChanged, keyChanged = false; @Override public void onChanged(String path) { if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) { log.info("The trust certificate changed, reload the ssl context"); reloadServerSslContext(); } if (path.equals(TlsSystemConfig.tlsServerCertPath)) { certChanged = true; } if (path.equals(TlsSystemConfig.tlsServerKeyPath)) { keyChanged = true; } if (certChanged && keyChanged) { log.info("The certificate and private key changed, reload the ssl context"); certChanged = keyChanged = false; reloadServerSslContext(); } } private void reloadServerSslContext() { ((NettyRemotingServer) remotingServer).loadSslContext(); } }); } catch (Exception e) { log.warn("FileWatchService created error, can't load the certificate dynamically"); } }
return true;}


5、在 JVM 进程关闭之前,先将线程池关闭,及时释放资源。

代码:

NamesrvStartup#start

public static NamesrvController start(final NamesrvController controller) throws Exception {    if (null == controller) {        throw new IllegalArgumentException("NamesrvController is null");    }
boolean initResult = controller.initialize(); if (!initResult) { controller.shutdown(); System.exit(-3); } // 注册JVM钩子函数代码 Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() { @Override public Void call() throws Exception { // 释放资源 controller.shutdown(); return null; } }));
controller.start();
return controller;}


四、路由管理

NameServer 作为路由注册中心,其核心作用是为 Client 提供消息发送/消费的路由信息。Master Broker 节点和所有 NameServer 建立长连接,每个 NameServer 节点拥有所有 Topic 对应 Queue 以及 Broker 的映射关系,包括路由注册、路由删除等,具体由 RouteInfoManager 负责管理。

1、路由元信息

代码:

org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager

  • topicQueueTable

    维护了 Topic 和其对应消息队列的映射关系,QueueData 记录了一条队列的元信息:所在 Broker、读队列数量、写队列数量等。

  • brokerAddrTable

    维护了 Broker Name 和 Broker 元信息的映射关系,Broker 通常以 Master-Slave 架构部署,BrokerData 记录了同一个 Broker Name 下所有节点的地址信息。

  • clusterAddrTable

    维护了 Broker 的集群信息。

  • brokerLiveTable

    维护了 Broker 的存活信息。NameServer 在收到来自 Broker 的心跳消息后,更新 BrokerLiveInfo 中的 lastUpdateTimestamp,如果 NameServer 长时间未收到 Broker 的心跳信息,NameServer 就会将其移除。

  • filterServerTable

    Broker 上的 FilterServer 列表,用于类模式消息过滤。


还是有点抽象是不是,没关系,看下面这张图与上面的存储关系对比下你就很清晰了。

2、NameServer 请求处理

NettyRequestProcessor 定义了 RocketMQ 处理网络请求的接口,DefaultRequestProcessor 实现了该接口并负责处理 NameServer 收到的网络请求。NettyRequestProcessor 定义如下。

// org.apache.rocketmq.remoting.netty.NettyRequestProcessorpublic interface NettyRequestProcessor {    RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)        throws Exception;
boolean rejectRequest();}


RemotingCommand 包含请求编码(code),针对不同的请求编码执行对应的请求处理逻辑。请求编码定义在 RequestCode 常量类中,本节以 REGISTER_BROKER(Broker 注册 & 心跳)为例,结合 Broker 的启动流程阐述请求从被 Broker 发出到被 NameServer 处理的流程。

Broker 启动类 BrokerStartup 在初始化核心控制器 BrokerController 阶段注册定时任务,定时发送 HTTP 请求获取 NameServer 地址列表并存储于 remotingClient。此处考虑一个问题:在弃用 ZooKeeper 后,RocketMQ 不存在注册中心供 NameServer 注册,那么 NameServer 地址列表是如何维护的?在获取过时的地址列表后,RocketMQ 如何持续保证可用性?BrokerController 定时获取地址列表核心逻辑精简如下。

org.apache.rocketmq.broker.BrokerController#initialize

org.apache.rocketmq.broker.out.BrokerOuterAPI#fetchNameServerAddr


org.apache.rocketmq.common.namesrv.TopAddressing#fetchNSAddr(boolean, long)

Broker 在获取到 NameServer 地址列表后,针对每个地址开启一个线程,将自身信息同步(默认)注册至 NameServer。Broker 利用 CountDownLatch 等待所有线程注册工作完成后,继续执行后续的工作。下面我们就来看下 Broker 是如何路由注册到 NameServer 上的。

3、路由注册

3.1 发送心跳包

RocketMQ 路由注册是通过 Broker 与 NameServer 的心跳功能实现的。Broker 启动时向集群中所有的 NameServer 发送心跳信息,每隔 30s 向集群中所有 NameServer 发送心跳包,NameServer 收到心跳包时会更新 brokerLiveTable 缓存中 BrokerLiveInfo 的 lastUpdataTimeStamp 信息,然后 NameServer 每隔 10s 扫描 brokerLiveTable,如果连续 120s 没有收到心跳包,NameServer 将移除 Broker 的路由信息同时关闭 Socket 连接。

org.apache.rocketmq.broker.BrokerController#start


org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBrokerAll

public List<RegisterBrokerResult> registerBrokerAll(    final String clusterName,    final String brokerAddr,    final String brokerName,    final long brokerId,    final String haServerAddr,    final TopicConfigSerializeWrapper topicConfigWrapper,    final List<String> filterServerList,    final boolean oneway,    final int timeoutMills,    final boolean compressed) {
final List<RegisterBrokerResult> registerBrokerResultList = Lists.newArrayList(); // 获得nameServer地址信息 List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList(); // 遍历所有nameserver列表 if (nameServerAddressList != null && nameServerAddressList.size() > 0) { // 封装请求头 final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader(); requestHeader.setBrokerAddr(brokerAddr); requestHeader.setBrokerId(brokerId); requestHeader.setBrokerName(brokerName); requestHeader.setClusterName(clusterName); requestHeader.setHaServerAddr(haServerAddr); requestHeader.setCompressed(compressed); // 封装请求体 RegisterBrokerBody requestBody = new RegisterBrokerBody(); requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper); requestBody.setFilterServerList(filterServerList); final byte[] body = requestBody.encode(compressed); final int bodyCrc32 = UtilAll.crc32(body); requestHeader.setBodyCrc32(bodyCrc32); final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size()); for (final String namesrvAddr : nameServerAddressList) { brokerOuterExecutor.execute(new Runnable() { @Override public void run() { try { // 分别向NameServer注册 RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body); if (result != null) { registerBrokerResultList.add(result); }
log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr); } catch (Exception e) { log.warn("registerBroker Exception, {}", namesrvAddr, e); } finally { countDownLatch.countDown(); } } }); }
try { countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { } }
return registerBrokerResultList;}


org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBroker

3.2 处理心跳包

DefaultRequestProcessor 网路处理类解析请求 类型,如果请求类型是为 REGISTER_BROKER,则将请求转发到 RouteInfoManager#regiesterBroker。

org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#processRequest


org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#registerBroker

维护路由信息

public RegisterBrokerResult registerBroker(    final String clusterName,    final String brokerAddr,    final String brokerName,    final long brokerId,    final String haServerAddr,    final TopicConfigSerializeWrapper topicConfigWrapper,    final List<String> filterServerList,    final Channel channel) {    RegisterBrokerResult result = new RegisterBrokerResult();    try {        try {            // 加锁            this.lock.writeLock().lockInterruptibly();            // 维护clusterAddrTable            Set<String> brokerNames = this.clusterAddrTable.get(clusterName);            if (null == brokerNames) {                brokerNames = new HashSet<String>();                this.clusterAddrTable.put(clusterName, brokerNames);            }            brokerNames.add(brokerName);
boolean registerFirst = false; // 维护brokerAddrTable BrokerData brokerData = this.brokerAddrTable.get(brokerName); // 第一次注册,则创建brokerData if (null == brokerData) { registerFirst = true; brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>()); this.brokerAddrTable.put(brokerName, brokerData); } // 非第一次注册,更新Broker Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs(); //Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT> //The same IP:PORT must only have one record in brokerAddrTable Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator(); while (it.hasNext()) { Entry<Long, String> item = it.next(); if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) { it.remove(); } }
String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr); registerFirst = registerFirst || (null == oldAddr);            // 维护topicQueueTable if (null != topicConfigWrapper && MixAll.MASTER_ID == brokerId) { if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) { ConcurrentMap<String, TopicConfig> tcTable = topicConfigWrapper.getTopicConfigTable(); if (tcTable != null) { for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) { this.createAndUpdateQueueData(brokerName, entry.getValue()); } } } } // 维护brokerLiveTable BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr, new BrokerLiveInfo( System.currentTimeMillis(), topicConfigWrapper.getDataVersion(), channel, haServerAddr)); if (null == prevBrokerLiveInfo) { log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr); } // 维护filterServerList if (filterServerList != null) { if (filterServerList.isEmpty()) { this.filterServerTable.remove(brokerAddr); } else { this.filterServerTable.put(brokerAddr, filterServerList); } }
if (MixAll.MASTER_ID != brokerId) { String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID); if (masterAddr != null) { BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr); if (brokerLiveInfo != null) { result.setHaServerAddr(brokerLiveInfo.getHaServerAddr()); result.setMasterAddr(masterAddr); } } } } finally { this.lock.writeLock().unlock(); } } catch (Exception e) { log.error("registerBroker Exception", e); }
return result;}


4、路由删除

Broker 每隔 30s 向 NameServer 发送一个心跳包,心跳包包含 BrokerId、Broker 地址、Broker 名称, Broker 所属集群名称、 Broker 关联的 FilterServer 列表。但是如果 Broker 宕机,NameServer 无法收到心跳包,此时 NameServer 如何来剔除这些失效的 Broker 呢? NameServer 会每隔 10s 扫描 brokerLiveTable 状态表,如果 BrokerLive 的 lastUpdateTimestamp 的时间戳距当前时间超过 120s,则认为 Broker 失效,移除该 Broker ,关闭与 Broker 连接,同时更新 topicQueueTable、brokerAddrTable 、brokerLiveTable、filterServerTable 。

RocketMQ 有两个触发点来删除路由信息:

  • NameServer 定期扫描 brokerLiveTable 检测上次心跳包与当前系统的时间差,如果时间超过 120s,则需要移除 broker。

  • Broker 在正常关闭的情况下,会执行 unregisterBroker 指令。

这两种方式路由删除的方法都是一样的,就是从相关路由表中删除与该 broker 相关的信息。


org.apache.rocketmq.namesrv.NamesrvController#initialize


public void scanNotActiveBroker() {  // 获得brokerLiveTable  Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();  // 遍历brokerLiveTable  while (it.hasNext()) {      Entry<String, BrokerLiveInfo> next = it.next();      long last = next.getValue().getLastUpdateTimestamp();      // 如果收到心跳包的时间距当时时间是否超过120s      if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {          // 关闭连接          RemotingUtil.closeChannel(next.getValue().getChannel());          // 移除broker          it.remove();          log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);          // 维护路由表          this.onChannelDestroy(next.getKey(), next.getValue().getChannel());      }  }}


public void onChannelDestroy(String remoteAddr, Channel channel) {    String brokerAddrFound = null;    if (channel != null) {        try {            try {                this.lock.readLock().lockInterruptibly();                Iterator<Entry<String, BrokerLiveInfo>> itBrokerLiveTable =                    this.brokerLiveTable.entrySet().iterator();                while (itBrokerLiveTable.hasNext()) {                    Entry<String, BrokerLiveInfo> entry = itBrokerLiveTable.next();                    if (entry.getValue().getChannel() == channel) {                        brokerAddrFound = entry.getKey();                        break;                    }                }            } finally {                this.lock.readLock().unlock();            }        } catch (Exception e) {            log.error("onChannelDestroy Exception", e);        }    }
if (null == brokerAddrFound) { brokerAddrFound = remoteAddr; } else { log.info("the broker's channel destroyed, {}, clean it's data structure at once", brokerAddrFound); }
if (brokerAddrFound != null && brokerAddrFound.length() > 0) {
try { try { // 申请写锁,根据brokerAddress从brokerLiveTable和filterServerTable移除 this.lock.writeLock().lockInterruptibly(); this.brokerLiveTable.remove(brokerAddrFound); this.filterServerTable.remove(brokerAddrFound); // 维护brokerAddrTable String brokerNameFound = null; boolean removeBrokerName = false; Iterator<Entry<String, BrokerData>> itBrokerAddrTable = this.brokerAddrTable.entrySet().iterator(); // 遍历brokerAddrTable while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) { BrokerData brokerData = itBrokerAddrTable.next().getValue(); // 遍历broker地址 Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator(); while (it.hasNext()) { Entry<Long, String> entry = it.next(); Long brokerId = entry.getKey(); String brokerAddr = entry.getValue(); // 根据broker地址移除brokerAddr if (brokerAddr.equals(brokerAddrFound)) { brokerNameFound = brokerData.getBrokerName(); it.remove(); log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed", brokerId, brokerAddr); break; } } // 如果当前主题只包含待移除的broker,则移除该topic if (brokerData.getBrokerAddrs().isEmpty()) { removeBrokerName = true; itBrokerAddrTable.remove(); log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed", brokerData.getBrokerName()); } } // 维护clusterAddrTable if (brokerNameFound != null && removeBrokerName) { Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator(); // 遍历clusterAddrTable while (it.hasNext()) { Entry<String, Set<String>> entry = it.next(); // 获得集群名称 String clusterName = entry.getKey(); // 获得集群中brokerName集合 Set<String> brokerNames = entry.getValue(); // 从brokerNames中移除brokerNameFound boolean removed = brokerNames.remove(brokerNameFound); if (removed) { log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed", brokerNameFound, clusterName);
if (brokerNames.isEmpty()) { log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster", clusterName); // 如果集群中不包含任何broker,则移除该集群 it.remove(); }
break; } } } // 维护topicQueueTable队列 if (removeBrokerName) { // 遍历topicQueueTable Iterator<Entry<String, List<QueueData>>> itTopicQueueTable = this.topicQueueTable.entrySet().iterator(); while (itTopicQueueTable.hasNext()) { Entry<String, List<QueueData>> entry = itTopicQueueTable.next(); // 主题名称 String topic = entry.getKey(); // 队列集合 List<QueueData> queueDataList = entry.getValue(); // 遍历该主题队列 Iterator<QueueData> itQueueData = queueDataList.iterator(); while (itQueueData.hasNext()) { // 从队列中移除为活跃broker信息 QueueData queueData = itQueueData.next(); if (queueData.getBrokerName().equals(brokerNameFound)) { itQueueData.remove(); log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed", topic, queueData); } } // 如果该topic的队列为空,则移除该topic if (queueDataList.isEmpty()) { itTopicQueueTable.remove(); log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed", topic); } } } } finally { // 释放写锁 this.lock.writeLock().unlock(); } } catch (Exception e) { log.error("onChannelDestroy Exception", e); } }}


5、路由发现

RocketMQ 路由发现是非实时的,当 Topic 路由出现变化后,NameServer 不会主动推送给客户端,而是由客户端定时拉取主题最新的路由。

org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getRouteInfoByTopic


public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,    RemotingCommand request) throws RemotingCommandException {    final RemotingCommand response = RemotingCommand.createResponseCommand(null);    final GetRouteInfoRequestHeader requestHeader =        (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);    // 调用RouteInfoManager的方法,从路由表topicQueueTable、brokerAddrTable、 filterServerTable中    // 分别填充TopicRouteData的List<QueueData>、List<BrokerData>、 filterServer    TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());    // 如果找到主题对应你的路由信息并且该主题为顺序消息,则从NameServer KVConfig中获取 关于顺序消息相关的配置填充路由信息    if (topicRouteData != null) {        if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {            String orderTopicConf =                this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,                    requestHeader.getTopic());            topicRouteData.setOrderTopicConf(orderTopicConf);        }
byte[] content = topicRouteData.encode(); response.setBody(content); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; }
response.setCode(ResponseCode.TOPIC_NOT_EXIST); response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic() + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)); return response;}


五、总结

最后我们再来看下我们前言提的那三个问题。既然作为路由注册中心,那有哪些路由信息注册到了 NameServer?生产者如何知道消息要发送到哪台消息服务器呢?当 Broker 不可用后,NameServer 并不会立即将变更后的注册信息推送至 Client(Producer/Consumer),此时 RocketMQ 如何保证 Client 正常发送/消费消息?

第一个问题在给定 Topic 的情况下,Client 根据负载均衡策略选择合适的消息队列,进一步获取到对应的 Broker 地址信息,具体有哪些路由信息注册到了 NameServer,可以看上面的路由元信息 RouteInfoManager 类。

第二个问题可以归纳为由于路由注册、路由删除以及路由发现,使生产者如何知道消息要发送到哪台消息服务器。

第三个问题对于 Broker 无效的场景,RocketMQ 牺牲了 C,选择了 AP,即通过 Client 重试保证可用性,由此产生的重复消息问题,通过 Client 幂等处理逻辑来规避。

最后老周再上一张图,涵盖了上述整个核心流程。到此为止,RocketMQ 的高可用机制——消息存储高可用、消息发送高可用、消息消费高可用以及集群管理高可用都分析完毕,希望对你有帮助。



欢迎大家关注我的公众号【老周聊架构】,Java后端主流技术栈的原理、源码分析、架构以及各种互联网高并发、高性能、高可用的解决方案。

喜欢的话,点赞、再看、分享三连。

点个在看你最好看



浏览 90
点赞
评论
收藏
分享

手机扫一扫分享

举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

举报