Redis在项目中的一些应用

前言

今天我们来总结下Redis在项目中的一些应用。

Redis在实际项目中除了可以作为缓存或者持久化数据库外,还能解决项目中遇到的一些棘手的问题。

正文

限流/防高频问题

这基本上属于项目中一个比较经典的问题了,我们以防止用户高频访问来举例,大多数的项目中都是通过Redis来解决高频访问问题的。

我们知道,对于高频访问问题,要有以下3要素:单位时间、单位时间限制访问次数、超频后的限制访问时间长。

我们定义一个Bean,控制这几个参数,当然也可以直接使用配置文件的方式进行配置等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class LimitRule {
/**
* 单位访问时间
*/
private int seconds;
/**
* 单位时间内限制的访问次数
*/
private int limitCount;
/**
* 单位时间超过访问次数后的锁定时间
*/
private int lockTime;
//Get Set略
public boolean enableLimitLock() {
return getLockTime() > 0 && getLimitCount() > 0;
}
}

Redis有一种数据结构,名字为Zset,可以通过方法zadd添加元素,通过zcount统计记录数,我们可以用Zset的有序集的value来存放访问时间,判断超频时,只需要用zcount判断单位时间seconds内Zset里的元素数据是否超过limitCount即可,超过后即为该用户添加一个锁定lockTime的Redis key。

因此,我们代码大致如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public class HighFreqLimit {

//记录用户行为并判断高频访问的zset
private static final String REDIS_VISIT_KEY_FORMAT = "redis.visit:user:zset:%s";
//高频访问用户key
private static final String REDIS_LIMIT_KEY_FORMAT = "redis.limit:user:%s";

//Redis 工具
private RedisUtil redisUtil;

public HighFreqLimit(RedisUtil redisUtil){
this.redisUtil = redisUtil;
}

/**
* 方案一:使用Zset来判断高频访问情况
*/
/**
* 判断用户是否高频访问
* 高频访问抛出异常
* @param userNo 用户唯一的编号
* @param limitRule 访问规则 这个规则可以用配置文件的方式处理
* @return
*/
public void checkLimit1(String userNo,LimitRule limitRule){

//判断是否启用了高频访问
if(!limitRule.enableLimitLock()){
return;
}

String redisKeyUserVisitZset = String.format(REDIS_VISIT_KEY_FORMAT,userNo);
String redisUserFreqLimitKey = String.format(REDIS_LIMIT_KEY_FORMAT,userNo);
//如果存在高频访问key说明已经高频访问了
if (redisUtil.exists(redisUserFreqLimitKey)) {
throw new RuntimeException("您操作的太快了,请稍后访问");
}

long currentTimeMillis=System.currentTimeMillis();
//访问信息,可以根据具体业务定制
String visitInfo = userNo +":"+ System.currentTimeMillis();
//将信息添加到zset里
redisUtil.zadd(redisKeyUserVisitZset,System.currentTimeMillis(), visitInfo);
//设置过期时间为单位时间
redisUtil.expire(redisKeyUserVisitZset, limitRule.getSeconds());
long startTimeMillis = currentTimeMillis - limitRule.getSeconds() * 1000;
//统计单位时间内Zset的元素个数
long visitCount = redisUtil.zcount(redisKeyUserVisitZset, startTimeMillis, currentTimeMillis);
//超过阈值则成为高频用户
if (visitCount > limitRule.getLimitCount()) {
redisUtil.setString(redisUserFreqLimitKey, visitInfo);
redisUtil.expire(redisUserFreqLimitKey, limitRule.getLockTime());
throw new RuntimeException("您操作的太快了,请稍后访问");
}
}
}

Redis在2.6版本后支持Lua表达式,因此我们也可以构建使用Lua表达式来解决上述问题。

构建Lua表达式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
private static final String REDIS_VISIT_KEY_FORMAT2 = "redis.visit:user:lua:%s";
/**
* 方案二:使用Lua表达式来判断高频访问情况
*/
/**
* 判断用户是否高频访问
* 高频访问抛出异常
* @param userNo
* @param limitRule
*/
public void checkLimit2(String userNo, LimitRule limitRule) {
String redisKeyUserVisit = String.format(REDIS_VISIT_KEY_FORMAT2,userNo);
long count;
List<String> keys = new ArrayList<String>();
keys.add(redisKeyUserVisit);
List<String> args = new ArrayList<String>();
args.add(limitRule.getLimitCount() + "");
args.add(limitRule.getSeconds() + "");
args.add(limitRule.getLockTime() + "");
count = Long.parseLong(redisUtil.getJedisFactory().getJedisCluster().eval(buildLuaScript(limitRule), keys, args) + "");
if(count > limitRule.getLimitCount()){
throw new RuntimeException("您操作的太快了,请稍后访问");
}
}

private String buildLuaScript(LimitRule limitRule) {
StringBuilder lua = new StringBuilder();
lua.append("\nlocal c");
lua.append("\nc = redis.call('get',KEYS[1])");
lua.append("\nif c and tonumber(c) > tonumber(ARGV[1]) then");
lua.append("\nreturn c;");
lua.append("\nend");
lua.append("\nc = redis.call('incr',KEYS[1])");
lua.append("\nif tonumber(c) == 1 then");
lua.append("\nredis.call('expire',KEYS[1],ARGV[2])");
lua.append("\nend");
if (limitRule.enableLimitLock()) {
lua.append("\nif tonumber(c) > tonumber(ARGV[1]) then");
lua.append("\nredis.call('expire',KEYS[1],ARGV[3])");
lua.append("\nend");
}
lua.append("\nreturn c;");
return lua.toString();
}

对于上述表达式,KEYS[1]即为redisKeyUserVisit,可以看到先进行取值,如果有值并且值比limitCount大就返回了,根据后面count > limitRule.getLimitCount()的判断说明已经超频了,如果不大于该值,则进行自增,如果该值是1,说明单位时间第一次访问,就设置它的单位时间过期,然后如果该值超频后会这是这个Key的过期时间为lockTime。

序列号生成问题

项目中另一种常见的情况就是流水号的生成了,很多业务流水号有如下格式 XXXX2019040100001 等,我最近的一个项目就有类似的复杂需求,这种情况下我们可以使用Redis来生成某一天的自增流水号,大致如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
public class SequenceUtils {

private RedisUtil redisUtil;

public SequenceUtils(RedisUtil redisUtil){
this.redisUtil = redisUtil;
}

//业务规定序号为00001 ,00002 这种5位格式
private static final int DEFAULT_LENGTH = 5;
//缓存时长
private static final int ONE_DAY_TIME = 24*60*60;

private static final String REDIS_CACHE_KEY = "redis.serialnumber:%s:%s";

/**
* 获取自增数字的字符串形式,包含0前缀
* @param seq
* @return
*/
private String getSequenceWithZeroPrefix(long seq) {
String str = String.valueOf(seq);
int len = str.length();
if (len >= DEFAULT_LENGTH) {
throw new RuntimeException("Sequence generate failed!");
}
int rest = DEFAULT_LENGTH - len;
StringBuilder sb = new StringBuilder();
for (int i = 0; i < rest; i++) {
sb.append('0');
}
sb.append(str);
return sb.toString();
}

/**
* 获取自增数字的字符串形式,不包含0前缀
* @param seq
* @return
*/
private String getSequenceNoZeroPrefix(long seq){
return String.valueOf(seq);
}


/**
* 序列号生成器
* @param bizCode 业务码
* @param needZero 是否需要0前缀
* @return
*/
public String generate(String bizCode,boolean needZero){
String date = DateFormatUtils.format(new Date(),"yyyyMMdd");
//redis key
String key = String.format(REDIS_CACHE_KEY,bizCode,date);
//自增并设置过期时间
long sequence = redisUtil.incr(key);
redisUtil.expire(key,ONE_DAY_TIME);

String seq;
if(needZero){
seq = getSequenceWithZeroPrefix(sequence);
}else{
seq = getSequenceNoZeroPrefix(sequence);
}
StringBuffer sb = new StringBuffer();
sb.append(bizCode).append(date).append(seq);

return sb.toString();
}
}

为保证绝对可靠,还可以进行改善,当Redis拿不到值时可以去数据库初始化今天的起始流水号等,这儿不再过多介绍,可以看到主要就是利用了Redis的自增incr和指定时间过期expire这两个关键方法。

分布式锁

还可以使用Redis做分布式锁,相比较之前说的Zookeeper实现分布式锁,使用Redis实现分布式锁,最明显的优点就是指令为内存操作,速度较快,性能较高;但缺点也比较明显,使用Redis实现分布式锁较为复杂,需要考虑超时、原子性、误删等情形,较为复杂,且由于没有等待锁的队列,等待锁只能依靠客户端自旋,效率较为低下。反观ZK实现的分布式锁,有等待锁的队列,但是添加删除节点性能较低。

我们使用Redis来简单实现一个分布式锁。

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
public class RedisLock {

//锁的key前缀
private final static String LOCK_PREFIX="redis.lock:%s";

private RedisUtil redisUtil;

public RedisLock(RedisUtil redisUtil){
this.redisUtil = redisUtil;
}

/**
* 尝试获得锁
* @param key String key
* @param lockTimeOut long 超时时间(毫秒)
* @return 大于 0 获得到锁并,等于0获取锁失败
*/
public long tryLock(String key,long lockTimeOut){
key= String.format(LOCK_PREFIX,key);
long expireTime = 0;
expireTime = System.currentTimeMillis() + lockTimeOut +1;
if(redisUtil.setStringIfNotExists(key, String.valueOf(expireTime))==1){
return expireTime;
}else {
String curLockTimeStr = redisUtil.getString(key);
//判断是否过期
if (StringUtils.isBlank(curLockTimeStr) || System.currentTimeMillis() > Long.valueOf(curLockTimeStr)) {
expireTime = System.currentTimeMillis() + lockTimeOut +1;
curLockTimeStr = redisUtil.getSet(key, String.valueOf(expireTime));
//仍然过期,则得到锁
if (StringUtils.isBlank(curLockTimeStr) || System.currentTimeMillis() > Long.valueOf(curLockTimeStr)){
return expireTime;
}else {
return 0;
}
}else {
return 0;
}
}
}
/**
* 一直等待获得锁
* @param key String key
* @param lockTimeOut long 超时时间(毫秒)
* @param perSleep long 获得锁循环等待休眠时间
* @return 大于 0 获得到锁并,等于0获取锁失败
* @throws InterruptedException
*/
public long lock(String key,long lockTimeOut,long perSleep) throws InterruptedException{
key= String.format(LOCK_PREFIX,key);
long starttime = System.currentTimeMillis();
long sleep = (perSleep==0 ? lockTimeOut/ 10 : perSleep);
//得到锁后设置的过期时间,未得到锁返回0
long expireTime = 0;
for (;;) {
expireTime = System.currentTimeMillis() + lockTimeOut +1;
if (redisUtil.setStringIfNotExists(key, String.valueOf(expireTime)) == 1) {
//得到了锁返回
return expireTime;
}else {
String curLockTimeStr = redisUtil.getString(key);
//判断是否过期
if (StringUtils.isBlank(curLockTimeStr) || System.currentTimeMillis() > Long.valueOf(curLockTimeStr)) {
expireTime = System.currentTimeMillis() + lockTimeOut +1;
curLockTimeStr = redisUtil.getSet(key, String.valueOf(expireTime));
//仍然过期,则得到锁
if (StringUtils.isBlank(curLockTimeStr) || System.currentTimeMillis() > Long.valueOf(curLockTimeStr)){
return expireTime;
}else {
Thread.sleep(sleep);
}
}else {
Thread.sleep(sleep);
}
}
if (lockTimeOut > 0 && ((System.currentTimeMillis() - starttime) >= lockTimeOut)) {
expireTime = 0;
return expireTime;
}
}
}
/**
* 释放锁
* @param key String key
* @param expireTime long 超时时间(毫秒)
*/
public void unlock(String key,long expireTime){
key= String.format(LOCK_PREFIX,key);
if (System.currentTimeMillis()-expireTime>0) {
return ;
}
String curLockTimeStr = redisUtil.getString(key);
if (StringUtils.isNotBlank(curLockTimeStr) && Long.valueOf(curLockTimeStr)>System.currentTimeMillis()) {
redisUtil.delKey(key);
}
}
}

可以看到在等待锁的阶段,如果设置超时时间,则客户端只能自旋等待锁,如果在指定时间内未获得锁,就会超时。

可以看到主要逻辑是,首先使用setStringIfNotExists (对应Redis的set str NX 命令)方法尝试设置key,如果成功说明获得锁,返回超时时间;如果不成功说明已经有程序在使用该锁,需要判断剩余过期时间,如果没有剩余过期时间,再尝试获得锁,否则线程sleep剩余过期时间。

可以看到,我们需要指定锁的使用时间,如果不指定时间,有可能会造成死锁等问题。

配置中心

Redis也可以用来实现配置中心的相关功能。

Redis 做配置中心,需要结合数据库来实现以确保稳定性。

数据库设计一张配置表用于存储配置数据,在Redis可以将数据存入哈希表来进行处理。

当然我们新增修改或者删除数据时需要同时对Redis和数据库进行操作。

并可以添加数据从Redis刷新到数据库和从数据库刷新到Redis等功能。

部分代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
public class RedisConfigCenter {
/**
* 配置中心key
*/
public static final String CONFIG_CENTER_KEY = "redis.configcenter:hash:key";
//Redis 工具
private RedisUtil redisUtil;
public RedisConfigCenter(RedisUtil redisUtil){
this.redisUtil = redisUtil;
}
/**
* 配置中心添加数据
* 可以添加一个或多个
* @return
*/
public boolean insertData(Map<String,String> insertData){
//放入Redis hash表
boolean exists = redisUtil.exists(CONFIG_CENTER_KEY);
if(exists){
//拿到key对应的Redis hash表数据
Map<String,String> redisMap = redisUtil.hashGetAll(CONFIG_CENTER_KEY);
insertData.putAll(redisMap);
}
//写入Redis hash表
redisUtil.hashMultipleSet(CONFIG_CENTER_KEY,insertData);

//数据库处理部分代码略
return true;
}

/**
* 配置中心更新数据
* @param updateData
* @return
*/
public boolean updateData(Map<String,String> updateData){
boolean exists = redisUtil.exists(CONFIG_CENTER_KEY);
if(!exists){
throw new RuntimeException("请先新增数据!");
}
//拿到key对应的Redis hash表数据
Map<String,String> redisMap = redisUtil.hashGetAll(CONFIG_CENTER_KEY);
redisMap.putAll(updateData);
//写入Redis hash表
redisUtil.hashMultipleSet(CONFIG_CENTER_KEY,updateData);
//数据库处理部分代码略
return true;
}

/**
* 配置中心删除数据
* @param deleteKeys
* @return
*/
public boolean deleteData(List<String> deleteKeys){
boolean exists = redisUtil.exists(CONFIG_CENTER_KEY);
if(!exists){
throw new RuntimeException("请先新增数据!");
}
//拿到key对应的Redis hash表数据
Map<String,String> redisMap = redisUtil.hashGetAll(CONFIG_CENTER_KEY);
deleteKeys.forEach(key->{
redisMap.remove(key);
});
//写入Redis hash表
redisUtil.hashMultipleSet(CONFIG_CENTER_KEY,redisMap);
//数据库处理部分代码略
return true;
}

/**
* 查询数据列表
* @return
*/
public Map<String,String> selectData(){
Map<String,String> map = new HashMap<>();
boolean exists = redisUtil.exists(CONFIG_CENTER_KEY);
if(!exists){
return map;
}
map = redisUtil.hashGetAll(CONFIG_CENTER_KEY);
return map;
}

/**
* 将数据库数据刷新到Redis
* @return
*/
public boolean refreshToRedis(){
//数据库获取到数据集合 略
Map<String,String> map = new HashMap<>();//TODO
redisUtil.hashMultipleSet(CONFIG_CENTER_KEY,map);
return true;
}

/**
* Redis刷新到数据库
* @return
*/
public boolean refreshToDataBase(){
Map<String,String> map = redisUtil.hashGetAll(CONFIG_CENTER_KEY);
//数据库操作,略
return true;
}
}

当然,为保证数据可靠性,可以启动两个线程,指定时间从Redis刷新到数据库或者从数据库刷新到Redis的功能。

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/**
* 可以改为配置的形式
*/
private static boolean refreshRedisThreadStop = false;
private static boolean refreshDataBaseThreadStop = false;
/**
* 刷新数据的线程
*/
ExecutorService executorService = Executors.newFixedThreadPool(2);

/**
* 初始化
*/
public void init() {
// refresh thread
executorService.submit(() ->{
while (!refreshRedisThreadStop) {
try {
//30 min 刷新一次
TimeUnit.MINUTES.sleep(30);
refreshToRedis();
} catch (Exception e) {
e.printStackTrace();
}
}
});
executorService.submit(() ->{
while (!refreshDataBaseThreadStop) {
try {
//30 min 刷新一次
TimeUnit.MINUTES.sleep(30);
refreshToDataBase();
} catch (Exception e) {
e.printStackTrace();
}
}
});
}

结语

通过以上对Redis在应用中的各个功能的应用,让我们对Redis的应用有了更深入的了解,以及对Redis的应用场景有了更加深刻的认识。

Redis 在项目中基本上已经是很平常的存在了,如何使用好它,解决棘手问题乃是我们的重中之重。




-------------文章结束啦 ~\(≧▽≦)/~ 感谢您的阅读-------------

您的支持就是我创作的动力!

欢迎关注我的其它发布渠道