Fork me on GitHub

Zookeeper应用之配置中心

前言

根据Zookeeper的一些特点,它是可以作为配置中心使用的。

何为配置中心?

我们在项目开发维护过程中会有很多公共变量或资源,需要统一管理,以前我们把它们写在程序公共类或者配置文件中,可是这样以后有变动,程序就需要重新部署,很是不方便,而且分布式、微服务等技术出现,修改维护多个项目管理也变得复杂。

为了解决以上问题,实现一次打包多地部署需求,减少项目管理及安全风险,我们需要将可变变量外移并通过页面统一可视化管理,基于此,我们统一建设了配置中心。

引入Zookeeper后,我们把数据存放在Zookeeper节点Znode上,可以选择主动轮询查询或者等待Zookeeper通知。当数据发生变化时,我们可以直接通过通知去执行某些业务操作。
一般为了数据准确性,我们会主动轮询查询或者通知+轮询的方式。

PS:当然使用数据库保存这些数据也是可以的,采用定期查询的方式,而且有的配置中心就与之类似,我们在这儿不做更广泛讨论。

分析

注册中心可以分为服务端和客户端两部分。

服务端一般用于存储配置数据,提供数据管理等等服务。客户端一般为业务端调用数据提供API服务等。

当然现在有一些开源的配置中心,如spring-cloud-config,diamond,disconf,apollo 等,我们以后有接触在具体介绍研究它们。

今天我们基于Zookeeper实现自己的一个简单的注册中心。

如下的配置中心流程图也就比较好理解了。

upload successful

正文

先来实现基于Zookeeper的配置中心客户端吧。

PS:了解这篇文章之前可以先看看 Zookeeper Java客户端Curator

先来了解下curator-recipes 包下的一个类PathChildrenCache。

该类是从本地缓存ZK路径的所有子路径中保存所有数据的一个工具类,它将监视ZK路径、响应更新/创建/删除事件、下拉数据等等,此类不能保证事务处理时的强同步。

这个类有一个全参构造方法:

1
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData){...}

Client是我们的ZKClient需要创建,path指要监控的路径,cacheData指是否缓存数据。

同时我们可以为其添加Listener,当节点/子节点数据有变化时,可以进行通知等。

使用该方法:

1
pathChildrenCache.getListenable().addListener(pathChildrenCacheListener);

我们想实现自己的配置中心客户端,与SpringBoot进行集成,其目录结构如下创建:

upload successful

ConfigCenterAutoConfig:SpringBoot自动配置类,会提供相应的Bean。
ConfigCenterConfiguration:自动配置类,从properties文件中获取配置属性。
ConfigCenterException:异常处理类。
ConfigCenterListener:配置中心监听listener。
ConfigCenterListenerAdapter:考虑到监听可以有多个,这个类用来处理它们。
LocalCacheService:主要用来定时轮询Zookeeper的配置。
ZKConfigService:主要用来创建Zookeeper连接及添加监听等。
ConfigUtil:工具类。
CacheNodeVo:节点Vo。
ConfigCenterClient:配置中心客户端。

先从配置类说起吧,连接配置文件properties的类ConfigCenterConfiguration。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@ConfigurationProperties("spring.zookeeper.config-center")
public class ConfigCenterConfiguration {
//zk地址
private String zkAddress;
//业务名称
private String sysName;
//连接超时时间
private Integer connectTimeoutMs = 60000;
//session过期时间
private Integer sessionTimeoutMs = 60000;
//重试间隔
private Integer retryInterval = 1000;
......
}

这个类不过多介绍了,就是Zookeeper的配置信息,连接Zookeeper时使用。

我们引入之前封装的framework-zookeeper包,通过自动配置拿到client。

1
2
3
4
5
<dependency>
<groupId>com.zwt</groupId>
<artifactId>framework-zookeeper</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>

ConfigCenterAutoConfig部分代码如下,也比较好理解,就是Spring启动后自动配置CuratorClient。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Configuration
@ConditionalOnClass(CuratorZKClient.class)
@EnableConfigurationProperties(ConfigCenterConfiguration.class)
public class ConfigCenterAutoConfig implements ApplicationContextAware {
private static final Logger log= LoggerFactory.getLogger(ConfigCenterAutoConfig.class);
@Autowired
private ConfigCenterConfiguration properties;
@Bean
@ConditionalOnMissingBean
@ConditionalOnProperty(prefix = "spring.zookeeper.config-center", name = "enabled", havingValue = "true")
CuratorZKClient curatorZKClient (){
ZKConfig config = new ZKConfig();
config.setConnectString(properties.getZkAddress());
config.setNameSpace(properties.getSysName());
config.setSessionTimeoutMs(properties.getSessionTimeoutMs());
config.setConnectTimeoutMs(properties.getConnectTimeoutMs());
config.setRetryInterval(properties.getRetryInterval());
CuratorZKClient zkClient = new CuratorZKClient(config);
zkClient.addConnectionListener((state) -> {
log.debug("ZKConfigService connectionListener state:" + state);
if (state == ZKConstants.State.CONNECTED || state == ZKConstants.State.RECONNECTED) {
log.info("ZKConfigService zookeeper is connected...");
}
});
zkClient.start();
return zkClient;
}
private static ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
//根据class类型返回bean
public static <T> T getBean(Class<T> requireType){
return applicationContext.getBean(requireType);
}
}

有了CuratorClient,我们创建ZKConfigService,主要为指定节点添加希望的监听。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class ZKConfigService{
private static final Logger log= LoggerFactory.getLogger(ZKConfigService.class);
private static CuratorZKClient zkClient = null;
final static String configRootPath = ConfigUtil.getConfigCenterPath();
/**
* ZKService初始化
*/
public static void init() {
//拿到zkClient
zkClient = ConfigCenterAutoConfig.getBean(CuratorZKClient.class);
try {
PathChildrenCache pathChildrenCache = new PathChildrenCache(zkClient.getCuratorFramework(), configRootPath, true);
PathChildrenCacheListener pathChildrenCacheListener = (client,event) -> {
log.debug("pathChildrenCacheListener eventType:" + event.getType());
ChildData data = event.getData();
if(data!=null){
String dataStr = new String(data.getData(), "UTF-8");
String key = StringUtils.substringAfterLast(data.getPath(), ConfigUtil.SEP_STRING);
switch (event.getType()) {
case CHILD_ADDED:
LocalCacheService.put(key,dataStr);
break;
case CHILD_UPDATED:
LocalCacheService.put(key,dataStr);
ConfigCenterListenerAdapter.onChange(key,dataStr);
break;
case CHILD_REMOVED:
LocalCacheService.remove(key);
break;
default:
break;
}
}
};
pathChildrenCache.getListenable().addListener(pathChildrenCacheListener);
pathChildrenCache.start();
} catch (Exception e) {
e.printStackTrace();
}
log.info("spring-boot-config ZKConfigService init success.");
}
/**
* 根据key获取值
* @param key
* @return
*/
public static String getKey(String key) {
return zkClient.getStringData(ConfigUtil.joinPath(configRootPath, key));
}
}

可以看到用到了我们刚才说的PathChildrenCache类,启动后,添加一个Listener,监听节点变化,一方面,我们需要一个类,对节点变化进行通知;另一方面,我们应把数据缓存在本地,如果数据变化后ZK没有通知或者其它情况,我们可以轮询查询后与本地缓存比较,有变化后继续进行我们节点变化的通知。

这就是我们的ConfigCenterListenerAdapter监听处理类和LocalCacheService本地缓存服务。

先说ConfigCenterListenerAdapter吧,可以看到上面代码节点有变化时触发了onChange事件。

由于我们业务可能需要多个监听类,故,我们提供一个监听接口,相关业务类实现这个接口,在注册一下监听即可使用岂不美哉。

考虑到此,我们存储监听类的集合应是静态的。如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class ConfigCenterListenerAdapter {
private static Logger log= LoggerFactory.getLogger(ConfigCenterListenerAdapter.class);
//key和要监听此节点的监听者列表
private static ConcurrentHashMap<String, List<ConfigCenterListener>> someKeyListenerMap = new ConcurrentHashMap<>();
private static List<ConfigCenterListener> allKeyListeners = new CopyOnWriteArrayList<>();
public static boolean addListener(String key, ConfigCenterListener configCenterListener) {
if (configCenterListener == null) {
return false;
}
if (key == null || key.trim().length() == 0) {
allKeyListeners.add(configCenterListener);
return true;
} else {
List<ConfigCenterListener> listeners = someKeyListenerMap.get(key);
if (listeners == null) {
listeners = new ArrayList<>();
someKeyListenerMap.put(key, listeners);
}
listeners.add(configCenterListener);
return true;
}
}
public static void onChange(String key, String value) {
if (key == null || key.trim().length() == 0) {
return;
}
List<ConfigCenterListener> keyListeners = someKeyListenerMap.get(key);
if (keyListeners != null && keyListeners.size() > 0) {
for(ConfigCenterListener listener : keyListeners) {
try {
listener.onChange(key, value);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
}
if (allKeyListeners.size() > 0) {
for(ConfigCenterListener confListener : allKeyListeners) {
try {
confListener.onChange(key, value);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
}
}
}
1
2
3
4
5
6
7
8
public interface ConfigCenterListener {
/**
* 配置的key有变化触发事件
* @param key
* @param value
*/
void onChange(String key, String value) ;
}

我们再来看看我们的主动轮询服务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
public class LocalCacheService {
private static final Logger log= LoggerFactory.getLogger(LocalCacheService.class);
//配置缓存信息map
private static final ConcurrentHashMap<String, CacheNodeVo> LOCAL_CONFIG_CACHE_MAP = new ConcurrentHashMap<>();
//刷新线程状态
private static boolean refreshThreadStop = false;
//一个单线程的线程池(刷新缓存使用)
private static ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("spring-boot-config-%d").setDaemon(true).build();
private static ExecutorService singleThreadPool = Executors.newFixedThreadPool(1, namedThreadFactory);
//一分钟刷新一次
public static void init() {
// refresh thread
singleThreadPool.submit(()->{
while (!refreshThreadStop) {
try {
TimeUnit.SECONDS.sleep(60);
reloadAll();
log.debug("spring-boot-config, refresh thread reloadAll success.");
} catch (Exception e) {
log.error("spring-boot-config, refresh thread error.");
log.error(e.getMessage(), e);
}
}
log.info("spring-boot-config, refresh thread stopped.");
});
log.info("spring-boot-config LocalCacheService init success.");
}

/**
* 通过key获取值
* @param key
* @return
*/
public static String get(String key) {
CacheNodeVo cacheNodeVo = LOCAL_CONFIG_CACHE_MAP.get(key);
if (cacheNodeVo != null) {
return cacheNodeVo.getValue();
}
return null;
}
/**
* 放入值
* @param key
* @param value
*/
public static void put(String key, String value) {
LOCAL_CONFIG_CACHE_MAP.put(key, new CacheNodeVo(key, value));
}
/**
* 移除某个值
* @param key
*/
public static void remove(String key) {
LOCAL_CONFIG_CACHE_MAP.remove(key);
}
/**
* 重新加载全部
*/
private static void reloadAll() {
Set<String> keySet = LOCAL_CONFIG_CACHE_MAP.keySet();
if (keySet.size() > 1) {
for(String key : keySet) {
String zkValue = ZKConfigService.getKey(key);
CacheNodeVo cacheNodeVo = LOCAL_CONFIG_CACHE_MAP.get(key);
if (cacheNodeVo != null && cacheNodeVo.getValue() != null && cacheNodeVo.getValue().equals(zkValue)) {
log.debug("refresh key:{} no changed ", key);
} else {
LOCAL_CONFIG_CACHE_MAP.put(key, new CacheNodeVo(key, zkValue));
ConfigCenterListenerAdapter.onChange(key, zkValue);
}
}
}
}
}

可以看到借助了一个定长线程池,每隔60s重载一下数据,有变化会对监听者进行通知。它是通过一个静态的ConcurrentHashMap 来保存数据的。

这儿看到刚才的也会主动通知监听者,这儿也通知监听者,它们会通知两次吗?

我们可以看到主动通知的时候,也会先把ConcurrentHashMap的值先改变在进行通知,要是出现通知两次的情况,会是概率极低的。要是要求只能通知一次,且业务监听无法重复处理两次数据变化请求,可以在向ConcurrentHashMap里放值时,再检查一下它的当前值,或使用其它方法处理。

两个服务ZK通知和主动轮询处理完成后,提供一个配置中心Client,用于获取值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class ConfigCenterClient {
private static final Logger log= LoggerFactory.getLogger(ConfigCenterClient.class);
static {
LocalCacheService.init();
ZKConfigService.init();
}
public static String getString(String key, String defaultValue) {
//查询本地缓存
String value = LocalCacheService.get(key);
if (value != null) {
log.debug("get config {} from cache",key);
return value;
}
//没有命中,查询zk中的值,并加入到缓存中,并加watcher
value = ZKConfigService.getKey(key);
if (value != null) {
log.debug("get config {} from zookeeper",key);
LocalCacheService.put(key,value);
return value;
}
return defaultValue;
}
public static String getString(String key) {
return getString(key, null);
}
private static void checkNull(String key,String value) {
if (value == null) {
throw new ConfigCenterException(String.format("config key [%s] does not exist",key));
}
}
public static long getLong(String key) {
String value = getString(key, null);
checkNull(key, value);
return Long.valueOf(value);
}
public static int getInt(String key) {
String value = getString(key, null);
checkNull(key, value);
return Integer.valueOf(value);
}
public static boolean getBoolean(String key) {
String value = getString(key, null);
checkNull(key, value);
return Boolean.valueOf(value);
}
public static boolean addListener(String key, ConfigCenterListener configCenterListener){
return ConfigCenterListenerAdapter.addListener(key, configCenterListener);
}
}

首先应加载LocalCacheService和ZKConfigService,然后实现主要的方法getString,直接去缓存里取,拿不到去Zookeeper里取并放到缓存里,在提供一个addListener方法,可以让用户自己定义想监听的节点。

至此,我们一个简单的配置中心的客户端就完成了,我们把它打包引入一个demo项目测试一下。

创建一个demo项目,引入我们的jar包。

1
2
3
4
5
<dependency>
<groupId>com.zwt</groupId>
<artifactId>config-spring-boot-starter</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>

我们创建一个Listener实现。

1
2
3
4
5
6
7
public class MyListener implements ConfigCenterListener {
private static final Logger log= LoggerFactory.getLogger(MyListener.class);
@Override
public void onChange(String key, String value) {
log.info(key+ " changed "+ value);
}
}

创建测试类,我们循环10次改变节点的值,测试一下我们的程序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@RunWith(SpringRunner.class)
@SpringBootTest
public class DemoApplicationTests {
@Autowired
CuratorZKClient client;
@Test
public void contextLoads() throws Exception{
//client.createNode("/config/test","Hello World");
ConfigCenterClient.addListener("test",new MyListener());
for(int i = 0;i<10;i++){
Thread.sleep(5000);
String str = ConfigCenterClient.getString("test");
System.out.println(str);
client.setData("/config/test","Hello World"+i);
}
}
}

可以看到执行结果。

upload successful

我们还要实现一个配置中心的服务端。

服务端基本上是对Zookeeper数据节点的增删改查这几个逻辑,其核心是Zookeeper保存在节点上的数据。

为了方便对Zookeeper数据进行操作,我们一般创建一个可视化后台管理系统。如下:

upload successful

upload successful

upload successful

这个系统是比较好实现的,引入我们的framework-zookeeper包,里面封装了Zookeeper的增删改查,当然需要创建一个web项目。

这一块就不再过多介绍了,当明白了Zookeeper的增删改查节点数据后,实现起来是比较容易的。

总结

今天我们通过Zookeeper实现了一个配置中心,简单了解了它的原理,也对Zookeeper有了一些更深刻的理解。

现在很多开源的配置中心也相当的不错,也是可以学习和理解的,我后面可能也会讲解一些关于这方面的知识。

framework-zookeeperconfig-spring-boot-starter 的相关代码已上传GitHub,大家如果有兴趣在实践中遇到问题可以过去参考下代码,如有疑问也欢迎与我交流探讨。

配置中心服务端(web项目)由于个人原因和时间原因,只写了个大概,也没有提交Github,后续应该会补上。




-------------文章结束啦 ~\(≧▽≦)/~ 感谢您的阅读-------------

SakuraTears wechat
扫一扫关注我的公众号
您的支持就是我创作的动力!
0%