别用ZkClient了,Curator才是ZooKeeper的好伴侣.

分布式朝闻道

共 10033字,需浏览 21分钟

 · 2022-04-01

zookeeper作为分布式系统中重要的协调组件,在后端开发中是难以绕开的一个重要知识领域。

可以说,只要在后端领域,比如说Java开发、大数据开发中待过三年及以上的工程师,或多或少都接触过或者直接使用过zookeeper。

因此笔者开启本系列,作为自己学习zookeeper(后文均称为zk)的记录,如果能够启发读者那就更好不过了。

zookeeper概述

ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。

它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。

度娘如是说。

zookeeper,直译过来就是动物园管理员,之所以这么说,主要是因为它在大数据技术栈中扮演了重要的协调角色。

在hadoop技术栈中,各种技术的logo都是小动物,而zookeeper的官方logo也是一位园丁模样的男士。这也直观地告诉了使用者,zk的用途和角色。

一言蔽之,zk就是在分布式系统中,对应用提供一致性保证,分布式选主,通过各种机制,对应用进行协调,从而使分布式系统对外提供某种特定的服务。

3c8180b69a99bef6e7ae03b075112675.webp

zookeeper官方形象

关于如何搭建zookeeper,网络上文章有很多,本文就不进行展开,感兴趣的可以自行查找相关资料。

https://zookeeper.apache.org/

curator概述

Apache Curator是Netflix公司开源的一套zookeeper客户端框架,并贡献给了apache社区。

它封装了Zookeeper客户端底层的api,提供了流式风格的api,提供了很多开箱即用的高级特性,如:分布式选主、分布式锁、path监控、node监控、更加易用的节点CRUD操作、分布式队列等。

Patrixck Hunt(Zookeeper的commiter)以一句“Guava is to Java that Curator to Zookeeper”给予Curator高度评价。

目前主流的zookeeper客户端共有三种:

  • 官方zookeeper客户端
  • zkClient
  • curator

其中,官方的客户端提供的api都比较底层,开发者直接拿来用需要进行一定的封装,否则直接使用会显得过于复杂和繁琐;zkclient虽然使用起来比较方便,但是文档较少,社区也不太活跃;而curator则是apache顶级项目,拥有活跃的开源社区,且拥有较多的成熟api和高级特性。

因此在zk的客户端这个领域,curator大受推崇。

http://curator.apache.org/index.html

f742ad55ac1389acd3ed09d54e292d40.webp

curator基础操作

我们通过代码来直接感受一下curator操作的快捷。

首先需要在工程中引入curator的依赖:

    
    
      org.apache.curator
      curator-framework
      4.3.0
    

    
    
      org.apache.curator
      curator-recipes
      4.3.0
    

「注意」 笔者使用的zookeeper服务端版本为3.5.6,因此使用4.0.0以上版本的curator是兼容的。

对于较低版本的zookeeper服务端,如3.4.x,则需要依赖curator2.x版本,如:2.12.0。如果使用高版本的curator,需要将curator自身依赖的ZooKeeper在maven中exclude掉。并引入对应的低版本zookeeper客户端。

关于curator与zookeeper具体的版本依赖,请参考官方的说明 ZooKeeper Version Compatibility

创建客户端实例

使用Curator第一步是要创建一个客户端实例,代码如下:(后续的操作中,第一步均是创建客户端实例。后续讲解过程中只粘贴关键代码,在文章的末尾会粘贴本文中所有的代码案例的完整代码)。

    RetryPolicy retry = new ExponentialBackoffRetry(1000, 3);
    CuratorFramework client =
            CuratorFrameworkFactory.newClient("127.0.0.1:2181", retry);
    client.start();
    System.out.println("启动curator客户端");

简单解释下这段代码的含义:

  1. 首先创建一个重试策略实例RetryPolicy,当客户端与zk服务端连接失败或者超时,curator会使用我们指定的 重试策略进行重试。RetryPolicy有多个实现,这里使用ExponentialBackoffRetry策略,重试三次,每次间隔1秒钟。

  2. 通过CuratorFrameworkFactory创建一个CuratorFramework实例,传入zk连接地址以及重试策略。示例代码中为单机方式连接串,如果是多节点方式只需要通过半角逗号分割的方式进行连接即可。

    多节点连接串
    ip0:port0,ip1:port1,ip2:port2
  1. 调用CuratorFramework.start()方法,与zk服务端建立连接。

如此,我们的客户端便能够与zk服务端建立起长连接,从而为各种交互做准备。

节点操作

连接建立好后,我们来学习一下curator如何对zk节点进行操作。通俗地说就是通过curator对zk的node做增删改查操作。

        try {
            // 增加
            client.create()
                    .creatingParentsIfNeeded()
                    .withMode(CreateMode.PERSISTENT)
                    .forPath("/snowalker/path""100".getBytes());
                    
            // 读取node的值
            byte[] dataBytes = client.getData().forPath("/snowalker/path");
            
            System.out.println(new String(dataBytes));
            // 修改node对应的值
            client.setData().forPath("/snowalker/path""120".getBytes());
            byte[] dataBytes1 = client.getData().forPath("/snowalker/path");
            
            System.out.println(new String(dataBytes1));
            // 获取子节点
            List children = client.getChildren().forPath("/snowalker");
            System.out.println(children);
            
            // 删除节点
            client.delete().forPath("/snowalker/path");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            client.close();
        }

这段代码基本上就涵盖了curator对zk的node进行增删改查的主流操作了。介绍下代码含义:

  1. 「增」 首先,通过create方法在zk上创建了 ”/snowalker/path“ 这样一个持久化节点。方法 creatingParentsIfNeeded() 表示 「如果有必要则创建父节点」,也就是递归地创建多个节点。
  2. 节点建立后,写入value;value在zk的node上以字节形式进行存储;初始值为100,并进行打印
  3. 「查」 通过CuratorFramework.getData().forPath("/snowalker/path")  可以读取对应节点的value
  4. 「改」 接着我们通过CuratorFramework.setData() 修改”/snowalker/path“ 对应的value为120,并进行打印
  5. 如果想要获取某个子节点,我们可以通过CuratorFramework.getChildren().forPath(path) 方法获取,返回一个list;也就是说,zk的子节点是一对多的(zk文件系统是树形结构)
  6. 「删」 通过执行CuratorFramework.delete().forPath(path) 能够将指定path进行删除

运行代码,观察到日志打印如下:

    启动curator客户端
    100
    120
    [path]

使用zkcli连接zookeeper服务端,ls看一下 /snowalker 目录下的节点:

    [zk: localhost:2181(CONNECTED) 6] ls /
    [snowalker]
    [zk: localhost:2181(CONNECTED) 7] ls /snowalker
    []
    [zk: localhost:2181(CONNECTED) 8]

当前只有 /snowalker 节点存在,子节点已经被删除。

可以看到,通过curator,我们通过几行代码便实现了对zk node的增删改查

curator进阶操作

zk提供了分布式选主、watcher动态监听等机制,能够为分布式系统提供分布式协调,配置实时变更通知等能力。Curator当然也提供了对应的API供我们进行调用。

接下来我们就分别看一下如何使用Curator来使用这些能力。

集群选主

首先看一下如何利用Curator实现集群选主。

Curator提供两种方式进行集群选主,分别为:

  • LeaderLatch方式
  • LeaderElection方式

LeaderLatch方式

首先观察一下leaderLatch选主方式调用方式:

    public class LeaderLatchDemo {
    
        public static void main(String[] args) throws Exception {
            new LeaderLatchDemo().leaderLatch();
        }
    
        public void leaderLatch() throws Exception {

首先我们还是要实例化一个CuratorFramework客户端,与zk服务端建立连接

            RetryPolicy retry = new ExponentialBackoffRetry(1000, 3);
            CuratorFramework client =
                    CuratorFrameworkFactory.newClient(
                            "127.0.0.1:2181",
                            5000,
                            3000,
                            retry);
            client.start();
            System.out.println("启动curator客户端");

接着注册一个连接状态监听器,在回调方法中根据返回的连接状态进行对应操作。

当连接状态ConnectionState为LOST时,表明客户端到服务端的连接已经断开,如果当前节点已经是leader,那么我们就需要暂停leader身份下的一切事情。

如果我们查看源码的话,会发现curator内部会将是否为leader的状态设置为false(已经不是leader了)

            client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
                @Override
                public void stateChanged(CuratorFramework client, ConnectionState newState) {
                    switch (newState) {
                        case LOST: {
                            break;
                        }
                    }
                }
            });

我们声明一个路径作为选主的依据。

通过 「LeaderLatch leaderLatch = new LeaderLatch(client, latchPath);」 实例化一个LeaderLatch实例,通过它进行leader选举操作。

            // latch
            String latchPath = "/snowalker/leader_latch";
            LeaderLatch leaderLatch = new LeaderLatch(client, latchPath);
            // 开启leader选举过程
            leaderLatch.start();
            // 判断当前节点是否为leader
            boolean hasLeadershipBefore = leaderLatch.hasLeadership();
            System.out.println("是否成为leader:" + hasLeadershipBefore);
    
            leaderLatch.await();    

当通过 「leaderLatch.start()」 开启leader选举之后,我们需要调用  「leaderLatch.await()」

如果当前的客户端未成为leader,则会进行等待,(内部源码实现是通过Object.wait()进行阻塞) 直到成为leader后,当前客户端线程会被唤醒,继续执行后续逻辑。

这里说的后续逻辑,实际上就是客户端作为leader节点需要执行的业务逻辑。比如:hdfs中两台机器,当其中一台机器成为主节点就会以主节点的身份对外提供文件相关服务,而另外一台非leader机器则会await在这里,直到它成为leader才会作为主节点提供服务。(也就是说,只有被选为leader的节点才会真正的提供服务,否则它看起来就好像 “假死” 了)

            boolean hasLeadershipAfter = leaderLatch.hasLeadership();
            System.out.println("是否成为leader:" + hasLeadershipAfter);
            Thread.sleep(100000);
        }
    }

最后我们再打印一下节点的状态,看当前节点是否成为leader。

运行效果

我们同时启动两个LeaderLatchDemo主进程,模拟双节点下的leader选举过程。

两个客户端控制台打印如下:

客户端A

启动curator客户端
是否成为leader:false

客户端B

启动curator客户端
是否成为leader:false
是否成为leader:true

上述日志打印,表明开始阶段,两个客户端均非leader。

当经过竞争之后,客户端B成为leader,而客户端A则阻塞。我们尝试关闭客户端B进程,观察客户端A的控制台日志打印:

客户端A

启动curator客户端
是否成为leader:false
是否成为leader:true

我们发现,客户端A成为了leader,从阻塞中唤醒。

这个小demo直观地为我们展现了通过leaderLatch进行leader选举的场景。

LeaderElection方式

curator为我们提供了一种更为简洁的leader选举方式,它就是 「LeaderElection」 方式。(「实际上」,LeaderElection与LeaderLatch在原理上几乎没有差别,他们的原理都是基于分布式锁实现的,只不过LeaderElection方式在使用上更加简洁,开发效率更高)

话不多说,直接上代码:

    public class LeaderElectionDemo {
    
        public static void main(String[] args) throws InterruptedException {

首先还是实例化一个CuratorFramework建立到zk服务端的连接。

            RetryPolicy retry = new ExponentialBackoffRetry(1000, 3);
            CuratorFramework client =
                    CuratorFrameworkFactory.newClient(
                            "127.0.0.1:2181",
                            5000,
                            3000,
                            retry);
            client.start();
            System.out.println("启动curator客户端");

接着定义一个leader选举节点,这个操作和LeaderLatch相似。

            String election = "/leader/election";

这里就不同了,我们需要建立一个LeaderSelector实例,它接收CuratorFramework实例、选举节点、以及一个LeaderSelectorListener  Leader选举监听器。

我们需要实现LeaderSelectorListener的回调方法:

  • takeLeadership回调中需要开发者实现当成为leader之后的业务逻辑。当一个客户端成为leader之后,便会回调takeLeadership方法,执行leader角色的业务逻辑

  • stateChanged方法需要开发者实现当连接状态发生变化之后的业务逻辑。比如:我们可以直接抛出异常,阻止leader业务逻辑继续进行。待另外的节点成为leader后执行takeLeadership方法

      LeaderSelector leaderSelector = new LeaderSelector(
              client,
              election,
              new LeaderSelectorListener() {
                  @Override
                  public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
                      System.out.println("你已经成为leader");
                      // 在 这里干leader的所有事情,此时方法不能退出
                      Thread.sleep(Integer.MAX_VALUE);
                  }
      
                  @Override
                  public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
                      System.out.println("你已经不是leader,链接状态发生变化,connectionState" + connectionState);
                      if (connectionState.equals(ConnectionState.LOST)) {
                          throw new CancelLeadershipException();
                      }
                  }
              });

通过调用LeaderSelector.start之后,多个客户端会在election节点下竞争leader角色。当某个客户端竞争leader成功,就会执行takeLeadership回调方法通知当前应用节点已经成为leader。接着执行leader角色的逻辑即可

            leaderSelector.start();
            Thread.sleep(Integer.MAX_VALUE);
        }
    }
运行代码

我们启动两个LeaderElectionDemo客户端,让他们进行leader角色的选举操作,观察控制台输出:

客户端A打印如下:

    启动curator客户端
    你已经成为leader

客户单B打印如下:

    启动curator客户端

这表明,客户端A竞争leader成功,并成功执行了回调方法takeLeadership。客户端B竞争leader失败,进程阻塞。

我们强制关闭客户端A,此时客户端B控制台输出如下:

    启动curator客户端
    你已经成为leader

这表明,客户端A释放了leader角色,客户端B竞争成功,并开始执行leader角色的方法。

「事实上」 LeaderElection方式内部实现机制几乎与LeaderLatch方式一模一样,它本质上也是通过分布式锁竞争成为leader的。

具体到细节就是,LeaderElection是通过使用curator实现的mutex锁进行leader竞争。如果获取到的锁就是leader。如果竞争leader的时候竞争锁失败,则会阻塞,并为上个节点添加watcher。

当上个节点对应的客户端down机或者长时间断开连接,则顺序临时节点就消失了,此时watcher会通知后一个节点进行加锁。后面的节点加锁成功便会成为leader角色。

我们发现,这其实就是Curator的分布式锁实现机制啊。

后续我们会对LeaderElection具体的代码实现进行展开讲解。敬请期待。

小结

本文到此就告一段落了,我们对curator的基本使用以及重要的leader选举特性进行了全方位的讲解和展示。

这里做个预告,接下来我会带领读者朋友们继续学习curator对zookeeper的watcher机制的封装和增强。


浏览 83
点赞
评论
收藏
分享

手机扫一扫分享

举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

举报