bboyjing's blog

Redis学习笔记十五【使用Redis构建应用程序组件-任务队列】

本章节将学习如何使用Redis来构建任务队列,实现类似RabbitMQ之类的消息队列的功能。

先进先出队列

依然使用之前虚构的例子,现在来实现通过电子邮件来订阅商品交易市场中已售出的商品的相关信息。因为对外发电子邮件可能会有非常高的延迟,甚至可能会出现发送失败的情况,所以将使用任务队列来记录邮件的收信人以及发送邮件的原因,并构建一个可以在邮件发送服务器运行变得缓慢的时候,以并行方式一次发送多封邮件的工作进程。我们要编写的队列将以“先到先服务”的方式发送邮件,并且无论发送是否成功,程序都会把结果记录到日志中。
邮件队列采用List构成,存储JSON格式的字符串,数据结构如下:

  • key
    • queue:email
  • list value
    • “{“seller_id”:1,”item_id”:”Item_M”,”price”:97,”buyer_id”:27,”time”:1482399576744}”
    • ××××××
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//将信息推入邮件列表
public void sendSoldEmailViaQueue(int seller, String item, int price, int buyer){
JSONObject jsonObject = new JSONObject();
jsonObject.put("seller_id", seller);
jsonObject.put("item_id", item);
jsonObject.put("price", price);
jsonObject.put("buyer_id", buyer);
jsonObject.put("time", System.currentTimeMillis());
stringRedisTemplate.opsForList().rightPush("queue:email", jsonObject.toJSONString());
}
//处理待发邮件
public void processEmailQueue(){
while (true) {
/**
* 给了超时参数的leftPop方法调用了bLPop命令
* 当给定列表内没有任何元素可供弹出的时候,连接将被 BLPOP 命令阻塞,直到等待超时或发现可弹出元素为止。
*/
String packed = stringRedisTemplate.opsForList().leftPop("queue:email", 30l, TimeUnit.SECONDS);
JSONObject jsonObject = JSONObject.parseObject(packed);
System.out.println(jsonObject.getString("item_id"));
}
}

上面已经实现了简单的队列,下面再看下如果要执行的任务不止一种,该怎么办?

多个可执行任务

将采用注册回调函数的方式来实现执行指定的任务,队列中存储的格式为[FUNCTION_NAME,[ARG1,ARG2,ARG3…]],书上的例子是python写的,动态语言实现这样的功能非常方便,但是用Java来写的话,就有点恶心了,还是简单来实现下吧。数据格式如下:

  • key
    • queue:task
  • list value
    • “[SendEmailTask,[1,Item_M,97,99]]”
    • ××××××
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
//将任务推入队列
public void sendTaskViaQueue(String queueName, String functionName, String ... args){
StringBuilder task = new StringBuilder();
task.append("[").append(functionName);
if(args.length > 0){
task.append(",[");
for (String arg : args) {
task.append(arg).append(",");
}
task.deleteCharAt(task.length() - 1);
task.append("]");
}
task.append("]");
stringRedisTemplate.opsForList().rightPush(queueName, task.toString());
}
//处理队列中的任务
public void processTaskQueue(String queueName) throws Exception{
while (true) {
String packed = stringRedisTemplate.opsForList().leftPop(queueName, 30l, TimeUnit.SECONDS);
String functionName;
String args;
int index = packed.indexOf(",");
if(index > 0){
functionName = packed.substring(1, index);
args = packed.substring(index + 2, packed.length() - 2);
}else{
functionName = packed.substring(1, packed.length() - 1);
args = "";
}
//此处可以用Spring管理Bean
Class taskClass = Class.forName("cn.didadu.queue." + functionName);
ITask task = (ITask) taskClass.newInstance();
task.execute(args.split(","));
}
}
//处理任务接口
public interface ITask {
void execute(String ... args);
}
//处理任务实现
public class SendEmailTask implements ITask{
@Override
public void execute(String... args) {
for (String arg: args) {
System.out.println("send email process running: " + arg);
}
}
}

任务优先级

假如多个任务之间存在优先级,上面一个例子实现了发送邮件任务,现在有另外任务需要发送消息,发送邮件的优先级比发送消息高。这个问题也很容易解决。BLPOP天然支持,因为BLPOP可以接收多个队列,当给定多个key参数时,按参数key的先后顺序依次检查各个列表,弹出第一个非空列表的头元素。只需要稍微修改下上述处理任务的方法即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public void processTaskQueue(String[] queueNames) throws Exception{
while (true) {
//leftPop参数不支持多个key,需要自己实现下
byte[][] keys = new byte[queueNames.length][];
int i = 0;
for(String queueName : queueNames){
keys[i++] = queueName.getBytes();
}
RedisCallback<List<byte[]>> redisCallback = connection -> connection.bLPop(30, keys);
String packed = new String(stringRedisTemplate.execute(redisCallback).get(1));
String functionName;
String args;
int index = packed.indexOf(",");
if(index > 0){
functionName = packed.substring(1, index);
args = packed.substring(index + 2, packed.length() - 2);
}else{
functionName = packed.substring(1, packed.length() - 1);
args = "";
}
//此处可以用Spring管理Bean
Class taskClass = Class.forName("cn.didadu.queue." + functionName);
ITask task = (ITask) taskClass.newInstance();
task.execute(args.split(","));
}
}

延迟任务

本节尝试构造一个具有延迟执行任务的队列,将所有需要在未来执行的任务都添加到有序集合中,并将任务的执行时间设置分值,另外再使用一个进程来查找有序集合中是否存在可以立即被执行的任务,如果有的话,就从有序集合中移除那个任务,并将任务重新添加到适当的任务队列中,来实现延迟特性。
有序集合队列存储的每个被延迟执行的任务是一个包含4个值的JSON串,分别为:唯一标识符、处理任务的队列的名字、处理任务的回调函数的名字、传给回调函数的参数。格式如下:

  • key
    • delayed:
  • list value
    • “{“task_id”:aa-bb-cc,”queue_name”:”queue:email”,”function_name”:”SendEmailTask”,”args”:[“1”, “Item_M”, “97”, “99”]}”
    • ××××××
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
//添加到延迟任务列表
public String sendTaskViaQueue(String queueName, String functionName, long delay,String ... args){
String task_id = UUID.randomUUID().toString();
JSONObject jsonObject = new JSONObject();
jsonObject.put("task_id", task_id);
jsonObject.put("queue_name", queueName);
jsonObject.put("function_name", functionName);
jsonObject.put("args", args);
stringRedisTemplate.opsForZSet().add(
"delayed:",
jsonObject.toJSONString(),
System.currentTimeMillis() + delay);
return task_id;
}
//处理延迟任务队列
public void processTaskQueue() throws Exception{
while (true) {
//获取延迟任务列表中的第一个任务
Set<ZSetOperations.TypedTuple<String>> delayTaskSet =
stringRedisTemplate.opsForZSet().rangeWithScores("delayed:", 0, 0);
//Redis没有直接提供阻塞有序集合的方法,需要自己检测
if(delayTaskSet.size() == 0){
Thread.sleep(1000);
continue;
}
//获取任务信息
String delayTaskStr = "";
long delay = 0;
for(ZSetOperations.TypedTuple typedTuple : delayTaskSet){
delayTaskStr = (String) typedTuple.getValue();
delay = typedTuple.getScore().longValue();
break;
}
//若还未到执行时间,等待一会儿继续loop
if(delay > System.currentTimeMillis()){
Thread.sleep(1000);
continue;
}
//将到执行时间的任务推入适当的任务队列中,并删除记录
JSONObject jsonObject = JSONObject.parseObject(delayTaskStr);
JSONArray jsonArray = jsonObject.getJSONArray("args");
int argsSize = jsonArray == null ? 0 : jsonArray.size();
String[] args = new String[argsSize];
for (int i = 0; i <jsonArray.size(); i++){
args[i] = jsonArray.getString(i);
}
multiTaskQueueService.sendTaskViaQueue(
jsonObject.getString("queue_name"),
jsonObject.getString("function_name"),
args);
stringRedisTemplate.opsForZSet().remove("delayed:", delayTaskStr);
}
}

Redis构建队列的学习就到此结束,示例代码可能无法直接使用到实际项目中,但是其实现思路以及对Redis数据结构的使用还是值得学习的。