Zookeeper(一):单机模式的启动逻辑
zk用处如此之多,以至于每个地方都要你理解zk原理!
请按如下操作姿势打开:
1. 打开zk的git仓库地址: https://github.com/apache/zookeeper , 确认过眼神,它就是你要找有人!
2. 下载源码到本地,下载 ant 工具到本地,(如果还没下载的话: http://ant.apache.org/)!
3. 运行 ant 脚本,使生成需要的环境:ant eclipse !(可能会花费几分钟的时间)
4. idea 打开源码,导入必要包!
5. 运行源码main() 方法,启动 zk服务端,注意添加运行时配置文件!
6. 分析源码,学习中!
首先,从启动脚本入口:(zkServer.sh)
#!/usr.bin.env bash# Licensed to the Apache Software Foundation (ASF) under one or more# contributor license agreements. See the NOTICE file distributed with# this work for additional information regarding copyright ownership.# The ASF licenses this file to You under the Apache License, Version 2.0# (the "License"); you may not use this file except in compliance with# the License. You may obtain a copy of the License at## http://www.apache.org.licenses.LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.## If this scripted is run out of /usr.bin or some other system bin directory# it should be linked to and not copied. Things like java jar files are found# relative to the canonical path of this script.## use POSTIX interface, symlink is followed automaticallyZOOBIN="${BASH_SOURCE-$0}"ZOOBIN="$(dirname "${ZOOBIN}")"ZOOBINDIR="$(cd "${ZOOBIN}"; pwd)"if [ -e "$ZOOBIN/../libexec.zkEnv.sh" ]; then. "$ZOOBINDIR/../libexec.zkEnv.sh"else. "$ZOOBINDIR.zkEnv.sh"fi# See the following page for extensive details on setting# up the JVM to accept JMX remote management:# http://java.sun.com.javase/6/docs.technotes.guides.management.agent.html# by default we allow local JMX connectionsif [ "x$JMXLOCALONLY" = "x" ]thenJMXLOCALONLY=falsefiif [ "x$JMXDISABLE" = "x" ] || [ "$JMXDISABLE" = 'false' ]thenecho "ZooKeeper JMX enabled by default" >&2if [ "x$JMXPORT" = "x" ]then# for some reason these two options are necessary on jdk6 on Ubuntu# accord to the docs they are not necessary, but otw jconsole cannot# do a local attachZOOMAIN="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=$JMXLOCALONLY org.apache.zookeeper.server.quorum.QuorumPeerMain"elseif [ "x$JMXAUTH" = "x" ]thenJMXAUTH=falsefiif [ "x$JMXSSL" = "x" ]thenJMXSSL=falsefiif [ "x$JMXLOG4J" = "x" ]thenJMXLOG4J=truefiecho "ZooKeeper remote JMX Port set to $JMXPORT" >&2echo "ZooKeeper remote JMX authenticate set to $JMXAUTH" >&2echo "ZooKeeper remote JMX ssl set to $JMXSSL" >&2echo "ZooKeeper remote JMX log4j set to $JMXLOG4J" >&2ZOOMAIN="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=$JMXPORT -Dcom.sun.management.jmxremote.authenticate=$JMXAUTH -Dcom.sun.management.jmxremote.ssl=$JMXSSL -Dzookeeper.jmx.log4j.disable=$JMXLOG4J org.apache.zookeeper.server.quorum.QuorumPeerMain"fielseecho "JMX disabled by user request" >&2ZOOMAIN="org.apache.zookeeper.server.quorum.QuorumPeerMain"fiif [ "x$SERVER_JVMFLAGS" != "x" ]thenJVMFLAGS="$SERVER_JVMFLAGS $JVMFLAGS"fiif [ "x$2" != "x" ]thenZOOCFG="$ZOOCFGDIR/$2"fi# if we give a more complicated path to the config, don't screw around in $ZOOCFGDIRif [ "x$(dirname "$ZOOCFG")" != "x$ZOOCFGDIR" ]thenZOOCFG="$2"fiif $cygwinthenZOOCFG=`cygpath -wp "$ZOOCFG"`# cygwin has a "kill" in the shell itself, gets confusedKILL=/bin.killelseKILL=killfiecho "Using config: $ZOOCFG" >&2case "$OSTYPE" in*solaris*)GREP=/usr.xpg4/bin.grep;;*)GREP=grep;;esacif [ -z "$ZOOPIDFILE" ]; thenZOO_DATADIR="$($GREP "^[[:space:]]*dataDir" "$ZOOCFG" | sed -e 's/.*=//')"if [ ! -d "$ZOO_DATADIR" ]; thenmkdir -p "$ZOO_DATADIR"fiZOOPIDFILE="$ZOO_DATADIR.zookeeper_server.pid"else# ensure it exists, otw stop will failmkdir -p "$(dirname "$ZOOPIDFILE")"fiif [ ! -w "$ZOO_LOG_DIR" ] ; thenmkdir -p "$ZOO_LOG_DIR"fi_ZOO_DAEMON_OUT="$ZOO_LOG_DIR.zookeeper.out"case $1 instart)echo -n "Starting zookeeper ... "if [ -f "$ZOOPIDFILE" ]; thenif kill -0 `cat "$ZOOPIDFILE"` > /dev.null 2>&1; thenecho $command already running as process `cat "$ZOOPIDFILE"`.exit 0fifinohup "$JAVA" "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" "-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}" \-cp "$CLASSPATH" $JVMFLAGS $ZOOMAIN "$ZOOCFG" > "$_ZOO_DAEMON_OUT" 2>&1 < /dev.null &if [ $? -eq 0 ]thencase "$OSTYPE" in*solaris*)/bin.echo "${!}\\c" > "$ZOOPIDFILE";;*)/bin.echo -n $! > "$ZOOPIDFILE";;esacif [ $? -eq 0 ];thensleep 1echo STARTEDelseecho FAILED TO WRITE PIDexit 1fielseecho SERVER DID NOT STARTexit 1fi;;start-foreground)ZOO_CMD=(exec "$JAVA")if [ "${ZOO_NOEXEC}" != "" ]; thenZOO_CMD=("$JAVA")fi"${ZOO_CMD[@]}" "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" "-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}" \-cp "$CLASSPATH" $JVMFLAGS $ZOOMAIN "$ZOOCFG";;print-cmd)echo "\"$JAVA\" -Dzookeeper.log.dir=\"${ZOO_LOG_DIR}\" -Dzookeeper.root.logger=\"${ZOO_LOG4J_PROP}\" -cp \"$CLASSPATH\" $JVMFLAGS $ZOOMAIN \"$ZOOCFG\" > \"$_ZOO_DAEMON_OUT\" 2>&1 < /dev.null";;stop)echo -n "Stopping zookeeper ... "if [ ! -f "$ZOOPIDFILE" ]thenecho "no zookeeper to stop (could not find file $ZOOPIDFILE)"else$KILL -9 $(cat "$ZOOPIDFILE")rm "$ZOOPIDFILE"echo STOPPEDfiexit 0;;upgrade)shiftecho "upgrading the servers to 3.*""$JAVA" "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" "-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}" \-cp "$CLASSPATH" $JVMFLAGS org.apache.zookeeper.server.upgrade.UpgradeMain ${@}echo "Upgrading ... ";;restart)shift"$0" stop ${@}sleep 3"$0" start ${@};;status)# -q is necessary on some versions of linux where nc returns too quickly, and no stat result is outputclientPortAddress=`$GREP "^[[:space:]]*clientPortAddress[^[:alpha:]]" "$ZOOCFG" | sed -e 's/.*=//'`if ! [ $clientPortAddress ]thenclientPortAddress="localhost"ficlientPort=`$GREP "^[[:space:]]*clientPort[^[:alpha:]]" "$ZOOCFG" | sed -e 's/.*=//'`STAT=`"$JAVA" "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" "-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}" \-cp "$CLASSPATH" $JVMFLAGS org.apache.zookeeper.client.FourLetterWordMain \$clientPortAddress $clientPort srvr 2> /dev.null \| $GREP Mode`if [ "x$STAT" = "x" ]thenecho "Error contacting service. It is probably not running."exit 1elseecho $STATexit 0fi;;*)echo "Usage: $0 {start|start-foreground|stop|restart|status|upgrade|print-cmd}" >&2esac
主要看下启动的脚本:
nohup java -Dzookeeper.log.dir=. -Dzookeeper.root.logger=INFO,CONSOLE -cp /etc.zookeeper-3.4.13/bin/../build.classes:/etc.zookeeper-3.4.13/bin/../build.lib/*.jar:/etc.zookeeper-3.4.13/bin/../lib.slf4j-log4j12-1.7.25.jar:/etc.zookeeper-3.4.13/bin/../lib.slf4j-api-1.7.25.jar:/etc.zookeeper-3.4.13/bin/../lib.netty-3.10.6.Final.jar:/etc.zookeeper-3.4.13/bin/../lib.log4j-1.2.17.jar:/etc.zookeeper-3.4.13/bin/../lib.jline-0.9.94.jar:/etc.zookeeper-3.4.13/bin/../lib.audience-annotations-0.5.0.jar:/etc.zookeeper-3.4.13/bin/../zookeeper-3.4.13.jar:/etc.zookeeper-3.4.13/bin/../src.java.lib/*.jar:/etc.zookeeper-3.4.13/bin/../conf: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=false org.apache.zookeeper.server.quorum.QuorumPeerMain /opt.zookeeper.zoo1.cfg > ./zookeeper.out 2>&1 < /dev.null &可以看到, org.apache.zookeeper.server.quorum.QuorumPeerMain 是启动类, 因此找到这个方法:
/*** To start the replicated server specify the configuration file name on* the command line.* @param args path to the configfile*/public static void main(String[] args) {QuorumPeerMain main = new QuorumPeerMain();try {main.initializeAndRun(args);} catch (IllegalArgumentException e) {LOG.error("Invalid arguments, exiting abnormally", e);LOG.info(USAGE);System.err.println(USAGE);System.exit(ExitCode.INVALID_INVOCATION.getValue());} catch (ConfigException e) {LOG.error("Invalid config, exiting abnormally", e);System.err.println("Invalid config, exiting abnormally");System.exit(ExitCode.INVALID_INVOCATION.getValue());} catch (DatadirException e) {LOG.error("Unable to access datadir, exiting abnormally", e);System.err.println("Unable to access datadir, exiting abnormally");System.exit(ExitCode.UNABLE_TO_ACCESS_DATADIR.getValue());} catch (AdminServerException e) {LOG.error("Unable to start AdminServer, exiting abnormally", e);System.err.println("Unable to start AdminServer, exiting abnormally");System.exit(ExitCode.ERROR_STARTING_ADMIN_SERVER.getValue());} catch (Exception e) {LOG.error("Unexpected exception, exiting abnormally", e);System.exit(ExitCode.UNEXPECTED_ERROR.getValue());}LOG.info("Exiting normally");System.exit(ExitCode.EXECUTION_FINISHED.getValue());}protected void initializeAndRun(String[] args)throws ConfigException, IOException, AdminServerException{QuorumPeerConfig config = new QuorumPeerConfig();if (args.length == 1) {config.parse(args[0]);}// Start and schedule the the purge taskDatadirCleanupManager purgeMgr = new DatadirCleanupManager(config.getDataDir(), config.getDataLogDir(), config.getSnapRetainCount(), config.getPurgeInterval());// 启动后台清理线程// org.apache.zookeeper.server.DatadirCleanupManagerpurgeMgr.start();if (args.length == 1 && config.isDistributed()) {// 如果是集群模式,则走runFromConfig(config);} else {// 单机模式运行,咱们先看单机模式,后续再深入到集群模式吧LOG.warn("Either no config or no quorum defined in config, running "+ " in standalone mode");// there is only server in the quorum -- run as standaloneZooKeeperServerMain.main(args);}}
以上过程, 主要就是进行初始化,然后捕获各种异常!包括对种应用参数,配置的异常检测!
其中,对单机模式的处理,则是直接转发给了 ZooKeeperServerMain.main() 处理。解析配置文件的过程大致如下
对单机模式的运行,直接调用 ZooKeeperServerMain, 即可!
// org.apache.zookeeper.server.ZooKeeperServerMain/** Start up the ZooKeeper server.** @param args the configfile or the port datadir [ticktime]*/public static void main(String[] args) {ZooKeeperServerMain main = new ZooKeeperServerMain();try {main.initializeAndRun(args);} catch (IllegalArgumentException e) {LOG.error("Invalid arguments, exiting abnormally", e);LOG.info(USAGE);System.err.println(USAGE);System.exit(ExitCode.INVALID_INVOCATION.getValue());} catch (ConfigException e) {LOG.error("Invalid config, exiting abnormally", e);System.err.println("Invalid config, exiting abnormally");System.exit(ExitCode.INVALID_INVOCATION.getValue());} catch (DatadirException e) {LOG.error("Unable to access datadir, exiting abnormally", e);System.err.println("Unable to access datadir, exiting abnormally");System.exit(ExitCode.UNABLE_TO_ACCESS_DATADIR.getValue());} catch (AdminServerException e) {LOG.error("Unable to start AdminServer, exiting abnormally", e);System.err.println("Unable to start AdminServer, exiting abnormally");System.exit(ExitCode.ERROR_STARTING_ADMIN_SERVER.getValue());} catch (Exception e) {LOG.error("Unexpected exception, exiting abnormally", e);System.exit(ExitCode.UNEXPECTED_ERROR.getValue());}LOG.info("Exiting normally");System.exit(ExitCode.EXECUTION_FINISHED.getValue());}// 单机模式初始化方法protected void initializeAndRun(String[] args)throws ConfigException, IOException, AdminServerException{try {ManagedUtil.registerLog4jMBeans();} catch (JMException e) {LOG.warn("Unable to register log4j JMX control", e);}ServerConfig config = new ServerConfig();if (args.length == 1) {config.parse(args[0]);} else {config.parse(args);}// 主要是看这个运行过程runFromConfig(config);}
从上面单机和集群模式的启动框架来看,大概流程都是一样的,都是先把配置文件解析出来,然后再启动自己的逻辑。另外,在集群方法中解析出的参数,需要的单机模式重新再解析一次,以做启动模式的兼容性!
集群使用的是 QuorumPeerConfig 解析,而单机则是使用 ServerConfig 来解析!
不过单机模式的配置解析仍然委托于集群方式的解析,如下: ServerConfig.parse();
// org.apache.zookeeper.server.ServerConfig/*** Parse a ZooKeeper configuration file* @param path the patch of the configuration file* @return ServerConfig configured wrt arguments* @throws ConfigException error processing configuration*/public void parse(String path) throws ConfigException {// 直接交由 QuorumPeerConfig 解析QuorumPeerConfig config = new QuorumPeerConfig();config.parse(path);// let qpconfig parse the file and then pull the stuff we are// interested in// 然后读取必要的参数即可, 这即是单机和集群模式配置的差别readFrom(config);}/*** Read attributes from a QuorumPeerConfig.* @param config*/public void readFrom(QuorumPeerConfig config) {clientPortAddress = config.getClientPortAddress();secureClientPortAddress = config.getSecureClientPortAddress();dataDir = config.getDataDir();dataLogDir = config.getDataLogDir();tickTime = config.getTickTime();maxClientCnxns = config.getMaxClientCnxns();minSessionTimeout = config.getMinSessionTimeout();maxSessionTimeout = config.getMaxSessionTimeout();metricsProviderClassName = config.getMetricsProviderClassName();metricsProviderConfiguration = config.getMetricsProviderConfiguration();}
所以,咱们还是简单看下集群下配置文件都是怎么解析的吧!
// org.apache.zookeeper.server.quorum.QuorumPeerConfig/*** Parse a ZooKeeper configuration file* @param path the patch of the configuration file* @throws ConfigException error processing configuration*/public void parse(String path) throws ConfigException {LOG.info("Reading configuration from: " + path);try {// 使用建造者模式,生成配置文件对象File configFile = (new VerifyingFileFactory.Builder(LOG).warnForRelativePath().failForNonExistingPath().build()).create(path);Properties cfg = new Properties();FileInputStream in = new FileInputStream(configFile);try {cfg.load(in);configFileStr = path;} finally {in.close();}// 解析配置属性到各字段域中,从这里我们也可以看到 zk 支持的所有配置项,如下文所示parseProperties(cfg);} catch (IOException e) {throw new ConfigException("Error processing " + path, e);} catch (IllegalArgumentException e) {throw new ConfigException("Error processing " + path, e);}if (dynamicConfigFileStr!=null) {try {// 对集群模式,则初始化集群配置Properties dynamicCfg = new Properties();FileInputStream inConfig = new FileInputStream(dynamicConfigFileStr);try {dynamicCfg.load(inConfig);if (dynamicCfg.getProperty("version") != null) {throw new ConfigException("dynamic file shouldn't have version inside");}String version = getVersionFromFilename(dynamicConfigFileStr);// If there isn't any version associated with the filename,// the default version is 0.if (version != null) {dynamicCfg.setProperty("version", version);}} finally {inConfig.close();}setupQuorumPeerConfig(dynamicCfg, false);} catch (IOException e) {throw new ConfigException("Error processing " + dynamicConfigFileStr, e);} catch (IllegalArgumentException e) {throw new ConfigException("Error processing " + dynamicConfigFileStr, e);}File nextDynamicConfigFile = new File(configFileStr + nextDynamicConfigFileSuffix);if (nextDynamicConfigFile.exists()) {try {Properties dynamicConfigNextCfg = new Properties();FileInputStream inConfigNext = new FileInputStream(nextDynamicConfigFile);try {dynamicConfigNextCfg.load(inConfigNext);} finally {inConfigNext.close();}boolean isHierarchical = false;for (EntryString key = entry.getKey().toString().trim();if (key.startsWith("group") || key.startsWith("weight")) {isHierarchical = true;break;}}lastSeenQuorumVerifier = createQuorumVerifier(dynamicConfigNextCfg, isHierarchical);} catch (IOException e) {LOG.warn("NextQuorumVerifier is initiated to null");}}}}// 对各属性的解析,从这里我们可以看到zk到底支持几个属性配置/*** Parse config from a Properties.* @param zkProp Properties to parse from.* @throws IOException* @throws ConfigException*/public void parseProperties(Properties zkProp)throws IOException, ConfigException {int clientPort = 0;int secureClientPort = 0;String clientPortAddress = null;String secureClientPortAddress = null;VerifyingFileFactory vff = new VerifyingFileFactory.Builder(LOG).warnForRelativePath().build();for (EntryString key = entry.getKey().toString().trim();String value = entry.getValue().toString().trim();if (key.equals("dataDir")) {dataDir = vff.create(value);} else if (key.equals("dataLogDir")) {dataLogDir = vff.create(value);} else if (key.equals("clientPort")) {clientPort = Integer.parseInt(value);} else if (key.equals("localSessionsEnabled")) {localSessionsEnabled = Boolean.parseBoolean(value);} else if (key.equals("localSessionsUpgradingEnabled")) {localSessionsUpgradingEnabled = Boolean.parseBoolean(value);} else if (key.equals("clientPortAddress")) {clientPortAddress = value.trim();} else if (key.equals("secureClientPort")) {secureClientPort = Integer.parseInt(value);} else if (key.equals("secureClientPortAddress")){secureClientPortAddress = value.trim();} else if (key.equals("tickTime")) {tickTime = Integer.parseInt(value);} else if (key.equals("maxClientCnxns")) {maxClientCnxns = Integer.parseInt(value);} else if (key.equals("minSessionTimeout")) {minSessionTimeout = Integer.parseInt(value);} else if (key.equals("maxSessionTimeout")) {maxSessionTimeout = Integer.parseInt(value);} else if (key.equals("initLimit")) {initLimit = Integer.parseInt(value);} else if (key.equals("syncLimit")) {syncLimit = Integer.parseInt(value);} else if (key.equals("electionAlg")) {electionAlg = Integer.parseInt(value);if (electionAlg != 1 && electionAlg != 2 && electionAlg != 3) {throw new ConfigException("Invalid electionAlg value. Only 1, 2, 3 are supported.");}} else if (key.equals("quorumListenOnAllIPs")) {quorumListenOnAllIPs = Boolean.parseBoolean(value);} else if (key.equals("peerType")) {if (value.toLowerCase().equals("observer")) {peerType = LearnerType.OBSERVER;} else if (value.toLowerCase().equals("participant")) {peerType = LearnerType.PARTICIPANT;} else{throw new ConfigException("Unrecognised peertype: " + value);}} else if (key.equals( "syncEnabled" )) {syncEnabled = Boolean.parseBoolean(value);} else if (key.equals("dynamicConfigFile")){dynamicConfigFileStr = value;} else if (key.equals("autopurge.snapRetainCount")) {snapRetainCount = Integer.parseInt(value);} else if (key.equals("autopurge.purgeInterval")) {purgeInterval = Integer.parseInt(value);} else if (key.equals("standaloneEnabled")) {if (value.toLowerCase().equals("true")) {setStandaloneEnabled(true);} else if (value.toLowerCase().equals("false")) {setStandaloneEnabled(false);} else {throw new ConfigException("Invalid option " + value + " for standalone mode. Choose 'true' or 'false.'");}} else if (key.equals("reconfigEnabled")) {if (value.toLowerCase().equals("true")) {setReconfigEnabled(true);} else if (value.toLowerCase().equals("false")) {setReconfigEnabled(false);} else {throw new ConfigException("Invalid option " + value + " for reconfigEnabled flag. Choose 'true' or 'false.'");}} else if (key.equals("sslQuorum")){sslQuorum = Boolean.parseBoolean(value);// TODO: UnifiedServerSocket is currently buggy, will be fixed when @ivmaykov's PRs are merged. Disable port unification until then.// } else if (key.equals("portUnification")){// shouldUsePortUnification = Boolean.parseBoolean(value);} else if ((key.startsWith("server.") || key.startsWith("group") || key.startsWith("weight")) && zkProp.containsKey("dynamicConfigFile")) {throw new ConfigException("parameter: " + key + " must be in a separate dynamic config file");} else if (key.equals(QuorumAuth.QUORUM_SASL_AUTH_ENABLED)) {quorumEnableSasl = Boolean.parseBoolean(value);} else if (key.equals(QuorumAuth.QUORUM_SERVER_SASL_AUTH_REQUIRED)) {quorumServerRequireSasl = Boolean.parseBoolean(value);} else if (key.equals(QuorumAuth.QUORUM_LEARNER_SASL_AUTH_REQUIRED)) {quorumLearnerRequireSasl = Boolean.parseBoolean(value);} else if (key.equals(QuorumAuth.QUORUM_LEARNER_SASL_LOGIN_CONTEXT)) {quorumLearnerLoginContext = value;} else if (key.equals(QuorumAuth.QUORUM_SERVER_SASL_LOGIN_CONTEXT)) {quorumServerLoginContext = value;} else if (key.equals(QuorumAuth.QUORUM_KERBEROS_SERVICE_PRINCIPAL)) {quorumServicePrincipal = value;} else if (key.equals("quorum.cnxn.threads.size")) {quorumCnxnThreadsSize = Integer.parseInt(value);} else if (key.equals("metricsProvider.className")) {metricsProviderClassName = value;} else if (key.startsWith("metricsProvider.")) {String keyForMetricsProvider = key.substring(16);metricsProviderConfiguration.put(keyForMetricsProvider, value);} else {System.setProperty("zookeeper." + key, value);}}if (!quorumEnableSasl && quorumServerRequireSasl) {throw new IllegalArgumentException(QuorumAuth.QUORUM_SASL_AUTH_ENABLED+ " is disabled, so cannot enable "+ QuorumAuth.QUORUM_SERVER_SASL_AUTH_REQUIRED);}if (!quorumEnableSasl && quorumLearnerRequireSasl) {throw new IllegalArgumentException(QuorumAuth.QUORUM_SASL_AUTH_ENABLED+ " is disabled, so cannot enable "+ QuorumAuth.QUORUM_LEARNER_SASL_AUTH_REQUIRED);}// If quorumpeer learner is not auth enabled then self won't be able to// join quorum. So this condition is ensuring that the quorumpeer learner// is also auth enabled while enabling quorum server require sasl.if (!quorumLearnerRequireSasl && quorumServerRequireSasl) {throw new IllegalArgumentException(QuorumAuth.QUORUM_LEARNER_SASL_AUTH_REQUIRED+ " is disabled, so cannot enable "+ QuorumAuth.QUORUM_SERVER_SASL_AUTH_REQUIRED);}// Reset to MIN_SNAP_RETAIN_COUNT if invalid (less than 3)// PurgeTxnLog.purge(File, File, int) will not allow to purge less// than 3.if (snapRetainCount < MIN_SNAP_RETAIN_COUNT) {LOG.warn("Invalid autopurge.snapRetainCount: " + snapRetainCount+ ". Defaulting to " + MIN_SNAP_RETAIN_COUNT);snapRetainCount = MIN_SNAP_RETAIN_COUNT;}if (dataDir == null) {throw new IllegalArgumentException("dataDir is not set");}if (dataLogDir == null) {dataLogDir = dataDir;}if (clientPort == 0) {LOG.info("clientPort is not set");if (clientPortAddress != null) {throw new IllegalArgumentException("clientPortAddress is set but clientPort is not set");}} else if (clientPortAddress != null) {this.clientPortAddress = new InetSocketAddress(InetAddress.getByName(clientPortAddress), clientPort);LOG.info("clientPortAddress is {}", formatInetAddr(this.clientPortAddress));} else {this.clientPortAddress = new InetSocketAddress(clientPort);LOG.info("clientPortAddress is {}", formatInetAddr(this.clientPortAddress));}if (secureClientPort == 0) {LOG.info("secureClientPort is not set");if (secureClientPortAddress != null) {throw new IllegalArgumentException("secureClientPortAddress is set but secureClientPort is not set");}} else if (secureClientPortAddress != null) {this.secureClientPortAddress = new InetSocketAddress(InetAddress.getByName(secureClientPortAddress), secureClientPort);LOG.info("secureClientPortAddress is {}", formatInetAddr(this.secureClientPortAddress));} else {this.secureClientPortAddress = new InetSocketAddress(secureClientPort);LOG.info("secureClientPortAddress is {}", formatInetAddr(this.secureClientPortAddress));}if (this.secureClientPortAddress != null) {configureSSLAuth();}if (tickTime == 0) {throw new IllegalArgumentException("tickTime is not set");}minSessionTimeout = minSessionTimeout == -1 ? tickTime * 2 : minSessionTimeout;maxSessionTimeout = maxSessionTimeout == -1 ? tickTime * 20 : maxSessionTimeout;if (minSessionTimeout > maxSessionTimeout) {throw new IllegalArgumentException("minSessionTimeout must not be larger than maxSessionTimeout");}LOG.info("metricsProvider.className is {}", metricsProviderClassName);try {Class.forName(metricsProviderClassName, false, Thread.currentThread().getContextClassLoader());} catch (ClassNotFoundException error) {throw new IllegalArgumentException("metrics provider class was not found", error);}// backward compatibility - dynamic configuration in the same file as// static configuration params see writeDynamicConfig()if (dynamicConfigFileStr == null) {// 初始化集群配置, 比如要求配置格式为 server.1=172.19.2.2:2181:3181, 否则抛出异常,// 其中 2181 为提供服务使用的端口, 3181 为选举使用的端口号setupQuorumPeerConfig(zkProp, true);if (isDistributed() && isReconfigEnabled()) {// we don't backup static config for standalone mode.// we also don't backup if reconfig feature is disabled.backupOldConfig();}}}/*** Parse dynamic configuration file and return* quorumVerifier for new configuration.* @param dynamicConfigProp Properties to parse from.* @throws IOException* @throws ConfigException*/public static QuorumVerifier parseDynamicConfig(Properties dynamicConfigProp, int eAlg, boolean warnings,boolean configBackwardCompatibilityMode) throws IOException, ConfigException {boolean isHierarchical = false;for (EntryString key = entry.getKey().toString().trim();if (key.startsWith("group") || key.startsWith("weight")) {isHierarchical = true;} else if (!configBackwardCompatibilityMode && !key.startsWith("server.") && !key.equals("version")){LOG.info(dynamicConfigProp.toString());throw new ConfigException("Unrecognised parameter: " + key);}}QuorumVerifier qv = createQuorumVerifier(dynamicConfigProp, isHierarchical);int numParticipators = qv.getVotingMembers().size();int numObservers = qv.getObservingMembers().size();if (numParticipators == 0) {if (!standaloneEnabled) {throw new IllegalArgumentException("standaloneEnabled = false then " +"number of participants should be >0");}if (numObservers > 0) {throw new IllegalArgumentException("Observers w.o participants is an invalid configuration");}} else if (numParticipators == 1 && standaloneEnabled) {// HBase currently adds a single server line to the config, for// b.w compatibility reasons we need to keep this here. If standaloneEnabled// is true, the QuorumPeerMain script will create a standalone server instead// of a quorum configuration// 如果只有一个server, 但是又要成为 集群选举模式,则是错误的配置LOG.error("Invalid configuration, only one server specified (ignoring)");if (numObservers > 0) {throw new IllegalArgumentException("Observers w.o quorum is an invalid configuration");}} else {if (warnings) {if (numParticipators <= 2) {LOG.warn("No server failure will be tolerated. " +"You need at least 3 servers.");} else if (numParticipators % 2 == 0) {LOG.warn("Non-optimial configuration, consider an odd number of servers.");}}for (QuorumServer s : qv.getVotingMembers().values()) {if (s.electionAddr == null)throw new IllegalArgumentException("Missing election port for server: " + s.id);}}return qv;}private static QuorumVerifier createQuorumVerifier(Properties dynamicConfigProp, boolean isHierarchical) throws ConfigException{if(isHierarchical){return new QuorumHierarchical(dynamicConfigProp);} else {/** The default QuorumVerifier is QuorumMaj*///LOG.info("Defaulting to majority quorums");return new QuorumMaj(dynamicConfigProp);}}public QuorumMaj(Properties props) throws ConfigException {for (EntryString key = entry.getKey().toString();String value = entry.getValue().toString();if (key.startsWith("server.")) {int dot = key.indexOf('.');long sid = Long.parseLong(key.substring(dot + 1));QuorumServer qs = new QuorumServer(sid, value);allMembers.put(Long.valueOf(sid), qs);if (qs.type == LearnerType.PARTICIPANT)// 把自己加入投票者名单中votingMembers.put(Long.valueOf(sid), qs);else {observingMembers.put(Long.valueOf(sid), qs);}} else if (key.equals("version")) {version = Long.parseLong(value, 16);}}// 最后,计算出半数的投票人数,超过半数后,选举将成立half = votingMembers.size() / 2;}// org.apache.zookeeper.server.quorum.QuorumPeer$QuorumServerpublic QuorumServer(long sid, String addressStr) throws ConfigException {// LOG.warn("sid = " + sid + " addressStr = " + addressStr);this.id = sid;String serverClientParts[] = addressStr.split(";");String serverParts[] = ConfigUtils.getHostAndPort(serverClientParts[0]);if ((serverClientParts.length > 2) || (serverParts.length < 3)|| (serverParts.length > 4)) {throw new ConfigException(addressStr + wrongFormat);}if (serverClientParts.length == 2) {//LOG.warn("ClientParts: " + serverClientParts[1]);String clientParts[] = ConfigUtils.getHostAndPort(serverClientParts[1]);if (clientParts.length > 2) {throw new ConfigException(addressStr + wrongFormat);}// is client_config a host:port or just a porthostname = (clientParts.length == 2) ? clientParts[0] : "0.0.0.0";try {clientAddr = new InetSocketAddress(hostname,Integer.parseInt(clientParts[clientParts.length - 1]));//LOG.warn("Set clientAddr to " + clientAddr);} catch (NumberFormatException e) {throw new ConfigException("Address unresolved: " + hostname + ":" + clientParts[clientParts.length - 1]);}}// server_config should be either host:port:port or host:port:port:typetry {addr = new InetSocketAddress(serverParts[0],Integer.parseInt(serverParts[1]));} catch (NumberFormatException e) {throw new ConfigException("Address unresolved: " + serverParts[0] + ":" + serverParts[1]);}try {electionAddr = new InetSocketAddress(serverParts[0],Integer.parseInt(serverParts[2]));} catch (NumberFormatException e) {throw new ConfigException("Address unresolved: " + serverParts[0] + ":" + serverParts[2]);}if(addr.getPort() == electionAddr.getPort()) {throw new ConfigException("Client and election port must be different! Please update the configuration file on server." + sid);}if (serverParts.length == 4) {setType(serverParts[3]);}this.hostname = serverParts[0];setMyAddrs();}private void setMyAddrs() {this.myAddrs = new ArrayList(); this.myAddrs.add(this.addr);this.myAddrs.add(this.clientAddr);this.myAddrs.add(this.electionAddr);// 把类型于 127.0.0.1 这样的特殊地址排除this.myAddrs = excludedSpecialAddresses(this.myAddrs);}// org.apache.zookeeper.server.quorum.QuorumPeerConfigvoid setupQuorumPeerConfig(Properties prop, boolean configBackwardCompatibilityMode)throws IOException, ConfigException {quorumVerifier = parseDynamicConfig(prop, electionAlg, true, configBackwardCompatibilityMode);// 检测 myid 文件是否存在,不存在则报错setupMyId();// 设置端口号setupClientPort();// 设置节点类型: PARTICIPANT, OBSERVERsetupPeerType();checkValidity();}
配置解析好后,就运行zk的server逻辑了!此处以单机模式为例进行分解!
// org.apache.zookeeper.server.ZooKeeperServerMain/*** Run from a ServerConfig.* @param config ServerConfig to use.* @throws IOException* @throws AdminServerException*/public void runFromConfig(ServerConfig config)throws IOException, AdminServerException {LOG.info("Starting server");FileTxnSnapLog txnLog = null;try {try {// 先启动度量程序metricsProvider = MetricsProviderBootstrap.startMetricsProvider(config.getMetricsProviderClassName(),config.getMetricsProviderConfiguration());} catch (MetricsProviderLifeCycleException error) {throw new IOException("Cannot boot MetricsProvider "+config.getMetricsProviderClassName(),error);}// Note that this thread isn't going to be doing anything else,// so rather than spawning another thread, we will just call// run() in this thread.// create a file logger url from the command line args// 创建各种日志文件,并校验有效性txnLog = new FileTxnSnapLog(config.dataLogDir, config.dataDir);// 创建 ZooKeeperServer, zk 正式启动, zkDb 设置为 nullfinal ZooKeeperServer zkServer = new ZooKeeperServer(txnLog,config.tickTime, config.minSessionTimeout, config.maxSessionTimeout, null);zkServer.setRootMetricsContext(metricsProvider.getRootContext());txnLog.setServerStats(zkServer.serverStats());// Registers shutdown handler which will be used to know the// server error or shutdown state changes.// 关闭的闭锁,注册关闭钩子final CountDownLatch shutdownLatch = new CountDownLatch(1);zkServer.registerServerShutdownHandler(new ZooKeeperServerShutdownHandler(shutdownLatch));// Start Admin server// 创建一个 jettyServer 的实例, 后台管理控制台adminServer = AdminServerFactory.createAdminServer();adminServer.setZooKeeperServer(zkServer);adminServer.start();boolean needStartZKServer = true;if (config.getClientPortAddress() != null) {// 创建 cnxn , 默认为 NIOServerCnxnFactorycnxnFactory = ServerCnxnFactory.createFactory();// 配置 server, 进行权限验证cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), false);cnxnFactory.startup(zkServer);// zkServer has been started. So we don't need to start it again in secureCnxnFactory.needStartZKServer = false;}if (config.getSecureClientPortAddress() != null) {secureCnxnFactory = ServerCnxnFactory.createFactory();secureCnxnFactory.configure(config.getSecureClientPortAddress(), config.getMaxClientCnxns(), true);secureCnxnFactory.startup(zkServer, needStartZKServer);}containerManager = new ContainerManager(zkServer.getZKDatabase(), zkServer.firstProcessor,Integer.getInteger("znode.container.checkIntervalMs", (int) TimeUnit.MINUTES.toMillis(1)),Integer.getInteger("znode.container.maxPerMinute", 10000));containerManager.start();// Watch status of ZooKeeper server. It will do a graceful shutdown// if the server is not running or hits an internal error.// 阻塞在此进行等等关闭信号, 至此,则 server 已完全启动shutdownLatch.await();// 优雅停机处理shutdown();if (cnxnFactory != null) {cnxnFactory.join();}if (secureCnxnFactory != null) {secureCnxnFactory.join();}if (zkServer.canShutdown()) {zkServer.shutdown(true);}} catch (InterruptedException e) {// warn, but generally this is okLOG.warn("Server interrupted", e);} finally {if (txnLog != null) {txnLog.close();}if (metricsProvider != null) {try {metricsProvider.stop();} catch (Throwable error) {LOG.warn("Error while stopping metrics", error);}}}}
如上,启动主要分作几步:
1. 开启度量程序,监控指标;
2. 使用 FileTxnSnapLog, 启动时从磁盘或其他地方恢复数据;
3. 创建 ZooKeeperServer, 待用;
4. 注册关闭钩子 ZooKeeperServerShutdownHandler ;
5. 启动后台管理程序 AdminServerFactory ;
6. 启动 zkServer;
7. 启动 ContainerManager;
8. 服务启动, 阻塞等待关闭信号;
如上,也算是基本的服务端程序的动作流程了!
附一个判断是否是集群的实现方式:
// 检测是否是集群模式public boolean isDistributed() {return quorumVerifier!=null && (!standaloneEnabled || quorumVerifier.getVotingMembers().size() > 1);}
接下来,我们先来看看ZkServer 的初始化!
// org.apache.zookeeper.server.ZooKeeperServer/*** Creates a ZooKeeperServer instance. It sets everything up, but doesn't* actually start listening for clients until run() is invoked.** @param dataDir the directory to put the data*/public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime,int minSessionTimeout, int maxSessionTimeout, ZKDatabase zkDb) {serverStats = new ServerStats(this);this.txnLogFactory = txnLogFactory;this.txnLogFactory.setServerStats(this.serverStats);// zkDb 为 zk 保存数据最重要的实例,但是此为 nullthis.zkDb = zkDb;this.tickTime = tickTime;// 默认最小超时: 4000, 最大超时: 40000;setMinSessionTimeout(minSessionTimeout);setMaxSessionTimeout(maxSessionTimeout);// 开启一个监听,主要作用是在listener = new ZooKeeperServerListenerImpl(this);LOG.info("Created server with tickTime " + tickTime+ " minSessionTimeout " + getMinSessionTimeout()+ " maxSessionTimeout " + getMaxSessionTimeout()+ " datadir " + txnLogFactory.getDataDir()+ " snapdir " + txnLogFactory.getSnapDir());}// 其中, server 的 state 为4种, 第种状态代表其所处的生命周期protected enum State {INITIAL, RUNNING, SHUTDOWN, ERROR}// org.apache.zookeeper.server.NIOServerCnxnFactory, 配置 server@Overridepublic void configure(InetSocketAddress addr, int maxcc, boolean secure) throws IOException {if (secure) {throw new UnsupportedOperationException("SSL isn't supported in NIOServerCnxn");}// 如果有必要的话,会进行登录权限验证, 有时候因为 zk 连接不通,则可能报 SASL 没有权限的错误configureSaslLogin();// 最大连接数,默认为 60maxClientCnxns = maxcc;sessionlessCnxnTimeout = Integer.getInteger(ZOOKEEPER_NIO_SESSIONLESS_CNXN_TIMEOUT, 10000);// We also use the sessionlessCnxnTimeout as expiring interval for// cnxnExpiryQueue. These don't need to be the same, but the expiring// interval passed into the ExpiryQueue() constructor below should be// less than or equal to the timeout.cnxnExpiryQueue =new ExpiryQueue(sessionlessCnxnTimeout); // 过期队列清理线程,后面细看expirerThread = new ConnectionExpirerThread();int numCores = Runtime.getRuntime().availableProcessors();// 32 cores sweet spot seems to be 4 selector threadsnumSelectorThreads = Integer.getInteger(ZOOKEEPER_NIO_NUM_SELECTOR_THREADS,Math.max((int) Math.sqrt((float) numCores/2), 1));if (numSelectorThreads < 1) {throw new IOException("numSelectorThreads must be at least 1");}// worker 线程数,默认为 cpu数的两倍numWorkerThreads = Integer.getInteger(ZOOKEEPER_NIO_NUM_WORKER_THREADS, 2 * numCores);workerShutdownTimeoutMS = Long.getLong(ZOOKEEPER_NIO_SHUTDOWN_TIMEOUT, 5000);LOG.info("Configuring NIO connection handler with "+ (sessionlessCnxnTimeout/1000) + "s sessionless connection"+ " timeout, " + numSelectorThreads + " selector thread(s), "+ (numWorkerThreads > 0 ? numWorkerThreads : "no")+ " worker threads, and "+ (directBufferBytes == 0 ? "gathered writes." :("" + (directBufferBytes/1024) + " kB direct buffers.")));// 将指定数量的selector线程添加到集合中,以便在 acceptThread 中开启for(int i=0; iselectorThreads.add(new SelectorThread(i));}// 打开一个 nio 连接, 判定socket 端口,至此,外部语法就可以进来了,但是,还没有任何的处理程序this.ss = ServerSocketChannel.open();ss.socket().setReuseAddress(true);LOG.info("binding to port " + addr);ss.socket().bind(addr);ss.configureBlocking(false);// 将连接信息传入内部类 AcceptThread 线程中,后缀的连接操作将直接由 AcceptThread 处理;acceptThread = new AcceptThread(ss, addr, selectorThreads);}// org.apache.zookeeper.server.NIOServerCnxnFactory$AcceptThreadpublic AcceptThread(ServerSocketChannel ss, InetSocketAddress addr,SetselectorThreads) throws IOException { super("NIOServerCxnFactory.AcceptThread:" + addr);this.acceptSocket = ss;this.acceptKey =acceptSocket.register(selector, SelectionKey.OP_ACCEPT);this.selectorThreads = Collections.unmodifiableList(new ArrayList(selectorThreads)); selectorIterator = this.selectorThreads.iterator();}// org.apache.zookeeper.server.ServerCnxnFactory/*** Initialize the server SASL if specified.** If the user has specified a "ZooKeeperServer.LOGIN_CONTEXT_NAME_KEY"* or a jaas.conf using "java.security.auth.login.config"* the authentication is required and an exception is raised.* Otherwise no authentication is configured and no exception is raised.** @throws IOException if jaas.conf is missing or there's an error in it.*/protected void configureSaslLogin() throws IOException {String serverSection = System.getProperty(ZooKeeperSaslServer.LOGIN_CONTEXT_NAME_KEY,ZooKeeperSaslServer.DEFAULT_LOGIN_CONTEXT_NAME);// Note that 'Configuration' here refers to javax.security.auth.login.Configuration.AppConfigurationEntry entries[] = null;SecurityException securityException = null;try {entries = Configuration.getConfiguration().getAppConfigurationEntry(serverSection);} catch (SecurityException e) {// handle below: might be harmless if the user doesn't intend to use JAAS authentication.securityException = e;}// No entries in jaas.conf// If there's a configuration exception fetching the jaas section and// the user has required sasl by specifying a LOGIN_CONTEXT_NAME_KEY or a jaas file// we throw an exception otherwise we continue without authentication.if (entries == null) {String jaasFile = System.getProperty(Environment.JAAS_CONF_KEY);String loginContextName = System.getProperty(ZooKeeperSaslServer.LOGIN_CONTEXT_NAME_KEY);if (securityException != null && (loginContextName != null || jaasFile != null)) {String errorMessage = "No JAAS configuration section named '" + serverSection + "' was found";if (jaasFile != null) {errorMessage += "in '" + jaasFile + "'.";}if (loginContextName != null) {errorMessage += " But " + ZooKeeperSaslServer.LOGIN_CONTEXT_NAME_KEY + " was set.";}LOG.error(errorMessage);throw new IOException(errorMessage);}return;}// jaas.conf entry availabletry {saslServerCallbackHandler = new SaslServerCallbackHandler(Configuration.getConfiguration());login = new Login(serverSection, saslServerCallbackHandler, new ZKConfig() );login.startThreadIfNeeded();} catch (LoginException e) {throw new IOException("Could not configure server because SASL configuration did not allow the "+ " ZooKeeper server to authenticate itself properly: " + e);}}
接下来是对连接端的启动: cnxnFactory.startup(zkServer);
// org.apache.zookeeper.server.ServerCnxnFactorypublic void startup(ZooKeeperServer zkServer) throws IOException, InterruptedException {startup(zkServer, true);}@Overridepublic void startup(ZooKeeperServer zks, boolean startServer)throws IOException, InterruptedException {// 调用 NioServer 的start()start();// 绑定server 工厂类setZooKeeperServer(zks);if (startServer) {// 从磁盘加载初始化数据zks.startdata();// 启动 zkServerzks.startup();}}// org.apache.zookeeper.server.NIOServerCnxnFactory// 开启各种线程, acceptor, selector, expirerThread...@Overridepublic void start() {stopped = false;if (workerPool == null) {// 自定义实现的线程池, 其底层也是启用 Executors 工厂类,去生成 ThreadPoolExecutor// 该 线程程是自启动的workerPool = new WorkerService("NIOWorker", numWorkerThreads, false);}for(SelectorThread thread : selectorThreads) {// 没有启动的线程就让它启动, 而 SelectorThread 内部则是由两个关键队列组成if (thread.getState() == Thread.State.NEW) {thread.start();}}// ensure thread is started once and only onceif (acceptThread.getState() == Thread.State.NEW) {acceptThread.start();}if (expirerThread.getState() == Thread.State.NEW) {expirerThread.start();}}// org.apache.zookeeper.server.WorkerService , worker 线程池的实现方式一览: FixedThreadPool()public WorkerService(String name, int numThreads,boolean useAssignableThreads) {this.threadNamePrefix = (name == null ? "" : name) + "Thread";this.numWorkerThreads = numThreads;this.threadsAreAssignable = useAssignableThreads;// 构造好后,直接启动自身, 其实是初始化好线程池start();}public void start() {if (numWorkerThreads > 0) {if (threadsAreAssignable) {for(int i = 1; i <= numWorkerThreads; ++i) {// worker 是基于 fixed 线程池的workers.add(Executors.newFixedThreadPool(1, new DaemonThreadFactory(threadNamePrefix, i)));}} else {workers.add(Executors.newFixedThreadPool(numWorkerThreads, new DaemonThreadFactory(threadNamePrefix)));}}stopped = false;}// SelectorThread 是一个 org.apache.zookeeper.server.NIOServerCnxnFactory, 的内部类, 主要维护 acceptedQueue和updateQueue 两个队列public SelectorThread(int id) throws IOException {super("NIOServerCxnFactory.SelectorThread-" + id);this.id = id;acceptedQueue = new LinkedBlockingQueue(); updateQueue = new LinkedBlockingQueue(); }
如上 zkServer 的 start() 过程,其实就是多种线程/线程池的开启过程!所有的 zkServer 的服务也是由这些线程来操作的!主要的操作流程为:
1. selectorThreads 接收外部请求, 放入 acceptedQueue 中;
2. 由 selector 构造 IOWorkRequest 放入 workerPool 中,进行稍后调试处理;
3. 由 workerPool 调度 IOWorkRequest.dowork() 方法进行处理;
4. ....
zkServer 启动起来之后,就会先的磁盘或者其他地方同步初始化数据;
zks.startdata(); 加载磁盘数据; 即初始化 zkDb, 整个运行的数据都是保存在该数据结构中!
// 我们先看下 FileTxnSnapLog 的构造器,其实做了很多事,如检查目录权限,创建目录等, 这将为后续的数据恢复打下基础// org.apache.zookeeper.server.persistence.FileTxnSnapLog/*** the constructor which takes the datadir and* snapdir.* @param dataDir the transaction directory* @param snapDir the snapshot directory*/public FileTxnSnapLog(File dataDir, File snapDir) throws IOException {LOG.debug("Opening datadir:{} snapDir:{}", dataDir, snapDir);this.dataDir = new File(dataDir, version + VERSION);this.snapDir = new File(snapDir, version + VERSION);// by default create snap.log dirs, but otherwise complain instead// See ZOOKEEPER-1161 for more detailsboolean enableAutocreate = Boolean.valueOf(System.getProperty(ZOOKEEPER_DATADIR_AUTOCREATE,ZOOKEEPER_DATADIR_AUTOCREATE_DEFAULT));if (!this.dataDir.exists()) {if (!enableAutocreate) {throw new DatadirException("Missing data directory "+ this.dataDir+ ", automatic data directory creation is disabled ("+ ZOOKEEPER_DATADIR_AUTOCREATE+ " is false). Please create this directory manually.");}if (!this.dataDir.mkdirs()) {throw new DatadirException("Unable to create data directory "+ this.dataDir);}}if (!this.dataDir.canWrite()) {throw new DatadirException("Cannot write to data directory " + this.dataDir);}if (!this.snapDir.exists()) {// by default create this directory, but otherwise complain instead// See ZOOKEEPER-1161 for more detailsif (!enableAutocreate) {throw new DatadirException("Missing snap directory "+ this.snapDir+ ", automatic data directory creation is disabled ("+ ZOOKEEPER_DATADIR_AUTOCREATE+ " is false). Please create this directory manually.");}if (!this.snapDir.mkdirs()) {throw new DatadirException("Unable to create snap directory "+ this.snapDir);}}if (!this.snapDir.canWrite()) {throw new DatadirException("Cannot write to snap directory " + this.snapDir);}// check content of transaction log and snapshot dirs if they are two different directories// See ZOOKEEPER-2967 for more detailsif(!this.dataDir.getPath().equals(this.snapDir.getPath())){checkLogDir();checkSnapDir();}txnLog = new FileTxnLog(this.dataDir);snapLog = new FileSnap(this.snapDir);autoCreateDB = Boolean.parseBoolean(System.getProperty(ZOOKEEPER_DB_AUTOCREATE,ZOOKEEPER_DB_AUTOCREATE_DEFAULT));}// org.apache.zookeeper.server.ZooKeeperServer, 这里是开始初始化 zkDbpublic void startdata()throws IOException, InterruptedException {//check to see if zkDb is not null// zkDb 主要使用 链表 和 map 来保存数据, DataTree() 保存数据, 而 DataTree 中又以 ConcurrentHashMap() 作为存储方式// 咱们先重点看一下if (zkDb == null) {zkDb = new ZKDatabase(this.txnLogFactory);}// 未初始化过 zkDb 则从磁盘加载一次数据if (!zkDb.isInitialized()) {loadData();}}// org.apache.zookeeper.server.ZKDatabase, 最重要的一个数据结构之一/*** the filetxnsnaplog that this zk database* maps to. There is a one to one relationship* between a filetxnsnaplog and zkdatabase.* @param snapLog the FileTxnSnapLog mapping this zkdatabase*/public ZKDatabase(FileTxnSnapLog snapLog) {// 使用 DataTree()dataTree = createDataTree();// 有过期时间的 session 使用 ConcurrentHashMap 保存, zxid -> timeoutsessionsWithTimeouts = new ConcurrentHashMap(); this.snapLog = snapLog;try {snapshotSizeFactor = Double.parseDouble(System.getProperty(SNAPSHOT_SIZE_FACTOR,Double.toString(DEFAULT_SNAPSHOT_SIZE_FACTOR)));if (snapshotSizeFactor > 1) {snapshotSizeFactor = DEFAULT_SNAPSHOT_SIZE_FACTOR;LOG.warn("The configured {} is invalid, going to use " +"the default {}", SNAPSHOT_SIZE_FACTOR,DEFAULT_SNAPSHOT_SIZE_FACTOR);}} catch (NumberFormatException e) {LOG.error("Error parsing {}, using default value {}",SNAPSHOT_SIZE_FACTOR, DEFAULT_SNAPSHOT_SIZE_FACTOR);snapshotSizeFactor = DEFAULT_SNAPSHOT_SIZE_FACTOR;}LOG.info("{} = {}", SNAPSHOT_SIZE_FACTOR, snapshotSizeFactor);}// org.apache.zookeeper.server.DataTree 构造函数如下public DataTree() {/* Rather than fight it, let root have an alias */nodes.put("", root);nodes.put(rootZookeeper, root);/** add the proc node and quota node */root.addChild(procChildZookeeper);nodes.put(procZookeeper, procDataNode);procDataNode.addChild(quotaChildZookeeper);nodes.put(quotaZookeeper, quotaDataNode);addConfigNode();nodeDataSize.set(approximateDataSize());try {// 创建 watchManager, 方便 watchdataWatches = WatchManagerFactory.createWatchManager();childWatches = WatchManagerFactory.createWatchManager();} catch (Exception e) {LOG.error("Unexpected exception when creating WatchManager, " +"exiting abnormally", e);System.exit(ExitCode.UNEXPECTED_ERROR.getValue());}}// org.apache.zookeeper.server.watch.WatchManagerFactory, 尽量支持动态设置, 默认为 WatchManager ;public static IWatchManager createWatchManager() throws IOException {String watchManagerName = System.getProperty(ZOOKEEPER_WATCH_MANAGER_NAME);if (watchManagerName == null) {watchManagerName = WatchManager.class.getName();}try {IWatchManager watchManager =(IWatchManager) Class.forName(watchManagerName).newInstance();LOG.info("Using {} as watch manager", watchManagerName);return watchManager;} catch (Exception e) {IOException ioe = new IOException("Couldn't instantiate "+ watchManagerName);ioe.initCause(e);throw ioe;}}// org.apache.zookeeper.server.ZooKeeperServer// 从磁盘加载初始化数据,/*** Restore sessions and data*/public void loadData() throws IOException, InterruptedException {/** When a new leader starts executing Leader#lead, it* invokes this method. The database, however, has been* initialized before running leader election so that* the server could pick its zxid for its initial vote.* It does it by invoking QuorumPeer#getLastLoggedZxid.* Consequently, we don't need to initialize it once more* and avoid the penalty of loading it a second time. Not* reloading it is particularly important for applications* that host a large database.** The following if block checks whether the database has* been initialized or not. Note that this method is* invoked by at least one other method:* ZooKeeperServer#startdata.** See ZOOKEEPER-1642 for more detail.*/if(zkDb.isInitialized()){setZxid(zkDb.getDataTreeLastProcessedZxid());}else {// 首次加载 DataBase, 会重新 loadDataBase(), 并获取最大 zxidsetZxid(zkDb.loadDataBase());}// Clean up dead sessionsListdeadSessions = new LinkedList (); for (Long session : zkDb.getSessions()) {if (zkDb.getSessionWithTimeOuts().get(session) == null) {deadSessions.add(session);}}for (long session : deadSessions) {// XXX: Is lastProcessedZxid really the best thing to use?killSession(session, zkDb.getDataTreeLastProcessedZxid());}// Make a clean snapshottakeSnapshot();}// org.apache.zookeeper.server.ZKDatabase/*** load the database from the disk onto memory and also add* the transactions to the committedlog in memory.* @return the last valid zxid on disk* @throws IOException*/public long loadDataBase() throws IOException {long startTime = Time.currentElapsedTime();// 使用 snapLog 进行数据的恢复long zxid = snapLog.restore(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener);initialized = true;long loadTime = Time.currentElapsedTime() - startTime;ServerMetrics.DB_INIT_TIME.add(loadTime);LOG.info("Snapshot loaded in " + loadTime + " ms");return zxid;}// org.apache.zookeeper.server.persistence.FileSnap/*** this function restores the server* database after reading from the* snapshots and transaction logs* @param dt the datatree to be restored* @param sessions the sessions to be restored* @param listener the playback listener to run on the* database restoration* @return the highest zxid restored* @throws IOException*/public long restore(DataTree dt, Mapsessions, PlayBackListener listener) throws IOException {long deserializeResult = snapLog.deserialize(dt, sessions);FileTxnLog txnLog = new FileTxnLog(dataDir);boolean trustEmptyDB;// 如果存在初始化文件标识存在,则删除File initFile = new File(dataDir.getParent(), "initialize");if (Files.deleteIfExists(initFile.toPath())) {LOG.info("Initialize file found, an empty database will not block voting participation");trustEmptyDB = true;} else {trustEmptyDB = autoCreateDB;}if (-1L == deserializeResult) {/* this means that we couldn't find any snapshot, so we need to* initialize an empty database (reported in ZOOKEEPER-2325) */if (txnLog.getLastLoggedZxid() != -1) {throw new IOException("No snapshot found, but there are log entries. " +"Something is broken!");}if (trustEmptyDB) {/* TODO: (br33d) we should either put a ConcurrentHashMap on restore()* or use Map on save() */save(dt, (ConcurrentHashMap)sessions, false); /* return a zxid of 0, since we know the database is empty */return 0L;} else {/* return a zxid of -1, since we are possibly missing data */LOG.warn("Unexpected empty data tree, setting zxid to -1");dt.lastProcessedZxid = -1L;return -1L;}}// 从副本中恢复数据return fastForwardFromEdits(dt, sessions, listener);}/*** deserialize a data tree from the most recent snapshot* @return the zxid of the snapshot*/public long deserialize(DataTree dt, Mapsessions) throws IOException {// we run through 100 snapshots (not all of them)// if we cannot get it running within 100 snapshots// we should give upListsnapList = findNValidSnapshots(100); if (snapList.size() == 0) {return -1L;}File snap = null;boolean foundValid = false;for (int i = 0, snapListSize = snapList.size(); i < snapListSize; i++) {snap = snapList.get(i);LOG.info("Reading snapshot " + snap);try (InputStream snapIS = new BufferedInputStream(new FileInputStream(snap));CheckedInputStream crcIn = new CheckedInputStream(snapIS, new Adler32())) {InputArchive ia = BinaryInputArchive.getArchive(crcIn);deserialize(dt, sessions, ia);long checkSum = crcIn.getChecksum().getValue();long val = ia.readLong("val");// 检查数据checksum,确认是否有被损坏if (val != checkSum) {throw new IOException("CRC corruption in snapshot : " + snap);}foundValid = true;break;} catch (IOException e) {LOG.warn("problem reading snap file " + snap, e);}}if (!foundValid) {throw new IOException("Not able to find valid snapshots in " + snapDir);}// 快照即是带了 zxid 的, 所以 以名字就可以解析出最大的 zxiddt.lastProcessedZxid = Util.getZxidFromName(snap.getName(), SNAPSHOT_FILE_PREFIX);return dt.lastProcessedZxid;}/*** This function will fast forward the server database to have the latest* transactions in it. This is the same as restore, but only reads from* the transaction logs and not restores from a snapshot.* @param dt the datatree to write transactions to.* @param sessions the sessions to be restored.* @param listener the playback listener to run on the* database transactions.* @return the highest zxid restored.* @throws IOException*/public long fastForwardFromEdits(DataTree dt, Mapsessions, PlayBackListener listener) throws IOException {TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);long highestZxid = dt.lastProcessedZxid;TxnHeader hdr;try {while (true) {// iterator points to// the first valid txn when initializedhdr = itr.getHeader();if (hdr == null) {//empty logsreturn dt.lastProcessedZxid;}// 只处理最后几个连续未被提交的事务数据if (hdr.getZxid() < highestZxid && highestZxid != 0) {LOG.error("{}(highestZxid) > {}(next log) for type {}",highestZxid, hdr.getZxid(), hdr.getType());} else {highestZxid = hdr.getZxid();}try {processTransaction(hdr,dt,sessions, itr.getTxn());} catch(KeeperException.NoNodeException e) {throw new IOException("Failed to process transaction type: " +hdr.getType() + " error: " + e.getMessage(), e);}listener.onTxnLoaded(hdr, itr.getTxn());if (!itr.next())break;}} finally {if (itr != null) {itr.close();}}return highestZxid;}// org.apache.zookeeper.server.ZKDatabase// 反序列化时,会调用 DataTree.deserialize() 方法;public void deserialize(InputArchive ia, String tag) throws IOException {aclCache.deserialize(ia);nodes.clear();pTrie.clear();nodeDataSize.set(0);String path = ia.readString("path");while (!"/".equals(path)) {// 数据一条条读入 node 中DataNode node = new DataNode();ia.readRecord(node, "node");nodes.put(path, node);synchronized (node) {aclCache.addUsage(node.acl);}int lastSlash = path.lastIndexOf('/');if (lastSlash == -1) {root = node;} else {String parentPath = path.substring(0, lastSlash);DataNode parent = nodes.get(parentPath);if (parent == null) {throw new IOException("Invalid Datatree, unable to find " +"parent " + parentPath + " of path " + path);}parent.addChild(path.substring(lastSlash + 1));long eowner = node.stat.getEphemeralOwner();EphemeralType ephemeralType = EphemeralType.get(eowner);if (ephemeralType == EphemeralType.CONTAINER) {containers.add(path);} else if (ephemeralType == EphemeralType.TTL) {ttls.add(path);} else if (eowner != 0) {HashSetlist = ephemerals.get(eowner); if (list == null) {list = new HashSet(); ephemerals.put(eowner, list);}list.add(path);}}path = ia.readString("path");}// 最后,放入 / 根节点nodes.put("/", root);nodeDataSize.set(approximateDataSize());// we are done with deserializing the// the datatree// update the quotas - create path trie// and also update the stat nodes// 更新集群节点信息,如有必要的话setupQuota();// 清除无用数据, 即 refers 中 小于0 的节点数据aclCache.purgeUnused();}/*** this method sets up the path trie and sets up stats for quota nodes*/private void setupQuota() {String quotaPath = Quotas.quotaZookeeper;DataNode node = getNode(quotaPath);if (node == null) {return;}traverseNode(quotaPath);}// 从磁盘加载完数据后,立即做一次新的快照// org.apache.zookeeper.server.persistence.FileTxnSnapLogpublic void takeSnapshot() {takeSnapshot(false);}public void takeSnapshot(boolean syncSnap){long start = Time.currentElapsedTime();try {txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), syncSnap);} catch (IOException e) {LOG.error("Severe unrecoverable error, exiting", e);// This is a severe error that we cannot recover from,// so we need to exitSystem.exit(ExitCode.TXNLOG_ERROR_TAKING_SNAPSHOT.getValue());}long elapsed = Time.currentElapsedTime() - start;LOG.info("Snapshot taken in " + elapsed + " ms");ServerMetrics.SNAPSHOT_TIME.add(elapsed);}/*** save the datatree and the sessions into a snapshot* @param dataTree the datatree to be serialized onto disk* @param sessionsWithTimeouts the session timeouts to be* serialized onto disk* @param syncSnap sync the snapshot immediately after write* @throws IOException*/public void save(DataTree dataTree,ConcurrentHashMapsessionsWithTimeouts, boolean syncSnap)throws IOException {long lastZxid = dataTree.lastProcessedZxid;// 快照的命名方式就是前缀+zxid// FileSnap.SNAPSHOT_FILE_PREFIX + "." + Long.toHexString(zxid);File snapshotFile = new File(snapDir, Util.makeSnapshotName(lastZxid));LOG.info("Snapshotting: 0x{} to {}", Long.toHexString(lastZxid),snapshotFile);try {// 按照一定规则序列化后存储,在读取时反向操作即可,此处为同步操作snapLog.serialize(dataTree, sessionsWithTimeouts, snapshotFile, syncSnap);} catch (IOException e) {if (snapshotFile.length() == 0) {/* This may be caused by a full disk. In such a case, the server* will get stuck in a loop where it tries to write a snapshot* out to disk, and ends up creating an empty file instead.* Doing so will eventually result in valid snapshots being* removed during cleanup. */if (snapshotFile.delete()) {LOG.info("Deleted empty snapshot file: " +snapshotFile.getAbsolutePath());} else {LOG.warn("Could not delete empty snapshot file: " +snapshotFile.getAbsolutePath());}} else {/* Something else went wrong when writing the snapshot out to* disk. If this snapshot file is invalid, when restarting,* ZooKeeper will skip it, and find the last known good snapshot* instead. */}throw e;}}// org.apache.zookeeper.server.persistence.FileSnap/*** serialize the datatree and session into the file snapshot* @param dt the datatree to be serialized* @param sessions the sessions to be serialized* @param snapShot the file to store snapshot into* @param fsync sync the file immediately after write*/public synchronized void serialize(DataTree dt, Mapsessions, File snapShot, boolean fsync) throws IOException {if (!close) {try (CheckedOutputStream crcOut =new CheckedOutputStream(new BufferedOutputStream(fsync ? new AtomicFileOutputStream(snapShot) :new FileOutputStream(snapShot)),new Adler32())) {//CheckedOutputStream cout = new CheckedOutputStream()OutputArchive oa = BinaryOutputArchive.getArchive(crcOut);FileHeader header = new FileHeader(SNAP_MAGIC, VERSION, dbId);serialize(dt, sessions, oa, header);long val = crcOut.getChecksum().getValue();oa.writeLong(val, "val");oa.writeString("/", "path");crcOut.flush();}}}
如上,loaddata(); 就算完成了!接下就是真正的启动了!
zks.startup();
// org.apache.zookeeper.server.ZooKeeperServerpublic synchronized void startup() {if (sessionTracker == null) {// 首先创建一个 sessionTracker, 它是一个异步线程,主要处理session的过期处理问题createSessionTracker();}// 开启处理线程startSessionTracker();// 设置处理器链,至关重要setupRequestProcessors();// 注册 JMXregisterJMX();// 最后标识启动完成,运行中setState(State.RUNNING);// 唤醒被阻塞的所有对象notifyAll();}// org.apache.zookeeper.server.SessionTrackerImplpublic SessionTrackerImpl(SessionExpirer expirer,ConcurrentMapsessionsWithTimeout, int tickTime, long serverId, ZooKeeperServerListener listener){super("SessionTracker", listener);this.expirer = expirer;this.sessionExpiryQueue = new ExpiryQueue(tickTime); this.sessionsWithTimeout = sessionsWithTimeout;this.nextSessionId.set(initializeNextSession(serverId));for (Entrye : sessionsWithTimeout.entrySet()) { trackSession(e.getKey(), e.getValue());}EphemeralType.validateServerId(serverId);}@Overridepublic void run() {try {while (running) {long waitTime = sessionExpiryQueue.getWaitTime();if (waitTime > 0) {Thread.sleep(waitTime);continue;}// 主要任务就是将过期的session 关闭掉for (SessionImpl s : sessionExpiryQueue.poll()) {setSessionClosing(s.sessionId);expirer.expire(s);}}} catch (InterruptedException e) {handleException(this.getName(), e);}LOG.info("SessionTrackerImpl exited loop!");}// org.apache.zookeeper.server.ZooKeeperServer// 构建处理器链,由此组合请求进来后的处理方式protected void setupRequestProcessors() {// 这里使用一个责任链模式进行包装 多个 processer// PrepRequestProcessor -> SyncRequestProcessor -> FinalRequestProcessor// 先准备 request, 然后落盘数据, 最后处理RequestProcessor finalProcessor = new FinalRequestProcessor(this);RequestProcessor syncProcessor = new SyncRequestProcessor(this,finalProcessor);// SyncRequestProcessor 是一个异步线程, 主要处理请求数据的实时落盘操作((SyncRequestProcessor)syncProcessor).start();firstProcessor = new PrepRequestProcessor(this, syncProcessor);((PrepRequestProcessor)firstProcessor).start();}// org.apache.zookeeper.server.PrepRequestProcessor, 作为第一个请求处理的线程@Overridepublic void run() {try {while (true) {// LinkedBlockingQueue, 阻塞队列 Request request = submittedRequests.take();long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;if (request.type == OpCode.ping) {traceMask = ZooTrace.CLIENT_PING_TRACE_MASK;}if (LOG.isTraceEnabled()) {ZooTrace.logRequest(LOG, traceMask, 'P', request, "");}if (Request.requestOfDeath == request) {break;}// 处理请求pRequest(request);}} catch (RequestProcessorException e) {if (e.getCause() instanceof XidRolloverException) {LOG.info(e.getCause().getMessage());}handleException(this.getName(), e);} catch (Exception e) {handleException(this.getName(), e);}LOG.info("PrepRequestProcessor exited loop!");}// org.apache.zookeeper.server.SyncRequestProcessor// 当有请求需要进行数据落盘时,仅仅是将数据插入到 queuedRequests 中,即可,该后台线程会及时把数据刷入磁盘的@Overridepublic void run() {try {int logCount = 0;// we do this in an attempt to ensure that not all of the servers// in the ensemble take a snapshot at the same timeint randRoll = r.nextInt(snapCount/2);while (true) {Request si = null;// 阻塞获取队列if (toFlush.isEmpty()) {si = queuedRequests.take();} else {si = queuedRequests.poll();if (si == null) {flush(toFlush);continue;}}// 如果是关闭请求, Request.requestOfDeath , 则直接退出if (si == requestOfDeath) {break;}if (si != null) {// track the number of records written to the logif (zks.getZKDatabase().append(si)) {logCount++;if (logCount > (snapCount / 2 + randRoll)) {randRoll = r.nextInt(snapCount/2);// roll the logzks.getZKDatabase().rollLog();// take a snapshotif (snapInProcess != null && snapInProcess.isAlive()) {LOG.warn("Too busy to snap, skipping");} else {snapInProcess = new ZooKeeperThread("Snapshot Thread") {public void run() {try {zks.takeSnapshot();} catch(Exception e) {LOG.warn("Unexpected exception", e);}}};snapInProcess.start();}logCount = 0;}} else if (toFlush.isEmpty()) {// optimization for read heavy workloads// iff this is a read, and there are no pending// flushes (writes), then just pass this to the next// processorif (nextProcessor != null) {// 因其本身是一个独立线程,所以需要独立调用下一个处理器nextProcessor.processRequest(si);if (nextProcessor instanceof Flushable) {((Flushable)nextProcessor).flush();}}continue;}toFlush.add(si);if (toFlush.size() > 1000) {flush(toFlush);}}}} catch (Throwable t) {handleException(this.getName(), t);} finally{running = false;}LOG.info("SyncRequestProcessor exited!");}// 处理各类请求,进行不同类型的区分/*** This method will be called inside the ProcessRequestThread, which is a* singleton, so there will be a single thread calling this code.** @param request*/protected void pRequest(Request request) throws RequestProcessorException {// LOG.info("Prep>>> cxid = " + request.cxid + " type = " +// request.type + " id = 0x" + Long.toHexString(request.sessionId));request.setHdr(null);request.setTxn(null);try {switch (request.type) {case OpCode.createContainer:case OpCode.create:case OpCode.create2:CreateRequest create2Request = new CreateRequest();pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request, true);break;case OpCode.createTTL:CreateTTLRequest createTtlRequest = new CreateTTLRequest();pRequest2Txn(request.type, zks.getNextZxid(), request, createTtlRequest, true);break;case OpCode.deleteContainer:case OpCode.delete:DeleteRequest deleteRequest = new DeleteRequest();pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest, true);break;case OpCode.setData:SetDataRequest setDataRequest = new SetDataRequest();pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest, true);break;case OpCode.reconfig:ReconfigRequest reconfigRequest = new ReconfigRequest();ByteBufferInputStream.byteBuffer2Record(request.request, reconfigRequest);pRequest2Txn(request.type, zks.getNextZxid(), request, reconfigRequest, true);break;case OpCode.setACL:SetACLRequest setAclRequest = new SetACLRequest();pRequest2Txn(request.type, zks.getNextZxid(), request, setAclRequest, true);break;case OpCode.check:CheckVersionRequest checkRequest = new CheckVersionRequest();pRequest2Txn(request.type, zks.getNextZxid(), request, checkRequest, true);break;case OpCode.multi:MultiTransactionRecord multiRequest = new MultiTransactionRecord();try {ByteBufferInputStream.byteBuffer2Record(request.request, multiRequest);} catch(IOException e) {request.setHdr(new TxnHeader(request.sessionId, request.cxid, zks.getNextZxid(),Time.currentWallTime(), OpCode.multi));throw e;}Listtxns = new ArrayList (); //Each op in a multi-op must have the same zxid!long zxid = zks.getNextZxid();KeeperException ke = null;//Store off current pending change records in case we need to rollbackMappendingChanges = getPendingChanges(multiRequest); for(Op op: multiRequest) {Record subrequest = op.toRequestRecord();int type;Record txn;/* If we've already failed one of the ops, don't bother* trying the rest as we know it's going to fail and it* would be confusing in the logfiles.*/if (ke != null) {type = OpCode.error;txn = new ErrorTxn(Code.RUNTIMEINCONSISTENCY.intValue());}/* Prep the request and convert to a Txn */else {try {pRequest2Txn(op.getType(), zxid, request, subrequest, false);type = request.getHdr().getType();txn = request.getTxn();} catch (KeeperException e) {ke = e;type = OpCode.error;txn = new ErrorTxn(e.code().intValue());if (e.code().intValue() > Code.APIERROR.intValue()) {LOG.info("Got user-level KeeperException when processing {} aborting" +" remaining multi ops. Error Path:{} Error:{}",request.toString(), e.getPath(), e.getMessage());}request.setException(e);/* Rollback change records from failed multi-op */rollbackPendingChanges(zxid, pendingChanges);}}//FIXME: I don't want to have to serialize it here and then// immediately deserialize in next processor. But I'm// not sure how else to get the txn stored into our list.ByteArrayOutputStream baos = new ByteArrayOutputStream();BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);txn.serialize(boa, "request") ;ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());txns.add(new Txn(type, bb.array()));}request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid,Time.currentWallTime(), request.type));request.setTxn(new MultiTxn(txns));break;//create.close session don't require request recordcase OpCode.createSession:case OpCode.closeSession:if (!request.isLocalSession()) {pRequest2Txn(request.type, zks.getNextZxid(), request,null, true);}break;//All the rest don't need to create a Txn - just verify sessioncase OpCode.sync:case OpCode.exists:case OpCode.getData:case OpCode.getACL:case OpCode.getChildren:case OpCode.getChildren2:case OpCode.ping:case OpCode.setWatches:case OpCode.checkWatches:case OpCode.removeWatches:zks.sessionTracker.checkSession(request.sessionId,request.getOwner());break;default:LOG.warn("unknown type " + request.type);break;}} catch (KeeperException e) {if (request.getHdr() != null) {request.getHdr().setType(OpCode.error);request.setTxn(new ErrorTxn(e.code().intValue()));}if (e.code().intValue() > Code.APIERROR.intValue()) {LOG.info("Got user-level KeeperException when processing {} Error Path:{} Error:{}",request.toString(), e.getPath(), e.getMessage());}request.setException(e);} catch (Exception e) {// log at error level as we are returning a marshalling// error to the userLOG.error("Failed to process " + request, e);StringBuilder sb = new StringBuilder();ByteBuffer bb = request.request;if(bb != null){bb.rewind();while (bb.hasRemaining()) {sb.append(Integer.toHexString(bb.get() & 0xff));}} else {sb.append("request buffer is null");}LOG.error("Dumping request buffer: 0x" + sb.toString());if (request.getHdr() != null) {request.getHdr().setType(OpCode.error);request.setTxn(new ErrorTxn(Code.MARSHALLINGERROR.intValue()));}}request.zxid = zks.getZxid();// 因该线程是第一个处理器,所以,需要把处理权让给下一个处理器,即 SyncRequestProcessor, 然后是 FinalRequestProcessor// 当然了,SyncRequestProcessor 的处理方式,仅仅是放入一个队列中而已, queuedRequests.add(request);nextProcessor.processRequest(request);}
处理器启动完成后,接下来进行 JMX 的启动;
// org.apache.zookeeper.server.ZooKeeperServerprotected void registerJMX() {// register with JMXtry {jmxServerBean = new ZooKeeperServerBean(this);MBeanRegistry.getInstance().register(jmxServerBean, null);try {jmxDataTreeBean = new DataTreeBean(zkDb.getDataTree());MBeanRegistry.getInstance().register(jmxDataTreeBean, jmxServerBean);} catch (Exception e) {LOG.warn("Failed to register with JMX", e);jmxDataTreeBean = null;}} catch (Exception e) {LOG.warn("Failed to register with JMX", e);jmxServerBean = null;}}
最后,zk 将阻塞在 shutdownLatch.await(); 等待关闭信号,做优雅关闭!
zkServer 启动完成!
总体来说下启动逻辑:
1. 集群和单机模式原本是一个启动入口;
2. 在配置文件解析之后,才发现是一个单机模式,此时,则重新调用单机模式方法重新运行;
3. 启动阶段主要为创建 selector, workerPool 等等的线程过程;
4. 启动时将进行一次数据初始化或数据恢复;
5. ZKDatabase 作为重要的存储结构贯穿 zk 的数据存储;
6. zkServer 最终将阻塞在关闭信号等待处;
7. 请求的处理使用责任链模式进行依次处理;
扫的是启动过程,但是实际的处理业务逻辑并没有说明。(这可能就是所谓:然而这并没有什么卵用!)
欲知后事如何,且听下回分解!

腾讯、阿里、滴滴后台面试题汇总总结 — (含答案)
面试:史上最全多线程面试题 !
最新阿里内推Java后端面试题
JVM难学?那是因为你没认真看完这篇文章

关注作者微信公众号 —《JAVA烂猪皮》
了解更多java后端架构知识以及最新面试宝典


看完本文记得给作者点赞+在看哦~~~大家的支持,是作者源源不断出文的动力
作者:等你归去来
出处:https://www.cnblogs.com/yougewe/p/10575094.html
