bboyjing's blog

Zookeeper学习笔记六【开源客户端之Curator】

本章我们来看下另外一个更强大的客户端Curator。

环境准备

zookeeper-sample项目中新建名为curator的module,curator版本如下:

1
2
3
4
5
6
7
8
<!-- curator -->
<dependencies>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.11.0</version>
</dependency>
</dependencies>

创建会话

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//使用curator来创建一个ZooKeeper客户端
public class CreateSessionSample {
public static void main(String[] args) throws Exception{
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("localhost:2181")
.sessionTimeoutMs(5000)
.connectionTimeoutMs(3000)
.retryPolicy(retryPolicy)
.namespace("base").build();
client.start();
System.out.println("ZooKeeper session established.");
}
}

Curator Api增加了Fluent设计风格,另外还有独立的命名空间功能,即指定一个Zookeeper根路径,基于上述client的操作就都在/base节点下。

创建节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* 使用Curator创建节点
* ZooKeeper规定所有非叶子节点必须为持久节点
* 所以下面的例子创建的c1节点为临时节点,/zk-book节点为永久节点
*/
public class CreateNodeSample {
static String path = "/zk-book/c1";
static CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("localhost:2181")
.sessionTimeoutMs(5000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.namespace("base")
.build();
public static void main(String[] args) throws Exception {
client.start();
client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath(path, "init".getBytes());
}
}

顺便测试了创建会话章节说的命名空间功能,以上程序跑完在zookeeper中会生成两个节点:/base和/base/zk-book,通过zkCli可以清楚地看出来

1
2
3
4
[zk: localhost:2181(CONNECTED) 6] ls /
[zookeeper, base]
[zk: localhost:2181(CONNECTED) 7] ls /base
[zk-book]

删除节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//使用Curator删除节点
```java
public class DelDataSample {
static String path = "/zk-book";
static CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("localhost:2181")
.sessionTimeoutMs(5000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.namespace("base").build();
public static void main(String[] args) throws Exception {
client.start();
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath(path);
client.delete().deletingChildrenIfNeeded()
.withVersion(stat.getVersion()).forPath(path);
}
}

读取数据

//使用Curator获取数据内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class GetDataSample {
static String path = "/zk-book";
static CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("localhost:2181")
.sessionTimeoutMs(5000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.namespace("base").build();
public static void main(String[] args) throws Exception {
client.start();
client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath(path, "init".getBytes());
Stat stat = new Stat();
System.out.println(new String(client.getData().storingStatIn(stat).forPath(path)));
}
}

更新数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//使用Curator更新数据内容
public class SetDataSample {
static String path = "/zk-book";
static CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("localhost:2181")
.sessionTimeoutMs(5000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.namespace("base").build();
public static void main(String[] args) throws Exception {
client.start();
client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath(path, "init".getBytes());
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath(path);
System.out.println("Success set node for : " + path + ", new version: "
+ client.setData().withVersion(stat.getVersion()).forPath(path).getVersion());
try {
client.setData().withVersion(stat.getVersion()).forPath(path);
} catch (Exception e) {
System.out.println("Fail set node due to " + e.getMessage());
}
}
}

异步接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
//使用Curator的异步接口
public class CreateNodeBackgroundSample {
static String path = "/zk-book";
static CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("localhost:2181")
.sessionTimeoutMs(5000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.namespace("base").build();
static CountDownLatch semaphore = new CountDownLatch(2);
static ExecutorService tp = Executors.newFixedThreadPool(2);
public static void main(String[] args) throws Exception {
client.start();
System.out.println("Main thread: " + Thread.currentThread().getName());
/**
* 此处传入了自定义的Executor
* 任务将由传入的ExecutorService去处理
*/
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)
.inBackground((client1, event) -> {
System.out.println("event[code: " + event.getResultCode() + ", type: " + event.getType() + "]");
System.out.println("Thread of processResult: " + Thread.currentThread().getName());
semaphore.countDown();
}, tp).forPath(path, "init".getBytes());
//
/**
* 此处没有传入自定义的Executor
* 使用Zookeeper默认的EventThread来处理,此处是名为main-EventThread的线程
*/
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)
.inBackground((client12, event) -> {
System.out.println("event[code: " + event.getResultCode() + ", type: " + event.getType() + "]");
System.out.println("Thread of processResult: " + Thread.currentThread().getName());
semaphore.countDown();
}).forPath(path, "init".getBytes());
semaphore.await();
tp.shutdown();
}
}

下面几节将利用Curator处理一些典型的应用场景。