bboyjing's blog

Redis学习笔记十六【使用Redis构建应用程序组件-消息拉取】

多个客户端在互相发送和接收消息的时候,通常会使用以下两种方式来传递消息。第一种时消息推送,也就是由消息发送者来确保所有接受者已经成功接收到了消息,Redis内置了PUBLISH和SUBSCRIBE命令可以实现;第二种时消息拉取,这种方法要求接收者自己去获取存储在某种mailbox里的消息。尽管消息推送非常有用,但是客户端因为某些原因而没办法一直保持在线的时候,采用这一消息传递方法的程序就会出现各种各样的问题。本章节将尝试编写不同的消息拉取方式,来替代PUBLISH和SUBSCRIBE。

单接收者消息的发送与订阅替代品

假设现在打算开发一个移动通信程序,这个应用通过连接服务器来发送和接受雷系短信或彩信的消息,基本上就是一个文字短信和图片彩信的替代品。每条消息都只会被发送至一个客户端,这一点极大地简化了我们要解决的问题。可以为每个移动客户端使用一个列表结构,发送者会把消息放到接受者的列表中,而接受者客户端则通过发送请求来获取最新的消息。数据格式如下:

  • key
    • mailbox:bboyjing
  • list value
    • “{“sendder”:”dora”,”msg”:”Hi I’m dora.”}”
    • ××××××

该示例的代码实现比较简单,之前也已经学过如何对列表进行推入和弹出操作,这里就不再给出代码实现了。

多接受者消息的发送与订阅替代品

单个接受者的消息传递已经满足不了需求,现在要实现一个群组聊天功能,和之前一样,因为应用程序的客户端可能会载人和时候进行连接或者断开连接,所以还是不能使用内置的PUBLISH和SUBSCRIBE。
每个新创建的群组都会有一些初始用户,每个用户都可以按照自己的意愿来参加或者离开群组。群组使用有序集合来记录参加群组的用户,其中有序集合的成员为用户的名字,分值时用户在群组内接收到的最大消息ID。用户也会使用有序集合来记录自己参加的所有群组,其中有序集合的成员为群组ID,分值是用户在群组内接收到的最大消息ID。数据格式如下:

  • key
    • chat:001
  • zset value
    • member : john | score : 5
    • member : jeff | score : 6
  • key
    • chat:002
  • zset value
    • member : michelle | score : 10
    • member : jason | score : 10
    • member : jenny | score : 11
  • key
    • member:jason
  • zset value
    • member : 001 | score : 5
    • member : 002 | score : 6
  • key
    • member:jeff
  • zset value
    • member : 001 | score : 6

以上数据例子表示jason和jeff都参加了001群组,其中用户jason看了6条群组消息中的5条。

创建群组聊天会话

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public String createChat(String sender, Set<String> recipients, String message) {
//通过全局计数器来获取一个新的群组ID
Long chatId = stringRedisTemplate.opsForValue().increment("ids:chat:", 1l);
//将发送者也添加到群组成员中
recipients.add(sender);
stringRedisTemplate.execute(new SessionCallback<List<Object>>() {
public List<Object> execute(RedisOperations operations) throws DataAccessException {
operations.multi();
recipients.forEach(recipient -> {
//将用户添加到群组中,并将这些用户在群组中最大已读消息ID初始化为0
stringRedisTemplate.opsForZSet().add("chat:" + chatId, recipient, 0);
//将群组ID添加到用户已参加群组的有序集合中
stringRedisTemplate.opsForZSet().add("member:" + recipient, String.valueOf(chatId),0);
});
return operations.exec();
}
});
//发送一条初始化消息
return "";
}

发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public String sendMessage(String chatId, String sender, String message) {
// 使用锁来消除竞争条件,保证消息的读取和插入的顺序一致
String identifier = timeoutLockService.acquireLockWithTimeout("chat:" + chatId);
if (identifier == null){
throw new RuntimeException("Couldn't get the lock");
}
try{
//获取消息ID
Long messageId = stringRedisTemplate.opsForValue().increment("ids:message:" + chatId, 1l);
//将消息添加到消息有序集合中
JSONObject values = new JSONObject();
values.put("id", messageId);
values.put("ts", System.currentTimeMillis());
values.put("sender", sender);
values.put("message", message);
stringRedisTemplate.opsForZSet().add("msgs:" + chatId, values.toJSONString(), messageId);
}finally {
firstLockService.releaseLock("chat:" + chatId, identifier);
}
return chatId;
}

获取消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public void fetchPendingMessages(String recipient) {
// 获取组员的群组ID以及在各组中目前收到的消息的最大ID
Set<ZSetOperations.TypedTuple<String>> memberSet =
stringRedisTemplate.opsForZSet().rangeWithScores("member:" + recipient, 0, -1);
// 获取各聊天组未读消息(分值大于上面获取的最大消息ID)
List<Object> results = stringRedisTemplate.execute(new SessionCallback<List<Object>>() {
public List<Object> execute(RedisOperations operations) throws DataAccessException {
operations.multi();
memberSet.forEach(member -> {
String chatId = member.getValue();
double messageId = member.getScore();
operations.opsForZSet().rangeByScore("msgs:" + chatId, ++messageId, Double.MAX_VALUE);
});
return operations.exec();
}
});
//遍历未读消息
stringRedisTemplate.execute(new SessionCallback<List<Object>>() {
public List<Object> execute(RedisOperations operations) throws DataAccessException {
operations.multi();
int i = 0;
for(ZSetOperations.TypedTuple<String> member : memberSet){
Set<String> messages = (Set<String>) results.get(i++);
System.out.println("聊天组:" + member.getValue() + ",有如下未读消息");
messages.forEach(message ->
System.out.println(JSONObject.parseObject(message).getString("message")));
//修改群组成员读取的最大消息ID
operations.opsForZSet().incrementScore(
"member:" + recipient,
member.getValue(),
messages.size());
//修改群组有序集合中成员读取的最大消息ID
operations.opsForZSet().incrementScore(
"chat:" + member.getValue(),
recipient,
messages.size());
}
return operations.exec();
}
});
}

加入群组和离开群组

加入和离开群组只需要初始化或者删除一些数据,比较简单,就不给出例子了。