bboyjing's blog

Redis学习笔记十【Redis事物】

前面章节已经涉及过基本的Redis事物,知晓了Redis的事物和传统的关系型数据库的事物并不相同。Redis虽然有简单的方法可以处理一连串相互一致的读操作和写操作,但是没有办法根据读取到的数据为依据来进行相关操作,这种无法以一致性的形式读取数据的行为将导致某一类型的问题变得难以解决。

模拟场景演示Redis事物

下面以一个商品买卖市场为场景,来演示Redis事物的使用,在该场景下,人们可以在市场里自由销售和购买商品。

定义用户信息和用户包裹

用户信息存储在一个散列中,散列的各个键值分别记录了用户的姓名、拥有的钱数等属性:

  • key
    • user:××××××
  • hash value
    • subKey : name | subValue : ××××××
    • subKey : funds | subValue : ××××××

用户包裹使用一个集合来表示,它记录了包裹里面每件商品的唯一编号:

  • key
    • inventory:××××××
  • set value
    • ××××××
    • ××××××

商品买卖市场的需求非常简单:一个用户可以将自己的商品按照给定的价格放到市场上进行销售,当另一个用户购买这个商品时,卖家就会收到钱。
为了将被销售的商品的全部信息都存储到市场里面,我会将商品的ID和卖家的ID拼接起来,并将拼接的结果用作成员存储到市场有序集合中,而商品的售价则用作分值,这样市场的数据结构也出来了:

  • key
    • market:
  • zset value
    • member : ItemA_userA | score : 35
    • member : ItemB_userB | score : 48

将商品放到市场上销售

在将一件商品放到市场上进行销售的时候,程序需要将被销售的商品添加到记录市场正在销售商品的有序集合中,并且在添加操作执行的过程中,监视卖家的包裹以确保被销售的商品缺失存在于卖家的包裹当中。代码示例位于redis-sample项目的market模块,下面列举出核心功能:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public boolean listItem(String itemId, String sellerId, long price) {
String inventory = "inventory:" + sellerId;
String item = itemId + '_' + sellerId;
//事物
SessionCallback<List<Object>> sessionCallback = new SessionCallback<List<Object>>() {
public List<Object> execute(RedisOperations operations) throws DataAccessException {
/**
* WATCH 命令用于在事务开始之前监视任意数量的键:
* 当调用 EXEC 命令执行事务时, 如果任意一个被监视的键已经被其他客户端修改了
* 那么整个事务不再执行, 直接返回失败
* 此处用来监视包裹是否发生变化
*/
operations.watch(inventory);
if(!operations.opsForSet().isMember(inventory, itemId)){
operations.unwatch();
return null;
}
operations.multi();
operations.opsForZSet().add("market:", item, price);
operations.opsForSet().remove(inventory, itemId);
return operations.exec();
}
};
return redisTemplate.execute(sessionCallback) == null ? false : true;
}

构造测试数据,假设有一名叫Frank的卖家有100元,包裹里有ItemL、ItemM、ItemN,他想要以97元的价格销售ItemM:

1
2
3
4
127.0.0.1:6379> hmset user:001 name Frank funds 10000
OK
127.0.0.1:6379> sadd inventory:001 ItemL ItemM ItemN
(integer) 3

跑完测试用例后检查下Redis中的数据:

1
2
3
4
5
6
7
8
9
//多了market:
127.0.0.1:6379> keys *
1) "market:"
2) "user:001"
3) "inventory:001"
//ItemM被从包裹中成功移除
127.0.0.1:6379> SMEMBERS inventory:001
1) "ItemL"
2) "ItemN"

另外,试了下断点在watch之后,手动随便删除inventory:001的一个member,测试结果根预想的一样会失败。

购买商品

购买商品的具体流程:首先使用watch对市场以及卖家的个人信息进行监视,然后获取买家拥有的钱数以及商品的售价,并检查买家是否有足够的钱来购买该商品,如果买家买有足够的钱,那么程序会取消事物;相反地,如果买家的钱足够,那么首先会将买家支付的钱转移给卖家,然后将售出的商品移动至买家的包裹,并将该商品从市场中移除。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public boolean purchaseItem(String buyerId, String itemId, String sellerId) {
String buyer = "user:" + buyerId;
String seller = "user:" + sellerId;
String item = itemId + '_' + sellerId;
String inventory = "inventory:" + buyerId;
//事物
SessionCallback<List<Object>> sessionCallback = new SessionCallback<List<Object>>() {
public List<Object> execute(RedisOperations operations) throws DataAccessException {
//监视市场和卖家信息
operations.watch(Lists.newArrayList("market:", buyer));
//获取商品售价
long sellPrice = operations.opsForZSet().score("market:", item).longValue();
//获取买家钱数
long funds = Long.valueOf((String) operations.opsForHash().get(buyer, "funds"));
if (sellPrice > funds){
operations.unwatch();
return null;
}
operations.multi();
operations.opsForHash().increment(seller, "funds", sellPrice);
operations.opsForHash().increment(buyer, "funds", -sellPrice);
operations.opsForSet().add(inventory, itemId);
operations.opsForZSet().remove("market:", item);
return operations.exec();
}
};
return redisTemplate.execute(sessionCallback) == null ? false : true;
}

构造测试数据,鸡舍有一名叫Bil的卖家正好有97元,想要购买Frank出售的ItemM:

1
2
127.0.0.1:6379> hmset user:002 name Bill funds 9700
OK

跑完测试用例后检查下Redis中的数据:

1
2
3
4
5
6
7
8
127.0.0.1:6379> hget user:001 funds
"19700"
127.0.0.1:6379> hget user:002 funds
"0"
127.0.0.1:6379> smembers inventory:002
1) "ItemM"
127.0.0.1:6379> zrange market: 0 -1
(empty list or set)

综上所述,Redis并没有在watch的时候锁住数据,而是在数据已经被其它客户端抢先修改了的情况下,通知执行了watch命令的客户端,这种乐观锁的实现方式尽可能地减少了客户端的等待时间。

非事物型流水线

在需要执行大量命令的情况下,即使命令实际上并不需要放在事物里面执行,但是为了通过一次发送所有的命令来减少通信次数,我们也可以将命令包裹在MULTI和EXEC里面执行。但是,MULTI和EXEC并不是免费的,它们会消耗资源,并且可能会导致其他重要的命令被延迟执行。此时,我们可以选择使用pipeline来一次性提交多个不同的命令。下面改写前面基本的Redis事物中的例子来演示下如何使用pipeline:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void incrByPipeline() throws InterruptedException{
final byte[] rawKey = stringRedisTemplate.getStringSerializer().serialize("trans:");
RedisCallback<Object> pipelineCallback = redisConnection -> {
redisConnection.incr(rawKey);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
redisConnection.decr(rawKey);
return null;
};
System.out.println(stringRedisTemplate.executePipelined(pipelineCallback).get(0));
}

测试下来输出三个1,可以看出使用pipeline依然可以有效地防止数据错误发生,因为Redis会执行完一个管道内的所有命令再去处理下一个管道。

性能相关事项

上面我们知道了如何在不适用事物的情况下,通过使用pipeline来提升Redi的性能,这一部分我们再来看看还有没有其他可以提升Redis性能的常规方法。
Redis提供了一个性能测试程序redis-benchmark来展示Redis在自己服务器上的各种性能特征:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
redis-3.2.4 src/redis-benchmark -c 1 -q
PING_INLINE: 67980.97 requests per second
PING_BULK: 69930.07 requests per second
SET: 66489.37 requests per second
GET: 69444.45 requests per second
INCR: 68352.70 requests per second
LPUSH: 65876.16 requests per second
RPUSH: 65876.16 requests per second
LPOP: 66533.60 requests per second
RPOP: 67294.75 requests per second
SADD: 68823.12 requests per second
SPOP: 68027.21 requests per second
LPUSH (needed to benchmark LRANGE): 65703.02 requests per second
LRANGE_100 (first 100 elements): 32530.91 requests per second
LRANGE_300 (first 300 elements): 14549.69 requests per second
LRANGE_500 (first 450 elements): 10447.14 requests per second
LRANGE_600 (first 600 elements): 8185.99 requests per second
MSET (10 keys): 56657.22 requests per second

上面时本机使用一个Redis客户端的性能测试结果,这些数据可以参考下,应用程序的实际性能肯定达不到基准测试的数据,如果和基准数据偏差比较大,下面列举一些可能的原因:

性能或者错误 可能的原因 解决方法
单个客户端性能达到redis-benchmark的50%~60% 这是不使用pipeline时的预期性能
单个客户端性能达到redis-benchmark的25%~30% 对于每个命令或者每组命令都创建新的连接 使用连接池,重用已有的Redis连接
客户端返回错误:”Cannot assign request address” 对于每个命令或者每组命令都创建新的连接 使用连接池,重用已有的Redis连接

这里简单了列举一些问题以及解决方法,另外Redis慢的原因也有可能是以不正确的方式使用Redis数据结构导致的,至于其它疑难杂症就靠使用者自行研究了。