NameServer剖析
前言
到现在为止,RocketMQ已经更了5篇文章:回过头看,里面基本就是跟了跟源码,并没有对重要的知识做剖析,为了让文章不太臃肿,所以我将跟源码和知识的剖析分开来写,今天先来剖析NameServer(以下简称namesrv)。一、namesrv的作用
1.1、路由元信息
private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final HashMap
> topicQueueTable; private final HashMap
brokerAddrTable; private final HashMap
> clusterAddrTable; private final HashMap
brokerLiveTable; private final HashMap
/* Filter Server */> filterServerTable;
BROKER_CHANNEL_EXPIRED_TIME:broker启动时会向集群中所有的namesrv发送心跳,之后每隔30s再次发送,如果namesrv在连续120s内没有收到broker发送的心跳,那么此broker就会被namesrv移除。
lock:读写锁,读写路由信息时候是需要加锁的,不然并发访问时会有问题。
topicQueueTable:topic消息队列路由信息,消息发送时根据路由表进行负载均衡。
brokerAddrTable:broker的基础信息,包含brokerName、所属集群名称、主从broker的地址。
clusterAddrTable:broker集群信息,存储集群中所有broker的名称。
brokerLiveTable:broker状态信息,namesrv每次收到broker发送的心跳包时都会更新该信息。
filterServerTable:broker上的FilterServer列表,用于类模式消费过滤。
QueueData:见名猜意,队列数据,该类是topic对应的队列信息的抽象
readQueueNums:读队列个数,一个broker默认为每个topic创建4个读队列writeQueueNums:写队列个数, 一个broker默认为每个topic创建4个写队列perm:读写权限,一般设置为6,6:同时支持读写, 4:禁写,2:禁读topicSynFlag:topic同步标记,同步复制还是异步复制
BrokerData:broker数据的抽象
brokerName:broker的名称
brokerAddrs:broker的ip集合,为什么是集合,因为互为主从的broker的brokerName是相同的,key是brokerId(0表示master,大于0表示slave),value是broker的ip地址。
注:一个broker集群包含多个broker主从,一个broker主从中的主和从的brokerName是一样的。
random:随机数生成器,当一条消息发送到broker集群,就是根据随机数生成来选择消息存储到哪个broker实例上的。BrokerLiveInfo:存活的broker信息,就是和namesrv有心跳连接的broker
注:这里是为了看map中的集群信息,所以利用了centos中搭建的mq集群。
1.2、路由注册
org.apache.rocketmq.broker.BrokerStartup#main
org.apache.rocketmq.broker.BrokerStartup#start
org.apache.rocketmq.broker.BrokerController#start
org.apache.rocketmq.broker.BrokerController#registerBrokerAll
org.apache.rocketmq.broker.BrokerController#doRegisterBrokerAll
org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBrokerAll
接下来本地启动broker,debug跟下看看它给namesrv发送请求的过程public List
registerBrokerAll( final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List
filterServerList, final boolean oneway,
final int timeoutMills,
final boolean compressed) {
final List
registerBrokerResultList = Lists.newArrayList(); //获取所有namesrv的地址:ip端口
List
nameServerAddressList = this.remotingClient.getNameServerAddressList(); //遍历namesrv集合
if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
//封装请求头
final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
//broker地址
requestHeader.setBrokerAddr(brokerAddr);
//brokerId,0:master,大于0:slave
requestHeader.setBrokerId(brokerId);
//broker名称
requestHeader.setBrokerName(brokerName);
//所属集群的名称
requestHeader.setClusterName(clusterName);
//当前broker所属主从中主节点的地址
requestHeader.setHaServerAddr(haServerAddr);
requestHeader.setCompressed(compressed);
////封装请求体
RegisterBrokerBody requestBody = new RegisterBrokerBody();
/**主题配置,topicConfigWrapper内部封装的是topicConfigManager中的
topicConfigTable,内部存储的是broker启动时默认的一些topic,
,MixAll.SELF_TEST_TOPIC、MixAll.DEFAULT_TOPIC(AutoCreateTopic
-Enable=true)、MixAll.BENCHMARK_TOPIC、MixAll.OFFSET_MOVED_EVNET、
BrokerConfig中的brokerClusterName、brokerName。Broker中Topic
默认存储在${Rocket_Home}/store/config/topic.json中*/
requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
requestBody.setFilterServerList(filterServerList);
final byte[] body = requestBody.encode(compressed);
final int bodyCrc32 = UtilAll.crc32(body);
requestHeader.setBodyCrc32(bodyCrc32);
final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
for (final String namesrvAddr : nameServerAddressList) {
brokerOuterExecutor.execute(new Runnable() {
@Override
public void run() {
try {
//通过netty向namesrv发送网络请求,注册broker
RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
if (result != null) {
registerBrokerResultList.add(result);
}
log.info("register broker to name server {} OK", namesrvAddr);
} catch (Exception e) {
log.warn("registerBroker Exception, {}", namesrvAddr, e);
} finally {
countDownLatch.countDown();
}
}
});
}
try {
countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}
}
return registerBrokerResultList;
}
注:为了简洁明了,直接将断点打到
org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBrokerAll方法中,着重看下封装的参数即可
注:broker每隔30s会发一个心跳,所以你会看到请求不断地进入
org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBrokerAll方法,为了排除干扰,我们将心跳时间改长点,然后重启broker
1.3、路由删除
org.apache.rocketmq.namesrv.NamesrvStartup#main
org.apache.rocketmq.namesrv.NamesrvStartup#main0
org.apache.rocketmq.namesrv.NamesrvStartup#start
org.apache.rocketmq.namesrv.NamesrvController#initialize
你会发现这个方法逻辑很简单,就是遍历brokerLiveTable,判断每一个brokerLiveInfo的lastUpdateTimestamp和当前时间戳的差距是不是大于120s,如果是,就关闭和此broker的socket连接,并将此brokerLiveInfo从brokerLiveTable中删除,然后删除与该broker相关的路由信息,路由的具体删除逻辑在onChannelDestroy方法中public void scanNotActiveBroker() {
Iterator
> it = this.brokerLiveTable.entrySet().iterator(); while (it.hasNext()) {
Entry
next = it.next(); long last = next.getValue().getLastUpdateTimestamp();
if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
RemotingUtil.closeChannel(next.getValue().getChannel());
it.remove();
log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
}
}
}
根据brokerAddress从brokerLiveTable、filterServerTable移除
1.4、路由发现
方法主要逻辑如下:public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final GetRouteInfoRequestHeader requestHeader =
(GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
//1:
TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
//2:
if (topicRouteData != null) {
if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
String orderTopicConf =
this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
requestHeader.getTopic());
topicRouteData.setOrderTopicConf(orderTopicConf);
}
byte[] content = topicRouteData.encode();
response.setBody(content);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
//3:
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()
+ FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
return response;
}
调用pickupTopicRouteData方法,从路由表topicQueueTable、brokerAddrTable、filterServerTable中分别填充到TopicRouteData中的queueData、brokerDatas、filterServerTable属性中。
List
:topic队列元数据 List
:topic分布的broker元数据 filterServer:broker上过滤服务器地址列表
如果找到topic对应的路由信息并且该topic为顺序消息,则从NameServerKVconfig中获取关于顺序消息相关的配置填充路由信息。
如果找不到路由信息就返回ResponseCode.TOPIC_NOT_EXIST
二、总结