Apache RocketMQ只用了7个类就实现了Nameserver,你知道是如何做到的吗?
1

2
让你来设计NameServer
该如何考虑?

3
一个具体请求示例,
来看看NameServer的源码实现
public RemotingCommand processRequest(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {if (ctx != null) {log.debug("receive request, {} {} {}",request.getCode(),RemotingHelper.parseChannelRemoteAddr(ctx.channel()),request);}switch (request.getCode()) {……//Broker注册请求处理,可以看到对于不对版本的Broker调用了不同的方法进行处理,测试中版本大于3.0.11,所以我们直接看registerBrokerWithFilterServer方法调用case RequestCode.REGISTER_BROKER:Version brokerVersion = MQVersion.value2Version(request.getVersion());if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {return this.registerBrokerWithFilterServer(ctx, request);} else {return this.registerBroker(ctx, request);}……case RequestCode.GET_ROUTEINFO_BY_TOPIC:return this.getRouteInfoByTopic(ctx, request);……default:break;}return null;}
public RemotingCommand registerBrokerWithFilterServer(ChannelHandlerContext ctx, RemotingCommand request)throws RemotingCommandException {//构建了请求返回信息final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();//解码Broker注册请求头信息,头信息里包含了Broker地址、名称、brokerId等信息final RegisterBrokerRequestHeader requestHeader =(RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);if (!checksum(ctx, request, requestHeader)) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("crc32 not match");return response;}RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();if (request.getBody() != null) {try {//解码Broker注册请求体信息,请求体中包含了所有的Topic信息registerBrokerBody = RegisterBrokerBody.decode(request.getBody(), requestHeader.isCompressed());} catch (Exception e) {throw new RemotingCommandException("Failed to decode RegisterBrokerBody", e);}} else {registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setCounter(new AtomicLong(0));registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setTimestamp(0);}//调用了RouteInfoManager#registerBroker方法,存储Broker相关信息到路由管理类中RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(requestHeader.getClusterName(),requestHeader.getBrokerAddr(),requestHeader.getBrokerName(),requestHeader.getBrokerId(),requestHeader.getHaServerAddr(),registerBrokerBody.getTopicConfigSerializeWrapper(),registerBrokerBody.getFilterServerList(),ctx.channel());//构建返回请求responseHeader.setHaServerAddr(result.getHaServerAddr());responseHeader.setMasterAddr(result.getMasterAddr());byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);response.setBody(jsonValue);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;}
网络连接及请求处理的能力是注册中心首先需要具备的功能。
其次需要有连接管理的能力,能够对超时连接,断开的连接进行处理。
路由信息管理能力是Nameserver的核心业务能力。
为了满足软件的灵活性,我们还需要有配置功能。
面向对象思想,类功能的合理分工,是所有软件开发都需要考虑的。
希望今天的内容能对大家有所帮助。
聊技术,不止于技术。
评论
