前言
我们今天来讲讲Zookeeper Java客户端的一些使用吧。
之前我们说到Zookeeper的安装及简单使用,要介绍它的一些应用场景,要明白它的应用场景,要先理解它客户端的一些操作方法。
Zookeeper的Java客户端,最常使用的便是Apache Curator了,它是Netflix公司开源的一个Zookeeper客户端,与Zookeeper提供的原生客户端ZooKeeper相比,Curator的抽象层次更高,简化了Zookeeper客户端的开发量,而且Curator的功能更加强大。
正文
要使用Curator客户端,需要下面的两个依赖。
1 | <dependency> |
curator-recipes包一般能满足我们的需要,要是封装更简便的底层功能的话,curator-framework包必不可少。
创建并启动客户端
使用程序创建一个客户端client并启动(连接到Zookeeper)。
Builder模式创建一个客户端。
1 | CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder() |
客户端启动。
1 | client.start(); |
简单说下连接时的参数:
- connectString:服务器列表,逗号隔开(host1:port1,host2:port2……)
- namespace:命名空间,可以用来进行业务区分。
- retryPolicy:重试策略,有以下4种重试策略,也可以自己实现重试策略(实现RetryPolicy接口)。
RetryOneTime:重试一次。 RetryNTimes:重试N次(需要传入重试间隔参数sleepMsBetweenRetries,及尝试次数n),它继承了抽象类SleepingRetry(每休眠一段时间重试一次)。 RetryForever:一直重试(需要传入重试间隔retryIntervalMs参数)。 BoundedExponentialBackoffRetry:重试次数固定,但每次重试的时间间隔会不断变大(如果一直连不上),需要传入初始等待重试时间baseSleepTimeMs,重试次数maxRetries,及最大等待重试时间maxSleepTimeMs 参数,这个类继承ExponentialBackoffRetry(它又继承SleepingRetry)抽象类。
- sessionTimeoutMs:会话超时时间,单位毫秒,默认60000ms。
- connectionTimeoutMs:连接创建超时时间,单位毫秒,默认60000ms。
创建数据节点。
Zookeeper节点有4种,上篇文章已介绍。
创建持久化节点
1 | client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path, data); |
使用create方法,creatingParentsIfNeeded这个方法保证如果有父节点也会一起创建,这在原生客户端是无法实现的。
CreateMode 有4种,跟Zookeeper的节点类型对应。
forPath方法可以认为最终操作,path表示节点路径,data表示节点数据。
data是byte数组,其它类型的数据应转换为byte数组。
注:如果不设置withMode方法,默认创建持久化节点,不设置data,节点默认内容为空。
如下:
1 | client.create().forPath(path); |
创建顺序节点
1 | client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(path, data); |
创建临时节点
1 | client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, data); |
创建临时顺序节点
1 | client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, data); |
设置节点数据
1 | client.setData().forPath(path, bytes); |
直接调用setData方法即可,返回一个Stat(节点信息类)。
获取节点数据
1 | client.getData().forPath(path); |
使用getData方法,返回byte数组。
获取子节点
1 | client.getChildren().forPath(path); |
使用getChildren方法,返回一个子节点List <String>
列表,数据为各个子节点名称。
删除节点
1 | client.delete().guaranteed().forPath(path); |
使用delete方法,guaranteed方法可以保证一定删除。如果某个节点删除失败,会抛出异常,但是如果使用了guaranteed,它会在后台继续进行删除直到删除成功。
删除节点(包括子节点)
1 | client.delete().guaranteed().deletingChildrenIfNeeded().forPath(path); |
deletingChildrenIfNeeded方法可以保证如果有子节点的话一并删除,原生client是无法实现此功能的(需要我们写方法处理)。
判断节点是否存在
1 | public boolean checkNodeExist(String path) { |
使用checkExists方法,最终返回一个Stat,如果Stat为空就说明不存在。
PS:由此我们可以创建一个 createOrUpdate方法,无节点时创建,有节点时更新内容。
1 | public void createOrUpdateNode(String path, byte[] data) { |
异步处理
上面的操作方法,都可以使用异步进行处理的,主要使用了inBackground方法。
如下:
1 | client.create().inBackground().forPath(path, data); |
该方法全参函数如下,且重载了多个方法。
1 | public T inBackground(BackgroundCallback callback, Object context, Executor executor); |
BackgroundCallback callback:异步回调函数,处理完成后会回调此函数进行某些逻辑。
Object context:上下文对象。
Executor executor:异步处理的线程,不指定的话将使用内部默认线程处理。
我们可以看下BackgroundCallback 方法 会有两个参数。
1 | public interface BackgroundCallback { |
第二个参数CuratorEvent里面包含了此次处理结果的所有信息,包括节点信息等。
1 | public interface CuratorEvent { |
CuratorEventType表示事件类型,表示此次操作的事件类型。可以看到它与CuratorFramework里的方法是一一对应的。
getResultCode返回处理结果码。可以在这个枚举里查看各个状态码。
添加watcher
1 | client.getData().usingWatcher(watcher).forPath(path); |
使用usingWatcher结合getData或者getChildren方法可以为指定节点或者子节点添加watcher。
Watcher可以为CuratorWatcher或者Zookeeper自带的Watcher。它们有一个event参数。
可以拿到Zookeeper的状态 KeeperState和 事件类型 EventType,从而进行某些必要的操作。
KeeperState枚举和EventType枚举如下图。
事务支持
Zookeeper一些操作是支持事务的。
主要用到的方法有inTransaction、and、commit等方法。举例如下:
1 | /** |
检查连接情况
1 | client.getZookeeperClient().isConnected(); |
关闭连接
1 | client.close(); |
提升
对Curator客户端有简单理解后,我们把它进行简单功能的封装。
PS:Curator的强大之处在于其增强功能部分,我们会在后面结合Zookeeper应用讨论。
创建项目framework-zookeeper,搭建如下结构:
接口ZKClient,里面有一些Zookeeper客户端的协议,大致如下:
1 | public interface ZKClient { |
CuratorZKClient是Curator对接口ZKClient的实现,BaseZKClient是原生客户端对接口ZKClient的实现。
我们来写下CuratorZKClient的一些关键代码。
1 | public class CuratorZKClient implements ZKClient { |
提供一个连接监听接口,以便我们可以监听Zookeeper的连接状态并且执行某些操作。
1 | public interface ConnectionListener { |
及节点变化接口,监测节点变化进行某些操作。
1 | public interface NodeListener { |
ZKConfig是Zookeeper客户端连接的配置,属性值可以配置在properties等配置文件里。
1 | public class ZKConfig { |
CuratorZKClient 里实现接口的start方法是,部分代码如下:
1 | ...... |
这样我们暴露了Zookeeper的连接状态监听接口,以后想监听它的连接状态进行某些操作,直接实现接口,并通过addConnectionListener添加进来即可。
1 |
|
其它的方法,比如createNode、deleteNode等,我们拿到client后,按照上面讲述的各个操作便可以写出代码,这里不再赘述。以下是createNode的例子。
1 |
|
再说一下需要实现的NodeListener方法,节点发生变化,主要通过watcher通知。
实现一个watcher。
1 | private class CuratorWatcherImpl implements CuratorWatcher { |
然后实现NodeListener的添加移除。
1 |
|
可以看到主要是是维护一个ConcurrentHashMap,listener为key,watcher为value,节点有变化,通知到listener。
好。到这里基本上一个Zookeeper工具客户端就OK了,BaseZKClient的实现与CuratorZKClient类似,有兴趣的可以自己看看。
测试
我们测试下我们的代码。
1 | public static void main(String[] args) throws Exception{ |
运行可以看到输出:
我们使用命令行也可以看到我们新增的test节点及其属性。
PS:可以看到nameSpace 业务命名空间相当于新增一个根节点以区分不同业务,避免节点冲突等作用。
我们在client启动后添加watcher。
1 | ...... |
继续测试,结果如下。
到这里,我们基本把客户端操作的基本说完了。关于其它一些Zookeeper客户端,这里就不在过多介绍了,有兴趣的可以继续实现ZKClient接口去完成。
PS:BaseZKClient类是我写的一个原生Zookeeper客户端的集成工具,但有些小问题未处理。
总结
通过使用Zookeeper客户端的一些例子,更对Zookeeper有了更深入的了解。
下面的文章我们将结合Curator的一些高级功能及Zookeeper的一些应用来了解Zookeeper的强大之处。