10分钟zookeeper总结。
安装配置
- 解压
- 修改配置:zoo.cfg, 修改dataDir
- 启动:bin/zkServer.sh start
默认是standalone
分布式部署
- 同步安装包
- 在zkData数据目录下创建一个myid文件
- 编辑myid,输入唯一标识,比如2,3,4
- 配置zoo.cfg :
# server.A=host1:B:C # A: myid编号 # B:数据通信端口 # C:选举通信的端口 server.2=host1:2888:3888 server.3=host2:2888:3888 server.4=host3:2888:3888
- 启动每台服务器上的
zkServer.sh start
,通过zkServer.sh status
查看leader还是follower
配置解释
tickTime: 每隔tickTime时间发送一次心跳
initLimit:follower链接leader最多允许的心跳帧次数:
initLimit * tickTime后就会超时
syncLimit: follower和leader同步时最多允许的心跳丢失次数
syncLimit * tickTime后就超时,从服务列表中删除follower
内部原理
基于paxos协议。
半数机制:一般装奇数台
leader与follower,投票选举出来的leader
详情可以查看选举部分代码
都在 Election
接口的实现里:
package org.apache.zookeeper.server.quorum;
public interface Election {
Vote lookForLeader() throws InterruptedException;
void shutdown();
}
主要是FastLeaderElection
类。
节点类型
- 持久:客户端和服务端断开连接后节点不删除
- 持久化目录节点:目录不变
- 持久化顺序编号目录节点:断开后会自动编号
- 短暂:断掉就删除了
- 临时目录节点
- 临时目录编号节点
Stat结构体
建议先看shell操作。
- czxid: 创建节点的事务zxid:zxid唯一,时间戳,代表顺序
- ctime: znode被创建的毫秒数
- mzxid:最后更新的zxid
- mtime:最后修改的毫秒数
- pZxid: 最后更新的子节点的zxid
- cversion: znode字节点变化号,znode子节点修改的次数
- dataversion:数据变化号
- aclVersion:访问控制列表的变化号
- ephemeralOwner:如果是临时节点,这个代表znode拥有者的session id,如果不是则为0
- dataLength:数据长度
- numChildren:子节点数量
监听器原理
- 客户端:首先有一个main线程
- 客户端:main线程里创建2个线程:一个负责连接通信(connector),一个负责监听(listener)
- 客户端:通过connector把注册的监听事件发给zk服务器
- 服务器:zk的注册监听器列表(服务器)将注册的监听事件添加到列表中
- 服务器:zk服务器监听到有数据路径变化,就会将消息发给listener线程
- 客户端:listener内部调用process方法
常见监听类型:
- 监听节点数据的变化:
get path watch
- 监听子节点增减的变化:
ls path watch
写数据的流程
client----》发送写请求—》server1
- 如果server1不是leader,则会把请求转发给leader,leader会把请求广播给各个server,比如server1和server2,各个server写成功了就会通知leader
- 当leader收到大多数(过半)的server写成功的回复后,就认为数据写成功了
- 写成功后,leader告诉server1写成功了
- server1再告诉client写成功了
操作实战
shell使用
zkCli.sh 进入客户端
help
: 显示命令ls /
:查看当前目录节点create /china "china"
: 创建节点create /china/beijing "beijing"
: 子节点get /china/beijing
: 获取节点的值create -e /china/japan "japan"
: 创建短暂节点create -s /china/cq "cq"
: 创建带有序号的节点(多次执行)set /china/beijing "home"
: 修改节点的值get /china watch
: 监听节点值的变化(注册一次有用一次)ls /china watch
: 监听节点的变化delete /china/beijing
: 删除节点rmr /china
: 递归删除
代码实践
准备maven环境
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.10</version>
</dependency>
</dependencies>
创建client
/**
* 多个以逗号分隔
*/
private static String connectString = "localhost:2181";
/**
* 超时时间
*/
private static int sessionTimeout = 2000;
/**
* client
*/
private static ZooKeeper zk;
@BeforeClass
public static void init() throws IOException {
zk = new ZooKeeper(connectString, sessionTimeout, watchedEvent -> {
});
}
@AfterClass
public static void afterClass() throws Exception {
zk.close();
}
代码实现CRUD
@Test
public void createNode() throws KeeperException, InterruptedException {
String path = zk.create("/super", "jimo".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
assertEquals("/super", path);
}
@Test
public void getNode() throws KeeperException, InterruptedException {
List<String> children = zk.getChildren("/", false);
children.forEach(System.out::println);
}
@Test
public void getNodeWatch() throws KeeperException, InterruptedException {
zk.getChildren("/", watcher -> {
try {
getNode();
} catch (Exception e) {
e.printStackTrace();
}
});
TimeUnit.MINUTES.sleep(1L);
}
@Test
public void nodeExist() throws KeeperException, InterruptedException {
Stat exists = zk.exists("/super", false);
assertNotNull(exists);
Stat existNon = zk.exists("/hehe", false);
assertNull(existNon);
}