bboyjing's blog

CrazyIM学习笔记三【服务端登录流程】

  登录的流程,从端到端的角度来说,包括如下环节:

  1. 客户端发送登录数据包。
  2. 服务端进行用户信息验证。
  3. 服务器端创建Session会话。
  4. 服务器端返回登录结果的信息给客户端,包括成功标志、Session ID等。

本章节将实现服务端登录的相关逻辑,其中涉及的主要模块如下:

  • Handler模块:客户端请求的处理
  • Processor模块:以异步的方式完成请求的业务逻辑处理
  • Session模块:管理用户与通道的绑定关系

首先创建chatserver模块,服务端代码大多在该模块中实现。

ProtoBufBuilder模块

  该模块用于存放消息的Bean。

User

  User类是服务端和客户端共用的类,放到chatcommon模块中,先把成员变量写上,后续使用到具体方法再添加:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
@Builder
@Data
@Slf4j
public class User implements Serializable {
String uid;
String devId;
String token;
String nickName;
transient PlatForm platform;
int intPlatFrom;
private String sessionId;
public static User fromMsg(ProtoMsg.LoginRequest info) {
User user = User.builder()
.uid(info.getUid())
.devId(info.getDeviceId())
.token(info.getToken())
.intPlatFrom(info.getPlatform()).build();
log.info("登录中: {}", user.toString());
return user;
}
@AllArgsConstructor
public enum PlatForm {
WINDOWS(1, "windows"),
MAC(2, "mac"),
ANDROID(3, "android"),
IOS(4, "ios"),
WEB(5, "web"),
UNKNOWN(6, "unknown");
@Getter
private int code;
@Getter
private String msg;
public static String getMsg(Byte code) {
for (PlatForm item : values()) {
if (item.getCode() == code) {
return item.getMsg();
}
}
return "";
}
}
}

LoginResponseBuilder

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Service
public class LoginResponseBuilder {
public ProtoMsg.Message loginResponse(ProtoInstant.ResultCodeEnum en, long seqId, String sessionId) {
ProtoMsg.Message.Builder mb = ProtoMsg.Message.newBuilder()
.setType(ProtoMsg.HeadType.LOGIN_RESPONSE) // 设置消息类型
.setSequence(seqId)
.setSessionId(sessionId); // 设置应答流水,与请求对应
ProtoMsg.LoginResponse.Builder b = ProtoMsg.LoginResponse.newBuilder()
.setCode(en.getCode())
.setInfo(en.getDesc())
.setExpose(1);
mb.setLoginResponse(b.build());
return mb.build();
}
}

ChatMsgBuilder

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Service
public class ChatMsgBuilder {
public ProtoMsg.Message chatResponse(long seqId, ProtoInstant.ResultCodeEnum en) {
ProtoMsg.Message.Builder mb = ProtoMsg.Message.newBuilder()
.setType(ProtoMsg.HeadType.MESSAGE_RESPONSE) //设置消息类型
.setSequence(seqId); //设置应答流水,与请求对应
ProtoMsg.MessageResponse.Builder rb =
ProtoMsg.MessageResponse.newBuilder()
.setCode(en.getCode())
.setInfo(en.getDesc())
.setExpose(1);
mb.setMessageResponse(rb.build());
return mb.build();
}
}

Session模块

  Session模块是基础,先构建出来,其中有两个重要的类。

SessionMap

  用来保存所有的ServerSession:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
@Slf4j
public class SessionMap {
private SessionMap() {
}
private static SessionMap singleInstance = new SessionMap();
public static SessionMap inst() {
return singleInstance;
}
// 会话集合
private ConcurrentHashMap<String, ServerSession> map = new ConcurrentHashMap<>();
/**
* 增加session对象
*/
public void addSession(String sessionId, ServerSession s) {
map.put(sessionId, s);
log.info("用户登录:id= " + s.getUser().getUid() + " 在线总数: " + map.size());
}
/**
* 获取session对象
*/
public ServerSession getSession(String sessionId) {
return map.getOrDefault(sessionId, null);
}
/**
* 根据用户id,获取session对象
*/
public List<ServerSession> getSessionsBy(String userId) {
List<ServerSession> list = map.values()
.stream()
.filter(s -> s.getUser().getUid().equals(userId))
.collect(Collectors.toList());
return list;
}
/**
* 删除session
*/
public void removeSession(String sessionId) {
if (!map.containsKey(sessionId)) {
return;
}
ServerSession s = map.get(sessionId);
map.remove(sessionId);
log.info("用户下线:id= " + s.getUser().getUid() + " 在线总数: " + map.size());
}
/**
* 判断用户是否登录
*
* @param user
* @return
*/
public boolean hasLogin(User user) {
Iterator<Map.Entry<String, ServerSession>> it = map.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<String, ServerSession> next = it.next();
User u = next.getValue().getUser();
if (u.getUid().equals(user.getUid()) && u.getPlatform().equals(user.getPlatform())) {
return true;
}
}
return false;
}
}

ServerSession

  为每个登录的用户维护一个ServerSession:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
@Slf4j
public class ServerSession {
public static final AttributeKey<String> KEY_USER_ID = AttributeKey.valueOf("key_user_id");
public static final AttributeKey<ServerSession> SESSION_KEY = AttributeKey.valueOf("SESSION_KEY");
/**
* 用户实现服务端会话管理的核心
* 通道
*/
private Channel channel;
/**
* 用户
*/
@Getter
private User user;
/**
* session唯一标示
*/
@Getter
private final String sessionId;
/**
* 登录状态
*/
@Getter
private boolean login = false;
/**
* session中存储的session 变量属性值
*/
private Map<String, Object> map = new HashMap<String, Object>();
public ServerSession(Channel channel) {
this.channel = channel;
this.sessionId = this.buildNewSessionId();
}
/**
* 从通道中获取session
*
* @param ctx
* @return
*/
public static ServerSession getSession(ChannelHandlerContext ctx) {
Channel channel = ctx.channel();
return channel.attr(ServerSession.SESSION_KEY).get();
}
private String buildNewSessionId() {
String uuid = UUID.randomUUID().toString();
return uuid.replaceAll("-", "");
}
// 写ProtoBuf数据帧
public synchronized void writeAndFlush(Object pkg) {
channel.writeAndFlush(pkg);
}
public void setUser(User user) {
this.user = user;
user.setSessionId(sessionId);
}
/**
* 和channel 通道实现双向绑定
*
* @return
*/
public ServerSession bind() {
log.info(" ServerSession 绑定会话 " + channel.remoteAddress());
channel.attr(ServerSession.SESSION_KEY).set(this);
SessionMap.inst().addSession(getSessionId(), this);
login = true;
return this;
}
// 关闭连接
public static void closeSession(ChannelHandlerContext ctx) {
ServerSession session = ctx.channel().attr(ServerSession.SESSION_KEY).get();
if (null != session && session.isValid()) {
session.close();
SessionMap.inst().removeSession(session.getSessionId());
}
}
//关闭连接
public synchronized void close() {
ChannelFuture future = channel.close();
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
log.error("CHANNEL_CLOSED error ");
}
}
});
}
public boolean isValid() {
return getUser() != null ? true : false;
}
}

Processor模块

  processor模块有一个基础接口和一个抽象类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public interface ServerProcessor {
/**
* 获取消息类型
*
* @return
*/
ProtoMsg.HeadType type();
/**
* 定义Processor的行为
*
* @param session
* @param proto
* @return
*/
boolean action(ServerSession session, ProtoMsg.Message proto);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public abstract class AbstractServerProcessor implements ServerProcessor {
/**
* 获取通道的KEY_USER_ID属性值
*
* @param ch
* @return
*/
protected String getKey(Channel ch) {
return ch.attr(ServerSession.KEY_USER_ID).get();
}
/**
* 给通道的KEY_USER_ID属性设值
*
* @param ch
* @param key
*/
protected void setKey(Channel ch, String key) {
ch.attr(ServerSession.KEY_USER_ID).set(key);
}
/**
* 判断是否
*
* @param ch
* @throws Exception
*/
protected void checkAuth(Channel ch) throws Exception {
if (null == getKey(ch)) {
throw new Exception("此用户,没有登录成功");
}
}
}

LoginProcessor

  登录处理逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
@Slf4j
@Service
public class LoginProcessor extends AbstractServerProcessor {
@Autowired
private LoginResponseBuilder loginResponseBuilder;
@Override
public ProtoMsg.HeadType type() {
return ProtoMsg.HeadType.LOGIN_REQUEST;
}
@Override
public boolean action(ServerSession session, ProtoMsg.Message proto) {
// 取出token验证
ProtoMsg.LoginRequest info = proto.getLoginRequest();
long seqNo = proto.getSequence();
User user = User.fromMsg(info);
if (!checkUser(user)) {
// 已经处于登录状态
ProtoInstant.ResultCodeEnum resultcode = ProtoInstant.ResultCodeEnum.NO_TOKEN;
// 构造登录失败的报文
ProtoMsg.Message response = loginResponseBuilder.loginResponse(resultcode, seqNo, "-1");
// 发送登录失败的报文
session.writeAndFlush(response);
return false;
}
session.setUser(user);
session.bind();
// 登录成功
ProtoInstant.ResultCodeEnum resultcode = ProtoInstant.ResultCodeEnum.SUCCESS;
//构造登录成功的报文
ProtoMsg.Message response = loginResponseBuilder.loginResponse(resultcode, seqNo, session.getSessionId());
//发送登录成功的报文
session.writeAndFlush(response);
return true;
}
private boolean checkUser(User user) {
if (SessionMap.inst().hasLogin(user)) {
return false;
}
/**
* 校验用户,比较耗时的操作,需要100 ms以上的时间
* 方法1:调用远程用户restfull 校验服务
* 方法2:调用数据库接口校验
*/
return true;
}
}

Concurrent模块

  因为Handler中的业务Processor都是异步处理的,先在comon模块中添加一些多线程相关的工具类。首先在common模块中添加sl4j依赖:

1
2
3
4
5
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.30</version>
</dependency>

CallbackTask

  支持回调的任务接口:

1
2
3
4
5
6
7
public interface CallbackTask<R> {
R execute() throws Exception;
void onSuccess(R r);
void onFailure(Throwable t);
}

CallbackTaskScheduler

  支持回调的异步任务,首先引入guava依赖:

1
2
3
4
5
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>17.0</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
@Slf4j
public class CallbackTaskScheduler extends Thread {
/**
* 任务队列
*/
private ConcurrentLinkedQueue<CallbackTask> executeTaskQueue = new ConcurrentLinkedQueue<CallbackTask>();
/**
* 线程休眠时间
*/
private long sleepTime = 200;
/**
* 固定10个的线程池
* 用来从队列中获取需要执行的CallbackTask
*/
private ExecutorService jPool = Executors.newFixedThreadPool(10);
/**
* 真正执行任务的线程池
*/
ListeningExecutorService gPool = MoreExecutors.listeningDecorator(jPool);
private static CallbackTaskScheduler inst = new CallbackTaskScheduler();
/**
* 私有构造函数,直接启动线程
*/
private CallbackTaskScheduler() {
this.start();
}
@Override
public void run() {
while (true) {
// 处理任务
handleTask();
// 当队列中任务处理完,休眠指定时间
threadSleep(sleepTime);
}
}
/**
* 添加任务
*
* @param executeTask
*/
public static <R> void add(CallbackTask<R> executeTask) {
inst.executeTaskQueue.add(executeTask);
}
/**
* 线程休眠
*
* @param time
*/
private void threadSleep(long time) {
try {
sleep(time);
} catch (InterruptedException e) {
log.error(e.getLocalizedMessage());
}
}
/**
* 处理任务队列,检查其中是否有任务
* 如果有任务交给ListeningExecutorService执行
*/
private void handleTask() {
try {
CallbackTask executeTask = null;
while (executeTaskQueue.peek() != null) {
executeTask = executeTaskQueue.poll();
handleTask(executeTask);
}
} catch (Exception e) {
log.error(e.getLocalizedMessage());
}
}
/**
* 执行任务操作
*
* @param executeTask
*/
private <R> void handleTask(CallbackTask<R> executeTask) {
// 提交任务,返回Future
ListenableFuture<R> future = gPool.submit(() -> executeTask.execute());
// Future添加回调,当有结果是调用FutureCallback实现的函数
Futures.addCallback(future, new FutureCallback<R>() {
@Override
public void onSuccess(R r) {
executeTask.onSuccess(r);
}
@Override
public void onFailure(Throwable t) {
executeTask.onFailure(t);
}
});
}
}

Handler模块

 该模块处理Netty入站消息。

LoginRequestHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
@Slf4j
@Service
@ChannelHandler.Sharable
public class LoginRequestHandler extends ChannelInboundHandlerAdapter {
@Autowired
private LoginProcessor loginProcessor;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (null == msg || !(msg instanceof ProtoMsg.Message)) {
super.channelRead(ctx, msg);
return;
}
ProtoMsg.Message pkg = (ProtoMsg.Message) msg;
// 取得请求类型
ProtoMsg.HeadType headType = pkg.getType();
if (!headType.equals(loginProcessor.type())) {
super.channelRead(ctx, msg);
return;
}
// 生成ServerSession
ServerSession session = new ServerSession(ctx.channel());
//异步任务,处理登录的逻辑
CallbackTaskScheduler.add(new CallbackTask<Boolean>() {
@Override
public Boolean execute() throws Exception {
// 调用登录逻辑
return loginProcessor.action(session, pkg);
}
//异步任务返回
@Override
public void onSuccess(Boolean r) {
if (r) {
// 登录成功,移除LoginRequestHandler
ctx.pipeline().remove(LoginRequestHandler.this);
log.info("登录成功:" + session.getUser());
} else {
// 登录失败,关闭连接、移除ServerSession
ServerSession.closeSession(ctx);
log.info("登录失败:" + session.getUser());
}
}
//异步任务异常
@Override
public void onFailure(Throwable t) {
// 登录异常,关闭连接、移除ServerSession
ServerSession.closeSession(ctx);
log.info("登录失败:" + session.getUser());
}
});
}
}

ServerExceptionHandler

  处理异常的Handler:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Slf4j
@ChannelHandler.Sharable
@Service
public class ServerExceptionHandler extends ChannelInboundHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 如果是编、解码异常则关闭连接
if (cause instanceof InvalidFrameException) {
log.error(cause.getMessage());
ServerSession.closeSession(ctx);
} else {
// 捕捉异常信息
log.error(cause.getMessage());
ctx.close();
}
}
/**
* 通道 Read 读取 Complete 完成
* 做刷新操作 ctx.flush()
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ServerSession.closeSession(ctx);
}
}

HeartBeatServerHandler

  用来处理心跳消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Slf4j
public class HeartBeatServerHandler extends IdleStateHandler {
private static final int READ_IDLE_GAP = 150;
public HeartBeatServerHandler() {
super(READ_IDLE_GAP, 0, 0, TimeUnit.SECONDS);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 判断消息实例
if (null == msg || !(msg instanceof ProtoMsg.Message)) {
super.channelRead(ctx, msg);
return;
}
ProtoMsg.Message pkg = (ProtoMsg.Message) msg;
// 判断消息类型
ProtoMsg.HeadType headType = pkg.getType();
if (headType.equals(ProtoMsg.HeadType.KEEPALIVE_REQUEST)) {
// 异步处理,将心跳包改成KEEPALIVE_RESPONSE的消息类型 ,回复给客户端
FutureTaskScheduler.add(() -> {
if (ctx.channel().isActive()) {
log.info("收到 HEART_BEAT 消息 from client");
ProtoMsg.Message keepaliveResponse = HeartBeatMsgBuilder.buildKeepAliveResponse(pkg.getSequence(), pkg.getHeartBeat());
ctx.writeAndFlush(keepaliveResponse);
}
});
}
// 如果不调用,IdleStateHandler的入站空闲检测将会调用不到
super.channelRead(ctx, msg);
}
@Override
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
System.out.println(READ_IDLE_GAP + "秒内未读到数据,关闭连接");
ServerSession.closeSession(ctx);
}
}

组装流程

  基础模块完成的差不多了,现在将它们通过Netty、以及SpringBoot给组装起来。

ChatServer

  Netty服务端实现,首先添加配置文件application.yml:

1
2
server:
port: 8080
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
@Slf4j
@Service("ChatServer")
public class ChatServer {
// 服务器端口
@Value("${server.port}")
private int port;
@Autowired
private LoginRequestHandler loginRequestHandler;
@Autowired
private ServerExceptionHandler serverExceptionHandler;
public void run() {
// 启动引导器
ServerBootstrap bootstrap = new ServerBootstrap();
// 采用NioEventLoopGroup
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
try {
// 1 设置reactor 线程组
bootstrap.group(bossGroup, workGroup);
// 2 设置nio类型的channel
bootstrap.channel(NioServerSocketChannel.class);
// 3 设置监听端口
bootstrap.localAddress(new InetSocketAddress(port));
// 4 设置通道选项
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
// 5 装配流水线
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
// 有连接到达时会创建一个channel
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 管理pipeline中的Handler
ch.pipeline().addLast(new ProtoBufDecoder());
ch.pipeline().addLast(new ProtoBufEncoder());
ch.pipeline().addLast(new HeartBeatServerHandler());
// 在流水线中添加handler来处理登录,登录后删除
ch.pipeline().addLast(loginRequestHandler);
ch.pipeline().addLast(serverExceptionHandler);
}
});
/*
* 6 开始绑定server
* 通过调用sync同步方法阻塞直到绑定成功
*/
ChannelFuture channelFuture = bootstrap.bind().sync();
log.info("疯狂创客圈 CrazyIM 服务启动, 端口 " + channelFuture.channel().localAddress());
/*
* 7 监听通道关闭事件
* 应用程序会一直等待,直到channel关闭
*/
ChannelFuture closeFuture = channelFuture.channel().closeFuture();
closeFuture.sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
/*
* 8 优雅关闭EventLoopGroup
* 释放掉所有资源包括创建的线程
*/
workGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}

ServerApplication

SpringBoot启动类:

1
2
3
4
5
6
7
8
9
@SpringBootApplication
public class ServerApplication {
public static void main(String[] args) {
// 启动并初始化 Spring 环境及其各 Spring 组件
ApplicationContext context = SpringApplication.run(ServerApplication.class, args);
ChatServer nettyServer = context.getBean(ChatServer.class);
nettyServer.run();
}
}

运行

  至此服务端的登录功能完成了,启动ServerApplication,成功日志如下:

1
2
3
4
2021-03-20 16:08:27.981 INFO 48628 --- [ main] cn.didadu.chatserver.ServerApplication : Starting ServerApplication using Java 1.8.0_202 on zhangjingdeMacBook-Pro.local with PID 48628 (/Users/zhangjing/IdeaProjects/crazyIM/chatserver/target/classes started by zhangjing in /Users/zhangjing/IdeaProjects/crazyIM)
2021-03-20 16:08:27.987 INFO 48628 --- [ main] cn.didadu.chatserver.ServerApplication : No active profile set, falling back to default profiles: default
2021-03-20 16:08:29.186 INFO 48628 --- [ main] cn.didadu.chatserver.ServerApplication : Started ServerApplication in 2.142 seconds (JVM running for 3.03)
2021-03-20 16:08:29.550 INFO 48628 --- [ main] cn.didadu.chatserver.server.ChatServer : 疯狂创客圈 CrazyIM 服务启动, 端口 /0:0:0:0:0:0:0:0:8080

下面将实现客户端的登录功能,届时再进行调试。