Fork me on GitHub

Zookeeper应用之分布式锁

前言

Zookeeper是可以用来实现分布式锁的。

要了解它,我们先简单说下分布式锁吧。

我们知道,在系统中,当我们访问公共资源并对资源进行一些操作时,为防止出现问题,需要对公共资源依次访问,如常见的多线程售票模型等。由于在一个系统中,我们可以使用锁(如ReentrantLock)或者synchronized关键字等Java方法处理。

但是,当系统逐渐由单系统转换为分布式系统、微服务时,情况就变得复杂了,比如有某共享资源,比如有1个奖品,一个应用查询到有奖品并尝试发给用户A,另一个应用也查询到有奖品并尝试发给用户B,这样A,B均显示有奖品,实际上我们的奖品数量是不足的。如常见的秒杀系统,抽奖系统等。

这时候就需要一种全局的互斥机制来控制应用对共享资源的访问,这就是所谓的分布式锁。

PS:分布式锁的实现也可以基于缓存(如Redis)实现,亦可以通过数据库(乐观锁等)实现,实际中要确实使用到分布式锁,基于缓存的实现还是要偏多一些的。

根据上面所述,下面的图是比较好理解的。

upload successful

我们再来说下分布式锁应具备的一些特点。

  1. 同一时间只允许一台机器(服务)的一个线程执行。
  2. 为整个系统必要业务提供服务,应当是高可用的。
  3. 性能应得以保证,不能在获取锁和释放锁过程中浪费太多资源或时间。
  4. 分布式锁应当具备失效机制,避免死锁发生。
  5. 应当具有可重入特性。
  6. 应当有非阻塞的特点,某个服务没有获取到锁,应返回获取失败,不能阻塞。

下面我们来用Zookeeper实现我们的分布式锁。

正文

我们在之前封装framework-zookeeper时,用到了下面这个依赖。

1
2
3
4
5
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>

这里面有一个类InterProcessMutex,这是分布式锁使用的关键类。

PS:其实它已经实现了分布式锁,我们来使用下它吧。

我们创建一个Test,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@RunWith(SpringRunner.class)
@SpringBootTest
public class DemoApplicationTests {
ExecutorService executorService = Executors.newCachedThreadPool();
static int TEST = 5;
@Test
public void test1() throws Exception{
for(int i=0;i<10;i++){
executorService.execute(()->{
try{
if(TEST>0){
doSomething();
}
}catch (Exception e){
e.printStackTrace();
}
});
}
Thread.sleep(100000);
}
public void doSomething() throws Exception{
Thread.sleep(1000);
TEST--;
System.out.println(Thread.currentThread().getId()+"--"+Thread.currentThread().getName()+"执行,TEST="+TEST);
}
}

考虑到多应用分布式比较麻烦,我这里的Test使用多线程模拟分布式请求,用了线程池管理了10个线程,假设TEST静态变量为共享资源,如果TEST数量大于0的时候,我们执行doSomething方法,假设这个方法执行需要一秒,执行后TEST减一。

我们运行Test后,可以看到输出结果。

upload successful

可以看到出现了负数。

我们引入InterProcessMutex,在判断TEST之前对其加分布式锁,锁的zk基路径我们定为/zwt/lock。调用acquire获取锁,完成业务逻辑后调用release方法释放锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Autowired
CuratorZKClient client;
@Test
public void test1() throws Exception{
for(int i=0;i<10;i++){
executorService.execute(()->{
try{
InterProcessMutex interProcessMutex = new InterProcessMutex(client.getCuratorFramework(),"/zwt/lock");
interProcessMutex.acquire();
if(TEST>0){
doSomething();
}
interProcessMutex.release();
}catch (Exception e){
e.printStackTrace();
}
});
}
Thread.sleep(100000);
}

运行Test查看结果。

upload successful

可以看到结果有序的减一最后到0结束。说明了InterProcessMutex实现了我们的分布式锁的功能。

它是如何实现的呢?

我们在上面的doSomething代码里加一些输出。如下:

1
2
3
4
5
6
7
8
9
10
public void doSomething() throws Exception{
Thread.sleep(1000);
List<String> list=client.getNodes("/zwt/lock");
list.forEach((e)->{
System.out.print(e+" ");
});
System.out.println();
TEST--;
System.out.println(Thread.currentThread().getId()+"--"+Thread.currentThread().getName()+"执行,TEST="+TEST);
}

继续测试,如下结果:

upload successful

数据大致如下:

upload successful

可以看到,Zookeeper创建了10个临时顺序节点,每次会找到最小的节点并删除。其实这就是InterProcessMutex这个类的实现分布式锁的原理。

我们可以看下它的相关代码。

我们从acquire方法看起,调用了internalLock方法,而后调用了
attemptLock方法,这个方法会通过createsTheLock去创建锁。

upload successful

upload successful

upload successful

可以看到createsTheLock方法里创建了临时有序节点。

upload successful

再看下internalLockLoop这个方法,有些大。

upload successful

upload successful

upload successful

可以看到会拿到有序的子节点,getSortedChildren。

然后尝试去获取锁(从最小的节点开始),getsTheLock会先获取比自己小的节点,要是自己是最小的节点就会获得锁。

拿到后就设置haveTheLock为true,没有拿到,就添加watcher,监听比自己小的节点。
然后根据设置的等待时间判断是否超时从而进行等待或者退出。

最后,如果到了时间或者出现异常,doDelete为true,就会删除节点。

upload successful

再来看下release方法,里面的主要方法releaseLock,可以看到它调用了上面的deleteOurPath方法删除创建的临时节点。

upload successful

upload successful

在锁的获取和释放方法里可以看到下面这些地方,它可以保证我们的分布式锁具有可重入的性质。其通过lockCount(AtomicInteger )实现的,统计重入次数。

1
final AtomicInteger lockCount = new AtomicInteger(1);

upload successful

upload successful

Zookeeper分布式锁的基本内容就是这些了,我们来总结下Zookeeper分布式锁的步骤:

  1. 指定一个存放锁目录(这儿我们指定的/zwt/lock)。
  2. 线程A想要获取锁,就需要在该目录下创建临时有序节点。
  3. 获取该目录下的所有子节点,然后获取比自己小的兄弟节点,如果不存在,说明自己是最小节点,那么就去获得锁。
  4. 线程B同线程A,创建好节点后获取目录下所有子节点,判断自己不是最小的,就会对获得锁的节点添加监听。
  5. 线程A处理完后释放锁,删除自己的节点,并通知,线程B监听后判断自己是不是最小节点,是的话会获取锁,不是的话在添加对当前获得锁的线程的监听。

通过上面我们可以看到Zookeeper分布式锁的一些优点,如高可用性(由Zookeeper保证)、可重入性不会出现死锁(临时节点程序出现异常断开连接后会被删除也就失去了锁)等。

和一些缺点,如需要创建临时节点、删除临时节点,性能上肯定有一些影响

其它

我们可以在对其进行简单封装形成自己的分布式锁工具类。

相关代码如下:

提供一个分布式锁的接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public interface DistributedReentrantLock {
/**
* 获得锁
* @return
* @throws InterruptedException
*/
boolean tryLock() throws InterruptedException;
/**
* 获得锁
* @param timeout
* @param unit
* @return
* @throws InterruptedException
*/
boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException;
/**
* 解除锁
*/
void unlock();
}

同时我们使用Zookeeper的InterProcessMutex去完成相关实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
public class ZKDistributedReentrantLock implements DistributedReentrantLock {
private static final Logger logger = LoggerFactory.getLogger(ZKDistributedReentrantLock.class);
/**
* 线程池
*/
private static final ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(
10,
new BasicThreadFactory.Builder().namingPattern("scheduled-pool-%d").build()
);
/**
* 所有锁的根节点
*/
public static final String ROOT_PATH = "/LOCK/";
/**
* 每次延迟清理PERSISTENT节点的时间 毫秒
*/
private static final long DELAY_TIME_FOR_CLEAN = 1000;
/**
* zk 共享锁实现
*/
private InterProcessMutex interProcessMutex;

/**
* 锁的ID,对应zk一个PERSISTENT节点,下挂EPHEMERAL节点.
*/
private String path;
/**
* zk的客户端
*/
private CuratorFramework client;
private volatile boolean isLockSuccess;
public ZKDistributedReentrantLock(CuratorFramework client, String lockId) {
this.client = client;
this.path = ROOT_PATH + lockId;
interProcessMutex = new InterProcessMutex(client, this.path);
}
public ZKDistributedReentrantLock(CuratorZKClient zkClient, String lockId) {
this.client = zkClient.getCuratorFramework();
this.path = ROOT_PATH + lockId;
interProcessMutex = new InterProcessMutex(client, this.path);
}
/**
* 获取锁
* @return
* @throws InterruptedException
*/
@Override
public boolean tryLock() throws InterruptedException {
return tryLock(-1, null);
}
/**
* 获取锁
* @param timeout
* @param unit
* @return
* @throws InterruptedException
*/
@Override
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
try {
isLockSuccess = interProcessMutex.acquire(timeout, unit);
logger.debug("{} lock result:{}",this.path,isLockSuccess);
return isLockSuccess;
} catch (InterruptedException e) {
throw e;
} catch (Exception e) {
logger.error(e.getMessage(),e);
throw new RuntimeException(e.getMessage(),e);
}
}
/**
* 释放锁
*/
@Override
public void unlock() {
if(isLockSuccess) {
try {
isLockSuccess = false;
interProcessMutex.release();
} catch (Throwable e) {
logger.error(e.getMessage(), e);
} finally {
executorService.schedule(new Cleaner(client, path), DELAY_TIME_FOR_CLEAN, TimeUnit.MILLISECONDS);
}
logger.debug("{} success unlock.",this.path);
}
}
static class Cleaner implements Runnable {
private CuratorFramework client;
private String path;
public Cleaner(CuratorFramework client, String path) {
this.client = client;
this.path = path;
}
@Override
public void run() {
try {
List list = client.getChildren().forPath(path);
if (list == null || list.isEmpty()) {
client.delete().forPath(path);
}
} catch (KeeperException.NoNodeException | KeeperException.NotEmptyException e1) {
//nothing
} catch (Exception e) {
//准备删除时,正好有线程创建锁
logger.error(e.getMessage(), e);
}
}
}
}

实现还是比较简单的,tryLock方法主要就是使用了interProcessMutex的acquire方法,成功后记录isLockSuccess状态,失败后除了调用release方法、把isLockSuccess变为false外,还应尝试清除刚才已经创建的业务lockId节点(线程池)。

测试相关代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
ZKConfig zkConfig = new ZKConfig();
//....
CuratorZKClient zkClient=new CuratorZKClient(zkConfig);
DistributedReentrantLock lock = new ZKDistributedReentrantLock(zkClient,"test");
try{
if(lock.tryLock()){
//doSomething
}
}catch (Exception e){
//...
}finally {
lock.unlock();
}

总结

通过对Zookeeper实现分布式锁的学习理解,我们又看到了Zookeeper的另外一个用途,对Zookeeper有了更深入的理解,也是蛮不错的一次学习体验。

有时间我会在研究下其它方式实现的分布式锁及其一些特点。




-------------文章结束啦 ~\(≧▽≦)/~ 感谢您的阅读-------------

SakuraTears wechat
扫一扫关注我的公众号
您的支持就是我创作的动力!
0%