ES系列(四):http请求分发框架解析
上一篇讲解了es的网络通信模块实现过程,大致明白其工作原理。再总结一下,就是基于netty编程范式,形成es通信基础。从而,最终我们得到几个重要的handler: Netty4HttpPipeliningHandler/Netty4HttpRequestHandler/Netty4MessageChannelHandler...
实际上,这种范式类的东西,没必要花太多精力去关注。因为这对于我们理解一个系统业务,可能不是那么重要。(话多了,实际上es中核心lucene难道不值得花精力关注?)但现在,我们可以进一步花精力,看看es都是如何处理http请求的,这样至少,对于之后的每个细节实现的研究是有前提基础的。
1:从handler开始
因为是处理http请求,自然是从网络入口开始。而因为使用netty, 则必然显现在一个handler上。即上篇中看到的 Netty4HttpRequestHandler ... 我们再来回顾下。
// org.elasticsearch.http.netty4.Netty4HttpServerTransport.HttpChannelHandler#HttpChannelHandlerprotected HttpChannelHandler(final Netty4HttpServerTransport transport, final HttpHandlingSettings handlingSettings) {this.transport = transport;this.handlingSettings = handlingSettings;this.byteBufSizer = new NettyByteBufSizer();this.requestCreator = new Netty4HttpRequestCreator();this.requestHandler = new Netty4HttpRequestHandler(transport);this.responseCreator = new Netty4HttpResponseCreator();}// org.elasticsearch.http.netty4.Netty4HttpServerTransport.HttpChannelHandler#initChannel@Overrideprotected void initChannel(Channel ch) throws Exception {Netty4HttpChannel nettyHttpChannel = new Netty4HttpChannel(ch);// 此处 handler 配置的相当多, 自然是因其功能复杂的原因ch.attr(HTTP_CHANNEL_KEY).set(nettyHttpChannel);ch.pipeline().addLast("byte_buf_sizer", byteBufSizer);ch.pipeline().addLast("read_timeout", new ReadTimeoutHandler(transport.readTimeoutMillis, TimeUnit.MILLISECONDS));final HttpRequestDecoder decoder = new HttpRequestDecoder(handlingSettings.getMaxInitialLineLength(),handlingSettings.getMaxHeaderSize(),handlingSettings.getMaxChunkSize());decoder.setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR);ch.pipeline().addLast("decoder", decoder);ch.pipeline().addLast("decoder_compress", new HttpContentDecompressor());ch.pipeline().addLast("encoder", new HttpResponseEncoder());final HttpObjectAggregator aggregator = new HttpObjectAggregator(handlingSettings.getMaxContentLength());aggregator.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents);ch.pipeline().addLast("aggregator", aggregator);if (handlingSettings.isCompression()) {ch.pipeline().addLast("encoder_compress", new HttpContentCompressor(handlingSettings.getCompressionLevel()));}ch.pipeline().addLast("request_creator", requestCreator);ch.pipeline().addLast("response_creator", responseCreator);// 最后两个处理器, pipelineing, handler, 则处理真正的业务ch.pipeline().addLast("pipelining", new Netty4HttpPipeliningHandler(logger, transport.pipeliningMaxEvents));// Netty4HttpRequestHandlerch.pipeline().addLast("handler", requestHandler);transport.serverAcceptedChannel(nettyHttpChannel);}
抛却 pipelineing 不说,就只剩下 handler 这个核心处理器了。而 handler 我们看到它是 Netty4HttpRequestHandler 的一个实例,而其构造方法中传入一个 transport, 这里面保存了相当多的上下文信息,包括配置,如何分发等等功能,细节无须多说。
但其中有一个 dispatcher 需要说下,因为这会影响到后续的请求如何处理的问题。
实际上,在node初始化时,就会创建一个 dispatcher, 是一个 RestController 实例,而这会在后其他组件进行注册时使用到,因为请求分发是由 RestController 控制的。它的初始化过程如下:
// org.elasticsearch.node.Node#Nodeprotected Node(final Environment initialEnvironment,Collection<Class<? extends Plugin>> classpathPlugins, boolean forbidPrivateIndexSettings) {...ActionModule actionModule = new ActionModule(false, settings, clusterModule.getIndexNameExpressionResolver(),settingsModule.getIndexScopedSettings(), settingsModule.getClusterSettings(), settingsModule.getSettingsFilter(),threadPool, pluginsService.filterPlugins(ActionPlugin.class), client, circuitBreakerService, usageService, systemIndices);modules.add(actionModule);// restController 在 ActionModule 中初始化, 被 networkModule 使用final RestController restController = actionModule.getRestController();final NetworkModule networkModule = new NetworkModule(settings, false, pluginsService.filterPlugins(NetworkPlugin.class),threadPool, bigArrays, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, xContentRegistry,networkService, restController, clusterService.getClusterSettings());Collection<UnaryOperator<Map<String, IndexTemplateMetadata>>> indexTemplateMetadataUpgraders =pluginsService.filterPlugins(Plugin.class).stream().map(Plugin::getIndexTemplateMetadataUpgrader).collect(Collectors.toList());...}// org.elasticsearch.action.ActionModule#ActionModulepublic ActionModule(boolean transportClient, Settings settings, IndexNameExpressionResolver indexNameExpressionResolver,IndexScopedSettings indexScopedSettings, ClusterSettings clusterSettings, SettingsFilter settingsFilter,ThreadPool threadPool, List<ActionPlugin> actionPlugins, NodeClient nodeClient,CircuitBreakerService circuitBreakerService, UsageService usageService, SystemIndices systemIndices) {this.transportClient = transportClient;this.settings = settings;this.indexNameExpressionResolver = indexNameExpressionResolver;this.indexScopedSettings = indexScopedSettings;this.clusterSettings = clusterSettings;this.settingsFilter = settingsFilter;this.actionPlugins = actionPlugins;this.threadPool = threadPool;actions = setupActions(actionPlugins);actionFilters = setupActionFilters(actionPlugins);autoCreateIndex = transportClient? null: new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver, systemIndices);destructiveOperations = new DestructiveOperations(settings, clusterSettings);Set<RestHeaderDefinition> headers = Stream.concat(actionPlugins.stream().flatMap(p -> p.getRestHeaders().stream()),Stream.of(new RestHeaderDefinition(Task.X_OPAQUE_ID, false))).collect(Collectors.toSet());UnaryOperator<RestHandler> restWrapper = null;for (ActionPlugin plugin : actionPlugins) {UnaryOperator<RestHandler> newRestWrapper = plugin.getRestHandlerWrapper(threadPool.getThreadContext());if (newRestWrapper != null) {logger.debug("Using REST wrapper from plugin " + plugin.getClass().getName());if (restWrapper != null) {throw new IllegalArgumentException("Cannot have more than one plugin implementing a REST wrapper");}restWrapper = newRestWrapper;}}mappingRequestValidators = new RequestValidators<>(actionPlugins.stream().flatMap(p -> p.mappingRequestValidators().stream()).collect(Collectors.toList()));indicesAliasesRequestRequestValidators = new RequestValidators<>(actionPlugins.stream().flatMap(p -> p.indicesAliasesRequestValidators().stream()).collect(Collectors.toList()));if (transportClient) {restController = null;} else {// 直接实例化 restControllerrestController = new RestController(headers, restWrapper, nodeClient, circuitBreakerService, usageService);}}
实例化好 dispatcher 后,需要接受其他组件的注册行为。(接受注册的有两个组件: RestController 和 Transport$RequestHandlers)
其中,RequestHandlers 的地址格式如: internal:transport/handshake, cluster:admin/snapshot/status[nodes], indices:admin/close[s][p] ; 而 RestController 的地址格式如: /{index}/_doc/{id}, /_nodes/{nodeId}/{metrics}, /_cluster/allocation/explain, /{index}/_alias/{name}
1.1. RequestHandlers 的注册
先来看看 RequestHandlers 的注册过程,这些注册更多的是用于集群内部通信使用:
// org.elasticsearch.transport.TransportService#registerRequestHandler/*** Registers a new request handler** @param action The action the request handler is associated with* @param requestReader The request class that will be used to construct new instances for streaming* @param executor The executor the request handling will be executed on* @param forceExecution Force execution on the executor queue and never reject it* @param canTripCircuitBreaker Check the request size and raise an exception in case the limit is breached.* @param handler The handler itself that implements the request handling*/public <Request extends TransportRequest> void registerRequestHandler(String action,String executor, boolean forceExecution,boolean canTripCircuitBreaker,Writeable.Reader<Request> requestReader,TransportRequestHandler<Request> handler) {validateActionName(action);handler = interceptor.interceptHandler(action, executor, forceExecution, handler);RequestHandlerRegistry<Request> reg = new RequestHandlerRegistry<>(action, requestReader, taskManager, handler, executor, forceExecution, canTripCircuitBreaker);transport.registerRequestHandler(reg);}// org.elasticsearch.transport.Transport#registerRequestHandler/*** Registers a new request handler*/default <Request extends TransportRequest> void registerRequestHandler(RequestHandlerRegistry<Request> reg) {getRequestHandlers().registerHandler(reg);}synchronized <Request extends TransportRequest> void registerHandler(RequestHandlerRegistry<Request> reg) {if (requestHandlers.containsKey(reg.getAction())) {throw new IllegalArgumentException("transport handlers for action " + reg.getAction() + " is already registered");}requestHandlers = MapBuilder.newMapBuilder(requestHandlers).put(reg.getAction(), reg).immutableMap();}// 注册样例1// org.elasticsearch.transport.TransportService#TransportServicepublic TransportService(Settings settings, Transport transport, ThreadPool threadPool, TransportInterceptor transportInterceptor,Function<BoundTransportAddress, DiscoveryNode> localNodeFactory, @Nullable ClusterSettings clusterSettings,Set<String> taskHeaders, ConnectionManager connectionManager) {...registerRequestHandler(HANDSHAKE_ACTION_NAME,ThreadPool.Names.SAME,false, false,HandshakeRequest::new,(request, channel, task) -> channel.sendResponse(new HandshakeResponse(localNode.getVersion(), Build.CURRENT.hash(), localNode, clusterName)));...}// 注册样例2// org.elasticsearch.repositories.VerifyNodeRepositoryAction#VerifyNodeRepositoryActionpublic VerifyNodeRepositoryAction(TransportService transportService, ClusterService clusterService,RepositoriesService repositoriesService) {this.transportService = transportService;this.clusterService = clusterService;this.repositoriesService = repositoriesService;transportService.registerRequestHandler(ACTION_NAME, ThreadPool.Names.SNAPSHOT, VerifyNodeRepositoryRequest::new,new VerifyNodeRepositoryRequestHandler());}// 注册样例3// org.elasticsearch.discovery.PeerFinder#PeerFinderpublic PeerFinder(Settings settings, TransportService transportService, TransportAddressConnector transportAddressConnector,ConfiguredHostsResolver configuredHostsResolver) {this.settings = settings;findPeersInterval = DISCOVERY_FIND_PEERS_INTERVAL_SETTING.get(settings);requestPeersTimeout = DISCOVERY_REQUEST_PEERS_TIMEOUT_SETTING.get(settings);this.transportService = transportService;this.transportAddressConnector = transportAddressConnector;this.configuredHostsResolver = configuredHostsResolver;transportService.registerRequestHandler(REQUEST_PEERS_ACTION_NAME, Names.GENERIC, false, false,PeersRequest::new,(request, channel, task) -> channel.sendResponse(handlePeersRequest(request)));transportService.registerRequestHandler(UnicastZenPing.ACTION_NAME, Names.GENERIC, false, false,UnicastZenPing.UnicastPingRequest::new, new Zen1UnicastPingRequestHandler());}
整个 RequestHandlers 的注册,使用一个同步锁保证线程安全性,且使用一个类似写时复制的一个机制,保证读的安全性。然后,最终使用一个简单的 HashMap 包含handlers与path. 性能与安全性同时兼顾。
1.2. RestController 的注册
RestController 的注册则稍有不同,它主要用于处理客户端的请求如:/{index}/_doc/{id}
// org.elasticsearch.rest.RestController#registerHandler/*** Registers a REST handler with the controller. The REST handler declares the {@code method}* and {@code path} combinations.*/public void registerHandler(final RestHandler restHandler) {restHandler.routes().forEach(route -> registerHandler(route.getMethod(), route.getPath(), restHandler));restHandler.deprecatedRoutes().forEach(route ->registerAsDeprecatedHandler(route.getMethod(), route.getPath(), restHandler, route.getDeprecationMessage()));restHandler.replacedRoutes().forEach(route -> registerWithDeprecatedHandler(route.getMethod(), route.getPath(),restHandler, route.getDeprecatedMethod(), route.getDeprecatedPath()));}/*** Registers a REST handler to be executed when one of the provided methods and path match the request.** @param path Path to handle (e.g., "/{index}/{type}/_bulk")* @param handler The handler to actually execute* @param method GET, POST, etc.*/protected void registerHandler(RestRequest.Method method, String path, RestHandler handler) {if (handler instanceof BaseRestHandler) {// 如果是 BaseRestHandler, 则添加到 usageService 中usageService.addRestHandler((BaseRestHandler) handler);}registerHandlerNoWrap(method, path, handlerWrapper.apply(handler));}private void registerHandlerNoWrap(RestRequest.Method method, String path, RestHandler maybeWrappedHandler) {// 此处 handlers = new PathTrie<>(RestUtils.REST_DECODER);handlers.insertOrUpdate(path, new MethodHandlers(path, maybeWrappedHandler, method),(mHandlers, newMHandler) -> mHandlers.addMethods(maybeWrappedHandler, method));}// org.elasticsearch.usage.UsageService#addRestHandler/*** Add a REST handler to this service.** @param handler the {@link BaseRestHandler} to add to the usage service.*/public void addRestHandler(BaseRestHandler handler) {Objects.requireNonNull(handler);if (handler.getName() == null) {throw new IllegalArgumentException("handler of type [" + handler.getClass().getName() + "] does not have a name");}// 以hashMap直接保存name与handler的关系final BaseRestHandler maybeHandler = handlers.put(handler.getName(), handler);/** Handlers will be registered multiple times, once for each route that the handler handles. This means that we will see handlers* multiple times, so we do not have a conflict if we are seeing the same instance multiple times. So, we only reject if a handler* with the same name was registered before, and it is not the same instance as before.*/if (maybeHandler != null && maybeHandler != handler) {final String message = String.format(Locale.ROOT,"handler of type [%s] conflicts with handler of type [%s] as they both have the same name [%s]",handler.getClass().getName(),maybeHandler.getClass().getName(),handler.getName());throw new IllegalArgumentException(message);}}
PathTrie 的插入更新过程细节,感兴趣可以展开:
// org.elasticsearch.common.path.PathTrie#insertOrUpdate/*** Insert a value for the given path. If the path already exists, replace the value with:* <pre>* value = updater.apply(oldValue, newValue);* </pre>* allowing the value to be updated if desired.*/public void insertOrUpdate(String path, T value, BiFunction<T, T, T> updater) {String[] strings = path.split(SEPARATOR);if (strings.length == 0) {if (rootValue != null) {rootValue = updater.apply(rootValue, value);} else {rootValue = value;}return;}int index = 0;// Supports initial delimiter.if (strings[0].isEmpty()) {index = 1;}root.insertOrUpdate(strings, index, value, updater);}// 类似字典树的路径查找实现// 核心原理是以 '/' 分割路径,然后依次从前往后取key以 HashMap 形式存储// 遇到 {xxx} 则特殊对待存储// 此处的字典先后是以 // 分的先后为依据的,而非字母顺序// org.elasticsearch.common.path.PathTrie.TrieNode#insertOrUpdateprivate synchronized void insertOrUpdate(String[] path, int index, T value, BiFunction<T, T, T> updater) {if (index >= path.length)return;String token = path[index];String key = token;// 类似 {index} 这种格式if (isNamedWildcard(token)) {key = wildcard;}TrieNode node = children.get(key);if (node == null) {T nodeValue = index == path.length - 1 ? value : null;node = new TrieNode(token, nodeValue, wildcard);addInnerChild(key, node);} else {if (isNamedWildcard(token)) {node.updateKeyWithNamedWildcard(token);}/** If the target node already exists, but is without a value,* then the value should be updated.*/if (index == (path.length - 1)) {if (node.value != null) {node.value = updater.apply(node.value, value);} else {node.value = value;}}}node.insertOrUpdate(path, index + 1, value, updater);}
有了以上地址注册后,在进行请求分发时就有依据了。
2. 请求分发过程
当一个请求到达es, es是如何执行处理的呢?当然是各种场景各有不同了。但是,我们此处要看的,从框架微处看es是如何执行的。其实,必然和上一节我们讲的 RestController, RequestHandlers 有关。
具体细节的如何分发到 RestController, 有兴趣的同学可以展开阅读, 无则直接从 RestController 开始。
// org.elasticsearch.http.netty4.Netty4HttpRequestHandler#channelRead0@Overrideprotected void channelRead0(ChannelHandlerContext ctx, HttpPipelinedRequest httpRequest) {final Netty4HttpChannel channel = ctx.channel().attr(Netty4HttpServerTransport.HTTP_CHANNEL_KEY).get();boolean success = false;try {serverTransport.incomingRequest(httpRequest, channel);success = true;} finally {if (success == false) {httpRequest.release();}}}// org.elasticsearch.http.AbstractHttpServerTransport#incomingRequest/*** This method handles an incoming http request.** @param httpRequest that is incoming* @param httpChannel that received the http request*/public void incomingRequest(final HttpRequest httpRequest, final HttpChannel httpChannel) {final long startTime = threadPool.relativeTimeInMillis();try {handleIncomingRequest(httpRequest, httpChannel, httpRequest.getInboundException());} finally {final long took = threadPool.relativeTimeInMillis() - startTime;final long logThreshold = slowLogThresholdMs;if (logThreshold > 0 && took > logThreshold) {logger.warn("handling request [{}][{}][{}][{}] took [{}ms] which is above the warn threshold of [{}ms]",httpRequest.header(Task.X_OPAQUE_ID), httpRequest.method(), httpRequest.uri(), httpChannel, took, logThreshold);}}}private void handleIncomingRequest(final HttpRequest httpRequest, final HttpChannel httpChannel, final Exception exception) {if (exception == null) {HttpResponse earlyResponse = corsHandler.handleInbound(httpRequest);if (earlyResponse != null) {httpChannel.sendResponse(earlyResponse, earlyResponseListener(httpRequest, httpChannel));httpRequest.release();return;}}Exception badRequestCause = exception;/** We want to create a REST request from the incoming request from Netty. However, creating this request could fail if there* are incorrectly encoded parameters, or the Content-Type header is invalid. If one of these specific failures occurs, we* attempt to create a REST request again without the input that caused the exception (e.g., we remove the Content-Type header,* or skip decoding the parameters). Once we have a request in hand, we then dispatch the request as a bad request with the* underlying exception that caused us to treat the request as bad.*/final RestRequest restRequest;{RestRequest innerRestRequest;try {innerRestRequest = RestRequest.request(xContentRegistry, httpRequest, httpChannel);} catch (final RestRequest.ContentTypeHeaderException e) {badRequestCause = ExceptionsHelper.useOrSuppress(badRequestCause, e);innerRestRequest = requestWithoutContentTypeHeader(httpRequest, httpChannel, badRequestCause);} catch (final RestRequest.BadParameterException e) {badRequestCause = ExceptionsHelper.useOrSuppress(badRequestCause, e);innerRestRequest = RestRequest.requestWithoutParameters(xContentRegistry, httpRequest, httpChannel);}restRequest = innerRestRequest;}final HttpTracer trace = tracer.maybeTraceRequest(restRequest, exception);/** We now want to create a channel used to send the response on. However, creating this channel can fail if there are invalid* parameter values for any of the filter_path, human, or pretty parameters. We detect these specific failures via an* IllegalArgumentException from the channel constructor and then attempt to create a new channel that bypasses parsing of these* parameter values.*/final RestChannel channel;{RestChannel innerChannel;ThreadContext threadContext = threadPool.getThreadContext();try {innerChannel =new DefaultRestChannel(httpChannel, httpRequest, restRequest, bigArrays, handlingSettings, threadContext, corsHandler,trace);} catch (final IllegalArgumentException e) {badRequestCause = ExceptionsHelper.useOrSuppress(badRequestCause, e);final RestRequest innerRequest = RestRequest.requestWithoutParameters(xContentRegistry, httpRequest, httpChannel);innerChannel =new DefaultRestChannel(httpChannel, httpRequest, innerRequest, bigArrays, handlingSettings, threadContext, corsHandler,trace);}channel = innerChannel;}// 分发请求,到 controllerdispatchRequest(restRequest, channel, badRequestCause);}// org.elasticsearch.http.AbstractHttpServerTransport#dispatchRequest// Visible for testingvoid dispatchRequest(final RestRequest restRequest, final RestChannel channel, final Throwable badRequestCause) {final ThreadContext threadContext = threadPool.getThreadContext();try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {if (badRequestCause != null) {dispatcher.dispatchBadRequest(channel, threadContext, badRequestCause);} else {// 分发到 restControllerdispatcher.dispatchRequest(restRequest, channel, threadContext);}}}
RestController 是以 dispatchRequest() 作为入口的,它负责找到能够处理该请求的处理器,然后分发过去。这很自然。
// org.elasticsearch.rest.RestController#dispatchRequest@Overridepublic void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) {try {// 尝试所有可能的处理器tryAllHandlers(request, channel, threadContext);} catch (Exception e) {try {// 发生异常则响应异常信息channel.sendResponse(new BytesRestResponse(channel, e));} catch (Exception inner) {inner.addSuppressed(e);logger.error(() ->new ParameterizedMessage("failed to send failure response for uri [{}]", request.uri()), inner);}}}// org.elasticsearch.rest.RestController#tryAllHandlersprivate void tryAllHandlers(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) throws Exception {// 读取 header 信息for (final RestHeaderDefinition restHeader : headersToCopy) {final String name = restHeader.getName();final List<String> headerValues = request.getAllHeaderValues(name);if (headerValues != null && headerValues.isEmpty() == false) {final List<String> distinctHeaderValues = headerValues.stream().distinct().collect(Collectors.toList());if (restHeader.isMultiValueAllowed() == false && distinctHeaderValues.size() > 1) {channel.sendResponse(BytesRestResponse.createSimpleErrorResponse(channel, BAD_REQUEST, "multiple values for single-valued header [" + name + "]."));return;} else {threadContext.putHeader(name, String.join(",", distinctHeaderValues));}}}// error_trace cannot be used when we disable detailed errors// we consume the error_trace parameter first to ensure that it is always consumedif (request.paramAsBoolean("error_trace", false) && channel.detailedErrorsEnabled() == false) {channel.sendResponse(BytesRestResponse.createSimpleErrorResponse(channel, BAD_REQUEST, "error traces in responses are disabled."));return;}final String rawPath = request.rawPath();final String uri = request.uri();final RestRequest.Method requestMethod;try {// Resolves the HTTP method and fails if the method is invalidrequestMethod = request.method();// Loop through all possible handlers, attempting to dispatch the request// 获取可能的处理器,主要是有正则或者索引变量的存在,可能匹配多个处理器Iterator<MethodHandlers> allHandlers = getAllHandlers(request.params(), rawPath);while (allHandlers.hasNext()) {final RestHandler handler;// 一个处理器里支持多种请求方法final MethodHandlers handlers = allHandlers.next();if (handlers == null) {handler = null;} else {handler = handlers.getHandler(requestMethod);}if (handler == null) {// 未找到处理器不代表不能处理,有可能需要继续查找,如果确定不能处理,则直接响应客户端返回if (handleNoHandlerFound(rawPath, requestMethod, uri, channel)) {return;}} else {// 找到了处理器,调用其方法dispatchRequest(request, channel, handler);return;}}} catch (final IllegalArgumentException e) {handleUnsupportedHttpMethod(uri, null, channel, getValidHandlerMethodSet(rawPath), e);return;}// If request has not been handled, fallback to a bad request error.// 降级方法调用handleBadRequest(uri, requestMethod, channel);}// org.elasticsearch.rest.RestController#getAllHandlersIterator<MethodHandlers> getAllHandlers(@Nullable Map<String, String> requestParamsRef, String rawPath) {final Supplier<Map<String, String>> paramsSupplier;if (requestParamsRef == null) {paramsSupplier = () -> null;} else {// Between retrieving the correct path, we need to reset the parameters,// otherwise parameters are parsed out of the URI that aren't actually handled.final Map<String, String> originalParams = new HashMap<>(requestParamsRef);paramsSupplier = () -> {// PathTrie modifies the request, so reset the params between each iterationrequestParamsRef.clear();requestParamsRef.putAll(originalParams);return requestParamsRef;};}// we use rawPath since we don't want to decode it while processing the path resolution// so we can handle things like:// my_index/my_type/http%3A%2F%2Fwww.google.comreturn handlers.retrieveAll(rawPath, paramsSupplier);}/*** Returns an iterator of the objects stored in the {@code PathTrie}, using* all possible {@code TrieMatchingMode} modes. The {@code paramSupplier}* is called between each invocation of {@code next()} to supply a new map* of parameters.*/public Iterator<T> retrieveAll(String path, Supplier<Map<String, String>> paramSupplier) {return new Iterator<T>() {private int mode;@Overridepublic boolean hasNext() {return mode < TrieMatchingMode.values().length;}@Overridepublic T next() {if (hasNext() == false) {throw new NoSuchElementException("called next() without validating hasNext()! no more modes available");}return retrieve(path, paramSupplier.get(), TrieMatchingMode.values()[mode++]);}};}
其中,路径匹配是基于trie树的一种实现,获取路径匹配的一个节点。需要稍微理解下其算法,按需展开。
// org.elasticsearch.common.path.PathTrie.TrieNode#retrievepublic T retrieve(String[] path, int index, Map<String, String> params, TrieMatchingMode trieMatchingMode) {if (index >= path.length)return null;String token = path[index];TrieNode node = children.get(token);boolean usedWildcard;if (node == null) {if (trieMatchingMode == TrieMatchingMode.WILDCARD_NODES_ALLOWED) {node = children.get(wildcard);if (node == null) {return null;}usedWildcard = true;} else if (trieMatchingMode == TrieMatchingMode.WILDCARD_ROOT_NODES_ALLOWED && index == 1) {/** Allow root node wildcard matches.*/node = children.get(wildcard);if (node == null) {return null;}usedWildcard = true;} else if (trieMatchingMode == TrieMatchingMode.WILDCARD_LEAF_NODES_ALLOWED && index + 1 == path.length) {/** Allow leaf node wildcard matches.*/node = children.get(wildcard);if (node == null) {return null;}usedWildcard = true;} else {return null;}} else {if (index + 1 == path.length && node.value == null && children.get(wildcard) != null&& EXPLICIT_OR_ROOT_WILDCARD.contains(trieMatchingMode) == false) {/** If we are at the end of the path, the current node does not have a value but* there is a child wildcard node, use the child wildcard node.*/node = children.get(wildcard);usedWildcard = true;} else if (index == 1 && node.value == null && children.get(wildcard) != null&& trieMatchingMode == TrieMatchingMode.WILDCARD_ROOT_NODES_ALLOWED) {/** If we are at the root, and root wildcards are allowed, use the child wildcard* node.*/node = children.get(wildcard);usedWildcard = true;} else {usedWildcard = token.equals(wildcard);}}// 思考:params 的目的是啥?put(params, node, token);if (index == (path.length - 1)) {return node.value;}T nodeValue = node.retrieve(path, index + 1, params, trieMatchingMode);if (nodeValue == null && !usedWildcard && trieMatchingMode != TrieMatchingMode.EXPLICIT_NODES_ONLY) {node = children.get(wildcard);if (node != null) {put(params, node, token);nodeValue = node.retrieve(path, index + 1, params, trieMatchingMode);}}return nodeValue;}
找到处理器之后,就是核心调用了。
// org.elasticsearch.rest.RestController#dispatchRequestprivate void dispatchRequest(RestRequest request, RestChannel channel, RestHandler handler) throws Exception {final int contentLength = request.contentLength();if (contentLength > 0) {final XContentType xContentType = request.getXContentType();if (xContentType == null) {sendContentTypeErrorMessage(request.getAllHeaderValues("Content-Type"), channel);return;}if (handler.supportsContentStream() && xContentType != XContentType.JSON && xContentType != XContentType.SMILE) {channel.sendResponse(BytesRestResponse.createSimpleErrorResponse(channel, RestStatus.NOT_ACCEPTABLE,"Content-Type [" + xContentType + "] does not support stream parsing. Use JSON or SMILE instead"));return;}}RestChannel responseChannel = channel;try {// 熔断判定if (handler.canTripCircuitBreaker()) {inFlightRequestsBreaker(circuitBreakerService).addEstimateBytesAndMaybeBreak(contentLength, "<http_request>");} else {inFlightRequestsBreaker(circuitBreakerService).addWithoutBreaking(contentLength);}// iff we could reserve bytes for the request we need to send the response also over this channelresponseChannel = new ResourceHandlingHttpChannel(channel, circuitBreakerService, contentLength);// TODO: Count requests double in the circuit breaker if they need copying?if (handler.allowsUnsafeBuffers() == false) {request.ensureSafeBuffers();}if (handler.allowSystemIndexAccessByDefault() == false && request.header(ELASTIC_PRODUCT_ORIGIN_HTTP_HEADER) == null) {// The ELASTIC_PRODUCT_ORIGIN_HTTP_HEADER indicates that the request is coming from an Elastic product with a plan// to move away from direct access to system indices, and thus deprecation warnings should not be emitted.// This header is intended for internal use only.client.threadPool().getThreadContext().putHeader(SYSTEM_INDEX_ACCESS_CONTROL_HEADER_KEY, Boolean.FALSE.toString());}// 调用handler处理方法,该handler可能会被过滤器先执行handler.handleRequest(request, responseChannel, client);} catch (Exception e) {responseChannel.sendResponse(new BytesRestResponse(responseChannel, e));}}// org.elasticsearch.xpack.security.rest.SecurityRestFilter#handleRequest@Overridepublic void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {if (licenseState.isSecurityEnabled() && request.method() != Method.OPTIONS) {// CORS - allow for preflight unauthenticated OPTIONS requestif (extractClientCertificate) {HttpChannel httpChannel = request.getHttpChannel();SSLEngineUtils.extractClientCertificates(logger, threadContext, httpChannel);}final String requestUri = request.uri();authenticationService.authenticate(maybeWrapRestRequest(request), ActionListener.wrap(authentication -> {if (authentication == null) {logger.trace("No authentication available for REST request [{}]", requestUri);} else {logger.trace("Authenticated REST request [{}] as {}", requestUri, authentication);}secondaryAuthenticator.authenticateAndAttachToContext(request, ActionListener.wrap(secondaryAuthentication -> {if (secondaryAuthentication != null) {logger.trace("Found secondary authentication {} in REST request [{}]", secondaryAuthentication, requestUri);}RemoteHostHeader.process(request, threadContext);restHandler.handleRequest(request, channel, client);},e -> handleException("Secondary authentication", request, channel, e)));}, e -> handleException("Authentication", request, channel, e)));} else {// 转发到下一处理器责任链restHandler.handleRequest(request, channel, client);}}// 我们以 RestNodeAction 为例,如请求 /_cat/nodes,看handler如何处理请求// org.elasticsearch.rest.BaseRestHandler#handleRequest@Overridepublic final void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {// prepare the request for execution; has the side effect of touching the request parameters// 看起来叫准备请求,实际非常重要,它会组装后续的请求逻辑final RestChannelConsumer action = prepareRequest(request, client);// validate unconsumed params, but we must exclude params used to format the response// use a sorted set so the unconsumed parameters appear in a reliable sorted orderfinal SortedSet<String> unconsumedParams =request.unconsumedParams().stream().filter(p -> !responseParams().contains(p)).collect(Collectors.toCollection(TreeSet::new));// validate the non-response paramsif (!unconsumedParams.isEmpty()) {final Set<String> candidateParams = new HashSet<>();candidateParams.addAll(request.consumedParams());candidateParams.addAll(responseParams());throw new IllegalArgumentException(unrecognized(request, unconsumedParams, candidateParams, "parameter"));}if (request.hasContent() && request.isContentConsumed() == false) {throw new IllegalArgumentException("request [" + request.method() + " " + request.path() + "] does not support having a body");}usageCount.increment();// execute the action// 即此处仅为调用前面设置好的方法action.accept(channel);}// org.elasticsearch.rest.action.cat.AbstractCatAction#prepareRequest@Overridepublic RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {boolean helpWanted = request.paramAsBoolean("help", false);// 帮助信息打印if (helpWanted) {return channel -> {Table table = getTableWithHeader(request);int[] width = buildHelpWidths(table, request);BytesStream bytesOutput = Streams.flushOnCloseStream(channel.bytesOutput());UTF8StreamWriter out = new UTF8StreamWriter().setOutput(bytesOutput);for (Table.Cell cell : table.getHeaders()) {// need to do left-align always, so create new cellspad(new Table.Cell(cell.value), width[0], request, out);out.append(" | ");pad(new Table.Cell(cell.attr.containsKey("alias") ? cell.attr.get("alias") : ""), width[1], request, out);out.append(" | ");pad(new Table.Cell(cell.attr.containsKey("desc") ? cell.attr.get("desc") : "not available"), width[2], request, out);out.append("\n");}out.close();channel.sendResponse(new BytesRestResponse(RestStatus.OK, BytesRestResponse.TEXT_CONTENT_TYPE, bytesOutput.bytes()));};} else {return doCatRequest(request, client);}}// org.elasticsearch.rest.action.cat.RestNodesAction#doCatRequest@Overridepublic RestChannelConsumer doCatRequest(final RestRequest request, final NodeClient client) {final ClusterStateRequest clusterStateRequest = new ClusterStateRequest();clusterStateRequest.clear().nodes(true);if (request.hasParam("local")) {deprecationLogger.deprecate("cat_nodes_local_parameter", LOCAL_DEPRECATED_MESSAGE);}clusterStateRequest.local(request.paramAsBoolean("local", clusterStateRequest.local()));clusterStateRequest.masterNodeTimeout(request.paramAsTime("master_timeout", clusterStateRequest.masterNodeTimeout()));final boolean fullId = request.paramAsBoolean("full_id", false);// 向集群发起请求,获得各节点状态return channel -> client.admin().cluster().state(clusterStateRequest, new RestActionListener<ClusterStateResponse>(channel) {@Overridepublic void processResponse(final ClusterStateResponse clusterStateResponse) {NodesInfoRequest nodesInfoRequest = new NodesInfoRequest();nodesInfoRequest.clear().addMetrics(NodesInfoRequest.Metric.JVM.metricName(),NodesInfoRequest.Metric.OS.metricName(),NodesInfoRequest.Metric.PROCESS.metricName(),NodesInfoRequest.Metric.HTTP.metricName());client.admin().cluster().nodesInfo(nodesInfoRequest, new RestActionListener<NodesInfoResponse>(channel) {@Overridepublic void processResponse(final NodesInfoResponse nodesInfoResponse) {NodesStatsRequest nodesStatsRequest = new NodesStatsRequest();nodesStatsRequest.clear().indices(true).addMetrics(NodesStatsRequest.Metric.JVM.metricName(),NodesStatsRequest.Metric.OS.metricName(),NodesStatsRequest.Metric.FS.metricName(),NodesStatsRequest.Metric.PROCESS.metricName(),NodesStatsRequest.Metric.SCRIPT.metricName());client.admin().cluster().nodesStats(nodesStatsRequest, new RestResponseListener<NodesStatsResponse>(channel) {@Overridepublic RestResponse buildResponse(NodesStatsResponse nodesStatsResponse) throws Exception {return RestTable.buildResponse(buildTable(fullId, request, clusterStateResponse, nodesInfoResponse,nodesStatsResponse), channel);}});}});}});}
以上的整个 /_cat/nodes 的整体处理框架,即其会先发起 state 请求,然后是 nodesInfo 请求,最后是 nodesStats 请求,然后再统一响应。所以,我们也会三阶段来分析。
2.1. state的处理过程
state即获取集群当前状态信息,它的入口在 ClusterAdmin 中。因为是第一接触es的处理流程,我们将其抽象方法中的过程也一起看了。后续则不再讲述此部分,而只看业务核心实现部分。
// org.elasticsearch.client.support.AbstractClient.ClusterAdmin#state@Overridepublic void state(final ClusterStateRequest request, final ActionListener<ClusterStateResponse> listener) {execute(ClusterStateAction.INSTANCE, request, listener);}// org.elasticsearch.client.support.AbstractClient.ClusterAdmin#execute@Overridepublic <Request extends ActionRequest, Response extends ActionResponse> void execute(ActionType<Response> action, Request request, ActionListener<Response> listener) {client.execute(action, request, listener);}// org.elasticsearch.client.support.AbstractClient#execute/*** This is the single execution point of *all* clients.*/@Overridepublic final <Request extends ActionRequest, Response extends ActionResponse> void execute(ActionType<Response> action, Request request, ActionListener<Response> listener) {try {listener = threadedWrapper.wrap(listener);doExecute(action, request, listener);} catch (Exception e) {assert false : new AssertionError(e);listener.onFailure(e);}}// org.elasticsearch.client.node.NodeClient#doExecute@Overridepublic <Request extends ActionRequest, Response extends ActionResponse>void doExecute(ActionType<Response> action, Request request, ActionListener<Response> listener) {// Discard the task because the Client interface doesn't use it.try {executeLocally(action, request, listener);} catch (TaskCancelledException | IllegalArgumentException | IllegalStateException e) {// #executeLocally returns the task and throws TaskCancelledException if it fails to register the task because the parent// task has been cancelled, IllegalStateException if the client was not in a state to execute the request because it was not// yet properly initialized or IllegalArgumentException if header validation fails we forward them to listener since this API// does not concern itself with the specifics of the task handlinglistener.onFailure(e);}}/*** Execute an {@link ActionType} locally, returning that {@link Task} used to track it, and linking an {@link ActionListener}.* Prefer this method if you don't need access to the task when listening for the response. This is the method used to implement* the {@link Client} interface.** @throws TaskCancelledException if the request's parent task has been cancelled already*/public <Request extends ActionRequest,Response extends ActionResponse> Task executeLocally(ActionType<Response> action, Request request, ActionListener<Response> listener) {return transportAction(action).execute(request, listener);}///*** Get the {@link TransportAction} for an {@link ActionType}, throwing exceptions if the action isn't available.*/@SuppressWarnings("unchecked")private < Request extends ActionRequest,Response extends ActionResponse> TransportAction<Request, Response> transportAction(ActionType<Response> action) {if (actions == null) {throw new IllegalStateException("NodeClient has not been initialized");}// action = "cluster:monitor/state"// actions 即前面注册的一系列行为// 此处将得到 TransportClusterStateActionTransportAction<Request, Response> transportAction = actions.get(action);if (transportAction == null) {throw new IllegalStateException("failed to find action [" + action + "] to execute");}return transportAction;}// org.elasticsearch.action.support.TransportAction#execute/*** Use this method when the transport action call should result in creation of a new task associated with the call.** This is a typical behavior.*/public final Task execute(Request request, ActionListener<Response> listener) {/** While this version of execute could delegate to the TaskListener* version of execute that'd add yet another layer of wrapping on the* listener and prevent us from using the listener bare if there isn't a* task. That just seems like too many objects. Thus the two versions of* this method.*/final Releasable unregisterChildNode = registerChildNode(request.getParentTask());final Task task;try {// 向taskManager 中注册任务task = taskManager.register("transport", actionName, request);} catch (TaskCancelledException e) {unregisterChildNode.close();throw e;}execute(task, request, new ActionListener<Response>() {@Overridepublic void onResponse(Response response) {try {Releasables.close(unregisterChildNode, () -> taskManager.unregister(task));} finally {listener.onResponse(response);}}@Overridepublic void onFailure(Exception e) {try {Releasables.close(unregisterChildNode, () -> taskManager.unregister(task));} finally {listener.onFailure(e);}}});return task;}// org.elasticsearch.tasks.TaskManager#register/*** Registers a task without parent task*/public Task register(String type, String action, TaskAwareRequest request) {Map<String, String> headers = new HashMap<>();long headerSize = 0;long maxSize = maxHeaderSize.getBytes();ThreadContext threadContext = threadPool.getThreadContext();for (String key : taskHeaders) {String httpHeader = threadContext.getHeader(key);if (httpHeader != null) {headerSize += key.length() * 2 + httpHeader.length() * 2;if (headerSize > maxSize) {throw new IllegalArgumentException("Request exceeded the maximum size of task headers " + maxHeaderSize);}headers.put(key, httpHeader);}}Task task = request.createTask(taskIdGenerator.incrementAndGet(), type, action, request.getParentTask(), headers);Objects.requireNonNull(task);assert task.getParentTaskId().equals(request.getParentTask()) : "Request [ " + request + "] didn't preserve it parentTaskId";if (logger.isTraceEnabled()) {logger.trace("register {} [{}] [{}] [{}]", task.getId(), type, action, task.getDescription());}if (task instanceof CancellableTask) {registerCancellableTask(task);} else {Task previousTask = tasks.put(task.getId(), task);assert previousTask == null;}return task;}// org.elasticsearch.action.support.TransportAction#execute/*** Use this method when the transport action should continue to run in the context of the current task*/public final void execute(Task task, Request request, ActionListener<Response> listener) {ActionRequestValidationException validationException = request.validate();if (validationException != null) {listener.onFailure(validationException);return;}if (task != null && request.getShouldStoreResult()) {listener = new TaskResultStoringActionListener<>(taskManager, task, listener);}RequestFilterChain<Request, Response> requestFilterChain = new RequestFilterChain<>(this, logger);requestFilterChain.proceed(task, actionName, request, listener);}// org.elasticsearch.action.support.TransportAction.RequestFilterChain#proceed@Overridepublic void proceed(Task task, String actionName, Request request, ActionListener<Response> listener) {int i = index.getAndIncrement();try {if (i < this.action.filters.length) {// 先执行filter, 再执行 action// filter 实现 ActionFilterthis.action.filters[i].apply(task, actionName, request, listener, this);} else if (i == this.action.filters.length) {this.action.doExecute(task, request, listener);} else {listener.onFailure(new IllegalStateException("proceed was called too many times"));}} catch(Exception e) {logger.trace("Error during transport action execution.", e);listener.onFailure(e);}}// org.elasticsearch.action.support.master.TransportMasterNodeAction#doExecute@Overrideprotected void doExecute(Task task, final Request request, ActionListener<Response> listener) {// 获取stateClusterState state = clusterService.state();logger.trace("starting processing request [{}] with cluster state version [{}]", request, state.version());if (task != null) {request.setParentTask(clusterService.localNode().getId(), task.getId());}// 启动一个新的状态获取请求new AsyncSingleAction(task, request, listener).doStart(state);}// org.elasticsearch.cluster.service.ClusterService#state/*** The currently applied cluster state.* TODO: Should be renamed to appliedState / appliedClusterState*/public ClusterState state() {return clusterApplierService.state();}// org.elasticsearch.cluster.service.ClusterApplierService#state/*** The current cluster state.* Should be renamed to appliedClusterState*/public ClusterState state() {assert assertNotCalledFromClusterStateApplier("the applied cluster state is not yet available");// AtomicReference, 当前的集群状态ClusterState clusterState = this.state.get();assert clusterState != null : "initial cluster state not set yet";return clusterState;}// org.elasticsearch.action.support.master.TransportMasterNodeAction.AsyncSingleAction#doStartprotected void doStart(ClusterState clusterState) {try {final DiscoveryNodes nodes = clusterState.nodes();// 本地是 master 直接处理,否则像远程master发起请求处理if (nodes.isLocalNodeElectedMaster() || localExecute(request)) {// check for block, if blocked, retry, else, execute locallyfinal ClusterBlockException blockException = checkBlock(request, clusterState);if (blockException != null) {if (!blockException.retryable()) {listener.onFailure(blockException);} else {logger.debug("can't execute due to a cluster block, retrying", blockException);retry(clusterState, blockException, newState -> {try {ClusterBlockException newException = checkBlock(request, newState);return (newException == null || !newException.retryable());} catch (Exception e) {// accept state as block will be rechecked by doStart() and listener.onFailure() then calledlogger.trace("exception occurred during cluster block checking, accepting state", e);return true;}});}} else {ActionListener<Response> delegate = ActionListener.delegateResponse(listener, (delegatedListener, t) -> {if (t instanceof FailedToCommitClusterStateException || t instanceof NotMasterException) {logger.debug(() -> new ParameterizedMessage("master could not publish cluster state or " +"stepped down before publishing action [{}], scheduling a retry", actionName), t);retryOnMasterChange(clusterState, t);} else {delegatedListener.onFailure(t);}});threadPool.executor(executor).execute(ActionRunnable.wrap(delegate, l -> masterOperation(task, request, clusterState, l)));}} else {if (nodes.getMasterNode() == null) {logger.debug("no known master node, scheduling a retry");retryOnMasterChange(clusterState, null);} else {DiscoveryNode masterNode = nodes.getMasterNode();final String actionName = getMasterActionName(masterNode);transportService.sendRequest(masterNode, actionName, request,new ActionListenerResponseHandler<Response>(listener, responseReader) {@Overridepublic void handleException(final TransportException exp) {Throwable cause = exp.unwrapCause();if (cause instanceof ConnectTransportException ||(exp instanceof RemoteTransportException && cause instanceof NodeClosedException)) {// we want to retry here a bit to see if a new master is electedlogger.debug("connection exception while trying to forward request with action name [{}] to " +"master node [{}], scheduling a retry. Error: [{}]",actionName, nodes.getMasterNode(), exp.getDetailedMessage());retryOnMasterChange(clusterState, cause);} else {listener.onFailure(exp);}}});}}} catch (Exception e) {listener.onFailure(e);}}
以上,是一个 state() 命令的处理过程,总结起来就是通过具体的handler, 找到可以执行的线程池, 然后执行状态本机查询返回,如果本机不是master节点则会向远程节点发起请求,以得到集群当前状态。从上面的实现来看,state() 只是第一步的操作,后面还有 nodesInfo 请求,nodesStats 请求。
2.2. nodesInfo的处理过程
当state处理完成后,它会再发起一个 nodesInfo 请求,这个看起来更像nodes相关的信息。
// org.elasticsearch.client.support.AbstractClient.ClusterAdmin#nodesInfo@Overridepublic void nodesInfo(final NodesInfoRequest request, final ActionListener<NodesInfoResponse> listener) {execute(NodesInfoAction.INSTANCE, request, listener);}// 最终调用 TransportNodesAction 的doExecute 方法// org.elasticsearch.action.support.nodes.TransportNodesAction#doExecute@Overrideprotected void doExecute(Task task, NodesRequest request, ActionListener<NodesResponse> listener) {new AsyncAction(task, request, listener).start();}// org.elasticsearch.action.support.nodes.TransportNodesAction.AsyncAction#startvoid start() {final DiscoveryNode[] nodes = request.concreteNodes();if (nodes.length == 0) {// nothing to notifythreadPool.generic().execute(() -> listener.onResponse(newResponse(request, responses)));return;}final TransportRequestOptions transportRequestOptions = TransportRequestOptions.timeout(request.timeout());// 依次向各节点发送状态请求,最终收集结果for (int i = 0; i < nodes.length; i++) {final int idx = i;final DiscoveryNode node = nodes[i];final String nodeId = node.getId();try {TransportRequest nodeRequest = newNodeRequest(request);if (task != null) {nodeRequest.setParentTask(clusterService.localNode().getId(), task.getId());}// action = inner:monitor/node/info[n]transportService.sendRequest(node, getTransportNodeAction(node), nodeRequest, transportRequestOptions,new TransportResponseHandler<NodeResponse>() {@Overridepublic NodeResponse read(StreamInput in) throws IOException {return newNodeResponse(in);}// 远端响应之后会进行回调该方法@Overridepublic void handleResponse(NodeResponse response) {onOperation(idx, response);}@Overridepublic void handleException(TransportException exp) {onFailure(idx, node.getId(), exp);}});} catch (Exception e) {onFailure(idx, nodeId, e);}}}
完成后,会得到一个 NodesInfoResponse 信息。备用。
2.3. nodesStats的处理流程
一个集群有一个集群状态,n个集群节点,n个节点状态。就是这样了,即每获取到一个节点的信息,就会去拉取一次节点的状态 nodesStats . 事实上,它们执行是同一方法逻辑。
// org.elasticsearch.action.support.nodes.TransportNodesAction#doExecute@Overrideprotected void doExecute(Task task, NodesRequest request, ActionListener<NodesResponse> listener) {new AsyncAction(task, request, listener).start();}void start() {final DiscoveryNode[] nodes = request.concreteNodes();if (nodes.length == 0) {// nothing to notifythreadPool.generic().execute(() -> listener.onResponse(newResponse(request, responses)));return;}final TransportRequestOptions transportRequestOptions = TransportRequestOptions.timeout(request.timeout());for (int i = 0; i < nodes.length; i++) {final int idx = i;final DiscoveryNode node = nodes[i];final String nodeId = node.getId();try {TransportRequest nodeRequest = newNodeRequest(request);if (task != null) {nodeRequest.setParentTask(clusterService.localNode().getId(), task.getId());}// action = cluster:monitor/nodes/stats[n]transportService.sendRequest(node, getTransportNodeAction(node), nodeRequest, transportRequestOptions,new TransportResponseHandler<NodeResponse>() {@Overridepublic NodeResponse read(StreamInput in) throws IOException {return newNodeResponse(in);}@Overridepublic void handleResponse(NodeResponse response) {onOperation(idx, response);}@Overridepublic void handleException(TransportException exp) {onFailure(idx, node.getId(), exp);}});} catch (Exception e) {onFailure(idx, nodeId, e);}}}// org.elasticsearch.action.admin.cluster.node.stats.TransportNodesStatsAction#nodeOperation@Overrideprotected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) {NodesStatsRequest request = nodeStatsRequest.request;Set<String> metrics = request.requestedMetrics();return nodeService.stats(request.indices(),NodesStatsRequest.Metric.OS.containedIn(metrics),NodesStatsRequest.Metric.PROCESS.containedIn(metrics),NodesStatsRequest.Metric.JVM.containedIn(metrics),NodesStatsRequest.Metric.THREAD_POOL.containedIn(metrics),NodesStatsRequest.Metric.FS.containedIn(metrics),NodesStatsRequest.Metric.TRANSPORT.containedIn(metrics),NodesStatsRequest.Metric.HTTP.containedIn(metrics),NodesStatsRequest.Metric.BREAKER.containedIn(metrics),NodesStatsRequest.Metric.SCRIPT.containedIn(metrics),NodesStatsRequest.Metric.DISCOVERY.containedIn(metrics),NodesStatsRequest.Metric.INGEST.containedIn(metrics),NodesStatsRequest.Metric.ADAPTIVE_SELECTION.containedIn(metrics),NodesStatsRequest.Metric.SCRIPT_CACHE.containedIn(metrics),NodesStatsRequest.Metric.INDEXING_PRESSURE.containedIn(metrics));}// org.elasticsearch.transport.RequestHandlerRegistry#processMessageReceivedpublic void processMessageReceived(Request request, TransportChannel channel) throws Exception {final Task task = taskManager.register(channel.getChannelType(), action, request);Releasable unregisterTask = () -> taskManager.unregister(task);try {if (channel instanceof TcpTransportChannel && task instanceof CancellableTask) {final TcpChannel tcpChannel = ((TcpTransportChannel) channel).getChannel();final Releasable stopTracking = taskManager.startTrackingCancellableChannelTask(tcpChannel, (CancellableTask) task);unregisterTask = Releasables.wrap(unregisterTask, stopTracking);}final TaskTransportChannel taskTransportChannel = new TaskTransportChannel(channel, unregisterTask);handler.messageReceived(request, taskTransportChannel, task);unregisterTask = null;} finally {Releasables.close(unregisterTask);}}// org.elasticsearch.action.support.nodes.TransportNodesAction.NodeTransportHandlerclass NodeTransportHandler implements TransportRequestHandler<NodeRequest> {@Overridepublic void messageReceived(NodeRequest request, TransportChannel channel, Task task) throws Exception {// 响应客户端channel.sendResponse(nodeOperation(request, task));}}// org.elasticsearch.rest.action.RestResponseListener#processResponse@Overrideprotected final void processResponse(Response response) throws Exception {// 响应客户端channel.sendResponse(buildResponse(response));}
到此,整个es的处理流程框架就讲完了。且我们以 /_cat/nodes 为例,讲解了es如何获取集群节点信息的。值得重提的是,es的状态操作,有时会向集群各节点发起请求,有时则不会。比如获取节点信息,并没有向集群中所有节点发送请求,而只是向其中一个master请求其本地当前状态即可。
3. 客户端响应
其实客户端响应,在前面的处理中,我们经常有看到。只不过没有完整的拿出来讨论。比如:
// 发生异常则响应异常信息channel.sendResponse(new BytesRestResponse(channel, e));// 直接发送错误信息channel.sendResponse(BytesRestResponse.createSimpleErrorResponse(channel, BAD_REQUEST, "error traces in responses are disabled."));/*** Handle HTTP OPTIONS requests to a valid REST endpoint. A 200 HTTP* response code is returned, and the response 'Allow' header includes a* list of valid HTTP methods for the endpoint (see* <a href="https://tools.ietf.org/html/rfc2616#section-9.2">HTTP/1.1 - 9.2* - Options</a>).*/private void handleOptionsRequest(RestChannel channel, Set<RestRequest.Method> validMethodSet) {BytesRestResponse bytesRestResponse = new BytesRestResponse(OK, TEXT_CONTENT_TYPE, BytesArray.EMPTY);// When we have an OPTIONS HTTP request and no valid handlers, simply send OK by default (with the Access Control Origin header// which gets automatically added).if (validMethodSet.isEmpty() == false) {bytesRestResponse.addHeader("Allow", Strings.collectionToDelimitedString(validMethodSet, ","));}// 直接响应头信息channel.sendResponse(bytesRestResponse);}// org.elasticsearch.transport.TaskTransportChannel#sendResponse@Overridepublic void sendResponse(TransportResponse response) throws IOException {try {onTaskFinished.close();} finally {channel.sendResponse(response);}}
更多写客户端逻辑可展开阅读,总结一句就是最终的netty的channel.wrtie();
// org.elasticsearch.transport.TransportService.DirectResponseChannel#sendResponse@Overridepublic void sendResponse(TransportResponse response) throws IOException {service.onResponseSent(requestId, action, response);final TransportResponseHandler handler = service.responseHandlers.onResponseReceived(requestId, service);// ignore if its null, the service logs itif (handler != null) {final String executor = handler.executor();if (ThreadPool.Names.SAME.equals(executor)) {processResponse(handler, response);} else {threadPool.executor(executor).execute(new Runnable() {@Overridepublic void run() {processResponse(handler, response);}@Overridepublic String toString() {return "delivery of response to [" + requestId + "][" + action + "]: " + response;}});}}}// org.elasticsearch.transport.Transport.ResponseHandlers#onResponseReceived/*** called by the {@link Transport} implementation when a response or an exception has been received for a previously* sent request (before any processing or deserialization was done). Returns the appropriate response handler or null if not* found.*/public TransportResponseHandler<? extends TransportResponse> onResponseReceived(final long requestId,final TransportMessageListener listener) {ResponseContext<? extends TransportResponse> context = handlers.remove(requestId);listener.onResponseReceived(requestId, context);if (context == null) {return null;} else {return context.handler();}}// org.elasticsearch.transport.TransportService.DirectResponseChannel#processResponse@SuppressWarnings("unchecked")protected void processResponse(TransportResponseHandler handler, TransportResponse response) {try {handler.handleResponse(response);} catch (Exception e) {processException(handler, wrapInRemote(new ResponseHandlerFailureTransportException(e)));}}// org.elasticsearch.transport.TransportService.ContextRestoreResponseHandler#handleResponse@Overridepublic void handleResponse(T response) {if(handler != null) {handler.cancel();}try (ThreadContext.StoredContext ignore = contextSupplier.get()) {delegate.handleResponse(response);}}

腾讯、阿里、滴滴后台面试题汇总总结 — (含答案)
面试:史上最全多线程面试题 !
最新阿里内推Java后端面试题
JVM难学?那是因为你没认真看完这篇文章

关注作者微信公众号 —《JAVA烂猪皮》
了解更多java后端架构知识以及最新面试宝典


看完本文记得给作者点赞+在看哦~~~大家的支持,是作者源源不断出文的动力
作者:等你归去来
出处:https://www.cnblogs.com/yougewe/p/14732025.html
