NameServer剖析
前言
到现在为止,RocketMQ已经更了5篇文章:回过头看,里面基本就是跟了跟源码,并没有对重要的知识做剖析,为了让文章不太臃肿,所以我将跟源码和知识的剖析分开来写,今天先来剖析NameServer(以下简称namesrv)。一、namesrv的作用
1.1、路由元信息
private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;private final ReadWriteLock lock = new ReentrantReadWriteLock();private final HashMap> topicQueueTable; private final HashMapbrokerAddrTable; private final HashMap> clusterAddrTable; private final HashMapbrokerLiveTable; private final HashMap/* Filter Server */> filterServerTable;
BROKER_CHANNEL_EXPIRED_TIME:broker启动时会向集群中所有的namesrv发送心跳,之后每隔30s再次发送,如果namesrv在连续120s内没有收到broker发送的心跳,那么此broker就会被namesrv移除。
lock:读写锁,读写路由信息时候是需要加锁的,不然并发访问时会有问题。
topicQueueTable:topic消息队列路由信息,消息发送时根据路由表进行负载均衡。
brokerAddrTable:broker的基础信息,包含brokerName、所属集群名称、主从broker的地址。
clusterAddrTable:broker集群信息,存储集群中所有broker的名称。
brokerLiveTable:broker状态信息,namesrv每次收到broker发送的心跳包时都会更新该信息。
filterServerTable:broker上的FilterServer列表,用于类模式消费过滤。
QueueData:见名猜意,队列数据,该类是topic对应的队列信息的抽象

readQueueNums:读队列个数,一个broker默认为每个topic创建4个读队列writeQueueNums:写队列个数, 一个broker默认为每个topic创建4个写队列perm:读写权限,一般设置为6,6:同时支持读写, 4:禁写,2:禁读topicSynFlag:topic同步标记,同步复制还是异步复制
BrokerData:broker数据的抽象

brokerName:broker的名称
brokerAddrs:broker的ip集合,为什么是集合,因为互为主从的broker的brokerName是相同的,key是brokerId(0表示master,大于0表示slave),value是broker的ip地址。
注:一个broker集群包含多个broker主从,一个broker主从中的主和从的brokerName是一样的。
random:随机数生成器,当一条消息发送到broker集群,就是根据随机数生成来选择消息存储到哪个broker实例上的。BrokerLiveInfo:存活的broker信息,就是和namesrv有心跳连接的broker

注:这里是为了看map中的集群信息,所以利用了centos中搭建的mq集群。




1.2、路由注册
org.apache.rocketmq.broker.BrokerStartup#main
org.apache.rocketmq.broker.BrokerStartup#start
org.apache.rocketmq.broker.BrokerController#start


org.apache.rocketmq.broker.BrokerController#registerBrokerAll
org.apache.rocketmq.broker.BrokerController#doRegisterBrokerAll
org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBrokerAll
接下来本地启动broker,debug跟下看看它给namesrv发送请求的过程public ListregisterBrokerAll( final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,final String haServerAddr,final TopicConfigSerializeWrapper topicConfigWrapper,final ListfilterServerList, final boolean oneway,final int timeoutMills,final boolean compressed) {final ListregisterBrokerResultList = Lists.newArrayList(); //获取所有namesrv的地址:ip端口ListnameServerAddressList = this.remotingClient.getNameServerAddressList(); //遍历namesrv集合if (nameServerAddressList != null && nameServerAddressList.size() > 0) {//封装请求头final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();//broker地址requestHeader.setBrokerAddr(brokerAddr);//brokerId,0:master,大于0:slaverequestHeader.setBrokerId(brokerId);//broker名称requestHeader.setBrokerName(brokerName);//所属集群的名称requestHeader.setClusterName(clusterName);//当前broker所属主从中主节点的地址requestHeader.setHaServerAddr(haServerAddr);requestHeader.setCompressed(compressed);////封装请求体RegisterBrokerBody requestBody = new RegisterBrokerBody();/**主题配置,topicConfigWrapper内部封装的是topicConfigManager中的topicConfigTable,内部存储的是broker启动时默认的一些topic,,MixAll.SELF_TEST_TOPIC、MixAll.DEFAULT_TOPIC(AutoCreateTopic-Enable=true)、MixAll.BENCHMARK_TOPIC、MixAll.OFFSET_MOVED_EVNET、BrokerConfig中的brokerClusterName、brokerName。Broker中Topic默认存储在${Rocket_Home}/store/config/topic.json中*/requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);requestBody.setFilterServerList(filterServerList);final byte[] body = requestBody.encode(compressed);final int bodyCrc32 = UtilAll.crc32(body);requestHeader.setBodyCrc32(bodyCrc32);final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());for (final String namesrvAddr : nameServerAddressList) {brokerOuterExecutor.execute(new Runnable() {@Overridepublic void run() {try {//通过netty向namesrv发送网络请求,注册brokerRegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);if (result != null) {registerBrokerResultList.add(result);}log.info("register broker to name server {} OK", namesrvAddr);} catch (Exception e) {log.warn("registerBroker Exception, {}", namesrvAddr, e);} finally {countDownLatch.countDown();}}});}try {countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {}}return registerBrokerResultList;}
注:为了简洁明了,直接将断点打到
org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBrokerAll方法中,着重看下封装的参数即可


注:broker每隔30s会发一个心跳,所以你会看到请求不断地进入
org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBrokerAll方法,为了排除干扰,我们将心跳时间改长点,然后重启broker










1.3、路由删除
org.apache.rocketmq.namesrv.NamesrvStartup#main
org.apache.rocketmq.namesrv.NamesrvStartup#main0
org.apache.rocketmq.namesrv.NamesrvStartup#start
org.apache.rocketmq.namesrv.NamesrvController#initialize

你会发现这个方法逻辑很简单,就是遍历brokerLiveTable,判断每一个brokerLiveInfo的lastUpdateTimestamp和当前时间戳的差距是不是大于120s,如果是,就关闭和此broker的socket连接,并将此brokerLiveInfo从brokerLiveTable中删除,然后删除与该broker相关的路由信息,路由的具体删除逻辑在onChannelDestroy方法中public void scanNotActiveBroker() {Iterator> it = this.brokerLiveTable.entrySet().iterator(); while (it.hasNext()) {Entrynext = it.next(); long last = next.getValue().getLastUpdateTimestamp();if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {RemotingUtil.closeChannel(next.getValue().getChannel());it.remove();log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);this.onChannelDestroy(next.getKey(), next.getValue().getChannel());}}}
根据brokerAddress从brokerLiveTable、filterServerTable移除




1.4、路由发现

方法主要逻辑如下:public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(null);final GetRouteInfoRequestHeader requestHeader =(GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);//1:TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());//2:if (topicRouteData != null) {if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {String orderTopicConf =this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,requestHeader.getTopic());topicRouteData.setOrderTopicConf(orderTopicConf);}byte[] content = topicRouteData.encode();response.setBody(content);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;}//3:response.setCode(ResponseCode.TOPIC_NOT_EXIST);response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()+ FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));return response;}
调用pickupTopicRouteData方法,从路由表topicQueueTable、brokerAddrTable、filterServerTable中分别填充到TopicRouteData中的queueData、brokerDatas、filterServerTable属性中。
List
:topic队列元数据 List
:topic分布的broker元数据 filterServer:broker上过滤服务器地址列表
如果找到topic对应的路由信息并且该topic为顺序消息,则从NameServerKVconfig中获取关于顺序消息相关的配置填充路由信息。
如果找不到路由信息就返回ResponseCode.TOPIC_NOT_EXIST
二、总结

