Fork me on GitHub

Zookeeper Java客户端Curator

前言

我们今天来讲讲Zookeeper Java客户端的一些使用吧。

之前我们说到Zookeeper的安装及简单使用,要介绍它的一些应用场景,要明白它的应用场景,要先理解它客户端的一些操作方法。

Zookeeper的Java客户端,最常使用的便是Apache Curator了,它是Netflix公司开源的一个Zookeeper客户端,与Zookeeper提供的原生客户端ZooKeeper相比,Curator的抽象层次更高,简化了Zookeeper客户端的开发量,而且Curator的功能更加强大。

正文

要使用Curator客户端,需要下面的两个依赖。

1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>

curator-recipes包一般能满足我们的需要,要是封装更简便的底层功能的话,curator-framework包必不可少。

创建并启动客户端

使用程序创建一个客户端client并启动(连接到Zookeeper)。

Builder模式创建一个客户端。

1
2
3
4
5
6
7
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
.connectString(zkConfig.getConnectString())
.namespace(zkConfig.getNameSpace())
.retryPolicy(new ExponentialBackoffRetry(zkConfig.getRetryInterval(), Integer.MAX_VALUE))
.connectionTimeoutMs(zkConfig.getConnectTimeoutMs())
.sessionTimeoutMs(zkConfig.getSessionTimeoutMs());
client = builder.build();

客户端启动。

1
client.start();

简单说下连接时的参数:

  1. connectString:服务器列表,逗号隔开(host1:port1,host2:port2……)
  2. namespace:命名空间,可以用来进行业务区分。
  3. retryPolicy:重试策略,有以下4种重试策略,也可以自己实现重试策略(实现RetryPolicy接口)。
    RetryOneTime:重试一次。
    RetryNTimes:重试N次(需要传入重试间隔参数sleepMsBetweenRetries,及尝试次数n),它继承了抽象类SleepingRetry(每休眠一段时间重试一次)。
    RetryForever:一直重试(需要传入重试间隔retryIntervalMs参数)。
    BoundedExponentialBackoffRetry:重试次数固定,但每次重试的时间间隔会不断变大(如果一直连不上),需要传入初始等待重试时间baseSleepTimeMs,重试次数maxRetries,及最大等待重试时间maxSleepTimeMs 参数,这个类继承ExponentialBackoffRetry(它又继承SleepingRetry)抽象类。
    
  4. sessionTimeoutMs:会话超时时间,单位毫秒,默认60000ms。
  5. connectionTimeoutMs:连接创建超时时间,单位毫秒,默认60000ms。

创建数据节点。

Zookeeper节点有4种,上篇文章已介绍。

创建持久化节点

1
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path, data);

使用create方法,creatingParentsIfNeeded这个方法保证如果有父节点也会一起创建,这在原生客户端是无法实现的。
CreateMode 有4种,跟Zookeeper的节点类型对应。
forPath方法可以认为最终操作,path表示节点路径,data表示节点数据。
data是byte数组,其它类型的数据应转换为byte数组。

注:如果不设置withMode方法,默认创建持久化节点,不设置data,节点默认内容为空。

如下:

1
client.create().forPath(path);

创建顺序节点

1
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(path, data);

创建临时节点

1
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, data);

创建临时顺序节点

1
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, data);

设置节点数据

1
client.setData().forPath(path, bytes);

直接调用setData方法即可,返回一个Stat(节点信息类)。

获取节点数据

1
client.getData().forPath(path);

使用getData方法,返回byte数组。

获取子节点

1
client.getChildren().forPath(path);

使用getChildren方法,返回一个子节点List <String> 列表,数据为各个子节点名称。

删除节点

1
client.delete().guaranteed().forPath(path);

使用delete方法,guaranteed方法可以保证一定删除。如果某个节点删除失败,会抛出异常,但是如果使用了guaranteed,它会在后台继续进行删除直到删除成功。

删除节点(包括子节点)

1
client.delete().guaranteed().deletingChildrenIfNeeded().forPath(path);

deletingChildrenIfNeeded方法可以保证如果有子节点的话一并删除,原生client是无法实现此功能的(需要我们写方法处理)。

判断节点是否存在

1
2
3
4
5
6
7
8
9
10
11
public boolean checkNodeExist(String path) {
boolean exits = false;
try {
Stat stat = client.checkExists().forPath(path);
if (stat != null) {
exits = true;
}
} catch (Exception e) {
}
return exits;
}

使用checkExists方法,最终返回一个Stat,如果Stat为空就说明不存在。

PS:由此我们可以创建一个 createOrUpdate方法,无节点时创建,有节点时更新内容。

1
2
3
4
5
6
7
8
9
10
11
public void createOrUpdateNode(String path, byte[] data) {
try {
if(checkNodeExist(path)){
setData(path,data);
}else{
createNode(path, data);
}
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}

异步处理

上面的操作方法,都可以使用异步进行处理的,主要使用了inBackground方法。

如下:

1
client.create().inBackground().forPath(path, data);

该方法全参函数如下,且重载了多个方法。

1
public T inBackground(BackgroundCallback callback, Object context, Executor executor);
BackgroundCallback callback:异步回调函数,处理完成后会回调此函数进行某些逻辑。
Object context:上下文对象。
Executor executor:异步处理的线程,不指定的话将使用内部默认线程处理。

我们可以看下BackgroundCallback 方法 会有两个参数。

1
2
3
public interface BackgroundCallback {
void processResult(CuratorFramework var1, CuratorEvent var2) throws Exception;
}

第二个参数CuratorEvent里面包含了此次处理结果的所有信息,包括节点信息等。

1
2
3
4
5
6
7
8
9
10
11
12
public interface CuratorEvent {
CuratorEventType getType();
int getResultCode();
String getPath();
Object getContext();
Stat getStat();
byte[] getData();
String getName();
List<String> getChildren();
List<ACL> getACLList();
WatchedEvent getWatchedEvent();
}

CuratorEventType表示事件类型,表示此次操作的事件类型。可以看到它与CuratorFramework里的方法是一一对应的。

upload successful

getResultCode返回处理结果码。可以在这个枚举里查看各个状态码。

upload successful

添加watcher

1
2
client.getData().usingWatcher(watcher).forPath(path);
client.getChildren().usingWatcher(watcher).forPath(path);

使用usingWatcher结合getData或者getChildren方法可以为指定节点或者子节点添加watcher。

Watcher可以为CuratorWatcher或者Zookeeper自带的Watcher。它们有一个event参数。

upload successful

可以拿到Zookeeper的状态 KeeperState和 事件类型 EventType,从而进行某些必要的操作。

KeeperState枚举和EventType枚举如下图。

upload successful

upload successful

事务支持

Zookeeper一些操作是支持事务的。

主要用到的方法有inTransaction、and、commit等方法。举例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* 开启事务
*/
public CuratorTransaction startTransaction() {
return client.inTransaction();
}
/**
* 事务中添加create操作
*/
public CuratorTransactionFinal addCreateToTransaction(CuratorTransaction transaction, String path) throws Exception {
return transaction.create().forPath(path, new byte[0]).and();
}
/**
* 事务中添加delete操作
*/
public CuratorTransactionFinal addDeleteToTransaction(CuratorTransaction transaction, String path) throws Exception {
return transaction.delete().forPath(path).and();
}
/**
* 事务中添加seData操作
*/
public CuratorTransactionFinal addSetDataToTransaction(CuratorTransaction transaction, String path, byte[] data) throws Exception {
return transaction.setData().forPath(path, data).and();
}
/**
* 提交事务
*/
public Collection<CuratorTransactionResult> commitTransaction(CuratorTransactionFinal transaction) throws Exception {
return transaction.commit();
}

检查连接情况

1
client.getZookeeperClient().isConnected();

关闭连接

1
client.close();

提升

对Curator客户端有简单理解后,我们把它进行简单功能的封装。

PS:Curator的强大之处在于其增强功能部分,我们会在后面结合Zookeeper应用讨论。

创建项目framework-zookeeper,搭建如下结构:

upload successful

接口ZKClient,里面有一些Zookeeper客户端的协议,大致如下:

1
2
3
4
5
6
7
8
9
10
public interface ZKClient {
void start();//启动
boolean isConnected();//连接情况
void close();//关闭
void createNode(String path, byte[] data);//创建永久节点
void createOrUpdateNode(String path, byte[] data); //创建或者更新节点
void createEphemeralNode(String path, byte[] data);
String createSequenceNode(String path) ;
......
}

CuratorZKClient是Curator对接口ZKClient的实现,BaseZKClient是原生客户端对接口ZKClient的实现。

我们来写下CuratorZKClient的一些关键代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
public class CuratorZKClient implements ZKClient {
//zk客户端
private CuratorFramework client;
//关闭状态
private volatile boolean closed = false;
//zk配置
private ZKConfig zkConfig;
//连接状态监听
private Set<ConnectionListener> connectionListeners = new CopyOnWriteArraySet<>();
//节点变化监听及相关Watcher
private final ConcurrentMap<String, ConcurrentMap<NodeListener, CuratorWatcher>> nodeListeners = new ConcurrentHashMap<>();
......
}

提供一个连接监听接口,以便我们可以监听Zookeeper的连接状态并且执行某些操作。

1
2
3
public interface ConnectionListener {
void stateChanged(ZKConstants.ConnectState state);
}

及节点变化接口,监测节点变化进行某些操作。

1
2
3
4
5
public interface NodeListener {
void nodeChanged(String path, List<String> nodes);//节点改变
void nodeDelete(String path);//节点删除
void dataChanged(String path, byte[] data);//节点数据改变
}

ZKConfig是Zookeeper客户端连接的配置,属性值可以配置在properties等配置文件里。

1
2
3
4
5
6
7
8
public class ZKConfig {
private String connectString;
private String nameSpace;
private int retryInterval = 1000;
private int connectTimeoutMs = 60000;
private int sessionTimeoutMs = 60000;
......
}

CuratorZKClient 里实现接口的start方法是,部分代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
......
public void start() {
.......
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
.connectString(zkConfig.getConnectString())
.namespace(zkConfig.getNameSpace())
//重试指定的次数, 且每一次重试之间停顿的时间逐渐增加
.retryPolicy(new ExponentialBackoffRetry(zkConfig.getRetryInterval(), Integer.MAX_VALUE))
.connectionTimeoutMs(zkConfig.getConnectTimeoutMs())
.sessionTimeoutMs(zkConfig.getSessionTimeoutMs());
client = builder.build();
//添加Zookeeper状态监听
client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
@Override
public void stateChanged(CuratorFramework client, ConnectionState zkConnectionState) {
ZKConstants.ConnectState state = toConnectionListenerState(zkConnectionState);
if (state != null) {
for(ConnectionListener connectionListener : connectionListeners) {
connectionListener.stateChanged(state);
}
}
}
private ZKConstants.ConnectState toConnectionListenerState(ConnectionState zkConnectionState) {
switch (zkConnectionState) {
case LOST:
return ZKConstants.ConnectState.DISCONNECTED;
case SUSPENDED:
return ZKConstants.ConnectState.DISCONNECTED;
case CONNECTED:
return ZKConstants.ConnectState.CONNECTED;
case RECONNECTED:
return ZKConstants.ConnectState.RECONNECTED;
default:
return null;
}
}
});
client.start();
}
.....

这样我们暴露了Zookeeper的连接状态监听接口,以后想监听它的连接状态进行某些操作,直接实现接口,并通过addConnectionListener添加进来即可。

1
2
3
4
5
6
7
8
@Override
public void addConnectionListener(ConnectionListener listener) {
connectionListeners.add(listener);
}
@Override
public void removeConnectionListener(ConnectionListener listener) {
connectionListeners.remove(listener);
}

其它的方法,比如createNode、deleteNode等,我们拿到client后,按照上面讲述的各个操作便可以写出代码,这里不再赘述。以下是createNode的例子。

1
2
3
4
5
6
7
8
9
10
@Override
public void createNode(String path, byte[] data) {
try {
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path, data);
} catch (KeeperException.NodeExistsException e) {
log.warn(String.format("create node is exist:%s", path));
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}

再说一下需要实现的NodeListener方法,节点发生变化,主要通过watcher通知。
实现一个watcher。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
private class CuratorWatcherImpl implements CuratorWatcher {
private volatile NodeListener listener;
public CuratorWatcherImpl(NodeListener listener) {
this.listener = listener;
}
public void unWatch() {
this.listener = null;
}
@Override
public void process(WatchedEvent event) throws Exception {
if (listener != null) {
log.debug(event.getPath() + " with event " + event.getType());
switch (event.getType()) {
//节点数据变化,调用listener指定方法
case NodeDataChanged:
try {
byte[] data = client.getData().usingWatcher(this).forPath(event.getPath());
log.debug(event.getPath() + " data after change: " + new String(data));
listener.dataChanged(event.getPath(), data);
} catch (Exception e) {
log.warn(e.getMessage(), e);
}
break;
//节点删除
case NodeDeleted:
//节点创建
case NodeCreated:
log.error(event.getPath());
//自己点改变
case NodeChildrenChanged:
try {
if (event.getType().equals(Watcher.Event.EventType.NodeDeleted)) {
listener.nodeDelete(event.getPath());
} else {
List<String> nodes = getNodes(event.getPath());
if (nodes != null) {
client.getChildren().usingWatcher(this).forPath(event.getPath());
}
//监控子节点数据变化
//for(String node : nodes) {
// client.getData().usingWatcher(this).forPath( ZKPathMgr.joinPath(event.getPath(),node));
//}
log.debug(event.getPath() + " nodes after change: " + nodes);
listener.nodeChanged(event.getPath(), nodes);
}
} catch (KeeperException.NoNodeException e) {
log.warn(e.getMessage());
} catch (Exception e) {
log.warn(e.getMessage(), e);
}
break;
case None:
default:
break;
}
}
}
}

然后实现NodeListener的添加移除。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Override
public void addNodeListener(String path, NodeListener listener) {
ConcurrentMap<NodeListener, CuratorWatcher> listeners = nodeListeners.get(path);
if (listeners == null) {
nodeListeners.putIfAbsent(path, new ConcurrentHashMap<NodeListener, CuratorWatcher>());
listeners = nodeListeners.get(path);
}
CuratorWatcher watcher = listeners.get(listener);
if (watcher == null) {
listeners.putIfAbsent(listener, new CuratorWatcherImpl(listener));
watcher = listeners.get(listener);
}
addChildrenCuratorWatcher(path, watcher);
}

@Override
public void removeNodeListener(String path, NodeListener listener) {
ConcurrentMap<NodeListener, CuratorWatcher> listeners = nodeListeners.get(path);
if (listeners != null) {
CuratorWatcher watcher = listeners.remove(listener);
if (watcher != null) {
((CuratorWatcherImpl) watcher).unWatch();
}
}
}
//添加目录watcher
private void addChildrenCuratorWatcher(final String path, CuratorWatcher watcher) {
try {
client.getData().usingWatcher(watcher).forPath(path);
client.getChildren().usingWatcher(watcher).forPath(path);
} catch (KeeperException.NoNodeException e) {
log.warn(String.format("add watcher node not exist:%s", path));
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}

可以看到主要是是维护一个ConcurrentHashMap,listener为key,watcher为value,节点有变化,通知到listener。

好。到这里基本上一个Zookeeper工具客户端就OK了,BaseZKClient的实现与CuratorZKClient类似,有兴趣的可以自己看看。

测试

我们测试下我们的代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public static void main(String[] args) throws Exception{
ZKConfig config = new ZKConfig();
config.setConnectString("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183");
config.setConnectTimeoutMs(60000);
config.setNameSpace("zwt");
config.setRetryInterval(1000);
config.setSessionTimeoutMs(60000);
ZKClient client = new CuratorZKClient(config);
client.addConnectionListener(new ConnectionListener() {
@Override
public void stateChanged(ZKConstants.ConnectState state) {
System.out.println("ZKState state "+ state.name());
}
});
client.start();
client.createNode("/mytest","Hello World");
System.out.println(new String(client.getData("/mytest"),"UTF-8"));
client.close();
}

运行可以看到输出:

upload successful

我们使用命令行也可以看到我们新增的test节点及其属性。

upload successful

PS:可以看到nameSpace 业务命名空间相当于新增一个根节点以区分不同业务,避免节点冲突等作用。

我们在client启动后添加watcher。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
......
client.start();
client.addNodeListener("/mytest", new NodeListener() {
@Override
public void nodeChanged(String path, List<String> nodes) {
System.out.println(path+" node changed");
}
@Override
public void nodeDelete(String path) {
System.out.println(path+" node delete");
}
@Override
public void dataChanged(String path, byte[] data) {
System.out.println(path+" data changed "+ data);
}
});
System.out.println(new String(client.getData("/mytest"),"UTF-8"));
client.setData("/mytest","World");
System.out.println(new String(client.getData("/mytest"),"UTF-8"));
client.createNode("/mytest/test");
client.deleteNodeWithChildren("/mytest");
......

继续测试,结果如下。

upload successful

到这里,我们基本把客户端操作的基本说完了。关于其它一些Zookeeper客户端,这里就不在过多介绍了,有兴趣的可以继续实现ZKClient接口去完成。

PS:BaseZKClient类是我写的一个原生Zookeeper客户端的集成工具,但有些小问题未处理。

总结

通过使用Zookeeper客户端的一些例子,更对Zookeeper有了更深入的了解。

下面的文章我们将结合Curator的一些高级功能及Zookeeper的一些应用来了解Zookeeper的强大之处。




-------------文章结束啦 ~\(≧▽≦)/~ 感谢您的阅读-------------

SakuraTears wechat
扫一扫关注我的公众号
您的支持就是我创作的动力!
0%