Fork me on GitHub

分布式缓存算法原理

前言

我们在使用缓存型数据库时,如Redis,为保证缓存数据库的高可用,提高缓存数据库的读写性能,最简单的方式是我们做缓存数据库的读写分离,组成Master-Master或者Master-Slave的形式,或者搭建缓存数据库集群。

当数据量更大时,和数据库类似,我们可以对缓存数据库进行“分库分表”操作。

在对缓存数据库进行“分库分表”操作中,便会涉及到分布式缓存算法的一些内容。我们来看下。

正文

现在假设我们有一个网站,需要使用缓存数据库Redis存储图片资源,存储的格式为键值对,key值为图片名称,value为该图片所在文件服务器的路径,我们需要根据文件名查找该文件所在文件服务器上的路径,数据量大概有2000W左右,按照我们约定的规则进行分库,规则就是随机分配,我们可以部署8台缓存服务器,每台服务器大概含有500W条数据,并且进行主从复制,示意图如下:

upload successful

由于规则随机,所以我们的数据可能存储在任何一组Redis中,我们如果想找某张图片,比如“a.png”,则需要遍历4组Redis查询,这显然是不能接受的。

我们可以使用数据库“分库分表”的一些规则来,如按照Hash值,按照类别,按照某个字段值来进行分配。

Redis作为通用缓存数据库,我们按照数据key的Hash值来对数据分类较好,因为Hash值不涉及到具体的业务逻辑。

使用Hash

可以知道,我们使用Hash,每一张图片在分库时都可以定位到特定的服务器,如下图:

upload successful

上图中,假设我们查找的是”a.png”,由于有4台服务器(排除从库),因此公式为hash(a.png) % 4 = 2 ,可知定位到了第2号服务器,这样的话就不会遍历所有的服务器,大大提升了性能。

使用Hash的问题

上述的方式虽然提升了性能,我们不再需要对整个Redis服务器进行遍历。但是,使用上述Hash算法进行缓存时,会出现一些缺陷,主要体现在服务器数量变动的时候,所有缓存的位置都要发生改变。

试想一下,如果4台缓存服务器已经不能满足我们的缓存需求,那么我们应该怎么做呢?

很简单,多增加几台缓存服务器不就行了。

假设我们增加了一台缓存服务器,那么缓存服务器的数量就由4台变成了5台。那么原本hash(a.png) % 4 = 2 的公式就变成了 hash(a.png) % 5 =?

假设有20个数据需要存储,在有4个Redis节点的时候如下图:

upload successful

当我们添加1个Redis节点之后,数据的分布如下图所示:

upload successful

上图中蓝色部分代表与4个节点时存储位置一致的数据,其命中率为:4/20=20%。也就是说这种情况带来的结果就是当服务器数量变动时,很多缓存的位置都要发生改变。

也就是当服务器数量发生改变时,所有缓存在一定时间内是失效的,当应用无法从缓存获取数据时,大量请求会调用数据库获取数据,导致数据库瘫痪(缓存雪崩)。

同样的,如果一台服务器出现故障,我们要将其移除,缓存服务器数量从4台变为3台,也会出现上述问题。

我们应该避免这种问题,这种问题是由于Hash算法本身的缘故,使用取模法进行缓存时,这种情况是无法避免的。

为了解决上述问题,一致性Hash算法诞生了。

一致性Hash算法

一致性Hash算法也是使用取模的方法,只是,上述的取模法是对服务器的数量进行取模,而一致性Hash算法是对2^32-1取模。

简单来说,一致性Hash算法将整个Hash值空间组织成一个虚拟的圆环,如假设某Hash函数H的值空间为0-2^32-1(即Hash值是一个32位无符号整形),整个Hash空间环如下:

upload successful

整个空间按顺时针方向组织。0~2^32-1在零点中方向重合。

下一步将各个服务器使用Hash进行处理,具体可以选择服务器的ip或主机名作为关键字进行Hash,这样每台机器就能确定其在Hash环上的位置,这里假设将上文中四台服务器使用ip地址Hash后在环空间的位置如下:

upload successful

接下来使用如下算法定位数据访问到相应服务器:将数据key使用相同的函数Hash计算出哈希值,并确定此数据在环上的位置,从此位置沿环顺时针“行走”,第一台遇到的服务器就是其应该定位到的服务器。

例如我们有Object AObject BObject CObject D四个数据对象,经过哈希计算后,在环空间上的位置如下:

upload successful

根据一致性哈希算法,数据A会被定为到Node A上,B被定为到Node B上,C被定为到Node C上,D被定为到Node D上。

一致性Hash算法的容错性和可扩展性

下面分析一致性哈希算法的容错性和可扩展性。

现假设Node C不幸宕机,可以看到此时对象ABD不会受到影响,只有C对象被重定位到Node D。一般的,在一致性哈希算法中,如果一台服务器不可用,则受影响的数据仅仅是此服务器到其环空间中前一台服务器(即沿着逆时针方向行走遇到的第一台服务器)之间数据,其它不会受到影响。

下面考虑另外一种情况,如果在系统中增加一台服务器Node X,如下图所示:

upload successful

此时对象Object ABD不受影响,只有对象C需要重定位到新的Node X 。一般的,在一致性哈希算法中,如果增加一台服务器,则受影响的数据仅仅是新服务器到其环空间中前一台服务器(即沿着逆时针方向行走遇到的第一台服务器)之间数据,其它数据也不会受到影响。

综上所述,一致性哈希算法对于节点的增减都只需重定位环空间中的一小部分数据,具有较好的容错性和可扩展性。

一致性Hash算法的数据倾斜问题

另外,一致性哈希算法在服务节点太少时,容易因为节点分部不均匀而造成数据倾斜问题。例如系统中只有两台服务器,其环分布如下,

upload successful

此时必然造成大量数据集中到Node A上,而只有极少量会定位到Node B上。

为了解决这种数据倾斜问题,一致性哈希算法引入了虚拟节点机制,即对每一个服务节点计算多个哈希,每个计算结果位置都放置一个此服务节点,称为虚拟节点。具体做法可以在服务器ip或主机名的后面增加编号来实现。

例如上面的情况,可以为每台服务器计算三个虚拟节点,于是可以分别计算 Node A#1Node A#2Node A#3Node B#1Node B#2Node B#3的哈希值,于是形成六个虚拟节点:

upload successful

同时数据定位算法不变,只是多了一步虚拟节点到实际节点的映射,例如定位到Node A#1Node A#2Node A#3三个虚拟节点的数据均定位到Node A上。这样就解决了服务节点少时数据倾斜的问题。

在实际应用中,通常将虚拟节点数设置为32甚至更大,因此即使很少的服务节点也能做到相对均匀的数据分布。

Java简单实现

下面我们通过代码来简单实现下一致性Hash算法的效果。如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
public class ConsistHash<K extends Comparable,V> {

/**
* 一致性Hash算法最大值,0 - 2^32 -1
* Integer的范围,- 2^31 --- 2^31 -1
* 我们取模后为方便都以无符号整数来统计
* Java中hashCode范围即Integer范围,取到负数后,我们取其绝对值+Integer.MAX_VALUE 来表示
*/
public static final long MAX_VALUE = 0xffffffffL;

public static final long MIN_VALUE = 0;

public final List<Long> list = Collections.synchronizedList(new ArrayList<>());

/**
* 模拟缓存类
*/
public final Map<Long,RedisServer<K,V>> cache = new HashMap<>();


/**
* 服务器列表
*/
private List<String> serverList;

public ConsistHash(List<String> serverList) {
this.serverList = serverList;
if(CollectionUtils.isEmpty(serverList)){
throw new RuntimeException("服务器列表为空!");
}
initHashLoop(serverList);
}

/**
* 将对象生成hash
* @param object
* @return
*/
private long hash(Object object){
long hash;
int hashCode = object.hashCode();
if(hashCode < 0){
hash = (long)(-hashCode) + (long)Integer.MAX_VALUE;
}else{
hash = hashCode;
}
return hash;
}

/**
* 根据服务器初始化Hash环
* @param serverList
*/
private void initHashLoop(List<String> serverList){
//构建服务节点
for (String s : serverList) {
//每个服务器计算Hash
long h = hash(s);
//快速查找数据节点位置
list.add(h);
//建立映射关系
cache.put(h,new RedisServer<>());
}
//排序
Collections.sort(list);
}

/**
* 放入缓存对象
* @param key
* @param value
*/
public void set(K key,V value){
//计算hash
long h = hash(key);
for (int i = 0; i < list.size(); i++) {
long temp = list.get(i);
//小于第一个hash值归到第一个hash缓存
if(i == 0 && h <= temp){
cache.get(temp).set(key,value);
break;
}
//大于当前hash且小于下一个hash归到下一个hash缓存
if(i+1 < list.size() && h > temp && h <= list.get(i+1)){
cache.get(list.get(i+1)).set(key,value);
break;
}
//比最后的hash还要大,归到第一个hash缓存(顺时针)
if(h > list.get(list.size() -1)){
cache.get(list.get(0)).set(key,value);
break;
}
}
}

/**
* 获取缓存数据
* @param key
* @return
*/
public V get(K key){
//计算hash
long h = hash(key);
for (int i = 0; i < list.size(); i++) {
long temp = list.get(i);
//小于第一个hash值归到第一个hash缓存
if(i == 0 && h <= temp){
return cache.get(temp).get(key);
}
//大于当前hash且小于下一个hash归到下一个hash缓存
if(i+1 < list.size() && h > temp && h <= list.get(i+1)){
return cache.get(list.get(i+1)).get(key);
}
//比最后的hash还要大,归到第一个hash缓存(顺时针)
if(h > list.get(list.size() -1)){
return cache.get(list.get(0)).get(key);
}
}
return null;
}

/**
* 模拟删除一个Redis缓存服务器
* @param server
*/
public void removeServer(String server){
Long h = hash(server);
list.remove(h);
cache.remove(h);
}

/**
* 模拟添加一个Redis缓存服务器
* @param server
*/
public void addServer(String server){
Long h = hash(server);
list.add(h);
Collections.sort(list);
cache.put(h,new RedisServer<>());
}

/**
* 打印缓存情况
* @return
*/
public void print(){
StringJoiner sj1 = new StringJoiner(",","[","]");
list.forEach(e->{
sj1.add(e.toString());
});
System.out.println(sj1);

StringJoiner sj2 = new StringJoiner("\n");
cache.forEach((k,v)->{
StringJoiner sj3 = new StringJoiner(",");
v.map.forEach((a,b)->{
sj3.add("["+a+"->"+b+"]");
});
sj2.add(k+":{"+sj3.toString()+"}");
});
System.out.println(sj2.toString());
}


/**
* 模拟Redis服务
* 当然Redis拿不到就会去数据库取数据,我们这儿简化
* @param <K>
* @param <V>
*/
public class RedisServer<K extends Comparable,V>{
private final Map<K,V> map = new ConcurrentHashMap<>();

public V get(K key){
return map.get(key);
}
public void set(K key,V value){
map.put(key,value);
}
}



public static void main(String[] args) {

List<String> list = new ArrayList<>();
list.add("10.1.1.2");
list.add("10.2.2.3");
list.add("10.3.4.4");
list.add("10.10.2.5");
ConsistHash<String,String> consistHash = new ConsistHash<>(list);

consistHash.set("key1111133132311","key11111");
consistHash.set("qweqrgdfgdgdgdgdf","qweqrgdfgdgdgdgdf");
consistHash.set("12232433sfddsf","12232433sfddsf");
consistHash.set("xsdsf567899","xsdsf567899");
consistHash.set("12qwqs","12qwqs");
consistHash.set("ccc","ccc");
consistHash.set("ffffffff","ffffffff");
System.out.println("宕机前:");
consistHash.print();

// 模拟宕机,查看缓存分别情况
consistHash.removeServer("10.1.1.2");
System.out.println("宕机后:");
consistHash.print();

//重新放入值
System.out.println("数据重新分布:");
consistHash.set("key1111133132311","key11111");
consistHash.set("qweqrgdfgdgdgdgdf","qweqrgdfgdgdgdgdf");
consistHash.set("12232433sfddsf","12232433sfddsf");
consistHash.set("xsdsf567899","xsdsf567899");
consistHash.set("12qwqs","12qwqs");
consistHash.set("ccc","ccc");
consistHash.set("ffffffff","ffffffff");
consistHash.print();

consistHash.addServer("10.1.1.2");
System.out.println("添加一个服务器:");
consistHash.set("key1111133132311","key11111");
consistHash.set("qweqrgdfgdgdgdgdf","qweqrgdfgdgdgdgdf");
consistHash.set("12232433sfddsf","12232433sfddsf");
consistHash.set("xsdsf567899","xsdsf567899");
consistHash.set("12qwqs","12qwqs");
consistHash.set("ccc","ccc");
consistHash.set("ffffffff","ffffffff");
consistHash.print();
}
}

上述代码中,需要注意的点如下:

  • 我们用了一个简易的RedisServer类来模拟Redis服务;
  • Hash算法也非常简单,采用了Java中对象的hashCode方法,如果取到负值,变为正值并加上Integer.MAX_VALUE
  • 其实上面的Hash算法直接用hashCode方法也是可以的,当然Hash数据范围变为Integer.MIN_VALUE - Integer.MAX_VALUE;
  • 我们运行测试类,可以看到其实服务器Hash后分布并不是很均匀的(数据倾斜),我们可以通过上面说的“虚拟节点”来解决,这儿就不上代码了。

总结

本文中,我们了解到了什么是一致性Hash算法,并通过代码加深了对算法的理解。

由于分布式系统中,每个节点都有可能新增或失效,如何保证系统能够正常运行,并对外提供稳定服务,一致性Hash算法即提供了处理这种问题的一个思路。

参考资料

  1. 一致性Hash算法原理
  2. 一致性Hash算法



-------------文章结束啦 ~\(≧▽≦)/~ 感谢您的阅读-------------

SakuraTears wechat
扫一扫关注我的公众号
您的支持就是我创作的动力!
0%