bboyjing's blog

CrazyIM学习笔记五【点对点单聊】

  单聊的业务比较简单,就像微信的聊天功能。其业务流程具体如下:

  1. 当用户A登录成功后,按照单聊的消息格式,发送消息。这里的消息格式为:userId:content。其中userId就是消息接目标用户B的userId;其中content,标识聊天的内容。
  2. 服务器端收到消息后,根据目标userId进行消息的转发,发送到用户B所在的客户端。
  3. 客户端用户B收到用户A发来的消息,在自己的控制台显式出来。

这里服务端的路由转发不是根据sessionId,而是根据userId。其原因是因为用不B可能登录了多个会话(桌面会话、移动端会话、网页端会话),这时发给用户B的聊天消息必须转发到多个会话,所以需要根据userId进行转发。

服务端实现

  服务端的实现只要添加相关的转发逻辑就可以了。

ChatRedirectProcessor

  转发异步操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Slf4j
@Service
public class ChatRedirectProcessor extends AbstractServerProcessor {
@Override
public ProtoMsg.HeadType type() {
return ProtoMsg.HeadType.MESSAGE_REQUEST;
}
@Override
public boolean action(ServerSession session, ProtoMsg.Message proto) {
// 聊天处理
ProtoMsg.MessageRequest msg = proto.getMessageRequest();
log.info("chatMsg | from="
+ msg.getFrom()
+ " , to=" + msg.getTo()
+ " , content=" + msg.getContent());
// 获取接收方的chatID
String to = msg.getTo();
// 获取userId对应的所有Session
List<ServerSession> toSessions = SessionMap.inst().getSessionsBy(to);
if (toSessions == null) {
// 接收方离线
log.info("[" + to + "] 不在线,发送失败!");
} else {
toSessions.forEach((v) -> {
// 将IM消息发送到接收方
v.writeAndFlush(proto);
});
}
return true;
}
}

ChatRedirectHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Slf4j
@Service
@ChannelHandler.Sharable
public class ChatRedirectHandler extends ChannelInboundHandlerAdapter {
@Autowired
private ChatRedirectProcessor chatRedirectProcessor;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 判断消息实例
if (null == msg || !(msg instanceof ProtoMsg.Message)) {
super.channelRead(ctx, msg);
return;
}
// 判断消息类型
ProtoMsg.Message pkg = (ProtoMsg.Message) msg;
ProtoMsg.HeadType headType = ((ProtoMsg.Message) msg).getType();
if (!headType.equals(chatRedirectProcessor.type())) {
super.channelRead(ctx, msg);
return;
}
//判断是否登录
ServerSession session = ServerSession.getSession(ctx);
if (null == session || !session.isLogin()) {
log.error("用户尚未登录,不能发送消息");
return;
}
// 异步处理IM消息转发的逻辑
FutureTaskScheduler.add(() -> chatRedirectProcessor.action(session, pkg));
}
}

整合

  将ChatRedirectHandler添加到ChatServer的流水线中:

1
2
3
......
ch.pipeline().addLast(chatRedirectHandler);
......

客户端实现

ChatConsoleCommand

  负责从Scanner控制台实例获取聊天消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Service
@Data
public class ChatConsoleCommand implements BaseCommand{
public static final String KEY = "2";
private String toUserId;
private String message;
@Override
public void exec(Scanner scanner) {
System.out.print("请输入聊天的消息(id:message):");
String[] info = null;
while (true) {
String input = scanner.next();
info = input.split(":");
if (info.length != 2) {
System.out.println("请输入聊天的消息(id:message):");
} else {
break;
}
}
toUserId = info[0];
message = info[1];
}
@Override
public String getKey() {
return KEY;
}
@Override
public String getTip() {
return "聊天";
}
}

ChatMsg

  该类是server和client共用的,放到chatcommon模块中,同时引入commons-lang3

1
2
3
4
5
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.12.0</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
@Data
public class ChatMsg {
private long msgId;
private String from;
private String to;
private long time;
private MSGTYPE msgType;
private String content;
private String url; //多媒体地址
private String property; //附加属性
private String fromNick; //发送者昵称
private String json; //附加的json串
private User user;
public ChatMsg(User user) {
if (null == user) {
return;
}
this.user = user;
this.setTime(System.currentTimeMillis());
this.setFrom(user.getUid());
this.setFromNick(user.getNickName());
}
/**
* 消息类型 1:纯文本 2:音频 3:视频 4:地理位置 5:其他
*/
public enum MSGTYPE {
TEXT, AUDIO, VIDEO, POS, OTHER;
}
public void fillMsg(ProtoMsg.MessageRequest.Builder cb) {
if (msgId > 0) {
cb.setMsgId(msgId);
}
if (StringUtils.isNotEmpty(from)) {
cb.setFrom(from);
}
if (StringUtils.isNotEmpty(to)) {
cb.setTo(to);
}
if (time > 0) {
cb.setTime(time);
}
if (msgType != null) {
cb.setMsgType(msgType.ordinal());
}
if (StringUtils.isNotEmpty(content)) {
cb.setContent(content);
}
if (StringUtils.isNotEmpty(url)) {
cb.setUrl(url);
}
if (StringUtils.isNotEmpty(property)) {
cb.setProperty(property);
}
if (StringUtils.isNotEmpty(fromNick)) {
cb.setFromNick(fromNick);
}
if (StringUtils.isNotEmpty(json)) {
cb.setJson(json);
}
}
}

ChagMsgBuilder

  构建聊天消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class ChatMsgBuilder extends BaseBuilder {
private ChatMsg chatMsg;
private User user;
public ChatMsgBuilder(ChatMsg chatMsg, User user, ClientSession session) {
super(ProtoMsg.HeadType.MESSAGE_REQUEST, session);
this.chatMsg = chatMsg;
this.user = user;
}
public ProtoMsg.Message build() {
ProtoMsg.Message message = buildCommon(-1);
ProtoMsg.MessageRequest.Builder cb = ProtoMsg.MessageRequest.newBuilder();
chatMsg.fillMsg(cb);
return message.toBuilder().setMessageRequest(cb).build();
}
public static ProtoMsg.Message buildChatMsg(ChatMsg chatMsg,
User user,
ClientSession session) {
ChatMsgBuilder builder = new ChatMsgBuilder(chatMsg, user, session);
return builder.build();
}
}

ChatSender

  发送聊天消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Slf4j
@Service
public class ChatSender extends BaseSender {
public void sendChatMsg(String touid, String content) {
log.info("发送消息 startConnectServer");
ChatMsg chatMsg = new ChatMsg(getUser());
chatMsg.setContent(content);
chatMsg.setMsgType(ChatMsg.MSGTYPE.TEXT);
chatMsg.setTo(touid);
chatMsg.setMsgId(System.currentTimeMillis());
ProtoMsg.Message message = ChatMsgBuilder.buildChatMsg(chatMsg, getUser(), getSession());
super.sendMsg(message);
}
@Override
protected void sendSucced(ProtoMsg.Message message) {
log.info("发送成功:" + message.getMessageRequest().getContent());
}
@Override
protected void sendfailed(ProtoMsg.Message message) {
log.info("发送失败:" + message.getMessageRequest().getContent());
}
}

ChatMsgHandler

  用来接收聊天消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Slf4j
@ChannelHandler.Sharable
@Service
public class ChatMsgHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//判断消息实例
if (null == msg || !(msg instanceof ProtoMsg.Message)) {
super.channelRead(ctx, msg);
return;
}
// 判断类型
ProtoMsg.Message pkg = (ProtoMsg.Message) msg;
ProtoMsg.HeadType headType = pkg.getType();
// 注意,这里接收的消息类型是MESSAGE_REQUEST
if (!headType.equals(ProtoMsg.HeadType.MESSAGE_REQUEST)) {
super.channelRead(ctx, msg);
return;
}
ProtoMsg.MessageRequest req = pkg.getMessageRequest();
String content = req.getContent();
String uid = req.getFrom();
log.info(" 收到消息 from uid:" + uid + " -> " + content);
}
}

CommandControler

  添加单聊逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
@Slf4j
@Service
public class CommandController {
......
@Autowired
private ChatConsoleCommand chatConsoleCommand;
@Autowired
private ChatSender chatSender;
/**
* 初始化命令Map
*/
public void initCommandMap() {
commandMap = new HashMap<>();
......
commandMap.put(chatConsoleCommand.getKey(), chatConsoleCommand);
......
}
/**
* 启动Command线程
*
* @throws InterruptedException
*/
public void startCommandThread() throws InterruptedException {
Thread.currentThread().setName("命令线程");
while (true) {
// 建立连接
while (!connectFlag) {
// 开始连接
startConnectServer();
// 暂停命令线程
waitCommandThread();
}
// 处理命令
while (null != session) {
// 获取命令行输入的命令key,通过key找到命令
Scanner scanner = new Scanner(System.in);
clientCommandMenu.exec(scanner);
String key = clientCommandMenu.getCommandInput();
BaseCommand command = commandMap.get(key);
if (null == command) {
System.err.println("无法识别[" + command + "]指令,请重新输入!");
continue;
}
switch (key) {
case LoginConsoleCommand.KEY:
command.exec(scanner);
startLogin((LoginConsoleCommand) command);
break;
case ChatConsoleCommand.KEY:
command.exec(scanner);
startOneChat((ChatConsoleCommand) command);
break;
}
}
}
}
// 发送单聊消息
private void startOneChat(ChatConsoleCommand c) {
// 登录
if (!isLogin()) {
log.info("还没有登录,请先登录");
return;
}
chatSender.setSession(session);
chatSender.setUser(user);
chatSender.sendChatMsg(c.getToUserId(), c.getMessage());
}
public boolean isLogin() {
if (null == session) {
log.info("session is null");
return false;
}
return session.isLogin();
}
}

整合

  将ChatMsgHandler添加到NettyClient的流水线中:

1
ch.pipeline().addLast(chatMsgHandler);

测试

  1. 启动clientserver:

    1
    2
    3
    4
    2021-03-23 14:51:16.527 INFO 66212 --- [ main] cn.didadu.chatserver.ServerApplication : Starting ServerApplication using Java 1.8.0_202 on zhangjingdeMacBook-Pro.local with PID 66212 (/Users/zhangjing/IdeaProjects/crazyIM/chatserver/target/classes started by zhangjing in /Users/zhangjing/IdeaProjects/crazyIM)
    2021-03-23 14:51:16.530 INFO 66212 --- [ main] cn.didadu.chatserver.ServerApplication : No active profile set, falling back to default profiles: default
    2021-03-23 14:51:17.332 INFO 66212 --- [ main] cn.didadu.chatserver.ServerApplication : Started ServerApplication in 1.595 seconds (JVM running for 2.222)
    2021-03-23 14:51:17.482 INFO 66212 --- [ main] cn.didadu.chatserver.server.ChatServer : 疯狂创客圈 CrazyIM 服务启动, 端口 /0:0:0:0:0:0:0:0:8080
  2. 将项目打个包,启动两个客户端,分别进行登录操作:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    2021-03-23 15:04:18.379 INFO 67320 --- [ main] cn.didadu.chatclient.ClientApplication : Starting ClientApplication v1.0.0-SNAPSHOT using Java 1.8.0_202 on zhangjingdeMacBook-Pro.local with PID 67320 (/Users/zhangjing/IdeaProjects/crazyIM/chatclient/target/chatclient-1.0.0-SNAPSHOT.jar started by zhangjing in /Users/zhangjing/IdeaProjects/crazyIM/chatclient/target)
    2021-03-23 15:04:18.382 INFO 67320 --- [ main] cn.didadu.chatclient.ClientApplication : No active profile set, falling back to default profiles: default
    2021-03-23 15:04:19.228 INFO 67320 --- [ main] cn.didadu.chatclient.ClientApplication : Started ClientApplication in 1.581 seconds (JVM running for 2.185)
    2021-03-23 15:04:19.276 INFO 67320 --- [pool-1-thread-1] cn.didadu.chatclient.client.NettyClient : 客户端开始连接 [疯狂创客圈IM]
    2021-03-23 15:04:19.407 INFO 67320 --- [ntLoopGroup-3-1] c.d.chatclient.client.CommandController : 疯狂创客圈 IM 服务器 连接成功!
    请输入某个操作指令:
    [menu] 0->show 所有命令 | 1->登录 | 2->聊天 |
    1
    请输入用户信息(id:password)
    bboyjing:123
    2021-03-23 15:04:39.316 INFO 67320 --- [ 命令线程] cn.didadu.chatclient.sender.LoginSender : 构造登录消息
    2021-03-23 15:04:39.359 INFO 67320 --- [ 命令线程] cn.didadu.chatclient.sender.LoginSender : 发送登录消息
    请输入某个操作指令:
    [menu] 0->show 所有命令 | 1->登录 | 2->聊天 |
    2021-03-23 15:04:39.418 INFO 67320 --- [ntLoopGroup-3-1] cn.didadu.chatclient.sender.BaseSender : 发送成功
    2021-03-23 15:04:39.482 INFO 67320 --- [ntLoopGroup-3-1] c.d.chatclient.client.ClientSession : 登录成功
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    021-03-23 15:04:26.503 INFO 67321 --- [ main] cn.didadu.chatclient.ClientApplication : Starting ClientApplication v1.0.0-SNAPSHOT using Java 1.8.0_202 on zhangjingdeMacBook-Pro.local with PID 67321 (/Users/zhangjing/IdeaProjects/crazyIM/chatclient/target/chatclient-1.0.0-SNAPSHOT.jar started by zhangjing in /Users/zhangjing/IdeaProjects/crazyIM/chatclient/target)
    2021-03-23 15:04:26.506 INFO 67321 --- [ main] cn.didadu.chatclient.ClientApplication : No active profile set, falling back to default profiles: default
    2021-03-23 15:04:27.368 INFO 67321 --- [ main] cn.didadu.chatclient.ClientApplication : Started ClientApplication in 1.732 seconds (JVM running for 2.237)
    2021-03-23 15:04:27.613 INFO 67321 --- [pool-1-thread-1] cn.didadu.chatclient.client.NettyClient : 客户端开始连接 [疯狂创客圈IM]
    2021-03-23 15:04:27.729 INFO 67321 --- [ntLoopGroup-3-1] c.d.chatclient.client.CommandController : 疯狂创客圈 IM 服务器 连接成功!
    请输入某个操作指令:
    [menu] 0->show 所有命令 | 1->登录 | 2->聊天 |
    1
    请输入用户信息(id:password)
    zhangjing:123
    2021-03-23 15:04:46.728 INFO 67321 --- [ 命令线程] cn.didadu.chatclient.sender.LoginSender : 构造登录消息
    2021-03-23 15:04:46.764 INFO 67321 --- [ 命令线程] cn.didadu.chatclient.sender.LoginSender : 发送登录消息
    请输入某个操作指令:
    [menu] 0->show 所有命令 | 1->登录 | 2->聊天 |
    2021-03-23 15:04:46.821 INFO 67321 --- [ntLoopGroup-3-1] cn.didadu.chatclient.sender.BaseSender : 发送成功
    2021-03-23 15:04:46.977 INFO 67321 --- [ntLoopGroup-3-1] c.d.chatclient.client.ClientSession : 登录成功
  3. 发送聊天消息:

    1
    2
    3
    4
    5
    6
    请输入某个操作指令:
    [menu] 0->show 所有命令 | 1->登录 | 2->聊天 |
    2
    请输入聊天的消息(id:message):zhangjing:hello
    2021-03-23 15:05:00.020 INFO 67320 --- [ 命令线程] cn.didadu.chatclient.sender.ChatSender : 发送消息 startConnectServer
    2021-03-23 15:05:00.039 INFO 67320 --- [ntLoopGroup-3-1] cn.didadu.chatclient.sender.ChatSender : 发送成功:hello
    1
    2021-03-23 15:05:00.052 INFO 67321 --- [ntLoopGroup-3-1] c.d.chatclient.handler.ChatMsgHandler : 收到消息 from uid:bboyjing -> hello

至此,简单的登录、单聊流程就完成了。