NameServer剖析
前言
到现在为止,RocketMQ已经更了5篇文章:回过头看,里面基本就是跟了跟源码,并没有对重要的知识做剖析,为了让文章不太臃肿,所以我将跟源码和知识的剖析分开来写,今天先来剖析NameServer(以下简称namesrv)。一、namesrv的作用
1.1、路由元信息
private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final HashMap
> topicQueueTable; private final HashMap
brokerAddrTable; private final HashMap
> clusterAddrTable; private final HashMap
brokerLiveTable; private final HashMap
/* Filter Server */> filterServerTable;
BROKER_CHANNEL_EXPIRED_TIME:broker启动时会向集群中所有的namesrv发送心跳,之后每隔30s再次发送,如果namesrv在连续120s内没有收到broker发送的心跳,那么此broker就会被namesrv移除。
lock:读写锁,读写路由信息时候是需要加锁的,不然并发访问时会有问题。
topicQueueTable:topic消息队列路由信息,消息发送时根据路由表进行负载均衡。
brokerAddrTable:broker的基础信息,包含brokerName、所属集群名称、主从broker的地址。
clusterAddrTable:broker集群信息,存储集群中所有broker的名称。
brokerLiveTable:broker状态信息,namesrv每次收到broker发送的心跳包时都会更新该信息。
filterServerTable:broker上的FilterServer列表,用于类模式消费过滤。
QueueData:见名猜意,队列数据,该类是topic对应的队列信息的抽象
readQueueNums:读队列个数,一个broker默认为每个topic创建4个读队列writeQueueNums:写队列个数, 一个broker默认为每个topic创建4个写队列perm:读写权限,一般设置为6,6:同时支持读写, 4:禁写,2:禁读topicSynFlag:topic同步标记,同步复制还是异步复制
BrokerData:broker数据的抽象
brokerName:broker的名称
brokerAddrs:broker的ip集合,为什么是集合,因为互为主从的broker的brokerName是相同的,key是brokerId(0表示master,大于0表示slave),value是broker的ip地址。
注:一个broker集群包含多个broker主从,一个broker主从中的主和从的brokerName是一样的。
random:随机数生成器,当一条消息发送到broker集群,就是根据随机数生成来选择消息存储到哪个broker实例上的。BrokerLiveInfo:存活的broker信息,就是和namesrv有心跳连接的broker
注:这里是为了看map中的集群信息,所以利用了centos中搭建的mq集群。
然后在org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#processRequest中打上断点,这个就是namesrv处理来自broker、producer、namesrv请求的类上面说了,broker是每隔30s向namesrv发送一次心跳,那么我们就在心跳请求的过程中窥探下那五个map中的内容可以看到,请求进来了,接着让它进入org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#registerBrokerWithFilterServer中的第219行停住,你就可以清晰的看到里面的内容了。
1.2、路由注册
org.apache.rocketmq.broker.BrokerStartup#main
org.apache.rocketmq.broker.BrokerStartup#start
org.apache.rocketmq.broker.BrokerController#start
其实BrokerController中第814行到823行的定时任务就是心跳的核心代码,brokerConfig.getRegisterNameServerPeriod()就是30s的时间间隔当然,此时间可以配置,默认是30s。定时任务中执行的方法其实就是注册的核心方法了,看下注册方法的调用过程:
org.apache.rocketmq.broker.BrokerController#registerBrokerAll
org.apache.rocketmq.broker.BrokerController#doRegisterBrokerAll
org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBrokerAll
接下来本地启动broker,debug跟下看看它给namesrv发送请求的过程public List
registerBrokerAll( final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List
filterServerList, final boolean oneway,
final int timeoutMills,
final boolean compressed) {
final List
registerBrokerResultList = Lists.newArrayList(); //获取所有namesrv的地址:ip端口
List
nameServerAddressList = this.remotingClient.getNameServerAddressList(); //遍历namesrv集合
if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
//封装请求头
final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
//broker地址
requestHeader.setBrokerAddr(brokerAddr);
//brokerId,0:master,大于0:slave
requestHeader.setBrokerId(brokerId);
//broker名称
requestHeader.setBrokerName(brokerName);
//所属集群的名称
requestHeader.setClusterName(clusterName);
//当前broker所属主从中主节点的地址
requestHeader.setHaServerAddr(haServerAddr);
requestHeader.setCompressed(compressed);
////封装请求体
RegisterBrokerBody requestBody = new RegisterBrokerBody();
/**主题配置,topicConfigWrapper内部封装的是topicConfigManager中的
topicConfigTable,内部存储的是broker启动时默认的一些topic,
,MixAll.SELF_TEST_TOPIC、MixAll.DEFAULT_TOPIC(AutoCreateTopic
-Enable=true)、MixAll.BENCHMARK_TOPIC、MixAll.OFFSET_MOVED_EVNET、
BrokerConfig中的brokerClusterName、brokerName。Broker中Topic
默认存储在${Rocket_Home}/store/config/topic.json中*/
requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
requestBody.setFilterServerList(filterServerList);
final byte[] body = requestBody.encode(compressed);
final int bodyCrc32 = UtilAll.crc32(body);
requestHeader.setBodyCrc32(bodyCrc32);
final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
for (final String namesrvAddr : nameServerAddressList) {
brokerOuterExecutor.execute(new Runnable() {
@Override
public void run() {
try {
//通过netty向namesrv发送网络请求,注册broker
RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
if (result != null) {
registerBrokerResultList.add(result);
}
log.info("register broker to name server {} OK", namesrvAddr);
} catch (Exception e) {
log.warn("registerBroker Exception, {}", namesrvAddr, e);
} finally {
countDownLatch.countDown();
}
}
});
}
try {
countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}
}
return registerBrokerResultList;
}
注:为了简洁明了,直接将断点打到
org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBrokerAll方法中,着重看下封装的参数即可
放行请求,它将到达namesrv的org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#processRequest方法注:broker每隔30s会发一个心跳,所以你会看到请求不断地进入
org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBrokerAll方法,为了排除干扰,我们将心跳时间改长点,然后重启broker
接下来就详细看下namesrv是怎么处理心跳包的接着看org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#registerBrokerWithFilterServer方法先校验请求头和请求体
接着调用org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#registerBroker方法,核心逻辑来了先加锁(读写锁),避免并发操作路由表引起问题然后根据集群名称(clusterName)从clusterAddrTable(五个map中的其中一个)取brokerNames,可以看到这里取出来的是一个,就是我本地起的broker,然后将传过来的brokerName添加到brokerNames中接着默认不是第一次注册,再根据传过来的brokerName从brokerAddrTable中查询brokerData,如果取出来是空,就以传过来的brokerName为key,构造一个brokerData放入brokerAddrTable中;如果存在,直接替换原先的,然后把registerFirst标识置为false,表示非第一次注册。接着判断当前broker是不是master(就是主从中的主),如果是主节点,在判断,如果topic的配置有更改或者是第一次注册,则需要创建或更新topic路由元数据,填充topicQueueTable,其实就是为默认topic自动注册路由信息,其中包括MixAll.DEFAULT_TOPIC的路由信息,当消息生产者发送topic时,如果该主题未创建并且BrokerConfig中的autoCreateTopicEnable为true时,将返回MixAll.DEFAULT_TOPIC的路由信息。
然后根据broker的地址去brokerLiveTable(存活的broker信息表)中取broker最近一次的心跳信息,如果取出来是空,打印一条新注册的日志。然后检查有没有注册broker的过滤器如果当前broker是从节点,则需要查找该broker的master的节点信息,并更新对应的masterAddr和haServerAddr属性最后注册完,解锁,完事总结:namesrv和broker保持长连接,broker的存活状态存储在brokerLiveTable中,namesrv每收到一个心跳包,就会更新brokerLiveTable中broker的状态信息以及其他路由表(topicQueueTable、brokerAddrTable、filterServerTable),在更新这些路由表时使用了读写锁,允许多个消息发送者并发读,保证消息发送时的高并发,但是同一时刻namesrv只处理一个broker心跳请求,多个心跳请求串行执行,这是读写锁的经典使用场景。
1.3、路由删除
org.apache.rocketmq.namesrv.NamesrvStartup#main
org.apache.rocketmq.namesrv.NamesrvStartup#main0
org.apache.rocketmq.namesrv.NamesrvStartup#start
org.apache.rocketmq.namesrv.NamesrvController#initialize
你会发现这个方法逻辑很简单,就是遍历brokerLiveTable,判断每一个brokerLiveInfo的lastUpdateTimestamp和当前时间戳的差距是不是大于120s,如果是,就关闭和此broker的socket连接,并将此brokerLiveInfo从brokerLiveTable中删除,然后删除与该broker相关的路由信息,路由的具体删除逻辑在onChannelDestroy方法中public void scanNotActiveBroker() {
Iterator
> it = this.brokerLiveTable.entrySet().iterator(); while (it.hasNext()) {
Entry
next = it.next(); long last = next.getValue().getLastUpdateTimestamp();
if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
RemotingUtil.closeChannel(next.getValue().getChannel());
it.remove();
log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
}
}
}
根据brokerAddress从brokerLiveTable、filterServerTable移除
遍历brokerAddrTable,从BrokerData的brokerAddrs中找到具体的broker,从brokerData中移除,如果移除后在brokerData中不再包含其他broker,则在brokerAddrTable中移除该brokerName对应的条目。根据brokerName从clusterAddrTable中找到broker并从集群中删除,如果移除后,集群中不包含任何的broker,则将该集群从clusterAddrTable中移除。根据brokerName,遍历所有topic的队列,如果队列中包含了当前broker的队列,则移除,如果topic只包含待移除broker的队列的话,从路由表中删除该topic。1.4、路由发现
方法主要逻辑如下:public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final GetRouteInfoRequestHeader requestHeader =
(GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
//1:
TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
//2:
if (topicRouteData != null) {
if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
String orderTopicConf =
this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
requestHeader.getTopic());
topicRouteData.setOrderTopicConf(orderTopicConf);
}
byte[] content = topicRouteData.encode();
response.setBody(content);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
//3:
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()
+ FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
return response;
}
调用pickupTopicRouteData方法,从路由表topicQueueTable、brokerAddrTable、filterServerTable中分别填充到TopicRouteData中的queueData、brokerDatas、filterServerTable属性中。
List
:topic队列元数据 List
:topic分布的broker元数据 filterServer:broker上过滤服务器地址列表
如果找到topic对应的路由信息并且该topic为顺序消息,则从NameServerKVconfig中获取关于顺序消息相关的配置填充路由信息。
如果找不到路由信息就返回ResponseCode.TOPIC_NOT_EXIST
二、总结