数据存储
基于java API初探zookeeper的使用
深入分析Watcher机制的实现原理
Curator客户端的使用,简单高效
数据存储
基于znode,基于文件系统风格的,树形结构的文件模型,和内存数据库差不多,基于增删改查的命令去操作数据库,整个数据库包括整个树形结构的内容,比如说我们的节点目录,节点路径和权限信息,而且zookeeper它会定时去吧这些信息数据存储到磁盘上。
使用DataTree管理整个数据存储,主要用来存储节点路径和数据的内容。底层是一个典型的基于ConcurrentHashMap的一个存储。
事务日志
创造一个节点或者进行事务操作的时候,都会记录一个事务日志,
zoo.cfg 文件中 , datadir
快照日志
它会记录zookeeper的某个时刻的快照,记录整个zookeeper全量数据的内容,类似于我们数据备份的一个概念,并且他会把这些文件放在指定的目录下去做一个存储,也是基于datadir这样一个路径去做一个存储
运行时日志
bin/zookeeper.out
我们可以通过它来看zookeeper运行时的一些情况。
实际应用过程中,我们会把事务日志单独挂载到一个磁盘上,我们的每一个请求,事务请求都会做一个输入日志的记录,那这个磁盘的读写的性能,将影响到zookeeper本身的操作的性能。
基于Java API初探zookeeper的使用
sh zkCli.sh 基于控制台
<dependency>
<groupId>com.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.8</version>
</dependency>
<dependency>
<groupId>com.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.8</version>
</dependency>
2
3
4
5
引入包以后就可以使用相应的API去做相应的操作,
代码
public class ConnectionDemo {
public static void main(String[] args) {
try {
ZooKeeper zooKeeper =
new ZooKeeper("192.168.136.128:2181," +
"192.168.136.129:2181," +
"192.168.136.130:2181",
4000,null);
System.out.println(zooKeeper.getState());
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class ConnectionDemo {
public static void main(String[] args) {
try {
ZooKeeper zooKeeper =
new ZooKeeper("192.168.136.128:2181," +
"192.168.136.129:2181," +
"192.168.136.130:2181",
4000,null);
System.out.println(zooKeeper.getState());
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
控制台
log4j:WARN No appenders could be found for logger (org.apache.zookeeper.ZooKeeper).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
CONNECTING
log4j:WARN No appenders could be found for logger (org.apache.zookeeper.ZooKeeper).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
CONNECTING
2
3
4
连接以后
显示连接状态为 connecting 等待以后connected
引入watcher机制,new watcher() 直接输出链接状态 connected
连接,
代码
public class ConnectionDemo {
public static void main(String[] args) {
try {
ZooKeeper zooKeeper =
new ZooKeeper("192.168.136.128:2181," +
"192.168.136.129:2181," +
"192.168.136.130:2181",
4000,null);
System.out.println(zooKeeper.getState());
Thread.sleep(1000);
System.out.println(zooKeeper.getState());
zooKeeper.close();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class ConnectionDemo {
public static void main(String[] args) {
try {
ZooKeeper zooKeeper =
new ZooKeeper("192.168.136.128:2181," +
"192.168.136.129:2181," +
"192.168.136.130:2181",
4000,null);
System.out.println(zooKeeper.getState());
Thread.sleep(1000);
System.out.println(zooKeeper.getState());
zooKeeper.close();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
控制台
log4j:WARN No appenders could be found for logger (org.apache.zookeeper.ZooKeeper).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
CONNECTING
CONNECTED
log4j:WARN No appenders could be found for logger (org.apache.zookeeper.ZooKeeper).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
CONNECTING
CONNECTED
2
3
4
5
Stat 对应控制台的各项属性
Watcher实现保证连接成功
public class ConnectionDemo {
public static void main(String[] args) {
try {
final CountDownLatch countDownLatch = new CountDownLatch(1);
ZooKeeper zooKeeper =
new ZooKeeper("192.168.136.128:2181," +
"192.168.136.129:2181," +
"192.168.136.130:2181",
4000, new Watcher() {
// 为了保证连接的成功状态
@Override
public void process(WatchedEvent watchedEvent) {
// 连接成功以后触发watcher事件
if(Event.KeeperState.SyncConnected == watchedEvent.getState()){
// 如果收到了服务端的响应事件,连接成功
// 连接成功以后做一个递减。
countDownLatch.countDown();
}
}
});
countDownLatch.await();
System.out.println(zooKeeper.getState());//CONNECTED
zooKeeper.close();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class ConnectionDemo {
public static void main(String[] args) {
try {
final CountDownLatch countDownLatch = new CountDownLatch(1);
ZooKeeper zooKeeper =
new ZooKeeper("192.168.136.128:2181," +
"192.168.136.129:2181," +
"192.168.136.130:2181",
4000, new Watcher() {
// 为了保证连接的成功状态
@Override
public void process(WatchedEvent watchedEvent) {
// 连接成功以后触发watcher事件
if(Event.KeeperState.SyncConnected == watchedEvent.getState()){
// 如果收到了服务端的响应事件,连接成功
// 连接成功以后做一个递减。
countDownLatch.countDown();
}
}
});
countDownLatch.await();
System.out.println(zooKeeper.getState());//CONNECTED
zooKeeper.close();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
控制台
log4j:WARN No appenders could be found for logger (org.apache.zookeeper.ZooKeeper).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
CONNECTED
log4j:WARN No appenders could be found for logger (org.apache.zookeeper.ZooKeeper).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
CONNECTED
2
3
4
参数都需要穿进去version
Stat.getVersion()
增删改查
public class ConnectionDemo {
public static void main(String[] args) {
try {
final CountDownLatch countDownLatch = new CountDownLatch(1);
ZooKeeper zooKeeper =
new ZooKeeper("192.168.136.128:2181," +
"192.168.136.129:2181," +
"192.168.136.130:2181",
4000, new Watcher() {
// 为了保证连接的成功状态
@Override
public void process(WatchedEvent watchedEvent) {
// 连接成功以后触发watcher事件
if(Event.KeeperState.SyncConnected == watchedEvent.getState()){
// 如果收到了服务端的响应事件,连接成功
// 连接成功以后做一个递减。
countDownLatch.countDown();
}
}
});
countDownLatch.await();
System.out.println(zooKeeper.getState());//CONNECTED
// 添加节点
zooKeeper.create("/java-con-darian","232".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
Stat stat = new Stat();
//Stat和我们在控制台看到的属性是一样的。
// 得到节点的值
byte[] dataBytes = zooKeeper.getData("/java-con-darian", null, stat);
System.out.println(new String(dataBytes));
Thread.sleep(1000);
// 修改节点的值,乐观锁的概念
zooKeeper.setData("/java-con-darian", "55".getBytes(),stat.getVersion());
byte[] updateBytes = zooKeeper.getData("/java-con-darian", null, stat);
System.out.println(new String(updateBytes));
zooKeeper.delete("/java-con-darian", stat.getVersion());
zooKeeper.close();
// 当前线程进行阻塞
System.in.read();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
public class ConnectionDemo {
public static void main(String[] args) {
try {
final CountDownLatch countDownLatch = new CountDownLatch(1);
ZooKeeper zooKeeper =
new ZooKeeper("192.168.136.128:2181," +
"192.168.136.129:2181," +
"192.168.136.130:2181",
4000, new Watcher() {
// 为了保证连接的成功状态
@Override
public void process(WatchedEvent watchedEvent) {
// 连接成功以后触发watcher事件
if(Event.KeeperState.SyncConnected == watchedEvent.getState()){
// 如果收到了服务端的响应事件,连接成功
// 连接成功以后做一个递减。
countDownLatch.countDown();
}
}
});
countDownLatch.await();
System.out.println(zooKeeper.getState());//CONNECTED
// 添加节点
zooKeeper.create("/java-con-darian","232".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
Stat stat = new Stat();
//Stat和我们在控制台看到的属性是一样的。
// 得到节点的值
byte[] dataBytes = zooKeeper.getData("/java-con-darian", null, stat);
System.out.println(new String(dataBytes));
Thread.sleep(1000);
// 修改节点的值,乐观锁的概念
zooKeeper.setData("/java-con-darian", "55".getBytes(),stat.getVersion());
byte[] updateBytes = zooKeeper.getData("/java-con-darian", null, stat);
System.out.println(new String(updateBytes));
zooKeeper.delete("/java-con-darian", stat.getVersion());
zooKeeper.close();
// 当前线程进行阻塞
System.in.read();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
控制台
CONNECTED
232
55
CONNECTED
232
55
2
3
事件机制
我们对一个节点发起订阅以后,那么对这个节点发起的任何变化,它都会有一个触发,发给触发前绑定的客户端,相当于一个发布订阅的功能。
通过watcher机制,保证刚开始的连接状态是connected.
Watcher机制是zookeeper里边非常重要的一个特性,那么我们在zookeeper上创建节点的时候,我们可以绑定监听事件,比如说,我们可以监听节点的变更,删除,子节点变更的一些事件,通过这些事件,可以实现zookeeper的分布式锁 ,集群管理的一些功能。
Watcher特性:当数据发生变化的时候,zookeeper会产生一个watcher事件,并且发送到客户端,但客户端只会收到一次通知,如果后续这个节点再次发生变化,那么之前设置的watcher的客户端不会再次收到消息(watcher机制是一次性操作)。可以通过循环监听达到永久的监听效果。
如何注册事件机制
通过三个操作来绑定事件
getData、Exists、getChildren
getData:通过一个结点获得一个节点的数据
Exists:判断这个节点是否存在
getChildren:可以拿到当前节点的所有子节点
通过设置节点的路径,相当于给节点绑定了事件
如何触发事件?
凡是事务类型的操作,都会触发监听操作 create/delete/setData
zookeeper.exists(event.getPath().true);
true :默认使用全局的默认事件。
代码
public class WatcherDemo {
public static void main(String[] args) {
try {
final CountDownLatch countDownLatch = new CountDownLatch(1);
final ZooKeeper zooKeeper =
new ZooKeeper("192.168.136.128:2181," +
"192.168.136.129:2181," +
"192.168.136.130:2181",
4000, new Watcher() {
// 为了保证连接的成功状态
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("全局的默认的watcher事件:"
+ watchedEvent.getType()
+ "----->>>>>>"
+ watchedEvent.getPath());
// 连接成功以后触发watcher事件
if (Event.KeeperState.SyncConnected == watchedEvent.getState()) {
// 如果收到了服务端的响应事件,连接成功
// 连接成功以后做一个递减。
countDownLatch.countDown();
}
}
});
countDownLatch.await();
zooKeeper.create("/zk-tmp-darian", "1".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
// exists getdata getchildren
// 绑定事件
// 我们在创建zookeeper的时候创建了一个默认的匿名内部类,
// 如果说设置成 true 的话,所有的事件都会触发到内部类里边
// zooKeeper.exists("/zi-tmp-darian",true);
// 绑定自己的watcher事件
// 通过 exists 绑定事件
Stat stat = zooKeeper.exists("/zk-tmp-darian", new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println(watchedEvent.getType() + "--->>>>" + watchedEvent.getPath());
try {
// 通过再一次绑定事件去实现永久监听的一个效果
zooKeeper.exists(watchedEvent.getPath(), true);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
// 通过修改的食物类型操作来触发监听事件
stat = zooKeeper.setData("/zk-tmp-darian", "222".getBytes(), stat.getVersion());
Thread.sleep(1000);
zooKeeper.delete("/zk-tmp-darian", stat.getVersion());
System.in.read();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
}
public class WatcherDemo {
public static void main(String[] args) {
try {
final CountDownLatch countDownLatch = new CountDownLatch(1);
final ZooKeeper zooKeeper =
new ZooKeeper("192.168.136.128:2181," +
"192.168.136.129:2181," +
"192.168.136.130:2181",
4000, new Watcher() {
// 为了保证连接的成功状态
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("全局的默认的watcher事件:"
+ watchedEvent.getType()
+ "----->>>>>>"
+ watchedEvent.getPath());
// 连接成功以后触发watcher事件
if (Event.KeeperState.SyncConnected == watchedEvent.getState()) {
// 如果收到了服务端的响应事件,连接成功
// 连接成功以后做一个递减。
countDownLatch.countDown();
}
}
});
countDownLatch.await();
zooKeeper.create("/zk-tmp-darian", "1".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
// exists getdata getchildren
// 绑定事件
// 我们在创建zookeeper的时候创建了一个默认的匿名内部类,
// 如果说设置成 true 的话,所有的事件都会触发到内部类里边
// zooKeeper.exists("/zi-tmp-darian",true);
// 绑定自己的watcher事件
// 通过 exists 绑定事件
Stat stat = zooKeeper.exists("/zk-tmp-darian", new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println(watchedEvent.getType() + "--->>>>" + watchedEvent.getPath());
try {
// 通过再一次绑定事件去实现永久监听的一个效果
zooKeeper.exists(watchedEvent.getPath(), true);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
// 通过修改的食物类型操作来触发监听事件
stat = zooKeeper.setData("/zk-tmp-darian", "222".getBytes(), stat.getVersion());
Thread.sleep(1000);
zooKeeper.delete("/zk-tmp-darian", stat.getVersion());
System.in.read();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
控制台
全局的默认的watcher事件:None----->>>>>>null
NodeDataChanged--->>>>/zk-tmp-darian
全局的默认的watcher事件:NodeDeleted----->>>>>>/zk-tmp-darian
全局的默认的watcher事件:None----->>>>>>null
NodeDataChanged--->>>>/zk-tmp-darian
全局的默认的watcher事件:NodeDeleted----->>>>>>/zk-tmp-darian
2
3
Watcher事件机制:
时间类型
None(-1) 客户端链接状态发生变化的时候,会受到none的事件
NodeCreated(1) 创建节点的事件。比如说zk-persis-mic
NodeDeleted(2) 删除节点的事件。
NodeDataChanged(3) 节点数据发生变更
NodeChildrenChanged(4) 子节点被创建、被删除、会发生事件触发
触发条件
zk-persis-mic(监听事件) | zk-persis-mic/child(监听事件) | |
---|---|---|
create(/zk-persis-mic) | NodeCreated(exists/getData) | 无 |
delete(/zk-persis-mic) | NodeDelete(exists/getData) | 无 |
setData(/zk-persis-mic) | NodeDataChanged(exists/getData) | 无 |
create(/zk-persis-mic/children) | NodeChildrenChanged(getChildren) | NodeCreated |
delete(/zk-persis-mic/children) | NodeChildrenChanged(getChildren) | NodeDelete |
setData(/zk-persis-mic/children) | NodeDataChanged |
事件的实现原理
通过exists方法去绑定的事件
入口就是exists
Zookeeper类中的exists方法
public Stat exists(final String path, Watcher watcher)
throws KeeperException, InterruptedException{
final String clientPath = path;
PathUtils.validatePath(clientPath);
// the watch contains the un-chroot path
WatchRegistration wcb = null;
if (watcher != null) {
wcb = new ExistsWatchRegistration(watcher, clientPath);
}
final String serverPath = prependChroot(clientPath);
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.exists);
ExistsRequest request = new ExistsRequest();
request.setPath(serverPath);
request.setWatch(watcher != null);
SetDataResponse response = new SetDataResponse();
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
if (r.getErr() != 0) {
if (r.getErr() == KeeperException.Code.NONODE.intValue()) {
return null;
}
throw KeeperException.create(KeeperException.Code.get(r.getErr()),
clientPath);
}
return response.getStat().getCzxid() == -1 ? null : response.getStat();
}
public Stat exists(final String path, Watcher watcher)
throws KeeperException, InterruptedException{
final String clientPath = path;
PathUtils.validatePath(clientPath);
// the watch contains the un-chroot path
WatchRegistration wcb = null;
if (watcher != null) {
wcb = new ExistsWatchRegistration(watcher, clientPath);
}
final String serverPath = prependChroot(clientPath);
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.exists);
ExistsRequest request = new ExistsRequest();
request.setPath(serverPath);
request.setWatch(watcher != null);
SetDataResponse response = new SetDataResponse();
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
if (r.getErr() != 0) {
if (r.getErr() == KeeperException.Code.NONODE.intValue()) {
return null;
}
throw KeeperException.create(KeeperException.Code.get(r.getErr()),
clientPath);
}
return response.getStat().getCzxid() == -1 ? null : response.getStat();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
我们client在zookeeper server上注册一个事件
ZkWatcher Manager 上保存客户端的事件注册
源码地址:Zookeeper核心类中exists(
分析原理:
在客户端调用里边就是实例化,
调用增删改查方法
一定有一个机制区队这个队列去做一个数据的处理
OutgoingQueue
有一个入队肯定有一个出对
数据不是放大队列中就完事了,肯定有机制去对队列做一个出队列
判断是引导流程走向的一个方式!
Ctrl + alt + B 点到方法上,可以看到有几个实现
NettyServerCnxn
服务端处理请求的类
receiveMessage()方法
通过链式的process对请求去做一个链式的处理,把不同的业务去做一个分离
预处理 > 同步处理 > finally处理
PIPE(管道模式)
Linux 的管道的处理,一个请求过来,进入不同的管道,做不同的处理,最终返回一个结果。
类似于链式风格的一种设计。
Zookeeper中大部分都在用多线程的方式异步化流程。
一定有一个线程去处理队列 submittRequests。
Hashmap<watcher, String path>
Curator的操作
主要是对我们数据节点操作的一个封装
原生操作会有很多不便之处,虽然说灵活性比较好,但是我们要做一些事情,比如说,要去绑定事件很复杂,我们要去创建一个连接也很复杂,那怎么去简化?
Netflix公司开源了一个zookeeper客户端,他跟原生的客户端相比,它是一个更高层次的抽象,他不光是API层次的一个抽象,同时还封装了一套基于应用场景的API,也就是说zookeeper它本身能够实现分布式锁,leader选举这些功能,那么curator对这些功能做了一些封装,我们直接调用API就好了。就可以完成leader选举的这样一个机制,那么我们怎么实现这样一个功能。
基于fluent风格的一个机制
CuratorFramework curatorFamework = CuratorFrameworkFactory
.builder()
.connectString()
.sessionTimeoutMS(4000)
.retryPolicy(new ExponetialBackoffRetry(1000,3))
.namespace(“curator”)
.build()
// 结果: /curator/mic/node1
// 原生api中,必须是逐层创建,也就是父节点必须存在,子节点才能创建
Curator的增删改查
package com.gupao.study.vip;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
/**
* <br>类说明 :
* <br>属性说明:
* <br>作者:Darian
**/
public class CuratorDemo {
public static void main(String[] args) throws Exception {
CuratorFramework curatorFramework = CuratorFrameworkFactory
.builder()
.connectString("192.168.136.128:2181," +
"192.168.136.129:2181," +
"192.168.136.130:2181")
.sessionTimeoutMs(4000)
// 一个重试机制,递减的重试机制
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
// 指定命名空间,我们会把某个业务放在一个大的命名空间下面
// 因为我们要区分每个场景下的业务划分
.namespace("/curator")
.build();
curatorFramework.start();
// 创建结果: /curator/darian/node1
// 原生的 API 中,必须是逐层创建,也就是父节点必须存在,子节点才能创建
curatorFramework.create()
.creatingParentContainersIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath("/darian/node1", "2".getBytes());
// 查询节点,得到节点的 Stat 信息
Stat stat = new Stat();
curatorFramework.getData()
.storingStatIn(stat)
.forPath("/darian/node1");
// 更新操作
curatorFramework.setData()
.withVersion(stat.getVersion())
.forPath("/darian/node1", "232".getBytes());
// 去删除某个节点
curatorFramework.delete()
.deletingChildrenIfNeeded()
.forPath("/darian/node1");
}
}
package com.gupao.study.vip;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
/**
* <br>类说明 :
* <br>属性说明:
* <br>作者:Darian
**/
public class CuratorDemo {
public static void main(String[] args) throws Exception {
CuratorFramework curatorFramework = CuratorFrameworkFactory
.builder()
.connectString("192.168.136.128:2181," +
"192.168.136.129:2181," +
"192.168.136.130:2181")
.sessionTimeoutMs(4000)
// 一个重试机制,递减的重试机制
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
// 指定命名空间,我们会把某个业务放在一个大的命名空间下面
// 因为我们要区分每个场景下的业务划分
.namespace("/curator")
.build();
curatorFramework.start();
// 创建结果: /curator/darian/node1
// 原生的 API 中,必须是逐层创建,也就是父节点必须存在,子节点才能创建
curatorFramework.create()
.creatingParentContainersIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath("/darian/node1", "2".getBytes());
// 查询节点,得到节点的 Stat 信息
Stat stat = new Stat();
curatorFramework.getData()
.storingStatIn(stat)
.forPath("/darian/node1");
// 更新操作
curatorFramework.setData()
.withVersion(stat.getVersion())
.forPath("/darian/node1", "232".getBytes());
// 去删除某个节点
curatorFramework.delete()
.deletingChildrenIfNeeded()
.forPath("/darian/node1");
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
CuratorFramework.create()
.creatingPatentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath(“/mic/node1”,”1”.getBytes());
//删除
curatorFramework
.delete()
.deletingChildrenIfNeeded()
.forPath(“/mic/node1”);
Stat stat = new Stat();
curatorFramework
.getData()
.storingStatIn(stat).forPath(“/mic/node1”)
curatorFramework.setData().
withVersion(sta.getVersion())
.forPath(“/mic/node1”,”xx”.getBytes);
curatorFramework.close(); //关闭连接
curator 事件的高度封装,
一定有一个节点特性
PathChildCache 监听一个节点下子节点的创建、删除、更新
(CHILD-ADDED,CHILD_UPDATE,CHILD_REMOVED)
NodeCache 监听一个结点的更新和创建时间
(Receive Event:/mic)
TreeCache zonghe1PathChildCache和NodeCache的特性
Curator的watcher机制
package com.gupao.study.vip;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.retry.ExponentialBackoffRetry;
/**
* <br>类说明 : 事件的高度封装
* <p>
* curator封装了三种事件的机制
* PathChildCache 监听一个节点下节点的创建、删除、更新
* NodeCache 监听一个节点的更新和创建事件
* TreeCache 综合 PatchChildCache 和 NodeCache 的特性
*
* <br>属性说明:
* <br>作者:Darian
**/
public class CuratorWatcherDemo {
public static void main(String[] args) throws Exception {
CuratorFramework curatorFramework = CuratorFrameworkFactory
.builder()
.connectString("192.168.136.128:2181," +
"192.168.136.129:2181," +
"192.168.136.130:2181")
.sessionTimeoutMs(4000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.namespace("curator")
.build();
curatorFramework.start();
// 当前节点的创建、删除事件监听
// addListenerWithNodeCache(curatorFramework, "/darian");
// 子节点的增加、修改、删除的事件监听
// addlistenerWithPathChildCache(curatorFramework,"/darian");
// 综合节点的监听事件
addListerWithTreeCache(curatorFramework, "/darian");
System.in.read();
}
/**
* NodeCache + PathChildrenCache
**/
public static void addListerWithTreeCache(CuratorFramework curatorFramework, String path) throws Exception {
final TreeCache treeCache = new TreeCache(curatorFramework,path);
TreeCacheListener treeCacheListener = new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
System.out.println(treeCacheEvent.getType()+"-->>"+treeCacheEvent.getData());
}
};
treeCache.getListenable().addListener(treeCacheListener);
treeCache.start();
/*
INITIALIZED-->>null
NODE_ADDED-->>ChildData{path='/darian', stat=51539607621,51539607621,1531222950294,1531222950294,0,0,0,0,1,0,51539607621
, data=[49]}
NODE_ADDED-->>ChildData{path='/darian/node1', stat=51539607622,51539607622,1531222969992,1531222969992,0,0,0,0,1,0,51539607622
, data=[49]}
NODE_ADDED-->>ChildData{path='/darian/node1/node1', stat=51539607623,51539607623,1531222989856,1531222989856,0,0,0,0,1,0,51539607623
, data=[49]}
NODE_REMOVED-->>ChildData{path='/darian/node1/node1', stat=51539607623,51539607623,1531222989856,1531222989856,0,0,0,0,1,0,51539607623
, data=[49]}
NODE_UPDATED-->>ChildData{path='/darian/node1', stat=51539607622,51539607625,1531222969992,1531223116477,1,2,0,0,1,0,51539607624
, data=[50]}
NODE_REMOVED-->>ChildData{path='/darian/node1', stat=51539607622,51539607625,1531222969992,1531223116477,1,2,0,0,1,0,51539607624
, data=[50]}
NODE_REMOVED-->>ChildData{path='/darian', stat=51539607621,51539607621,1531222950294,1531222950294,0,2,0,0,1,0,51539607626
, data=[49]}
*/
}
/**
* 监听对应节点下的子节点的变化
* 创建
* 删除
* 更新
*/
public static void addlistenerWithPathChildCache(CuratorFramework curatorFramework, String path) throws Exception {
final PathChildrenCache pathChildrenCache =
new PathChildrenCache(curatorFramework, path, true);
PathChildrenCacheListener pathChildrenCacheListener = new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
System.out.println("receive Event:"
+ pathChildrenCacheEvent.getType()
+ "-->>"
+ pathChildrenCacheEvent.getData());
}
};
pathChildrenCache.getListenable().addListener(pathChildrenCacheListener);
pathChildrenCache.start(PathChildrenCache.StartMode.NORMAL);
/*
receive Event:CONNECTION_RECONNECTED-->>null
receive Event:CHILD_ADDED-->>ChildData{path='/darian/node1', stat=51539607611,51539607611,1531222192297,1531222192297,0,0,0,0,1,0,51539607611
, data=[49]}
receive Event:CHILD_REMOVED-->>ChildData{path='/darian/node1', stat=51539607611,51539607611,1531222192297,1531222192297,0,0,0,0,1,0,51539607611
, data=[49]}
*/
}
/**
* 监听这个节点的变化
* 创建
* 更新
**/
public static void addListenerWithNodeCache(CuratorFramework curatorFramework, String path) throws Exception {
final NodeCache nodeCache = new NodeCache(curatorFramework, path, false);
NodeCacheListener nodeCacheListener = new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println("Receive Event:" + nodeCache.getCurrentData().getPath());
}
};
nodeCache.getListenable().addListener(nodeCacheListener);
nodeCache.start();
}
}
package com.gupao.study.vip;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.retry.ExponentialBackoffRetry;
/**
* <br>类说明 : 事件的高度封装
* <p>
* curator封装了三种事件的机制
* PathChildCache 监听一个节点下节点的创建、删除、更新
* NodeCache 监听一个节点的更新和创建事件
* TreeCache 综合 PatchChildCache 和 NodeCache 的特性
*
* <br>属性说明:
* <br>作者:Darian
**/
public class CuratorWatcherDemo {
public static void main(String[] args) throws Exception {
CuratorFramework curatorFramework = CuratorFrameworkFactory
.builder()
.connectString("192.168.136.128:2181," +
"192.168.136.129:2181," +
"192.168.136.130:2181")
.sessionTimeoutMs(4000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.namespace("curator")
.build();
curatorFramework.start();
// 当前节点的创建、删除事件监听
// addListenerWithNodeCache(curatorFramework, "/darian");
// 子节点的增加、修改、删除的事件监听
// addlistenerWithPathChildCache(curatorFramework,"/darian");
// 综合节点的监听事件
addListerWithTreeCache(curatorFramework, "/darian");
System.in.read();
}
/**
* NodeCache + PathChildrenCache
**/
public static void addListerWithTreeCache(CuratorFramework curatorFramework, String path) throws Exception {
final TreeCache treeCache = new TreeCache(curatorFramework,path);
TreeCacheListener treeCacheListener = new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
System.out.println(treeCacheEvent.getType()+"-->>"+treeCacheEvent.getData());
}
};
treeCache.getListenable().addListener(treeCacheListener);
treeCache.start();
/*
INITIALIZED-->>null
NODE_ADDED-->>ChildData{path='/darian', stat=51539607621,51539607621,1531222950294,1531222950294,0,0,0,0,1,0,51539607621
, data=[49]}
NODE_ADDED-->>ChildData{path='/darian/node1', stat=51539607622,51539607622,1531222969992,1531222969992,0,0,0,0,1,0,51539607622
, data=[49]}
NODE_ADDED-->>ChildData{path='/darian/node1/node1', stat=51539607623,51539607623,1531222989856,1531222989856,0,0,0,0,1,0,51539607623
, data=[49]}
NODE_REMOVED-->>ChildData{path='/darian/node1/node1', stat=51539607623,51539607623,1531222989856,1531222989856,0,0,0,0,1,0,51539607623
, data=[49]}
NODE_UPDATED-->>ChildData{path='/darian/node1', stat=51539607622,51539607625,1531222969992,1531223116477,1,2,0,0,1,0,51539607624
, data=[50]}
NODE_REMOVED-->>ChildData{path='/darian/node1', stat=51539607622,51539607625,1531222969992,1531223116477,1,2,0,0,1,0,51539607624
, data=[50]}
NODE_REMOVED-->>ChildData{path='/darian', stat=51539607621,51539607621,1531222950294,1531222950294,0,2,0,0,1,0,51539607626
, data=[49]}
*/
}
/**
* 监听对应节点下的子节点的变化
* 创建
* 删除
* 更新
*/
public static void addlistenerWithPathChildCache(CuratorFramework curatorFramework, String path) throws Exception {
final PathChildrenCache pathChildrenCache =
new PathChildrenCache(curatorFramework, path, true);
PathChildrenCacheListener pathChildrenCacheListener = new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
System.out.println("receive Event:"
+ pathChildrenCacheEvent.getType()
+ "-->>"
+ pathChildrenCacheEvent.getData());
}
};
pathChildrenCache.getListenable().addListener(pathChildrenCacheListener);
pathChildrenCache.start(PathChildrenCache.StartMode.NORMAL);
/*
receive Event:CONNECTION_RECONNECTED-->>null
receive Event:CHILD_ADDED-->>ChildData{path='/darian/node1', stat=51539607611,51539607611,1531222192297,1531222192297,0,0,0,0,1,0,51539607611
, data=[49]}
receive Event:CHILD_REMOVED-->>ChildData{path='/darian/node1', stat=51539607611,51539607611,1531222192297,1531222192297,0,0,0,0,1,0,51539607611
, data=[49]}
*/
}
/**
* 监听这个节点的变化
* 创建
* 更新
**/
public static void addListenerWithNodeCache(CuratorFramework curatorFramework, String path) throws Exception {
final NodeCache nodeCache = new NodeCache(curatorFramework, path, false);
NodeCacheListener nodeCacheListener = new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println("Receive Event:" + nodeCache.getCurrentData().getPath());
}
};
nodeCache.getListenable().addListener(nodeCacheListener);
nodeCache.start();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131