开发者社区 > 博文 > 10分钟系列-zookeeper入门
分享
  • 打开微信扫码分享

  • 点击前往QQ分享

  • 点击前往微博分享

  • 点击复制链接

10分钟系列-zookeeper入门

  • 京东城市JUST团队
  • 2021-01-18
  • IP归属:未知
  • 27960浏览

10分钟zookeeper总结。

安装配置

  1. 解压
  2. 修改配置:zoo.cfg, 修改dataDir
  3. 启动:bin/zkServer.sh start

默认是standalone

分布式部署

  1. 同步安装包
  2. 在zkData数据目录下创建一个myid文件
  3. 编辑myid,输入唯一标识,比如2,3,4
  4. 配置zoo.cfg :
    # server.A=host1:B:C
    # A: myid编号
    # B:数据通信端口
    # C:选举通信的端口
    server.2=host1:2888:3888
    server.3=host2:2888:3888
    server.4=host3:2888:3888
  5. 启动每台服务器上的zkServer.sh start ,通过zkServer.sh status查看leader还是follower

配置解释

tickTime: 每隔tickTime时间发送一次心跳

initLimit:follower链接leader最多允许的心跳帧次数:
initLimit * tickTime后就会超时

syncLimit: follower和leader同步时最多允许的心跳丢失次数
syncLimit * tickTime后就超时,从服务列表中删除follower

内部原理


基于paxos协议。

  1. 半数机制:一般装奇数台

  2. leader与follower,投票选举出来的leader

详情可以查看选举部分代码

都在 Election接口的实现里:

package org.apache.zookeeper.server.quorum;

public interface Election {

    Vote lookForLeader() throws InterruptedException;
    void shutdown();

}

主要是FastLeaderElection类。

节点类型

  • 持久:客户端和服务端断开连接后节点不删除
    1. 持久化目录节点:目录不变
    2. 持久化顺序编号目录节点:断开后会自动编号
  • 短暂:断掉就删除了
    1. 临时目录节点
    2. 临时目录编号节点

Stat结构体

建议先看shell操作。

  1. czxid: 创建节点的事务zxid:zxid唯一,时间戳,代表顺序
  2. ctime: znode被创建的毫秒数
  3. mzxid:最后更新的zxid
  4. mtime:最后修改的毫秒数
  5. pZxid: 最后更新的子节点的zxid
  6. cversion: znode字节点变化号,znode子节点修改的次数
  7. dataversion:数据变化号
  8. aclVersion:访问控制列表的变化号
  9. ephemeralOwner:如果是临时节点,这个代表znode拥有者的session id,如果不是则为0
  10. dataLength:数据长度
  11. numChildren:子节点数量

监听器原理

  1. 客户端:首先有一个main线程
  2. 客户端:main线程里创建2个线程:一个负责连接通信(connector),一个负责监听(listener)
  3. 客户端:通过connector把注册的监听事件发给zk服务器
  4. 服务器:zk的注册监听器列表(服务器)将注册的监听事件添加到列表中
  5. 服务器:zk服务器监听到有数据路径变化,就会将消息发给listener线程
  6. 客户端:listener内部调用process方法

常见监听类型:

  1. 监听节点数据的变化:get path watch
  2. 监听子节点增减的变化: ls path watch

写数据的流程

client----》发送写请求—》server1

  • 如果server1不是leader,则会把请求转发给leader,leader会把请求广播给各个server,比如server1和server2,各个server写成功了就会通知leader
  • 当leader收到大多数(过半)的server写成功的回复后,就认为数据写成功了
  • 写成功后,leader告诉server1写成功了
  • server1再告诉client写成功了

操作实战

shell使用

zkCli.sh 进入客户端

  1. help: 显示命令

  2. ls /:查看当前目录节点

  3. create /china "china" : 创建节点
    create /china/beijing "beijing": 子节点

  4. get /china/beijing : 获取节点的值

  5. create -e /china/japan "japan": 创建短暂节点

  6. create -s /china/cq "cq": 创建带有序号的节点(多次执行)

  7. set /china/beijing "home": 修改节点的值

  8. get /china watch: 监听节点值的变化(注册一次有用一次)
    ls /china watch: 监听节点的变化

  9. delete /china/beijing: 删除节点
    rmr /china: 递归删除

代码实践

准备maven环境

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>

        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>

        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.10</version>
        </dependency>
    </dependencies>

创建client

    /**
     * 多个以逗号分隔
     */
    private static String connectString = "localhost:2181";
    /**
     * 超时时间
     */
    private static int sessionTimeout = 2000;
    /**
     * client
     */
    private static ZooKeeper zk;

    @BeforeClass
    public static void init() throws IOException {
        zk = new ZooKeeper(connectString, sessionTimeout, watchedEvent -> {

        });
    }
	
    @AfterClass
    public static void afterClass() throws Exception {
        zk.close();
    }	

代码实现CRUD

    @Test
    public void createNode() throws KeeperException, InterruptedException {
        String path = zk.create("/super", "jimo".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        assertEquals("/super", path);
    }

    @Test
    public void getNode() throws KeeperException, InterruptedException {
        List<String> children = zk.getChildren("/", false);
        children.forEach(System.out::println);
    }

    @Test
    public void getNodeWatch() throws KeeperException, InterruptedException {
        zk.getChildren("/", watcher -> {
            try {
                getNode();
            } catch (Exception e) {
                e.printStackTrace();
            }
        });

        TimeUnit.MINUTES.sleep(1L);
    }

    @Test
    public void nodeExist() throws KeeperException, InterruptedException {
        Stat exists = zk.exists("/super", false);
        assertNotNull(exists);
        Stat existNon = zk.exists("/hehe", false);
        assertNull(existNon);
    }
共0条评论