ES系列(一):编译准备与server启动过程解析
ES作为强大的和流行的搜索引擎服务组件,为我们提供了方便的和高性能的搜索服务。在实际应用中也是用得比较爽,但如果能够理解更深入一点,那就更好了。虽然网上有许多的文章已经完整说明,ES是如何如何做到高性能,如何做到高可用的,以及有许多的避坑指南。但那些,毕竟还是太描述化。
就让我们以源码作为出发点,一探ES究竟吧,虽然也可能是片面的。
1:ES编译及准备
害,其实我们不想搞编译。一个是意义不大;二个是ES是用java编写的,打包后本质上它就是一个war包或者jar包;三个是编译需要拉取外部的许多jar包依赖,而这些依赖又是在国外网站速度又是超级慢。
简单的,直接在es官网下载个安装包就可以了。https://www.elastic.co/cn/downloads/elasticsearch 。这是实际应用的通常路径,但不是我们学习的路径。
如果要自己编译,也可以,直接下载源码包: https://github.com/elastic/elasticsearch.git ; 下载gradle: https://gradle.org/releases/ ; 安装jdk11+...
安装方法, 直接切到elasticsearch 源码目录运行: gradlew idea ; 即可。这比起 ./configure; make; make install; 是简单不少的。java自行解决几乎所有的依赖,而非自行解决!
当然,中途你肯定会遇到许多问题,一般主要就是网络问题。主要就是里面依赖了许多国外网站的资源,可以进行修改:
# 搜索所有 repositories { , 将其网址替换为 aliyun 的地址,如下:repositories {// jcenter()maven { url 'https://maven.aliyun.com/repository/gradle-plugin' }maven { url 'https://maven.aliyun.com/repository/google' }maven { url 'https://maven.aliyun.com/nexus/content/groups/public/' }maven { url 'https://maven.aliyun.com/repository/jcenter'}}
重新运行 gradle idea, 即可。如果再失败,就换个快的网络,重试,直到成功。
我们也可以直接将源码导致idea中,直接用ide进行编译即可。同样,需要替换相应的依赖地址。导入完成后,就可以看到整个gradle的目录了。如下:

2. ES server的启动流程
当环境准备好了,我们就可以顺利进入正题了。理论上,一个应用的启动流程都不会很复杂,我们就大致瞅瞅吧。
2.1. Elasticsearch入口类的作用
因为入口类为 Elasticsearch, 所以通过入口类,就可以知道它是如何开始,它是否承担了所有的工作,以及将下文都交给了谁。必然需要直接定位到入口:Elasticsearch#main
// org.elasticsearch.bootstrap.Elasticsearch#main/*** Main entry point for starting elasticsearch*/public static void main(final String[] args) throws Exception {overrideDnsCachePolicyProperties();/** We want the JVM to think there is a security manager installed so that if internal policy decisions that would be based on the* presence of a security manager or lack thereof act as if there is a security manager present (e.g., DNS cache policy). This* forces such policies to take effect immediately.*/System.setSecurityManager(new SecurityManager() {@Overridepublic void checkPermission(Permission perm) {// grant all permissions so that we can later set the security manager to the one that we want}});LogConfigurator.registerErrorListener();final Elasticsearch elasticsearch = new Elasticsearch();// 看起来是转移到另一个 main() 方法了int status = main(args, elasticsearch, Terminal.DEFAULT);// 如果执行未返回OK, 则说明发生了异常,直接结束JVM。否则 es 进程将被继续后台执行if (status != ExitCodes.OK) {final String basePath = System.getProperty("es.logs.base_path");// It's possible to fail before logging has been configured, in which case there's no point// suggesting that the user look in the log file.if (basePath != null) {Terminal.DEFAULT.errorPrintln("ERROR: Elasticsearch did not exit normally - check the logs at "+ basePath+ System.getProperty("file.separator")+ System.getProperty("es.logs.cluster_name") + ".log");}exit(status);}}static int main(final String[] args, final Elasticsearch elasticsearch, final Terminal terminal) throws Exception {return elasticsearch.main(args, terminal);}
入口比较简单,但好像啥也看不出来。但大致就是实例化一个 Elasticsearch, 然后调用其main() 方法。这样做有什么好处呢?这样就可以用 Elasticsearch 中定义的变量了,而不只是调用其静态方法和变量了。
我们先来看一下 Elasticsearch 的类继承图:Elasticsearch extends EnvironmentAwareCommand extends Command implements Closeable
Elasticsearch 的类构造里面也做了一些事情,实际上是增加几个变量的解析规则:
// visible for testingElasticsearch() {// beforeMain 为空方法super("Starts Elasticsearch", () -> {}); // we configure logging later so we override the base class from configuring loggingversionOption = parser.acceptsAll(Arrays.asList("V", "version"),"Prints Elasticsearch version information and exits");daemonizeOption = parser.acceptsAll(Arrays.asList("d", "daemonize"),"Starts Elasticsearch in the background").availableUnless(versionOption);pidfileOption = parser.acceptsAll(Arrays.asList("p", "pidfile"),"Creates a pid file in the specified path on start").availableUnless(versionOption).withRequiredArg().withValuesConvertedBy(new PathConverter());quietOption = parser.acceptsAll(Arrays.asList("q", "quiet"),"Turns off standard output/error streams logging in console").availableUnless(versionOption).availableUnless(daemonizeOption);}
而 elasticsearch.main() 则是调用的Command中定义的通用方法,主要目的在于使用一般的命令执行模板方法。整个 Elasticsearch 类可以看作是启动的门面类,它会很多的准备和验证工作。比如创建配置上下文,验证命令参数等等。所以通过它的执行,我们理解到,大体上需要关注什么参数,以及可能用户会遇到的报错情况。
// org.elasticsearch.cli.Command#main/** Parses options for this command from args and executes it. */public final int main(String[] args, Terminal terminal) throws Exception {if (addShutdownHook()) {// 添加关闭钩子,做一些资源的关闭,避免数据损坏或丢失,但实际上此处为空执行shutdownHookThread = new Thread(() -> {try {this.close();} catch (final IOException e) {try (StringWriter sw = new StringWriter();PrintWriter pw = new PrintWriter(sw)) {e.printStackTrace(pw);terminal.errorPrintln(sw.toString());} catch (final IOException impossible) {// StringWriter#close declares a checked IOException from the Closeable interface but the Javadocs for StringWriter// say that an exception here is impossiblethrow new AssertionError(impossible);}}});Runtime.getRuntime().addShutdownHook(shutdownHookThread);}// 此处将被执行空转beforeMain.run();try {// 同样是 Command 的私有实现mainWithoutErrorHandling(args, terminal);} catch (OptionException e) {// print help to stderr on exceptionsprintHelp(terminal, true);terminal.errorPrintln(Terminal.Verbosity.SILENT, "ERROR: " + e.getMessage());return ExitCodes.USAGE;} catch (UserException e) {if (e.exitCode == ExitCodes.USAGE) {printHelp(terminal, true);}if (e.getMessage() != null) {terminal.errorPrintln(Terminal.Verbosity.SILENT, "ERROR: " + e.getMessage());}// 异常返回return e.exitCode;}// 正常情况下都返回 OKreturn ExitCodes.OK;}/*** Executes the command, but all errors are thrown.*/void mainWithoutErrorHandling(String[] args, Terminal terminal) throws Exception {// 命令行参数解析final OptionSet options = parser.parse(args);// -h 打印帮助文档if (options.has(helpOption)) {printHelp(terminal, false);return;}if (options.has(silentOption)) {terminal.setVerbosity(Terminal.Verbosity.SILENT);} else if (options.has(verboseOption)) {terminal.setVerbosity(Terminal.Verbosity.VERBOSE);} else {terminal.setVerbosity(Terminal.Verbosity.NORMAL);}// 回调回 Elasticsearch, 先到 EnvironmentAwareCommandexecute(terminal, options);}// org.elasticsearch.cli.EnvironmentAwareCommand#execute(org.elasticsearch.cli.Terminal, joptsimple.OptionSet)@Overrideprotected void execute(Terminal terminal, OptionSet options) throws Exception {final Map<String, String> settings = new HashMap<>();for (final KeyValuePair kvp : settingOption.values(options)) {// 不能存在空值参数if (kvp.value.isEmpty()) {throw new UserException(ExitCodes.USAGE, "setting [" + kvp.key + "] must not be empty");}// 不能存在重复的参数if (settings.containsKey(kvp.key)) {final String message = String.format(Locale.ROOT,"setting [%s] already set, saw [%s] and [%s]",kvp.key,settings.get(kvp.key),kvp.value);throw new UserException(ExitCodes.USAGE, message);}settings.put(kvp.key, kvp.value);}putSystemPropertyIfSettingIsMissing(settings, "path.data", "es.path.data");putSystemPropertyIfSettingIsMissing(settings, "path.home", "es.path.home");putSystemPropertyIfSettingIsMissing(settings, "path.logs", "es.path.logs");// 回调 Elasticsearch 的 execute 实现execute(terminal, options, createEnv(settings));}// org.elasticsearch.bootstrap.Elasticsearch#execute@Overrideprotected void execute(Terminal terminal, OptionSet options, Environment env) throws UserException {if (options.nonOptionArguments().isEmpty() == false) {throw new UserException(ExitCodes.USAGE, "Positional arguments not allowed, found " + options.nonOptionArguments());}// 版本打印if (options.has(versionOption)) {final String versionOutput = String.format(Locale.ROOT,"Version: %s, Build: %s/%s/%s/%s, JVM: %s",Build.CURRENT.getQualifiedVersion(),Build.CURRENT.flavor().displayName(),Build.CURRENT.type().displayName(),Build.CURRENT.hash(),Build.CURRENT.date(),JvmInfo.jvmInfo().version());terminal.println(versionOutput);return;}final boolean daemonize = options.has(daemonizeOption);final Path pidFile = pidfileOption.value(options);final boolean quiet = options.has(quietOption);// a misconfigured java.io.tmpdir can cause hard-to-diagnose problems later, so reject it immediatelytry {env.validateTmpFile();} catch (IOException e) {throw new UserException(ExitCodes.CONFIG, e.getMessage());}try {// 初始化init(daemonize, pidFile, quiet, env);} catch (NodeValidationException e) {throw new UserException(ExitCodes.CONFIG, e.getMessage());}}// Elasticsearch.initvoid init(final boolean daemonize, final Path pidFile, final boolean quiet, Environment initialEnv)throws NodeValidationException, UserException {try {Bootstrap.init(!daemonize, pidFile, quiet, initialEnv);} catch (BootstrapException | RuntimeException e) {// format exceptions to the console in a special way// to avoid 2MB stacktraces from guice, etc.throw new StartupException(e);}}
以上,就是 Elasticsearch 类的使命了。两个重点:1. 创建配置环境上下文;2. 验证传入命令参数的合法性;3. 提交启动命令给到 Bootstrap 类。
2.2. Bootstrap启动流程解析
从表面意思来看,Bootstrap 更像是启动工作的实力担当。接过 Elasticsearch 类的配置上下文信息,Bootstrap 又如何展开进一步的工作呢?我们一起来看下。
// 它是以静态方法 init() 作为切入点// org.elasticsearch.bootstrap.Bootstrap#init/*** This method is invoked by {@link Elasticsearch#main(String[])} to startup elasticsearch.*/static void init(final boolean foreground,final Path pidFile,final boolean quiet,final Environment initialEnv) throws BootstrapException, NodeValidationException, UserException {// force the class initializer for BootstrapInfo to run before// the security manager is installedBootstrapInfo.init();INSTANCE = new Bootstrap();final SecureSettings keystore = loadSecureSettings(initialEnv);final Environment environment = createEnvironment(pidFile, keystore, initialEnv.settings(), initialEnv.configFile());// the LogConfigurator will replace System.out and System.err with redirects to our logfile, so we need to capture// the stream objects before calling LogConfigurator to be able to close them when appropriatefinal Runnable sysOutCloser = getSysOutCloser();final Runnable sysErrorCloser = getSysErrorCloser();LogConfigurator.setNodeName(Node.NODE_NAME_SETTING.get(environment.settings()));try {LogConfigurator.configure(environment);} catch (IOException e) {throw new BootstrapException(e);}if (environment.pidFile() != null) {try {PidFile.create(environment.pidFile(), true);} catch (IOException e) {throw new BootstrapException(e);}}try {final boolean closeStandardStreams = (foreground == false) || quiet;if (closeStandardStreams) {final Logger rootLogger = LogManager.getRootLogger();final Appender maybeConsoleAppender = Loggers.findAppender(rootLogger, ConsoleAppender.class);if (maybeConsoleAppender != null) {Loggers.removeAppender(rootLogger, maybeConsoleAppender);}sysOutCloser.run();}// fail if somebody replaced the lucene jarscheckLucene();// install the default uncaught exception handler; must be done before security is// initialized as we do not want to grant the runtime permission// setDefaultUncaughtExceptionHandlerThread.setDefaultUncaughtExceptionHandler(new ElasticsearchUncaughtExceptionHandler());// 检查环境INSTANCE.setup(true, environment);try {// any secure settings must be read during node constructionIOUtils.close(keystore);} catch (IOException e) {throw new BootstrapException(e);}// 启动服务INSTANCE.start();// We don't close stderr if `--quiet` is passed, because that// hides fatal startup errors. For example, if Elasticsearch is// running via systemd, the init script only specifies// `--quiet`, not `-d`, so we want users to be able to see// startup errors via journalctl.if (foreground == false) {sysErrorCloser.run();}} catch (NodeValidationException | RuntimeException e) {// disable console logging, so user does not see the exception twice (jvm will show it already)final Logger rootLogger = LogManager.getRootLogger();final Appender maybeConsoleAppender = Loggers.findAppender(rootLogger, ConsoleAppender.class);if (foreground && maybeConsoleAppender != null) {Loggers.removeAppender(rootLogger, maybeConsoleAppender);}Logger logger = LogManager.getLogger(Bootstrap.class);// HACK, it sucks to do this, but we will run users out of disk space otherwiseif (e instanceof CreationException) {// guice: log the shortened exc to the log fileByteArrayOutputStream os = new ByteArrayOutputStream();PrintStream ps = null;try {ps = new PrintStream(os, false, "UTF-8");} catch (UnsupportedEncodingException uee) {assert false;e.addSuppressed(uee);}new StartupException(e).printStackTrace(ps);ps.flush();try {logger.error("Guice Exception: {}", os.toString("UTF-8"));} catch (UnsupportedEncodingException uee) {assert false;e.addSuppressed(uee);}} else if (e instanceof NodeValidationException) {logger.error("node validation exception\n{}", e.getMessage());} else {// full exceptionlogger.error("Exception", e);}// re-enable it if appropriate, so they can see any logging during the shutdown processif (foreground && maybeConsoleAppender != null) {Loggers.addAppender(rootLogger, maybeConsoleAppender);}throw e;}}
以上就是BootStrap的启动框架了。大体分为几步:
1. 实例化BootStrap类到INSTANCE中;
2. 读取密码等安全信息;
3. 重新创建自己的环境上下文,主要是为加入更多配置如密码信息;
4. 加载日志实例;
5. 创建pid;
6. 检查lucene版本信息避免jar包被替换导致的异常;
7. Bootstrap进行准备工作;
8. Bootstrap进行启动工作;
9. 启动完成;
可见,整个框架还是很清晰的,但是又有一种意犹未尽的感觉。那是自然,因为框架只会有大概思路,并不会给你打通任督发二脉。除去一些检查性的工作,其中的核心是的准备工作和启动工作。下面细细分解下。
2.3. Bootstrap准备工作详解
上节说的两个重点之一:Bootstrap准备,都需要准备啥呢?
// org.elasticsearch.bootstrap.Bootstrap#setupprivate void setup(boolean addShutdownHook, Environment environment) throws BootstrapException {Settings settings = environment.settings();try {// 加载外部模块,独立进程contollerspawner.spawnNativeControllers(environment, true);} catch (IOException e) {throw new BootstrapException(e);}// 初始化本地一些资源信息initializeNatives(environment.tmpFile(),BootstrapSettings.MEMORY_LOCK_SETTING.get(settings),BootstrapSettings.SYSTEM_CALL_FILTER_SETTING.get(settings),BootstrapSettings.CTRLHANDLER_SETTING.get(settings));// initialize probes before the security manager is installed// 初始化各需要的探针, 保证实例加载可用/**static void initializeProbes() {// Force probes to be loadedProcessProbe.getInstance();OsProbe.getInstance();JvmInfo.jvmInfo();}*/initializeProbes();// 关闭钩子,保证node和外部运行的contoller得到正常关闭if (addShutdownHook) {Runtime.getRuntime().addShutdownHook(new Thread() {@Overridepublic void run() {try {IOUtils.close(node, spawner);LoggerContext context = (LoggerContext) LogManager.getContext(false);Configurator.shutdown(context);if (node != null && node.awaitClose(10, TimeUnit.SECONDS) == false) {throw new IllegalStateException("Node didn't stop within 10 seconds. " +"Any outstanding requests or tasks might get killed.");}} catch (IOException ex) {throw new ElasticsearchException("failed to stop node", ex);} catch (InterruptedException e) {LogManager.getLogger(Bootstrap.class).warn("Thread got interrupted while waiting for the node to shutdown.");Thread.currentThread().interrupt();}}});}try {// look for jar hell// 检查重复类final Logger logger = LogManager.getLogger(JarHell.class);JarHell.checkJarHell(logger::debug);} catch (IOException | URISyntaxException e) {throw new BootstrapException(e);}// Log ifconfig output before SecurityManager is installed// 打印 ifconfig 输出信息IfConfig.logIfNecessary();// install SM after natives, shutdown hooks, etc.try {// 配置 SecurityManagerSecurity.configure(environment, BootstrapSettings.SECURITY_FILTER_BAD_DEFAULTS_SETTING.get(settings));} catch (IOException | NoSuchAlgorithmException e) {throw new BootstrapException(e);}// 终于实例化节点了node = new Node(environment) {@Overrideprotected void validateNodeBeforeAcceptingRequests(final BootstrapContext context,final BoundTransportAddress boundTransportAddress, List<BootstrapCheck> checks) throws NodeValidationException {// 将检查节点功能委托给 Bootstrap 处理BootstrapChecks.check(context, boundTransportAddress, checks);}};}// org.elasticsearch.bootstrap.Spawner#spawnNativeControllers/*** Spawns the native controllers for each module.** @param environment The node environment* @param inheritIo Should the stdout and stderr of the spawned process inherit the* stdout and stderr of the JVM spawning it?* @throws IOException if an I/O error occurs reading the module or spawning a native process*/void spawnNativeControllers(final Environment environment, final boolean inheritIo) throws IOException {if (spawned.compareAndSet(false, true) == false) {throw new IllegalStateException("native controllers already spawned");}if (Files.exists(environment.modulesFile()) == false) {throw new IllegalStateException("modules directory [" + environment.modulesFile() + "] not found");}/** For each module, attempt to spawn the controller daemon. Silently ignore any module that doesn't include a controller for the* correct platform.*/// plugin 目录列举List<Path> paths = PluginsService.findPluginDirs(environment.modulesFile());for (final Path modules : paths) {// 读取 plugin-descriptor.properties 信息final PluginInfo info = PluginInfo.readFromProperties(modules);final Path spawnPath = Platforms.nativeControllerPath(modules);if (Files.isRegularFile(spawnPath) == false) {continue;}if (info.hasNativeController() == false) {final String message = String.format(Locale.ROOT,"module [%s] does not have permission to fork native controller",modules.getFileName());throw new IllegalArgumentException(message);}// 启动插件的 controller 进程final Process process = spawnNativeController(spawnPath, environment.tmpFile(), inheritIo);processes.add(process);}}// org.elasticsearch.bootstrap.Spawner#spawnNativeController/*** Attempt to spawn the controller daemon for a given module. The spawned process will remain connected to this JVM via its stdin,* stdout, and stderr streams, but the references to these streams are not available to code outside this package.*/private Process spawnNativeController(final Path spawnPath, final Path tmpPath, final boolean inheritIo) throws IOException {final String command;if (Constants.WINDOWS) {/** We have to get the short path name or starting the process could fail due to max path limitations. The underlying issue here* is that starting the process on Windows ultimately involves the use of CreateProcessW. CreateProcessW has a limitation that* if its first argument (the application name) is null, then its second argument (the command line for the process to start) is* restricted in length to 260 characters (cf. https://msdn.microsoft.com/en-us/library/windows/desktop/ms682425.aspx). Since* this is exactly how the JDK starts the process on Windows (cf.* http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/687fd7c7986d/src/windows/native/java/lang/ProcessImpl_md.c#l319), this* limitation is in force. As such, we use the short name to avoid any such problems.*/command = Natives.getShortPathName(spawnPath.toString());} else {command = spawnPath.toString();}final ProcessBuilder pb = new ProcessBuilder(command);// the only environment variable passes on the path to the temporary directorypb.environment().clear();pb.environment().put("TMPDIR", tmpPath.toString());// The process _shouldn't_ write any output via its stdout or stderr, but if it does then// it will block if nothing is reading that output. To avoid this we can inherit the// JVM's stdout and stderr (which are redirected to files in standard installations).if (inheritIo) {pb.redirectOutput(ProcessBuilder.Redirect.INHERIT);pb.redirectError(ProcessBuilder.Redirect.INHERIT);}// the output stream of the process object corresponds to the daemon's stdinreturn pb.start();}
读取配置文件细节,感兴趣的同学可以深入查看下,主要是具体解析哪些变量的问题。
// 1. 解析 plugin-descriptor.properties 文件// org.elasticsearch.plugins.PluginInfo#readFromProperties/*** Reads the plugin descriptor file.** @param path the path to the root directory for the plugin* @return the plugin info* @throws IOException if an I/O exception occurred reading the plugin descriptor*/public static PluginInfo readFromProperties(final Path path) throws IOException {final Path descriptor = path.resolve(ES_PLUGIN_PROPERTIES);final Map<String, String> propsMap;{final Properties props = new Properties();try (InputStream stream = Files.newInputStream(descriptor)) {props.load(stream);}propsMap = props.stringPropertyNames().stream().collect(Collectors.toMap(Function.identity(), props::getProperty));}final String name = propsMap.remove("name");if (name == null || name.isEmpty()) {throw new IllegalArgumentException("property [name] is missing in [" + descriptor + "]");}final String description = propsMap.remove("description");if (description == null) {throw new IllegalArgumentException("property [description] is missing for plugin [" + name + "]");}final String version = propsMap.remove("version");if (version == null) {throw new IllegalArgumentException("property [version] is missing for plugin [" + name + "]");}final String esVersionString = propsMap.remove("elasticsearch.version");if (esVersionString == null) {throw new IllegalArgumentException("property [elasticsearch.version] is missing for plugin [" + name + "]");}final Version esVersion = Version.fromString(esVersionString);final String javaVersionString = propsMap.remove("java.version");if (javaVersionString == null) {throw new IllegalArgumentException("property [java.version] is missing for plugin [" + name + "]");}JarHell.checkVersionFormat(javaVersionString);final String extendedString = propsMap.remove("extended.plugins");final List<String> extendedPlugins;if (extendedString == null) {extendedPlugins = Collections.emptyList();} else {extendedPlugins = Arrays.asList(Strings.delimitedListToStringArray(extendedString, ","));}final boolean hasNativeController = parseBooleanValue(name, "has.native.controller", propsMap.remove("has.native.controller"));final PluginType type = getPluginType(name, propsMap.remove("type"));final String classname = getClassname(name, type, propsMap.remove("classname"));final String javaOpts = propsMap.remove("java.opts");if (type != PluginType.BOOTSTRAP && Strings.isNullOrEmpty(javaOpts) == false) {throw new IllegalArgumentException("[java.opts] can only have a value when [type] is set to [bootstrap] for plugin [" + name + "]");}boolean isLicensed = parseBooleanValue(name, "licensed", propsMap.remove("licensed"));if (propsMap.isEmpty() == false) {throw new IllegalArgumentException("Unknown properties for plugin [" + name + "] in plugin descriptor: " + propsMap.keySet());}return new PluginInfo(name, description, version, esVersion, javaVersionString,classname, extendedPlugins, hasNativeController, type, javaOpts, isLicensed);}// 2. 读取controller路径信息/*** The path to the native controller for a plugin with native components.*/public static Path nativeControllerPath(Path plugin) {if (Constants.MAC_OS_X) {return plugin.resolve("platform").resolve(PLATFORM_NAME).resolve(PROGRAM_NAME + ".app").resolve("Contents").resolve("MacOS").resolve(PROGRAM_NAME);}// 根据系统平台加载不同文件,如windows为 controller.exe, 其他为 contollerreturn plugin.resolve("platform").resolve(PLATFORM_NAME).resolve("bin").resolve(PROGRAM_NAME);}
接下来是本地资源的初始化过程,如禁止root运行,检测系统支持功能接口情况等等:
// org.elasticsearch.bootstrap.Bootstrap#initializeNatives/** initialize native resources */public static void initializeNatives(Path tmpFile, boolean mlockAll, boolean systemCallFilter, boolean ctrlHandler) {final Logger logger = LogManager.getLogger(Bootstrap.class);// check if the user is running as root, and bail// 检查是否是用root运行,windows忽略,linux上通过native方法 JNACLibrary.geteuid() == 0 来判定if (Natives.definitelyRunningAsRoot()) {throw new RuntimeException("can not run elasticsearch as root");}// enable system call filter// 如启动 linux 的 linuxImpl(),if (systemCallFilter) {Natives.tryInstallSystemCallFilter(tmpFile);}// mlockall if requestedif (mlockAll) {if (Constants.WINDOWS) {Natives.tryVirtualLock();} else {Natives.tryMlockall();}}// listener for windows close eventif (ctrlHandler) {Natives.addConsoleCtrlHandler(new ConsoleCtrlHandler() {@Overridepublic boolean handle(int code) {if (CTRL_CLOSE_EVENT == code) {logger.info("running graceful exit on windows");try {Bootstrap.stop();} catch (IOException e) {throw new ElasticsearchException("failed to stop node", e);}return true;}return false;}});}// force remainder of JNA to be loaded (if available).try {/***private static final class Holder {private static final JNAKernel32Library instance = new JNAKernel32Library();}*/JNAKernel32Library.getInstance();} catch (Exception ignored) {// we've already logged this.}Natives.trySetMaxNumberOfThreads();Natives.trySetMaxSizeVirtualMemory();Natives.trySetMaxFileSize();// init lucene random seed. it will use /dev/urandom where available:StringHelper.randomId();}
整个过程如其方法名所示,初始化native的一系列支持性资源,实际上就是测试该运行平台上的各设备,是否可用,以预热处理。
以下是linux平台测试filter/mlockAll过程细节速览,详细可展开。(需JNA支持)
// org.elasticsearch.bootstrap.SystemCallFilter#init/*** Attempt to drop the capability to execute for the process.* <p>* This is best effort and OS and architecture dependent. It may throw any Throwable.* @return 0 if we can do this for application threads, 1 for the entire process*/static int init(Path tmpFile) throws Exception {if (Constants.LINUX) {return linuxImpl();} else if (Constants.MAC_OS_X) {// try to enable both mechanisms if possiblebsdImpl();macImpl(tmpFile);return 1;} else if (Constants.SUN_OS) {solarisImpl();return 1;} else if (Constants.FREE_BSD || OPENBSD) {bsdImpl();return 1;} else if (Constants.WINDOWS) {windowsImpl();return 1;} else {throw new UnsupportedOperationException("syscall filtering not supported for OS: '" + Constants.OS_NAME + "'");}}// org.elasticsearch.bootstrap.SystemCallFilter#linuxImpl/** try to install our BPF filters via seccomp() or prctl() to block execution */private static int linuxImpl() {// first be defensive: we can give nice errors this way, at the very least.// also, some of these security features get backported to old versions, checking kernel version here is a big no-no!final Arch arch = ARCHITECTURES.get(Constants.OS_ARCH);boolean supported = Constants.LINUX && arch != null;if (supported == false) {throw new UnsupportedOperationException("seccomp unavailable: '" + Constants.OS_ARCH + "' architecture unsupported");}// we couldn't link methods, could be some really ancient kernel (e.g. < 2.1.57) or some bugif (linux_libc == null) {throw new UnsupportedOperationException("seccomp unavailable: could not link methods. requires kernel 3.5+ " +"with CONFIG_SECCOMP and CONFIG_SECCOMP_FILTER compiled in");}// try to check system calls really are who they claim// you never know (e.g. https://chromium.googlesource.com/chromium/src.git/+/master/sandbox/linux/seccomp-bpf/sandbox_bpf.cc#57)final int bogusArg = 0xf7a46a5c;// test seccomp(BOGUS)long ret = linux_syscall(arch.seccomp, bogusArg);if (ret != -1) {throw new UnsupportedOperationException("seccomp unavailable: seccomp(BOGUS_OPERATION) returned " + ret);} else {int errno = Native.getLastError();switch (errno) {case ENOSYS: break; // okcase EINVAL: break; // okdefault: throw new UnsupportedOperationException("seccomp(BOGUS_OPERATION): " + JNACLibrary.strerror(errno));}}// test seccomp(VALID, BOGUS)ret = linux_syscall(arch.seccomp, SECCOMP_SET_MODE_FILTER, bogusArg);if (ret != -1) {throw new UnsupportedOperationException("seccomp unavailable: seccomp(SECCOMP_SET_MODE_FILTER, BOGUS_FLAG) returned " + ret);} else {int errno = Native.getLastError();switch (errno) {case ENOSYS: break; // okcase EINVAL: break; // okdefault: throw new UnsupportedOperationException("seccomp(SECCOMP_SET_MODE_FILTER, BOGUS_FLAG): "+ JNACLibrary.strerror(errno));}}// test prctl(BOGUS)ret = linux_prctl(bogusArg, 0, 0, 0, 0);if (ret != -1) {throw new UnsupportedOperationException("seccomp unavailable: prctl(BOGUS_OPTION) returned " + ret);} else {int errno = Native.getLastError();switch (errno) {case ENOSYS: break; // okcase EINVAL: break; // okdefault: throw new UnsupportedOperationException("prctl(BOGUS_OPTION): " + JNACLibrary.strerror(errno));}}// now just normal defensive checks// check for GET_NO_NEW_PRIVSswitch (linux_prctl(PR_GET_NO_NEW_PRIVS, 0, 0, 0, 0)) {case 0: break; // not yet setcase 1: break; // already set by callerdefault:int errno = Native.getLastError();if (errno == EINVAL) {// friendly error, this will be the typical case for an old kernelthrow new UnsupportedOperationException("seccomp unavailable: requires kernel 3.5+ with" +" CONFIG_SECCOMP and CONFIG_SECCOMP_FILTER compiled in");} else {throw new UnsupportedOperationException("prctl(PR_GET_NO_NEW_PRIVS): " + JNACLibrary.strerror(errno));}}// check for SECCOMPswitch (linux_prctl(PR_GET_SECCOMP, 0, 0, 0, 0)) {case 0: break; // not yet setcase 2: break; // already in filter mode by callerdefault:int errno = Native.getLastError();if (errno == EINVAL) {throw new UnsupportedOperationException("seccomp unavailable: CONFIG_SECCOMP not compiled into kernel," +" CONFIG_SECCOMP and CONFIG_SECCOMP_FILTER are needed");} else {throw new UnsupportedOperationException("prctl(PR_GET_SECCOMP): " + JNACLibrary.strerror(errno));}}// check for SECCOMP_MODE_FILTERif (linux_prctl(PR_SET_SECCOMP, SECCOMP_MODE_FILTER, 0, 0, 0) != 0) {int errno = Native.getLastError();switch (errno) {case EFAULT: break; // availablecase EINVAL: throw new UnsupportedOperationException("seccomp unavailable: CONFIG_SECCOMP_FILTER not" +" compiled into kernel, CONFIG_SECCOMP and CONFIG_SECCOMP_FILTER are needed");default: throw new UnsupportedOperationException("prctl(PR_SET_SECCOMP): " + JNACLibrary.strerror(errno));}}// ok, now set PR_SET_NO_NEW_PRIVS, needed to be able to set a seccomp filter as ordinary userif (linux_prctl(PR_SET_NO_NEW_PRIVS, 1, 0, 0, 0) != 0) {throw new UnsupportedOperationException("prctl(PR_SET_NO_NEW_PRIVS): " + JNACLibrary.strerror(Native.getLastError()));}// check it workedif (linux_prctl(PR_GET_NO_NEW_PRIVS, 0, 0, 0, 0) != 1) {throw new UnsupportedOperationException("seccomp filter did not really succeed: prctl(PR_GET_NO_NEW_PRIVS): " +JNACLibrary.strerror(Native.getLastError()));}// BPF installed to check arch, limit, then syscall.// See https://www.kernel.org/doc/Documentation/prctl/seccomp_filter.txt for details.SockFilter insns[] = {/* 1 */ BPF_STMT(BPF_LD + BPF_W + BPF_ABS, SECCOMP_DATA_ARCH_OFFSET), ///* 2 */ BPF_JUMP(BPF_JMP + BPF_JEQ + BPF_K, arch.audit, 0, 7), // if (arch != audit) goto fail;/* 3 */ BPF_STMT(BPF_LD + BPF_W + BPF_ABS, SECCOMP_DATA_NR_OFFSET), ///* 4 */ BPF_JUMP(BPF_JMP + BPF_JGT + BPF_K, arch.limit, 5, 0), // if (syscall > LIMIT) goto fail;/* 5 */ BPF_JUMP(BPF_JMP + BPF_JEQ + BPF_K, arch.fork, 4, 0), // if (syscall == FORK) goto fail;/* 6 */ BPF_JUMP(BPF_JMP + BPF_JEQ + BPF_K, arch.vfork, 3, 0), // if (syscall == VFORK) goto fail;/* 7 */ BPF_JUMP(BPF_JMP + BPF_JEQ + BPF_K, arch.execve, 2, 0), // if (syscall == EXECVE) goto fail;/* 8 */ BPF_JUMP(BPF_JMP + BPF_JEQ + BPF_K, arch.execveat, 1, 0), // if (syscall == EXECVEAT) goto fail;/* 9 */ BPF_STMT(BPF_RET + BPF_K, SECCOMP_RET_ALLOW), // pass: return OK;/* 10 */ BPF_STMT(BPF_RET + BPF_K, SECCOMP_RET_ERRNO | (EACCES & SECCOMP_RET_DATA)), // fail: return EACCES;};// seccomp takes a long, so we pass it one explicitly to keep the JNA simpleSockFProg prog = new SockFProg(insns);prog.write();long pointer = Pointer.nativeValue(prog.getPointer());int method = 1;// install filter, if this works, after this there is no going back!// first try it with seccomp(SECCOMP_SET_MODE_FILTER), falling back to prctl()if (linux_syscall(arch.seccomp, SECCOMP_SET_MODE_FILTER, SECCOMP_FILTER_FLAG_TSYNC, new NativeLong(pointer)) != 0) {method = 0;int errno1 = Native.getLastError();if (logger.isDebugEnabled()) {logger.debug("seccomp(SECCOMP_SET_MODE_FILTER): {}, falling back to prctl(PR_SET_SECCOMP)...",JNACLibrary.strerror(errno1));}if (linux_prctl(PR_SET_SECCOMP, SECCOMP_MODE_FILTER, pointer, 0, 0) != 0) {int errno2 = Native.getLastError();throw new UnsupportedOperationException("seccomp(SECCOMP_SET_MODE_FILTER): " + JNACLibrary.strerror(errno1) +", prctl(PR_SET_SECCOMP): " + JNACLibrary.strerror(errno2));}}// now check that the filter was really installed, we should be in filter mode.if (linux_prctl(PR_GET_SECCOMP, 0, 0, 0, 0) != 2) {throw new UnsupportedOperationException("seccomp filter installation did not really succeed. seccomp(PR_GET_SECCOMP): "+ JNACLibrary.strerror(Native.getLastError()));}logger.debug("Linux seccomp filter installation successful, threads: [{}]", method == 1 ? "all" : "app" );return method;}// org.elasticsearch.bootstrap.Natives#tryMlockallstatic void tryMlockall() {if (JNA_AVAILABLE == false) {logger.warn("cannot mlockall because JNA is not available");return;}JNANatives.tryMlockall();}// org.elasticsearch.bootstrap.JNANatives#tryMlockallstatic void tryMlockall() {int errno = Integer.MIN_VALUE;String errMsg = null;boolean rlimitSuccess = false;long softLimit = 0;long hardLimit = 0;try {int result = JNACLibrary.mlockall(JNACLibrary.MCL_CURRENT);if (result == 0) {LOCAL_MLOCKALL = true;return;}errno = Native.getLastError();errMsg = JNACLibrary.strerror(errno);if (Constants.LINUX || Constants.MAC_OS_X) {// we only know RLIMIT_MEMLOCK for these two at the moment.JNACLibrary.Rlimit rlimit = new JNACLibrary.Rlimit();if (JNACLibrary.getrlimit(JNACLibrary.RLIMIT_MEMLOCK, rlimit) == 0) {rlimitSuccess = true;softLimit = rlimit.rlim_cur.longValue();hardLimit = rlimit.rlim_max.longValue();} else {logger.warn("Unable to retrieve resource limits: {}", JNACLibrary.strerror(Native.getLastError()));}}} catch (UnsatisfiedLinkError e) {// this will have already been logged by CLibrary, no need to repeat itreturn;}// mlockall failed for some reasonlogger.warn("Unable to lock JVM Memory: error={}, reason={}", errno , errMsg);logger.warn("This can result in part of the JVM being swapped out.");if (errno == JNACLibrary.ENOMEM) {if (rlimitSuccess) {logger.warn("Increase RLIMIT_MEMLOCK, soft limit: {}, hard limit: {}", rlimitToString(softLimit),rlimitToString(hardLimit));if (Constants.LINUX) {// give specific instructions for the linux case to make it easyString user = System.getProperty("user.name");logger.warn("These can be adjusted by modifying /etc/security/limits.conf, for example: \n" +"\t# allow user '{}' mlockall\n" +"\t{} soft memlock unlimited\n" +"\t{} hard memlock unlimited",user, user, user);logger.warn("If you are logged in interactively, you will have to re-login for the new limits to take effect.");}} else {logger.warn("Increase RLIMIT_MEMLOCK (ulimit).");}}}
检查重复类的实现,主要是看是否存在重复jar包,以及类名,详情可戳。
// org.elasticsearch.bootstrap.JarHell#checkJarHell/*** Checks the current classpath for duplicate classes* @param output A {@link String} {@link Consumer} to which debug output will be sent* @throws IllegalStateException if jar hell was found*/public static void checkJarHell(Consumer<String> output) throws IOException, URISyntaxException {ClassLoader loader = JarHell.class.getClassLoader();output.accept("java.class.path: " + System.getProperty("java.class.path"));output.accept("sun.boot.class.path: " + System.getProperty("sun.boot.class.path"));if (loader instanceof URLClassLoader) {output.accept("classloader urls: " + Arrays.toString(((URLClassLoader)loader).getURLs()));}checkJarHell(parseClassPath(), output);}/*** Checks the set of URLs for duplicate classes* @param urls A set of URLs from the classpath to be checked for conflicting jars* @param output A {@link String} {@link Consumer} to which debug output will be sent* @throws IllegalStateException if jar hell was found*/@SuppressForbidden(reason = "needs JarFile for speed, just reading entries")public static void checkJarHell(Set<URL> urls, Consumer<String> output) throws URISyntaxException, IOException {// we don't try to be sneaky and use deprecated/internal/not portable stuff// like sun.boot.class.path, and with jigsaw we don't yet have a way to get// a "list" at all. So just exclude any elements underneath the java homeString javaHome = System.getProperty("java.home");output.accept("java.home: " + javaHome);final Map<String,Path> clazzes = new HashMap<>(32768);Set<Path> seenJars = new HashSet<>();for (final URL url : urls) {final Path path = PathUtils.get(url.toURI());// exclude system resourcesif (path.startsWith(javaHome)) {output.accept("excluding system resource: " + path);continue;}if (path.toString().endsWith(".jar")) {// jar包重复if (seenJars.add(path) == false) {throw new IllegalStateException("jar hell!" + System.lineSeparator() +"duplicate jar on classpath: " + path);}output.accept("examining jar: " + path);try (JarFile file = new JarFile(path.toString())) {Manifest manifest = file.getManifest();if (manifest != null) {// 检查 MANIFEST.MF, 版本号...checkManifest(manifest, path);}// inspect entriesEnumeration<JarEntry> elements = file.entries();while (elements.hasMoreElements()) {String entry = elements.nextElement().getName();if (entry.endsWith(".class")) {// for jar format, the separator is defined as /entry = entry.replace('/', '.').substring(0, entry.length() - 6);checkClass(clazzes, entry, path);}}}} else {output.accept("examining directory: " + path);// case for tests: where we have class files in the classpathfinal Path root = PathUtils.get(url.toURI());final String sep = root.getFileSystem().getSeparator();// don't try and walk class or resource directories that don't exist// gradle will add these to the classpath even if they never get createdif (Files.exists(root)) {Files.walkFileTree(root, new SimpleFileVisitor<Path>() {@Overridepublic FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {String entry = root.relativize(file).toString();if (entry.endsWith(".class")) {// normalize with the os separator, remove '.class'entry = entry.replace(sep, ".").substring(0, entry.length() - ".class".length());checkClass(clazzes, entry, path);}return super.visitFile(file, attrs);}});}}}}// class 检查private static void checkClass(Map<String, Path> clazzes, String clazz, Path jarpath) {if (clazz.equals("module-info") || clazz.endsWith(".module-info")) {// Ignore jigsaw module descriptionsreturn;}Path previous = clazzes.put(clazz, jarpath);if (previous != null) {if (previous.equals(jarpath)) {if (clazz.startsWith("org.apache.xmlbeans")) {return; // https://issues.apache.org/jira/browse/XMLBEANS-499}// throw a better exception in this ridiculous case.// unfortunately the zip file format allows this buggy possibility// UweSays: It can, but should be considered as bug :-)throw new IllegalStateException("jar hell!" + System.lineSeparator() +"class: " + clazz + System.lineSeparator() +"exists multiple times in jar: " + jarpath + " !!!!!!!!!");} else {throw new IllegalStateException("jar hell!" + System.lineSeparator() +"class: " + clazz + System.lineSeparator() +"jar1: " + previous + System.lineSeparator() +"jar2: " + jarpath);}}}
设置 SecurityManager 如下:
// org.elasticsearch.bootstrap.Security#configure/*** Initializes SecurityManager for the environment* Can only happen once!* @param environment configuration for generating dynamic permissions* @param filterBadDefaults true if we should filter out bad java defaults in the system policy.*/static void configure(Environment environment, boolean filterBadDefaults) throws IOException, NoSuchAlgorithmException {// enable security policy: union of template and environment-based paths, and possibly plugin permissionsMap<String, URL> codebases = PolicyUtil.getCodebaseJarMap(JarHell.parseClassPath());Policy.setPolicy(new ESPolicy(codebases, createPermissions(environment),getPluginAndModulePermissions(environment), filterBadDefaults, createRecursiveDataPathPermission(environment)));// enable security managerfinal String[] classesThatCanExit =new String[]{// SecureSM matches class names as regular expressions so we escape the $ that arises from the nested class nameElasticsearchUncaughtExceptionHandler.PrivilegedHaltAction.class.getName().replace("$", "\\$"),Command.class.getName()};System.setSecurityManager(new SecureSM(classesThatCanExit));// do some basic testsselfTest();}/** Simple checks that everything is ok */@SuppressForbidden(reason = "accesses jvm default tempdir as a self-test")static void selfTest() throws IOException {// check we can manipulate temporary filestry {// 创建和删除临时文件,以测试 SM 有效性Path p = Files.createTempFile(null, null);try {Files.delete(p);} catch (IOException ignored) {// potentially virus scanner}} catch (SecurityException problem) {throw new SecurityException("Security misconfiguration: cannot access java.io.tmpdir", problem);}}
以上,就是整个es的setup过程了,native环境检查,依赖jar包的检查,钩子安装,sm安装。。。当然了还有最重要的node的创建,这个我们下节再说。
2.4. Node的创建与检查
每个es-server实际上都是作为一个集群的一个节点运行的,而它的核心工作也是以Node形式呈现的。所以单独谈谈node的创建。
node是在setup中实例化的,而且是另外实现了一个Node, 主要是为了覆盖validateNodeBeforeAcceptingRequests() .
// org.elasticsearch.bootstrap.Bootstrap#setupprivate void setup(boolean addShutdownHook, Environment environment) throws BootstrapException {...node = new Node(environment) {@Overrideprotected void validateNodeBeforeAcceptingRequests(final BootstrapContext context,final BoundTransportAddress boundTransportAddress, List<BootstrapCheck> checks) throws NodeValidationException {BootstrapChecks.check(context, boundTransportAddress, checks);}};}
所以除去校验工作是在 Bootstrap 中完成外,其他工作都是在 Node 的原生实现中完成,当然这里指的是构造方法。
// org.elasticsearch.node.Node#Nodepublic Node(Environment environment) {this(environment, Collections.emptyList(), true);}/*** Constructs a node** @param initialEnvironment the initial environment for this node, which will be added to by plugins* @param classpathPlugins the plugins to be loaded from the classpath* @param forbidPrivateIndexSettings whether or not private index settings are forbidden when creating an index; this is used in the* test framework for tests that rely on being able to set private settings*/protected Node(final Environment initialEnvironment,Collection<Class<? extends Plugin>> classpathPlugins, boolean forbidPrivateIndexSettings) {final List<Closeable> resourcesToClose = new ArrayList<>(); // register everything we need to release in the case of an errorboolean success = false;try {Settings tmpSettings = Settings.builder().put(initialEnvironment.settings()).put(Client.CLIENT_TYPE_SETTING_S.getKey(), CLIENT_TYPE).build();final JvmInfo jvmInfo = JvmInfo.jvmInfo();logger.info("version[{}], pid[{}], build[{}/{}/{}/{}], OS[{}/{}/{}], JVM[{}/{}/{}/{}]",Build.CURRENT.getQualifiedVersion(),jvmInfo.pid(),Build.CURRENT.flavor().displayName(),Build.CURRENT.type().displayName(),Build.CURRENT.hash(),Build.CURRENT.date(),Constants.OS_NAME,Constants.OS_VERSION,Constants.OS_ARCH,Constants.JVM_VENDOR,Constants.JVM_NAME,Constants.JAVA_VERSION,Constants.JVM_VERSION);if (jvmInfo.getBundledJdk()) {logger.info("JVM home [{}], using bundled JDK [{}]", System.getProperty("java.home"), jvmInfo.getUsingBundledJdk());} else {logger.info("JVM home [{}]", System.getProperty("java.home"));deprecationLogger.deprecate(DeprecationCategory.OTHER,"no-jdk","no-jdk distributions that do not bundle a JDK are deprecated and will be removed in a future release");}logger.info("JVM arguments {}", Arrays.toString(jvmInfo.getInputArguments()));if (Build.CURRENT.isProductionRelease() == false) {logger.warn("version [{}] is a pre-release version of Elasticsearch and is not suitable for production",Build.CURRENT.getQualifiedVersion());}if (logger.isDebugEnabled()) {logger.debug("using config [{}], data [{}], logs [{}], plugins [{}]",initialEnvironment.configFile(), Arrays.toString(initialEnvironment.dataFiles()),initialEnvironment.logsFile(), initialEnvironment.pluginsFile());}// 1. 插件服务实例化this.pluginsService = new PluginsService(tmpSettings, initialEnvironment.configFile(), initialEnvironment.modulesFile(),initialEnvironment.pluginsFile(), classpathPlugins);final Settings settings = pluginsService.updatedSettings();final Set<DiscoveryNodeRole> additionalRoles = pluginsService.filterPlugins(Plugin.class).stream().map(Plugin::getRoles).flatMap(Set::stream).collect(Collectors.toSet());// role保存DiscoveryNode.setAdditionalRoles(additionalRoles);/** Create the environment based on the finalized view of the settings. This is to ensure that components get the same setting* values, no matter they ask for them from.*/this.environment = new Environment(settings, initialEnvironment.configFile());Environment.assertEquivalent(initialEnvironment, this.environment);nodeEnvironment = new NodeEnvironment(tmpSettings, environment);logger.info("node name [{}], node ID [{}], cluster name [{}], roles {}",NODE_NAME_SETTING.get(tmpSettings), nodeEnvironment.nodeId(), ClusterName.CLUSTER_NAME_SETTING.get(tmpSettings).value(),DiscoveryNode.getRolesFromSettings(settings).stream().map(DiscoveryNodeRole::roleName).collect(Collectors.toCollection(LinkedHashSet::new)));resourcesToClose.add(nodeEnvironment);localNodeFactory = new LocalNodeFactory(settings, nodeEnvironment.nodeId());// 2. 创建各执行线程池实例final List<ExecutorBuilder<?>> executorBuilders = pluginsService.getExecutorBuilders(settings);// 这个 ThreadPool 包含了许多类型的请求线程池, 如get/search/post...final ThreadPool threadPool = new ThreadPool(settings, executorBuilders.toArray(new ExecutorBuilder[0]));resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS));final ResourceWatcherService resourceWatcherService = new ResourceWatcherService(settings, threadPool);resourcesToClose.add(resourceWatcherService);// adds the context to the DeprecationLogger so that it does not need to be injected everywhereHeaderWarning.setThreadContext(threadPool.getThreadContext());resourcesToClose.add(() -> HeaderWarning.removeThreadContext(threadPool.getThreadContext()));final List<Setting<?>> additionalSettings = new ArrayList<>();// register the node.data, node.ingest, node.master, node.remote_cluster_client settings here so we can mark them privateadditionalSettings.add(NODE_DATA_SETTING);additionalSettings.add(NODE_INGEST_SETTING);additionalSettings.add(NODE_MASTER_SETTING);additionalSettings.add(NODE_REMOTE_CLUSTER_CLIENT);additionalSettings.addAll(pluginsService.getPluginSettings());final List<String> additionalSettingsFilter = new ArrayList<>(pluginsService.getPluginSettingsFilter());for (final ExecutorBuilder<?> builder : threadPool.builders()) {additionalSettings.addAll(builder.getRegisteredSettings());}// 创建NodeClient实例client = new NodeClient(settings, threadPool);// ScriptPlugin 实例载入final ScriptModule scriptModule = new ScriptModule(settings, pluginsService.filterPlugins(ScriptPlugin.class));final ScriptService scriptService = newScriptService(settings, scriptModule.engines, scriptModule.contexts);// AnalysisPlugin 实例载入AnalysisModule analysisModule = new AnalysisModule(this.environment, pluginsService.filterPlugins(AnalysisPlugin.class));// this is as early as we can validate settings at this point. we already pass them to ScriptModule as well as ThreadPool// so we might be late here alreadyfinal Set<SettingUpgrader<?>> settingsUpgraders = pluginsService.filterPlugins(Plugin.class).stream().map(Plugin::getSettingUpgraders).flatMap(List::stream).collect(Collectors.toSet());final SettingsModule settingsModule =new SettingsModule(settings, additionalSettings, additionalSettingsFilter, settingsUpgraders);scriptModule.registerClusterSettingsListeners(scriptService, settingsModule.getClusterSettings());// DiscoveryPlugin 实例载入final NetworkService networkService = new NetworkService(getCustomNameResolvers(pluginsService.filterPlugins(DiscoveryPlugin.class)));// Cluster 服务初始化List<ClusterPlugin> clusterPlugins = pluginsService.filterPlugins(ClusterPlugin.class);final ClusterService clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool);clusterService.addStateApplier(scriptService);resourcesToClose.add(clusterService);final Set<Setting<?>> consistentSettings = settingsModule.getConsistentSettings();if (consistentSettings.isEmpty() == false) {clusterService.addLocalNodeMasterListener(new ConsistentSettingsService(settings, clusterService, consistentSettings).newHashPublisher());}// IngestService 服务初始化final IngestService ingestService = new IngestService(clusterService, threadPool, this.environment,scriptService, analysisModule.getAnalysisRegistry(),pluginsService.filterPlugins(IngestPlugin.class), client);final SetOnce<RepositoriesService> repositoriesServiceReference = new SetOnce<>();final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client);final UsageService usageService = new UsageService();ModulesBuilder modules = new ModulesBuilder();// 监控服务初始化final MonitorService monitorService = new MonitorService(settings, nodeEnvironment, threadPool);// 健康检查服务初始化final FsHealthService fsHealthService = new FsHealthService(settings, clusterService.getClusterSettings(), threadPool,nodeEnvironment);final SetOnce<RerouteService> rerouteServiceReference = new SetOnce<>();// snapshotsInfoService 服务初始化final InternalSnapshotsInfoService snapshotsInfoService = new InternalSnapshotsInfoService(settings, clusterService,repositoriesServiceReference::get, rerouteServiceReference::get);final ClusterModule clusterModule = new ClusterModule(settings, clusterService, clusterPlugins, clusterInfoService,snapshotsInfoService, threadPool.getThreadContext());modules.add(clusterModule);// 索引模块服务 初始化IndicesModule indicesModule = new IndicesModule(pluginsService.filterPlugins(MapperPlugin.class));modules.add(indicesModule);// 搜索模块服务 初始化SearchModule searchModule = new SearchModule(settings, pluginsService.filterPlugins(SearchPlugin.class));// CircuitBreakerPlugin 实例载入List<BreakerSettings> pluginCircuitBreakers = pluginsService.filterPlugins(CircuitBreakerPlugin.class).stream().map(plugin -> plugin.getCircuitBreaker(settings)).collect(Collectors.toList());final CircuitBreakerService circuitBreakerService = createCircuitBreakerService(settingsModule.getSettings(),pluginCircuitBreakers,settingsModule.getClusterSettings());pluginsService.filterPlugins(CircuitBreakerPlugin.class).forEach(plugin -> {CircuitBreaker breaker = circuitBreakerService.getBreaker(plugin.getCircuitBreaker(settings).getName());plugin.setCircuitBreaker(breaker);});resourcesToClose.add(circuitBreakerService);// GatewayModule 载入modules.add(new GatewayModule());PageCacheRecycler pageCacheRecycler = createPageCacheRecycler(settings);BigArrays bigArrays = createBigArrays(pageCacheRecycler, circuitBreakerService);modules.add(settingsModule);List<NamedWriteableRegistry.Entry> namedWriteables = Stream.of(NetworkModule.getNamedWriteables().stream(),IndicesModule.getNamedWriteables().stream(),searchModule.getNamedWriteables().stream(),pluginsService.filterPlugins(Plugin.class).stream().flatMap(p -> p.getNamedWriteables().stream()),ClusterModule.getNamedWriteables().stream()).flatMap(Function.identity()).collect(Collectors.toList());final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(namedWriteables);NamedXContentRegistry xContentRegistry = new NamedXContentRegistry(Stream.of(NetworkModule.getNamedXContents().stream(),IndicesModule.getNamedXContents().stream(),searchModule.getNamedXContents().stream(),pluginsService.filterPlugins(Plugin.class).stream().flatMap(p -> p.getNamedXContent().stream()),ClusterModule.getNamedXWriteables().stream()).flatMap(Function.identity()).collect(toList()));// metaStateService 初始化final MetaStateService metaStateService = new MetaStateService(nodeEnvironment, xContentRegistry);// 各服务工厂实例化final PersistedClusterStateService lucenePersistedStateFactory= new PersistedClusterStateService(nodeEnvironment, xContentRegistry, bigArrays, clusterService.getClusterSettings(),threadPool::relativeTimeInMillis);// collect engine factory providers from pluginsfinal Collection<EnginePlugin> enginePlugins = pluginsService.filterPlugins(EnginePlugin.class);final Collection<Function<IndexSettings, Optional<EngineFactory>>> engineFactoryProviders =enginePlugins.stream().map(plugin -> (Function<IndexSettings, Optional<EngineFactory>>)plugin::getEngineFactory).collect(Collectors.toList());final Map<String, IndexStorePlugin.DirectoryFactory> indexStoreFactories =pluginsService.filterPlugins(IndexStorePlugin.class).stream().map(IndexStorePlugin::getDirectoryFactories).flatMap(m -> m.entrySet().stream()).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories =pluginsService.filterPlugins(IndexStorePlugin.class).stream().map(IndexStorePlugin::getRecoveryStateFactories).flatMap(m -> m.entrySet().stream()).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));final List<IndexStorePlugin.IndexFoldersDeletionListener> indexFoldersDeletionListeners =pluginsService.filterPlugins(IndexStorePlugin.class).stream().map(IndexStorePlugin::getIndexFoldersDeletionListeners).flatMap(List::stream).collect(Collectors.toList());final Map<String, IndexStorePlugin.SnapshotCommitSupplier> snapshotCommitSuppliers =pluginsService.filterPlugins(IndexStorePlugin.class).stream().map(IndexStorePlugin::getSnapshotCommitSuppliers).flatMap(m -> m.entrySet().stream()).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));final Map<String, Collection<SystemIndexDescriptor>> systemIndexDescriptorMap = pluginsService.filterPlugins(SystemIndexPlugin.class).stream().collect(Collectors.toUnmodifiableMap(plugin -> plugin.getClass().getSimpleName(),plugin -> plugin.getSystemIndexDescriptors(settings)));final SystemIndices systemIndices = new SystemIndices(systemIndexDescriptorMap);final SystemIndexManager systemIndexManager = new SystemIndexManager(systemIndices, client);clusterService.addListener(systemIndexManager);final RerouteService rerouteService= new BatchedRerouteService(clusterService, clusterModule.getAllocationService()::reroute);rerouteServiceReference.set(rerouteService);clusterService.setRerouteService(rerouteService);// 索引服务实例化,带入以上解析的许多参数final IndicesService indicesService =new IndicesService(settings, pluginsService, nodeEnvironment, xContentRegistry, analysisModule.getAnalysisRegistry(),clusterModule.getIndexNameExpressionResolver(), indicesModule.getMapperRegistry(), namedWriteableRegistry,threadPool, settingsModule.getIndexScopedSettings(), circuitBreakerService, bigArrays, scriptService,clusterService, client, metaStateService, engineFactoryProviders, indexStoreFactories,searchModule.getValuesSourceRegistry(), recoveryStateFactories, indexFoldersDeletionListeners,snapshotCommitSuppliers);final AliasValidator aliasValidator = new AliasValidator();final ShardLimitValidator shardLimitValidator = new ShardLimitValidator(settings, clusterService);final MetadataCreateIndexService metadataCreateIndexService = new MetadataCreateIndexService(settings,clusterService,indicesService,clusterModule.getAllocationService(),aliasValidator,shardLimitValidator,environment,settingsModule.getIndexScopedSettings(),threadPool,xContentRegistry,systemIndices,forbidPrivateIndexSettings);pluginsService.filterPlugins(Plugin.class).forEach(p -> p.getAdditionalIndexSettingProviders().forEach(metadataCreateIndexService::addAdditionalIndexSettingProvider));final MetadataCreateDataStreamService metadataCreateDataStreamService =new MetadataCreateDataStreamService(threadPool, clusterService, metadataCreateIndexService);Collection<Object> pluginComponents = pluginsService.filterPlugins(Plugin.class).stream().flatMap(p -> p.createComponents(client, clusterService, threadPool, resourceWatcherService,scriptService, xContentRegistry, environment, nodeEnvironment,namedWriteableRegistry, clusterModule.getIndexNameExpressionResolver(),repositoriesServiceReference::get).stream()).collect(Collectors.toList());ActionModule actionModule = new ActionModule(settings, clusterModule.getIndexNameExpressionResolver(),settingsModule.getIndexScopedSettings(), settingsModule.getClusterSettings(), settingsModule.getSettingsFilter(),threadPool, pluginsService.filterPlugins(ActionPlugin.class), client, circuitBreakerService, usageService, systemIndices);modules.add(actionModule);// restController 接入final RestController restController = actionModule.getRestController();final NetworkModule networkModule = new NetworkModule(settings, pluginsService.filterPlugins(NetworkPlugin.class),threadPool, bigArrays, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, xContentRegistry,networkService, restController, clusterService.getClusterSettings());Collection<UnaryOperator<Map<String, IndexTemplateMetadata>>> indexTemplateMetadataUpgraders =pluginsService.filterPlugins(Plugin.class).stream().map(Plugin::getIndexTemplateMetadataUpgrader).collect(Collectors.toList());final MetadataUpgrader metadataUpgrader = new MetadataUpgrader(indexTemplateMetadataUpgraders);final MetadataIndexUpgradeService metadataIndexUpgradeService = new MetadataIndexUpgradeService(settings, xContentRegistry,indicesModule.getMapperRegistry(), settingsModule.getIndexScopedSettings(), scriptService);if (DiscoveryNode.isMasterNode(settings)) {clusterService.addListener(new SystemIndexMetadataUpgradeService(systemIndices, clusterService));}new TemplateUpgradeService(client, clusterService, threadPool, indexTemplateMetadataUpgraders);final Transport transport = networkModule.getTransportSupplier().get();Set<String> taskHeaders = Stream.concat(pluginsService.filterPlugins(ActionPlugin.class).stream().flatMap(p -> p.getTaskHeaders().stream()),Stream.of(Task.X_OPAQUE_ID)).collect(Collectors.toSet());final TransportService transportService = newTransportService(settings, transport, threadPool,networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings(), taskHeaders);final GatewayMetaState gatewayMetaState = new GatewayMetaState();final ResponseCollectorService responseCollectorService = new ResponseCollectorService(clusterService);final SearchTransportService searchTransportService = new SearchTransportService(transportService, client,SearchExecutionStatsCollector.makeWrapper(responseCollectorService));final HttpServerTransport httpServerTransport = newHttpTransport(networkModule);final IndexingPressure indexingLimits = new IndexingPressure(settings);final RecoverySettings recoverySettings = new RecoverySettings(settings, settingsModule.getClusterSettings());RepositoriesModule repositoriesModule = new RepositoriesModule(this.environment,pluginsService.filterPlugins(RepositoryPlugin.class), transportService, clusterService, bigArrays, xContentRegistry,recoverySettings);RepositoriesService repositoryService = repositoriesModule.getRepositoryService();repositoriesServiceReference.set(repositoryService);SnapshotsService snapshotsService = new SnapshotsService(settings, clusterService,clusterModule.getIndexNameExpressionResolver(), repositoryService, transportService, actionModule.getActionFilters());SnapshotShardsService snapshotShardsService = new SnapshotShardsService(settings, clusterService, repositoryService,transportService, indicesService);// restoreService 服务初始化RestoreService restoreService = new RestoreService(clusterService, repositoryService, clusterModule.getAllocationService(),metadataCreateIndexService, metadataIndexUpgradeService, clusterService.getClusterSettings(), shardLimitValidator);final DiskThresholdMonitor diskThresholdMonitor = new DiskThresholdMonitor(settings, clusterService::state,clusterService.getClusterSettings(), client, threadPool::relativeTimeInMillis, rerouteService);clusterInfoService.addListener(diskThresholdMonitor::onNewInfo);// 服务发现模块初始化final DiscoveryModule discoveryModule = new DiscoveryModule(settings, transportService, namedWriteableRegistry,networkService, clusterService.getMasterService(), clusterService.getClusterApplierService(),clusterService.getClusterSettings(), pluginsService.filterPlugins(DiscoveryPlugin.class),clusterModule.getAllocationService(), environment.configFile(), gatewayMetaState, rerouteService,fsHealthService);this.nodeService = new NodeService(settings, threadPool, monitorService, discoveryModule.getDiscovery(),transportService, indicesService, pluginsService, circuitBreakerService, scriptService,httpServerTransport, ingestService, clusterService, settingsModule.getSettingsFilter(), responseCollectorService,searchTransportService, indexingLimits, searchModule.getValuesSourceRegistry().getUsageService());final SearchService searchService = newSearchService(clusterService, indicesService,threadPool, scriptService, bigArrays, searchModule.getFetchPhase(),responseCollectorService, circuitBreakerService);final List<PersistentTasksExecutor<?>> tasksExecutors = pluginsService.filterPlugins(PersistentTaskPlugin.class).stream().map(p -> p.getPersistentTasksExecutor(clusterService, threadPool, client, settingsModule,clusterModule.getIndexNameExpressionResolver())).flatMap(List::stream).collect(toList());final PersistentTasksExecutorRegistry registry = new PersistentTasksExecutorRegistry(tasksExecutors);final PersistentTasksClusterService persistentTasksClusterService =new PersistentTasksClusterService(settings, registry, clusterService, threadPool);resourcesToClose.add(persistentTasksClusterService);final PersistentTasksService persistentTasksService = new PersistentTasksService(clusterService, threadPool, client);// EDSL, 依赖注入modules.add(b -> {b.bind(Node.class).toInstance(this);b.bind(NodeService.class).toInstance(nodeService);b.bind(NamedXContentRegistry.class).toInstance(xContentRegistry);b.bind(PluginsService.class).toInstance(pluginsService);b.bind(Client.class).toInstance(client);b.bind(NodeClient.class).toInstance(client);b.bind(Environment.class).toInstance(this.environment);b.bind(ThreadPool.class).toInstance(threadPool);b.bind(NodeEnvironment.class).toInstance(nodeEnvironment);b.bind(ResourceWatcherService.class).toInstance(resourceWatcherService);b.bind(CircuitBreakerService.class).toInstance(circuitBreakerService);b.bind(BigArrays.class).toInstance(bigArrays);b.bind(PageCacheRecycler.class).toInstance(pageCacheRecycler);b.bind(ScriptService.class).toInstance(scriptService);b.bind(AnalysisRegistry.class).toInstance(analysisModule.getAnalysisRegistry());b.bind(IngestService.class).toInstance(ingestService);b.bind(IndexingPressure.class).toInstance(indexingLimits);b.bind(UsageService.class).toInstance(usageService);b.bind(AggregationUsageService.class).toInstance(searchModule.getValuesSourceRegistry().getUsageService());b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry);b.bind(MetadataUpgrader.class).toInstance(metadataUpgrader);b.bind(MetaStateService.class).toInstance(metaStateService);b.bind(PersistedClusterStateService.class).toInstance(lucenePersistedStateFactory);b.bind(IndicesService.class).toInstance(indicesService);b.bind(AliasValidator.class).toInstance(aliasValidator);b.bind(MetadataCreateIndexService.class).toInstance(metadataCreateIndexService);b.bind(MetadataCreateDataStreamService.class).toInstance(metadataCreateDataStreamService);b.bind(SearchService.class).toInstance(searchService);b.bind(SearchTransportService.class).toInstance(searchTransportService);b.bind(SearchPhaseController.class).toInstance(new SearchPhaseController(namedWriteableRegistry, searchService::aggReduceContextBuilder));b.bind(Transport.class).toInstance(transport);b.bind(TransportService.class).toInstance(transportService);b.bind(NetworkService.class).toInstance(networkService);b.bind(UpdateHelper.class).toInstance(new UpdateHelper(scriptService));b.bind(MetadataIndexUpgradeService.class).toInstance(metadataIndexUpgradeService);b.bind(ClusterInfoService.class).toInstance(clusterInfoService);b.bind(SnapshotsInfoService.class).toInstance(snapshotsInfoService);b.bind(GatewayMetaState.class).toInstance(gatewayMetaState);b.bind(Discovery.class).toInstance(discoveryModule.getDiscovery());{processRecoverySettings(settingsModule.getClusterSettings(), recoverySettings);b.bind(PeerRecoverySourceService.class).toInstance(new PeerRecoverySourceService(transportService,indicesService, recoverySettings));b.bind(PeerRecoveryTargetService.class).toInstance(new PeerRecoveryTargetService(threadPool,transportService, recoverySettings, clusterService));}b.bind(HttpServerTransport.class).toInstance(httpServerTransport);pluginComponents.stream().forEach(p -> b.bind((Class) p.getClass()).toInstance(p));b.bind(PersistentTasksService.class).toInstance(persistentTasksService);b.bind(PersistentTasksClusterService.class).toInstance(persistentTasksClusterService);b.bind(PersistentTasksExecutorRegistry.class).toInstance(registry);b.bind(RepositoriesService.class).toInstance(repositoryService);b.bind(SnapshotsService.class).toInstance(snapshotsService);b.bind(SnapshotShardsService.class).toInstance(snapshotShardsService);b.bind(RestoreService.class).toInstance(restoreService);b.bind(RerouteService.class).toInstance(rerouteService);b.bind(ShardLimitValidator.class).toInstance(shardLimitValidator);b.bind(FsHealthService.class).toInstance(fsHealthService);b.bind(SystemIndices.class).toInstance(systemIndices);});injector = modules.createInjector();// We allocate copies of existing shards by looking for a viable copy of the shard in the cluster and assigning the shard there.// The search for viable copies is triggered by an allocation attempt (i.e. a reroute) and is performed asynchronously. When it// completes we trigger another reroute to try the allocation again. This means there is a circular dependency: the allocation// service needs access to the existing shards allocators (e.g. the GatewayAllocator) which need to be able to trigger a// reroute, which needs to call into the allocation service. We close the loop here:clusterModule.setExistingShardsAllocators(injector.getInstance(GatewayAllocator.class));List<LifecycleComponent> pluginLifecycleComponents = pluginComponents.stream().filter(p -> p instanceof LifecycleComponent).map(p -> (LifecycleComponent) p).collect(Collectors.toList());resourcesToClose.addAll(pluginLifecycleComponents);resourcesToClose.add(injector.getInstance(PeerRecoverySourceService.class));this.pluginLifecycleComponents = Collections.unmodifiableList(pluginLifecycleComponents);client.initialize(injector.getInstance(new Key<Map<ActionType, TransportAction>>() {}),transportService.getTaskManager(),() -> clusterService.localNode().getId(),transportService.getLocalNodeConnection(),transportService.getRemoteClusterService(),namedWriteableRegistry);this.namedWriteableRegistry = namedWriteableRegistry;logger.debug("initializing HTTP handlers ...");actionModule.initRestHandlers(() -> clusterService.state().nodes());logger.info("initialized");success = true;} catch (IOException ex) {throw new ElasticsearchException("failed to bind service", ex);} finally {if (success == false) {IOUtils.closeWhileHandlingException(resourcesToClose);}}}
怎么样?一看就很复杂吧。是的,不然怎么叫核心呢?当然了,里面有许多的是检查和警告日志的操作。从中,我们也可以看出,pluginsService 是个重要的入口服务,它提供了许多的功能筛选。而 ModulesBuilder 则作为一个聚合各服务的所有者,将相关服务聚到一起。并最终以依赖注入的形式,为后续快速使用各服务打下了基础。线程池是ES的重要组件,它使用一个 ThreadPool, 将所有使用到的线程池都封装起来,并供其他场景使用。最后,将client初始化,以及提供rest接口的相关服务绑定,完成node创建。
// 插件过滤实现@SuppressWarnings("unchecked")public <T> List<T> filterPlugins(Class<T> type) {return plugins.stream().filter(x -> type.isAssignableFrom(x.v2().getClass())).map(p -> ((T)p.v2())).collect(Collectors.toList());}// 线程池创建过程// org.elasticsearch.threadpool.ThreadPool#ThreadPool@SuppressWarnings({"rawtypes", "unchecked"})public ThreadPool(final Settings settings, final ExecutorBuilder<?>... customBuilders) {assert Node.NODE_NAME_SETTING.exists(settings);final Map<String, ExecutorBuilder> builders = new HashMap<>();final int allocatedProcessors = EsExecutors.allocatedProcessors(settings);final int halfProcMaxAt5 = halfAllocatedProcessorsMaxFive(allocatedProcessors);final int halfProcMaxAt10 = halfAllocatedProcessorsMaxTen(allocatedProcessors);final int genericThreadPoolMax = boundedBy(4 * allocatedProcessors, 128, 512);builders.put(Names.GENERIC, new ScalingExecutorBuilder(Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30)));builders.put(Names.WRITE, new FixedExecutorBuilder(settings, Names.WRITE, allocatedProcessors, 10000, false));builders.put(Names.GET, new FixedExecutorBuilder(settings, Names.GET, allocatedProcessors, 1000, false));builders.put(Names.ANALYZE, new FixedExecutorBuilder(settings, Names.ANALYZE, 1, 16, false));builders.put(Names.SEARCH, new FixedExecutorBuilder(settings, Names.SEARCH, searchThreadPoolSize(allocatedProcessors), 1000, true));builders.put(Names.SEARCH_THROTTLED, new FixedExecutorBuilder(settings, Names.SEARCH_THROTTLED, 1, 100, true));builders.put(Names.MANAGEMENT, new ScalingExecutorBuilder(Names.MANAGEMENT, 1, 5, TimeValue.timeValueMinutes(5)));builders.put(Names.FLUSH, new ScalingExecutorBuilder(Names.FLUSH, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));builders.put(Names.REFRESH, new ScalingExecutorBuilder(Names.REFRESH, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5)));builders.put(Names.WARMER, new ScalingExecutorBuilder(Names.WARMER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));builders.put(Names.SNAPSHOT, new ScalingExecutorBuilder(Names.SNAPSHOT, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));builders.put(Names.FETCH_SHARD_STARTED,new ScalingExecutorBuilder(Names.FETCH_SHARD_STARTED, 1, 2 * allocatedProcessors, TimeValue.timeValueMinutes(5)));builders.put(Names.FORCE_MERGE, new FixedExecutorBuilder(settings, Names.FORCE_MERGE, 1, -1, false));builders.put(Names.FETCH_SHARD_STORE,new ScalingExecutorBuilder(Names.FETCH_SHARD_STORE, 1, 2 * allocatedProcessors, TimeValue.timeValueMinutes(5)));builders.put(Names.SYSTEM_READ, new FixedExecutorBuilder(settings, Names.SYSTEM_READ, halfProcMaxAt5, 2000, false));builders.put(Names.SYSTEM_WRITE, new FixedExecutorBuilder(settings, Names.SYSTEM_WRITE, halfProcMaxAt5, 1000, false));for (final ExecutorBuilder<?> builder : customBuilders) {if (builders.containsKey(builder.name())) {throw new IllegalArgumentException("builder with name [" + builder.name() + "] already exists");}builders.put(builder.name(), builder);}this.builders = Collections.unmodifiableMap(builders);threadContext = new ThreadContext(settings);final Map<String, ExecutorHolder> executors = new HashMap<>();for (final Map.Entry<String, ExecutorBuilder> entry : builders.entrySet()) {final ExecutorBuilder.ExecutorSettings executorSettings = entry.getValue().getSettings(settings);final ExecutorHolder executorHolder = entry.getValue().build(executorSettings, threadContext);if (executors.containsKey(executorHolder.info.getName())) {throw new IllegalStateException("duplicate executors with name [" + executorHolder.info.getName() + "] registered");}logger.debug("created thread pool: {}", entry.getValue().formatInfo(executorHolder.info));// 各自类型保存各自的线程池实例,以便将来取用executors.put(entry.getKey(), executorHolder);}executors.put(Names.SAME, new ExecutorHolder(DIRECT_EXECUTOR, new Info(Names.SAME, ThreadPoolType.DIRECT)));this.executors = unmodifiableMap(executors);final List<Info> infos =executors.values().stream().filter(holder -> holder.info.getName().equals("same") == false).map(holder -> holder.info).collect(Collectors.toList());this.threadPoolInfo = new ThreadPoolInfo(infos);this.scheduler = Scheduler.initScheduler(settings);TimeValue estimatedTimeInterval = ESTIMATED_TIME_INTERVAL_SETTING.get(settings);this.cachedTimeThread = new CachedTimeThread(EsExecutors.threadName(settings, "[timer]"), estimatedTimeInterval.millis());this.cachedTimeThread.start();}// Guice 依赖注入创建// org.elasticsearch.common.inject.ModulesBuilder#createInjectorpublic Injector createInjector() {Injector injector = Guice.createInjector(modules);((InjectorImpl) injector).clearCache();// in ES, we always create all instances as if they are eager singletons// this allows for considerable memory savings (no need to store construction info) as well as cycles((InjectorImpl) injector).readOnlyAllSingletons();return injector;}// org.elasticsearch.common.inject.Guice#createInjector/*** Creates an injector for the given set of modules.** @throws CreationException if one or more errors occur during Injector* creation*/public static Injector createInjector(Iterable<? extends Module> modules) {return createInjector(Stage.DEVELOPMENT, modules);}
rest处理器注册过程,详情可戳。
// org.elasticsearch.action.ActionModule#initRestHandlerspublic void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {List<AbstractCatAction> catActions = new ArrayList<>();Consumer<RestHandler> registerHandler = handler -> {if (handler instanceof AbstractCatAction) {catActions.add((AbstractCatAction) handler);}restController.registerHandler(handler);};// 以下是各处理器的注册,我们从中可以看出ES支持的操作大类,每个Action包含了具体实现registerHandler.accept(new RestAddVotingConfigExclusionAction());registerHandler.accept(new RestClearVotingConfigExclusionsAction());registerHandler.accept(new RestMainAction());registerHandler.accept(new RestNodesInfoAction(settingsFilter));registerHandler.accept(new RestRemoteClusterInfoAction());registerHandler.accept(new RestNodesStatsAction());registerHandler.accept(new RestNodesUsageAction());registerHandler.accept(new RestNodesHotThreadsAction());registerHandler.accept(new RestClusterAllocationExplainAction());registerHandler.accept(new RestClusterStatsAction());registerHandler.accept(new RestClusterStateAction(settingsFilter, threadPool));registerHandler.accept(new RestClusterHealthAction());registerHandler.accept(new RestClusterUpdateSettingsAction());registerHandler.accept(new RestClusterGetSettingsAction(settings, clusterSettings, settingsFilter));registerHandler.accept(new RestClusterRerouteAction(settingsFilter));registerHandler.accept(new RestClusterSearchShardsAction());registerHandler.accept(new RestPendingClusterTasksAction());registerHandler.accept(new RestPutRepositoryAction());registerHandler.accept(new RestGetRepositoriesAction(settingsFilter));registerHandler.accept(new RestDeleteRepositoryAction());registerHandler.accept(new RestVerifyRepositoryAction());registerHandler.accept(new RestCleanupRepositoryAction());registerHandler.accept(new RestGetSnapshotsAction());registerHandler.accept(new RestCreateSnapshotAction());registerHandler.accept(new RestCloneSnapshotAction());registerHandler.accept(new RestRestoreSnapshotAction());registerHandler.accept(new RestDeleteSnapshotAction());registerHandler.accept(new RestSnapshotsStatusAction());registerHandler.accept(new RestGetIndicesAction());registerHandler.accept(new RestIndicesStatsAction());registerHandler.accept(new RestIndicesSegmentsAction());registerHandler.accept(new RestIndicesShardStoresAction());registerHandler.accept(new RestGetAliasesAction());registerHandler.accept(new RestIndexDeleteAliasesAction());registerHandler.accept(new RestIndexPutAliasAction());registerHandler.accept(new RestIndicesAliasesAction());registerHandler.accept(new RestCreateIndexAction());registerHandler.accept(new RestResizeHandler.RestShrinkIndexAction());registerHandler.accept(new RestResizeHandler.RestSplitIndexAction());registerHandler.accept(new RestResizeHandler.RestCloneIndexAction());registerHandler.accept(new RestRolloverIndexAction());registerHandler.accept(new RestDeleteIndexAction());registerHandler.accept(new RestCloseIndexAction());registerHandler.accept(new RestOpenIndexAction());registerHandler.accept(new RestAddIndexBlockAction());registerHandler.accept(new RestUpdateSettingsAction());registerHandler.accept(new RestGetSettingsAction());registerHandler.accept(new RestAnalyzeAction());registerHandler.accept(new RestGetIndexTemplateAction());registerHandler.accept(new RestPutIndexTemplateAction());registerHandler.accept(new RestDeleteIndexTemplateAction());registerHandler.accept(new RestPutComponentTemplateAction());registerHandler.accept(new RestGetComponentTemplateAction());registerHandler.accept(new RestDeleteComponentTemplateAction());registerHandler.accept(new RestPutComposableIndexTemplateAction());registerHandler.accept(new RestGetComposableIndexTemplateAction());registerHandler.accept(new RestDeleteComposableIndexTemplateAction());registerHandler.accept(new RestSimulateIndexTemplateAction());registerHandler.accept(new RestSimulateTemplateAction());registerHandler.accept(new RestPutMappingAction());registerHandler.accept(new RestGetMappingAction(threadPool));registerHandler.accept(new RestGetFieldMappingAction());registerHandler.accept(new RestRefreshAction());registerHandler.accept(new RestFlushAction());registerHandler.accept(new RestSyncedFlushAction());registerHandler.accept(new RestForceMergeAction());registerHandler.accept(new RestClearIndicesCacheAction());registerHandler.accept(new RestResolveIndexAction());registerHandler.accept(new RestIndexAction());registerHandler.accept(new CreateHandler());registerHandler.accept(new AutoIdHandler(nodesInCluster));registerHandler.accept(new RestGetAction());registerHandler.accept(new RestGetSourceAction());registerHandler.accept(new RestMultiGetAction(settings));registerHandler.accept(new RestDeleteAction());registerHandler.accept(new RestCountAction());registerHandler.accept(new RestTermVectorsAction());registerHandler.accept(new RestMultiTermVectorsAction());registerHandler.accept(new RestBulkAction(settings));registerHandler.accept(new RestUpdateAction());registerHandler.accept(new RestSearchAction());registerHandler.accept(new RestSearchScrollAction());registerHandler.accept(new RestClearScrollAction());registerHandler.accept(new RestMultiSearchAction(settings));registerHandler.accept(new RestValidateQueryAction());registerHandler.accept(new RestExplainAction());registerHandler.accept(new RestRecoveryAction());registerHandler.accept(new RestReloadSecureSettingsAction());// Scripts APIregisterHandler.accept(new RestGetStoredScriptAction());registerHandler.accept(new RestPutStoredScriptAction());registerHandler.accept(new RestDeleteStoredScriptAction());registerHandler.accept(new RestGetScriptContextAction());registerHandler.accept(new RestGetScriptLanguageAction());registerHandler.accept(new RestFieldCapabilitiesAction());// Tasks APIregisterHandler.accept(new RestListTasksAction(nodesInCluster));registerHandler.accept(new RestGetTaskAction());registerHandler.accept(new RestCancelTasksAction(nodesInCluster));// Ingest APIregisterHandler.accept(new RestPutPipelineAction());registerHandler.accept(new RestGetPipelineAction());registerHandler.accept(new RestDeletePipelineAction());registerHandler.accept(new RestSimulatePipelineAction());// Dangling indices APIregisterHandler.accept(new RestListDanglingIndicesAction());registerHandler.accept(new RestImportDanglingIndexAction());registerHandler.accept(new RestDeleteDanglingIndexAction());// CAT APIregisterHandler.accept(new RestAllocationAction());registerHandler.accept(new RestShardsAction());registerHandler.accept(new RestMasterAction());registerHandler.accept(new RestNodesAction());registerHandler.accept(new RestTasksAction(nodesInCluster));registerHandler.accept(new RestIndicesAction());registerHandler.accept(new RestSegmentsAction());// Fully qualified to prevent interference with rest.action.count.RestCountActionregisterHandler.accept(new org.elasticsearch.rest.action.cat.RestCountAction());// Fully qualified to prevent interference with rest.action.indices.RestRecoveryActionregisterHandler.accept(new RestCatRecoveryAction());registerHandler.accept(new RestHealthAction());registerHandler.accept(new org.elasticsearch.rest.action.cat.RestPendingClusterTasksAction());registerHandler.accept(new RestAliasAction());registerHandler.accept(new RestThreadPoolAction());registerHandler.accept(new RestPluginsAction());registerHandler.accept(new RestFielddataAction());registerHandler.accept(new RestNodeAttrsAction());registerHandler.accept(new RestRepositoriesAction());registerHandler.accept(new RestSnapshotAction());registerHandler.accept(new RestTemplatesAction());for (ActionPlugin plugin : actionPlugins) {for (RestHandler handler : plugin.getRestHandlers(settings, restController, clusterSettings, indexScopedSettings,settingsFilter, indexNameExpressionResolver, nodesInCluster)) {registerHandler.accept(handler);}}registerHandler.accept(new RestCatAction(catActions));}// org.elasticsearch.rest.RestController#registerHandler(org.elasticsearch.rest.RestHandler)/*** Registers a REST handler with the controller. The REST handler declares the {@code method}* and {@code path} combinations.*/public void registerHandler(final RestHandler restHandler) {restHandler.routes().forEach(route -> registerHandler(route.getMethod(), route.getPath(), restHandler));restHandler.deprecatedRoutes().forEach(route ->registerAsDeprecatedHandler(route.getMethod(), route.getPath(), restHandler, route.getDeprecationMessage()));restHandler.replacedRoutes().forEach(route -> registerWithDeprecatedHandler(route.getMethod(), route.getPath(),restHandler, route.getDeprecatedMethod(), route.getDeprecatedPath()));}
2.5. Bootstrap的启动
有了上面如此之多的准备工作,接下来就是真正启动ES了。即前面看到的 INSTANCE.start();
// org.elasticsearch.bootstrap.Bootstrap#startprivate void start() throws NodeValidationException {// 主要是node操作es节点, keepAliveThread 只是做一个锁等待,避免es进程退出。node.start();keepAliveThread.start();}// org.elasticsearch.node.Node#start/*** Start the node. If the node is already started, this method is no-op.*/public Node start() throws NodeValidationException {if (lifecycle.moveToStarted() == false) {return this;}logger.info("starting ...");// 通知各生命周期组件,各自start()pluginLifecycleComponents.forEach(LifecycleComponent::start);// 通知核心服务启动injector.getInstance(MappingUpdatedAction.class).setClient(client);injector.getInstance(IndicesService.class).start();injector.getInstance(IndicesClusterStateService.class).start();injector.getInstance(SnapshotsService.class).start();injector.getInstance(SnapshotShardsService.class).start();injector.getInstance(RepositoriesService.class).start();injector.getInstance(SearchService.class).start();injector.getInstance(FsHealthService.class).start();nodeService.getMonitorService().start();final ClusterService clusterService = injector.getInstance(ClusterService.class);final NodeConnectionsService nodeConnectionsService = injector.getInstance(NodeConnectionsService.class);nodeConnectionsService.start();clusterService.setNodeConnectionsService(nodeConnectionsService);injector.getInstance(GatewayService.class).start();Discovery discovery = injector.getInstance(Discovery.class);clusterService.getMasterService().setClusterStatePublisher(discovery::publish);// Start the transport service now so the publish address will be added to the local disco node in ClusterServiceTransportService transportService = injector.getInstance(TransportService.class);transportService.getTaskManager().setTaskResultsService(injector.getInstance(TaskResultsService.class));transportService.getTaskManager().setTaskCancellationService(new TaskCancellationService(transportService));transportService.start();assert localNodeFactory.getNode() != null;assert transportService.getLocalNode().equals(localNodeFactory.getNode()): "transportService has a different local node than the factory provided";injector.getInstance(PeerRecoverySourceService.class).start();// Load (and maybe upgrade) the metadata stored on diskfinal GatewayMetaState gatewayMetaState = injector.getInstance(GatewayMetaState.class);gatewayMetaState.start(settings(), transportService, clusterService, injector.getInstance(MetaStateService.class),injector.getInstance(MetadataIndexUpgradeService.class), injector.getInstance(MetadataUpgrader.class),injector.getInstance(PersistedClusterStateService.class));if (Assertions.ENABLED) {try {assert injector.getInstance(MetaStateService.class).loadFullState().v1().isEmpty();final NodeMetadata nodeMetadata = NodeMetadata.FORMAT.loadLatestState(logger, NamedXContentRegistry.EMPTY,nodeEnvironment.nodeDataPaths());assert nodeMetadata != null;assert nodeMetadata.nodeVersion().equals(Version.CURRENT);assert nodeMetadata.nodeId().equals(localNodeFactory.getNode().getId());} catch (IOException e) {assert false : e;}}// we load the global state here (the persistent part of the cluster state stored on disk) to// pass it to the bootstrap checks to allow plugins to enforce certain preconditions based on the recovered state.final Metadata onDiskMetadata = gatewayMetaState.getPersistedState().getLastAcceptedState().metadata();assert onDiskMetadata != null : "metadata is null but shouldn't"; // this is never nullvalidateNodeBeforeAcceptingRequests(new BootstrapContext(environment, onDiskMetadata), transportService.boundAddress(),pluginsService.filterPlugins(Plugin.class).stream().flatMap(p -> p.getBootstrapChecks().stream()).collect(Collectors.toList()));clusterService.addStateApplier(transportService.getTaskManager());// start after transport service so the local disco is knowndiscovery.start(); // start before cluster service so that it can set initial state on ClusterApplierServiceclusterService.start();assert clusterService.localNode().equals(localNodeFactory.getNode()): "clusterService has a different local node than the factory provided";transportService.acceptIncomingRequests();discovery.startInitialJoin();final TimeValue initialStateTimeout = INITIAL_STATE_TIMEOUT_SETTING.get(settings());configureNodeAndClusterIdStateListener(clusterService);if (initialStateTimeout.millis() > 0) {final ThreadPool thread = injector.getInstance(ThreadPool.class);ClusterState clusterState = clusterService.state();ClusterStateObserver observer =new ClusterStateObserver(clusterState, clusterService, null, logger, thread.getThreadContext());if (clusterState.nodes().getMasterNodeId() == null) {logger.debug("waiting to join the cluster. timeout [{}]", initialStateTimeout);final CountDownLatch latch = new CountDownLatch(1);observer.waitForNextChange(new ClusterStateObserver.Listener() {@Overridepublic void onNewClusterState(ClusterState state) { latch.countDown(); }@Overridepublic void onClusterServiceClose() {latch.countDown();}@Overridepublic void onTimeout(TimeValue timeout) {logger.warn("timed out while waiting for initial discovery state - timeout: {}",initialStateTimeout);latch.countDown();}}, state -> state.nodes().getMasterNodeId() != null, initialStateTimeout);try {latch.await();} catch (InterruptedException e) {throw new ElasticsearchTimeoutException("Interrupted while waiting for initial discovery state");}}}// http 服务启动,即打开socket端口,接受 rest 请求了injector.getInstance(HttpServerTransport.class).start();if (WRITE_PORTS_FILE_SETTING.get(settings())) {TransportService transport = injector.getInstance(TransportService.class);writePortsFile("transport", transport.boundAddress());HttpServerTransport http = injector.getInstance(HttpServerTransport.class);writePortsFile("http", http.boundAddress());}logger.info("started");// 通知 ClusterPlugin 组件节点启动完成pluginsService.filterPlugins(ClusterPlugin.class).forEach(ClusterPlugin::onNodeStarted);return this;}
Ok, 以上就是ES节点的启动过程了。node作为一个调度节点或者框架,协调各个服务进行启动,从而使整个ES有序工作起来。这很合理,也只能这样实现。但我们可以通过这个框架,瞧出具体涉及哪些服务启动,以及其先后顺序如何。这也达到了为我们解开谜团的作用,就够了。
3. 一点闲话
本文讨论的是ES的启动流程。一般地,一个系统的启动流程都大概是这样:进入入口main(), 命令参数校验,创建必要数据结构,加载必要模块配置,打开端口,启动循环服务;对于非分布式的应用,往往不会很复杂,甚至一致无持久化特性的系统,更是无所顾忌。而对于需要进行数据恢复,集群协调的应用,则往往会难上许多。
而对于一些比较核心或者细节的东西,我们在做快速浏览时又往往是不关注的。这又需要更多的时间与精力去领会。
虽然只是走马观花看启动,但毕竟看到了各种该该出场的组件,这也将以后更单独理解组件必然有帮助。
欲知后事如何,且听下回分解。

腾讯、阿里、滴滴后台面试题汇总总结 — (含答案)
面试:史上最全多线程面试题 !
最新阿里内推Java后端面试题
JVM难学?那是因为你没认真看完这篇文章

关注作者微信公众号 —《JAVA烂猪皮》
了解更多java后端架构知识以及最新面试宝典


看完本文记得给作者点赞+在看哦~~~大家的支持,是作者源源不断出文的动力
作者:等你归去来
出处:https://www.cnblogs.com/yougewe/p/14495459.html
