bboyjing's blog

Redis学习笔记十四【使用Redis构建应用程序组件-计数信号量】

计数信号量也是一种锁,它可以让用户限制一项资源最多能够同时被多少个进程访问,计数信号量和锁的区别在于,当客户端获取锁失败时,通常会选择等待;而当客户端获取计数信号量失败时,通常会选择立即返回失败结果。下面模拟一下,最多同时只允许5个进程同时访问一个资源。

构建基本的计数信号量

构建计数信号量时要考虑的事情和构建其他类型的锁时要考虑的事情大部分都是相同的,比如判断时哪个客户端取得了锁,如何处理客户端在获得锁之后奔溃的情况,以及如何处理锁超时的问题。为了使多个信号量持有者的信息都存储到同一个结构里面,将使用有序集合来构建计数信号量。就是为每个尝试获取信号量的进程生成一个唯一标识符,将这个标识符用作有序集合的成员,对应的分值是尝试获取信号量时的Unix时间戳。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
//获取信号量
public String acquireSemaphore(String semname, int limit){
//信号量多久后超时
long timeout = 100000;
String identifier = UUID.randomUUID().toString();
long now = System.currentTimeMillis();
SessionCallback<List<Object>> sessionCallback = new SessionCallback<List<Object>>() {
public List<Object> execute(RedisOperations operations) throws DataAccessException {
operations.multi();
/**
* 移除有序集合中score介于min和max之间的成员
* 清理有序集合中时间戳大于超时数字的标识符
*/
operations.opsForZSet().removeRangeByScore(semname, Double.MIN_VALUE, now - timeout);
//尝试获取信号量
operations.opsForZSet().add(semname, identifier, now);
operations.opsForZSet().rank(semname, identifier);
return operations.exec();
}
};
//检查是否成功获取了信号量
if((Long) stringRedisTemplate.execute(sessionCallback).get(2) < limit){
return identifier;
}
//获取信号量失败,删除之前添加的标示
stringRedisTemplate.opsForZSet().remove(semname, identifier);
return null;
}
//释放信号量
public void releaseSemaphore(String semname, String identifier){
stringRedisTemplate.opsForZSet().remove(semname, identifier);
}

这个基本版本的信号量实现非常好用,不仅简单,而且运行速度也很快,但是存在不公平的问题。举个例子,假设有A、B两台机器,A的系统时间比B的系统时间快10毫秒,那么当A取得最后一个信号量的时候,B只需要在10毫秒内尝试获取信号量,就有可能偷走A的信号量,或者使获取的信号量超过limit值。每当锁或者信号量因为系统时钟的细微不同而导致的获取结果出现剧烈变化时,这个锁或者信号量就是不公平的,下面将尝试解决这个问题。

公平的信号量

公平信号量的目的是使得只要各个系统的系统时间相差不超过1秒,就不会引起信号量被偷。为了尽可能地减少系统时间不一致带来的问题,我们需要给信号量添加一个计数器和一个有序集合。数据结构如下:

  • key
    • semaphore:remote
  • zset value
    • member : f29b654d-e173-4ada-987e-7ef7a231d705 | score : 1481876968118
    • member : ××× | score : ×××
  • key
    • semaphore:remote:owner
  • zset value
    • member : f29b654d-e173-4ada-987e-7ef7a231d705 | score : 7350
    • member : ××× | score : ×××
  • key
    • semaphore:remote:counter
  • string value
    • 7350
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
public String acquireFairSemaphore(String semname, int limit){
//信号量多久后超时
long timeout = 100000;
String identifier = UUID.randomUUID().toString();
String czset = semname + ":owner";
String ctr = semname + ":counter";
long now = System.currentTimeMillis();
RedisCallback<List<Object>> redisCallback = redisConnection -> {
redisConnection.multi();
//清理有序集合中时间戳大于超时数字的标识符
redisConnection.zRemRangeByScore(semname.getBytes(), Double.MIN_VALUE, now - timeout);
/**
* czset、semname求交集,并将结果集覆盖到czset中,目的也是清理czset中的过期信号量
* 由于RedisOperations中的intersectAndStore方法没有weights参数,所以用这里是使用RedisConnection
* czset * 1 与 semname * 0求交集,这样就只留下了czset中的数据
*/
redisConnection.zInterStore(czset.getBytes(),
RedisZSetCommands.Aggregate.SUM,
new int[]{1,0}, czset.getBytes(),
semname.getBytes());
//信号量计数+1
redisConnection.incr(ctr.getBytes());
return redisConnection.exec();
};
//获取计数器
long counter = (Long)stringRedisTemplate.execute(redisCallback).get(2);
/**
* 尝试获取信号量
*/
redisCallback = redisConnection -> {
redisConnection.multi();
redisConnection.zAdd(semname.getBytes(), now, identifier.getBytes());
redisConnection.zAdd(czset.getBytes(), counter, identifier.getBytes());
redisConnection.zRank(czset.getBytes(), identifier.getBytes());
return redisConnection.exec();
};
if((Long)stringRedisTemplate.execute(redisCallback).get(2) < limit){
return identifier;
}
/**
* 获取信号量失败,删除记录
*/
redisCallback = redisConnection -> {
redisConnection.multi();
redisConnection.zRem(semname.getBytes(), identifier.getBytes());
redisConnection.zRem(czset.getBytes(), identifier.getBytes());
return redisConnection.exec();
};
stringRedisTemplate.execute(redisCallback);
return null;
}
//释放信号量
public boolean releaseFairSemaphore(String semname, String identifier){
SessionCallback<List<Object>> sessionCallback = new SessionCallback<List<Object>>() {
public List<Object> execute(RedisOperations operations) throws DataAccessException {
operations.multi();
operations.opsForZSet().remove(semname, identifier);
operations.opsForZSet().remove(semname + ":owner", identifier);
return operations.exec();
}
};
return (Long)stringRedisTemplate.execute(sessionCallback).get(1) == 1;
}

刷新信号量

上面的信号量实现是在给定时间后超时,现在扩展下新功能,假如需要刷新信号量,防止其过期。因为公平信号量已经区分开了超时有序集合和信号量拥有者有序集合,所以程序只需要对超时有序集合进行更新,就可以立即刷新信号量的超时时间了。

1
2
3
4
5
6
7
8
9
10
public boolean refreshFareSemaphore(String semname, String identifier){
/**
* zadd返回被成功添加的新成员的数量,不包括那些被更新的、已经存在的成员。
* add返回true表示添加了新成员,而不是更新,所以需要释放掉刚添加的数据
*/S
if(stringRedisTemplate.opsForZSet().add(semname, identifier, System.currentTimeMillis())){
releaseFairSemaphore(semname, identifier);
}
return false;
}

消除竞争条件

上面的信号量实现依然会带有可能会导致操作不正确的竞争条件出现。比如,当两个进程A和B都在尝试获取剩余最后一个信号量,假设A先对计数器执行了自增操作,此时只要B能够抢先地将自己的标识符添加到有序集合中,并且检查了标识符在有序集合中的排名,那么B就可以成功地取得信号量。之后A也将自己的标识符添加到有序结合里,并检查标识符在有序集合中的排名时,A也能获取信号量。
为了消除信号量中所有可能出现的竞争条件,构建一个正确的计数器信号量实现,我们需要用到前面构建的带有超时功能的分布式锁。总的来说,当程序想要获取信号量的时候,先尝试获取一个带有短暂超时功能的分布式锁。

1
2
3
4
5
6
7
8
9
10
11
public String acquireSemaphoreWithLock(String semname, int limit){
String identifier = timeoutLockService.acquireLockWithTimeout(semname);
if(!StringUtils.isEmpty(identifier)){
try{
return acquireFairSemaphore(semname, limit);
}finally {
firstLockService.releaseLock(semname, identifier);
}
}
return null;
}

信号量的实现就到此位置,列举下三种实现的优缺点:

  • 如果对使用系统时钟没有意见,也不需要对信号量进行刷新,并且能够接受信号量的数量欧尔超过限制,那么可以使用第一种实现。
  • 如果只信任差距在一两秒之间的系统时钟,但仍然能够接受信号量的数量偶尔超过限制,那么可以使用第二种实现。
  • 如果希望信号量一直都具有正确的行为,那么可以使用带锁的信号量实现来保证正确性。