bboyjing's blog

CrazyIM学习笔记四【客户端登录流程】

  这一章节将实现客户端登录的相关逻辑,客户端所涉及的模块大致如下:

  • ClientCommand模块:控制台命令模块,程序是通过控制台来触发的,根据用户输入的命令来调用响应的Command实现类。
  • ProtoBufBuilder模块:主要用来组装ProtoBuf消息。
  • Sender模块:数据包发送模块。
  • Handler:处理器模块,主要是服务端响应处理器。

首先创建chatclient模块,客户端的代码大多在该模块中实现。

ClientCommand模块

  这里我们将要实现4个基础的命令,这几个命令的作用主要是用来收集用户的输入:

  1. 菜单命令类(ClientCommandMenu):主要用于列出支持的命令菜单,以及响应用户输入的命令类型
  2. 登录命令(LoginConsoleCommand)

这些Command类都会实现一个BaseCommand:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public interface BaseCommand {
/**
* 命令的执行
* @param scanner
*/
void exec(Scanner scanner);
/**
* 用于识别每个命令的key
* @return
*/
String getKey();
/**
* 命令的提示信息
* @return
*/
String getTip();
}

ClientCommandMenu

  该命令的作用主要用于列出支持的命令菜单,以及响应用户输入的命令类型。首先添加spring-boot-starter、chatocommon模块以及lombok的依赖:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<dependency>
<artifactId>chatcommon</artifactId>
<groupId>cn.didadu</groupId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>

实现ClientCommandMenu类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Service
@Data
public class ClientCommandMenu implements BaseCommand{
public static final String KEY = "0";
private String allCommandsShow;
private String commandInput;
@Override
public void exec(Scanner scanner) {
// 使用err,立即输出
System.err.println("请输入某个操作指令:");
System.err.println(allCommandsShow);
// 获取第一个指令(next读取到空白符就结束了)
commandInput = scanner.next();
}
@Override
public String getKey() {
return KEY;
}
@Override
public String getTip() {
return "show 所有命令";
}
}

这里没有给allCommandsShow赋值,后面再看。

LoginConsoleCommand

  该命令负责从Scanner控制台实例收集客户端登录的用户ID和密码,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Service
@Data
public class LoginConsoleCommand implements BaseCommand{
public static final String KEY = "1";
private String userName;
private String password;
@Override
public void exec(Scanner scanner) {
System.out.println("请输入用户信息(id:password) ");
String[] info = null;
while (true) {
String input = scanner.next();
info = input.split(":");
if (info.length != 2) {
System.out.println("请按照格式输入(id:password):");
} else {
break;
}
}
userName = info[0];
password = info[1];
}
@Override
public String getKey() {
return KEY;
}
@Override
public String getTip() {
return "登录";
}
}

ProtoBufBuilder模块

  该模块存放的就是各种消息Bean,没有什么实际的逻辑。

BaseBuilder

  Builder消息都存储在chatClient模块的protoBuilder包下,它们继承自BaseBuilder:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class BaseBuilder {
protected ProtoMsg.HeadType type;
private long seqId;
private ClientSession session;
public BaseBuilder(ProtoMsg.HeadType type, ClientSession session) {
this.type = type;
this.session = session;
}
/**
* 构建消息 基础部分
*/
public ProtoMsg.Message buildCommon(long seqId) {
this.seqId = seqId;
ProtoMsg.Message.Builder mb =
ProtoMsg.Message
.newBuilder()
.setType(type)
.setSessionId(session.getSessionId())
.setSequence(seqId);
return mb.buildPartial();
}
}

其中用到了ClientSession,该类在chatClient模块的client包下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
@Data
@Slf4j
public class ClientSession {
public static final AttributeKey<ClientSession> SESSION_KEY = AttributeKey.valueOf("SESSION_KEY");
/**
* 用户实现客户端会话管理的核心
*/
private Channel channel;
/**
* 用户信息
*/
private User user;
/**
* 保存登录后的服务端sessionId
*/
private String sessionId;
/**
* 是否已建立连接
*/
private boolean connected = false;
/**
* 是否已登录
*/
private boolean login = false;
/**
* session中存储的session 变量属性值
*/
private Map<String, Object> map = new HashMap<String, Object>();
/**
* 绑定通道
*
* @param channel
*/
public ClientSession(Channel channel) {
this.channel = channel;
this.sessionId = String.valueOf(-1);
channel.attr(ClientSession.SESSION_KEY).set(this);
}
}

LoginMsgBuilder

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class LoginMsgBuilder extends BaseBuilder {
private final User user;
public LoginMsgBuilder(User user, ClientSession session) {
super(ProtoMsg.HeadType.LOGIN_REQUEST, session);
this.user = user;
}
public ProtoMsg.Message build() {
ProtoMsg.Message message = buildCommon(-1);
ProtoMsg.LoginRequest.Builder lb =
ProtoMsg.LoginRequest.newBuilder()
.setDeviceId(user.getDevId())
.setPlatform(user.getPlatform().getCode())
.setToken(user.getToken())
.setUid(user.getUid());
return message.toBuilder().setLoginRequest(lb).build();
}
public static ProtoMsg.Message buildLoginMsg(
User user, ClientSession session) {
LoginMsgBuilder builder = new LoginMsgBuilder(user, session);
return builder.build();
}
}

HeartBeatMsgBuilder

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class HeartBeatMsgBuilder extends BaseBuilder {
private final User user;
public HeartBeatMsgBuilder(User user, ClientSession session) {
super(ProtoMsg.HeadType.KEEPALIVE_REQUEST, session);
this.user = user;
}
public ProtoMsg.Message buildMsg() {
ProtoMsg.Message message = buildCommon(-1);
ProtoMsg.MessageHeartBeat.Builder lb = ProtoMsg.MessageHeartBeat.newBuilder()
.setSeq(0)
.setJson("{\"from\":\"client\"}")
.setUid(user.getUid());
return message.toBuilder().setHeartBeat(lb).build();
}
}

Sender模块

  该模块的作用是用来发送数据包的。

BaseSender

  所有Sender的基类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
@Slf4j
@Data
public abstract class BaseSender {
private User user;
private ClientSession session;
/**
* 是否连接中
*
* @return
*/
public boolean isConnected() {
if (null == session) {
log.info("session is null");
return false;
}
return session.isConnected();
}
/**
* 是否登录中
*
* @return
*/
public boolean isLogin() {
if (null == session) {
log.info("session is null");
return false;
}
return session.isLogin();
}
public void sendMsg(ProtoMsg.Message message) {
if (null == getSession() || !isConnected()) {
log.info("连接还没成功");
return;
}
Channel channel = session.getChannel();
ChannelFuture f = channel.writeAndFlush(message);
// ChannelFuture有结果了会在addListener中执行operationComplete方法
f.addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
// 回调
if (f.isSuccess()) {
sendSucced(message);
} else {
sendfailed(message);
}
}
});
try {
f.sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
protected void sendSucced(ProtoMsg.Message message) {
log.info("发送成功");
}
protected void sendfailed(ProtoMsg.Message message) {
log.info("发送失败");
}
}

LoginSender

  发送登录消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Slf4j
@Service
public class LoginSender extends BaseSender {
public void sendLoginMsg() {
if (!isConnected()) {
log.info("还没有建立连接!");
return;
}
log.info("构造登录消息");
ProtoMsg.Message message = LoginMsgBuilder.buildLoginMsg(getUser(), getSession());
log.info("发送登录消息");
super.sendMsg(message);
}
}

Handler模块

  处理器模块,主要是服务端响应处理器,都是入站处理器。

HeartBeatClientHandler

  用来处理心跳消息,首先在client模块的ClientSession添加获取Sessiont的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Data
@Slf4j
public class ClientSession {
......
/**
* 从Channel中获取ClientSession
*
* @param ctx
* @return
*/
public static ClientSession getSession(ChannelHandlerContext ctx) {
Channel channel = ctx.channel();
return channel.attr(ClientSession.SESSION_KEY).get();
}
}

HeartBeatClientHandler实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
@Slf4j
@ChannelHandler.Sharable
@Service
public class HeartBeatClientHandler extends ChannelInboundHandlerAdapter {
// 心跳的时间间隔,单位为s
private static final int HEARTBEAT_INTERVAL = 100;
/**
* 在Handler被加入到Pipeline时,开始发送心跳
* 然后每100秒递归发送信条消息
*
* @param ctx
* @throws Exception
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
ClientSession session = ClientSession.getSession(ctx);
User user = session.getUser();
HeartBeatMsgBuilder builder = new HeartBeatMsgBuilder(user, session);
ProtoMsg.Message message = builder.buildMsg();
// 发送心跳
heartBeat(ctx, message);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 判断消息实例
if (null == msg || !(msg instanceof ProtoMsg.Message)) {
super.channelRead(ctx, msg);
return;
}
// 判断类型
ProtoMsg.Message pkg = (ProtoMsg.Message) msg;
ProtoMsg.HeadType headType = pkg.getType();
if (headType.equals(ProtoMsg.HeadType.KEEPALIVE_RESPONSE)) {
log.info(" 收到回写的 HEART_BEAT 消息 from server");
return;
} else {
super.channelRead(ctx, msg);
}
}
/**
* 使用定时器,发送心跳报文
*
* @param ctx
* @param heartbeatMsg
*/
public void heartBeat(ChannelHandlerContext ctx, ProtoMsg.Message heartbeatMsg) {
ctx.executor().schedule(() -> {
if (ctx.channel().isActive()) {
log.info(" 发送 HEART_BEAT 消息 to server");
ctx.writeAndFlush(heartbeatMsg);
// 递归调用,发送下一次的心跳
heartBeat(ctx, heartbeatMsg);
}
}, HEARTBEAT_INTERVAL, TimeUnit.SECONDS);
}
}

LoginResponceHandler

  用来处理登录消息的返回逻辑,首先在common模块中添加常量信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class ProtoInstant {
......
@AllArgsConstructor
public enum ResultCodeEnum {
SUCCESS(0, "登录成功"),
AUTH_FAILED(1, "登录失败"),
NO_TOKEN(2, "没有授权码"),
UNKNOWN_ERROR(3, "未知错误");
@Getter
private Integer code;
@Getter
private String desc;
}
}

client模块的ClientSession添加登录方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Data
@Slf4j
public class ClientSession {
......
/**
* 登录成功之后,设置sessionId
*
* @param ctx
* @param pkg
*/
public static void loginSuccess(ChannelHandlerContext ctx, ProtoMsg.Message pkg) {
Channel channel = ctx.channel();
ClientSession session = channel.attr(ClientSession.SESSION_KEY).get();
session.setSessionId(pkg.getSessionId());
session.setLogin(true);
log.info("登录成功");
}
}

LoginResponseHandler实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Slf4j
@ChannelHandler.Sharable
@Service()
public class LoginResponseHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 判断消息实例
if (null == msg || !(msg instanceof ProtoMsg.Message)) {
// 传递消息到下一个Handler
super.channelRead(ctx, msg);
return;
}
// 判断类型
ProtoMsg.Message pkg = (ProtoMsg.Message) msg;
ProtoMsg.HeadType headType = ((ProtoMsg.Message) msg).getType();
if (!headType.equals(ProtoMsg.HeadType.LOGIN_RESPONSE)) {
// 如果不是LOGIN_RESPONSE,传递到下一个Handler
super.channelRead(ctx, msg);
return;
}
//判断返回是否成功
ProtoMsg.LoginResponse info = pkg.getLoginResponse();
ProtoInstant.ResultCodeEnum result = ProtoInstant.ResultCodeEnum.values()[info.getCode()];
if (!result.equals(ProtoInstant.ResultCodeEnum.SUCCESS)) {
//登录失败
log.info(result.getDesc());
} else {
// 登录成功,设置session id
ClientSession.loginSuccess(ctx, pkg);
ChannelPipeline p = ctx.pipeline();
// 移除登录响应处理器(从该连接的pipeline中移除LoginResponseHandler)
p.remove(this);
// 在编码器后面,动态插入心跳处理器
p.addAfter("encoder", "heartbeat", new HeartBeatClientHandler());
}
}
}

ExceptionHandler

  用来处理Handler中抛出的异常,首先在ClientSession中添加close方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Data
@Slf4j
public class ClientSession {
......
/**
* 关闭通道
*/
public void close() {
connected = false;
ChannelFuture future = channel.close();
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
log.error("连接顺利断开");
}
}
});
}
}

ExceptionHandler实现,其中重连操作后面再完善:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Slf4j
@ChannelHandler.Sharable
@Service
public class ExceptionHandler extends ChannelInboundHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (cause instanceof InvalidFrameException) {
// 协议异常,直接关闭
log.error(cause.getMessage());
ClientSession.getSession(ctx).close();
} else {
// 其它异常,捕捉异常信息
log.error(cause.getMessage());
ctx.close();
// TODO 重连
}
}
/**
* 通道 Read 读取 Complete 完成
* 将缓冲区中的数据刷新到对端
*
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
}

Concurrent模块

  在客户端进程中需要用到多线程,在common模块的cocurrent包中添加工具类。

ExecuteTask

  任务接口:

1
2
3
public interface ExecuteTask {
void execute();
}

FutureTaskScheduler

  线程任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
@Slf4j
public class FutureTaskScheduler extends Thread {
/**
* 任务队列
*/
private ConcurrentLinkedQueue<ExecuteTask> executeTaskQueue = new ConcurrentLinkedQueue<ExecuteTask>();
/**
* 线程休眠时间
*/
private long sleepTime = 200;
/**
* 固定10个的线程池
* 用来从队列中获取需要执行的CallbackTask
*/
private ExecutorService pool = Executors.newFixedThreadPool(10);
private static FutureTaskScheduler inst = new FutureTaskScheduler();
/**
* 私有构造函数,直接启动线程
*/
private FutureTaskScheduler() {
this.start();
}
/**
* 添加任务
*
* @param executeTask
*/
public static void add(ExecuteTask executeTask) {
inst.executeTaskQueue.add(executeTask);
}
@Override
public void run() {
while (true) {
// 处理任务
handleTask();
// 休眠
threadSleep(sleepTime);
}
}
/**
* 线程休眠
*
* @param time
*/
private void threadSleep(long time) {
try {
sleep(time);
} catch (InterruptedException e) {
log.error(e.getLocalizedMessage());
}
}
/**
* 处理任务队列,检查其中是否有任务
*/
private void handleTask() {
try {
ExecuteTask executeTask;
while (executeTaskQueue.peek() != null) {
executeTask = executeTaskQueue.poll();
pool.execute(new ExecuteRunnable(executeTask));
}
} catch (Exception e) {
log.error(e.getLocalizedMessage());
}
}
class ExecuteRunnable implements Runnable {
ExecuteTask executeTask;
ExecuteRunnable(ExecuteTask executeTask) {
this.executeTask = executeTask;
}
@Override
public void run() {
executeTask.execute();
}
}
}

组装各个模块

NettyClient

  Netty客户端实现,首先添加配置项:

1
2
3
server:
ip: localhost
port: 8080
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
@Slf4j
@Service
public class NettyClient {
/**
* 服务器ip地址
*/
@Value("${server.ip}")
private String host;
/**
* 服务器端口
*/
@Value("${server.port}")
private int port;
@Autowired
private LoginResponseHandler loginResponseHandler;
@Autowired
private ExceptionHandler exceptionHandler;
EventLoopGroup eventLoopGroup;
@Setter
private GenericFutureListener<ChannelFuture> connectedListener;
public NettyClient() {
eventLoopGroup = new NioEventLoopGroup();
}
/**
* 重连
*/
public void doConnect() {
Bootstrap bootstrap = new Bootstrap();
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
bootstrap.group(eventLoopGroup);
bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
bootstrap.remoteAddress(host, port);
// 设置通道初始化
bootstrap.handler(
new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast("decoder", new ProtoBufDecoder());
ch.pipeline().addLast("encoder", new ProtoBufEncoder());
ch.pipeline().addLast(loginResponseHandler);
ch.pipeline().addLast(exceptionHandler);
}
}
);
log.info("客户端开始连接 [疯狂创客圈IM]");
ChannelFuture f = bootstrap.connect();
f.addListener(connectedListener);
} catch (Exception e) {
log.info("客户端连接失败!" + e.getMessage());
}
}
public void close() {
eventLoopGroup.shutdownGracefully();
}
}

CommandController

  负责收集用户在控制台输入的命令,根据响应的命令类型调用响应的命令处理器合收集相关的信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
@Slf4j
@Service
public class CommandController {
/**
* 登录命令收集类
*/
@Autowired
private LoginConsoleCommand loginConsoleCommand;
/**
* 菜单命令收集类
*/
@Autowired
private ClientCommandMenu clientCommandMenu;
@Autowired
private NettyClient nettyClient;
@Autowired
private LoginSender loginSender;
/**
* 与服务端连接的通道
*/
private Channel channel;
/**
* 连接状态标记
*/
private boolean connectFlag = false;
/**
* 客户端会话
*/
private ClientSession session;
/**
* 客户端命令Map
*/
private Map<String, BaseCommand> commandMap;
/**
* 显式在中断的字符串(展示出所有支持的命令行)
*/
private String menuString;
private User user;
/**
* 初始化命令Map
*/
public void initCommandMap() {
commandMap = new HashMap<>();
commandMap.put(clientCommandMenu.getKey(), clientCommandMenu);
commandMap.put(loginConsoleCommand.getKey(), loginConsoleCommand);
Set<Map.Entry<String, BaseCommand>> entrys = commandMap.entrySet();
Iterator<Map.Entry<String, BaseCommand>> iterator = entrys.iterator();
StringBuilder menus = new StringBuilder();
menus.append("[menu] ");
while (iterator.hasNext()) {
BaseCommand next = iterator.next().getValue();
menus.append(next.getKey())
.append("->")
.append(next.getTip())
.append(" | ");
}
menuString = menus.toString();
// 在这里设置了ClientCommandMenu.allCommandsShow
clientCommandMenu.setAllCommandsShow(menuString);
}
GenericFutureListener<ChannelFuture> closeListener = (ChannelFuture f) -> {
log.info(new Date() + ": 连接已经断开……");
channel = f.channel();
// 创建会话
ClientSession session = channel.attr(ClientSession.SESSION_KEY).get();
session.close();
// 唤醒用户线程
notifyCommandThread();
};
/**
* 连接通道监听
*/
GenericFutureListener<ChannelFuture> connectedListener = (ChannelFuture f) -> {
final EventLoop eventLoop = f.channel().eventLoop();
if (!f.isSuccess()) {
log.info("连接失败!在10s之后准备尝试重连!");
eventLoop.schedule(
() -> nettyClient.doConnect(),
10,
TimeUnit.SECONDS);
connectFlag = false;
} else {
connectFlag = true;
log.info("疯狂创客圈 IM 服务器 连接成功!");
channel = f.channel();
// 创建会话
session = new ClientSession(channel);
session.setConnected(true);
// 连接上之后添加通道关闭监听
channel.closeFuture().addListener(closeListener);
// 唤醒用户线程
notifyCommandThread();
}
};
public synchronized void notifyCommandThread() {
// 唤醒,命令收集程
this.notify();
}
public synchronized void waitCommandThread() {
// 休眠,命令收集线程
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 启动连接
*/
public void startConnectServer() {
FutureTaskScheduler.add(() -> {
nettyClient.setConnectedListener(connectedListener);
nettyClient.doConnect();
});
}
/**
* 启动Command线程
*
* @throws InterruptedException
*/
public void startCommandThread() throws InterruptedException {
Thread.currentThread().setName("命令线程");
while (true) {
// 建立连接
while (!connectFlag) {
// 开始连接
startConnectServer();
// 暂停命令线程
waitCommandThread();
}
// 处理命令
while (null != session) {
// 获取命令行输入的命令key,通过key找到命令
Scanner scanner = new Scanner(System.in);
clientCommandMenu.exec(scanner);
String key = clientCommandMenu.getCommandInput();
BaseCommand command = commandMap.get(key);
if (null == command) {
System.err.println("无法识别[" + command + "]指令,请重新输入!");
continue;
}
switch (key) {
case LoginConsoleCommand.KEY:
command.exec(scanner);
startLogin((LoginConsoleCommand) command);
break;
}
}
}
}
private void startLogin(LoginConsoleCommand command) {
// 登录
if (!connectFlag) {
log.info("连接异常,请重新建立连接");
return;
}
User user = User.builder()
.uid(command.getUserName())
.token(command.getPassword())
.platform(User.PlatForm.WEB)
.devId("1111").build();
this.user = user;
session.setUser(user);
loginSender.setUser(user);
loginSender.setSession(session);
loginSender.sendLoginMsg();
}
}

ClientApplication

  SpringBoot启动类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@SpringBootApplication
public class ClientApplication {
public static void main(String[] args) {
// 启动并初始化 Spring 环境及其各 Spring 组件
ApplicationContext context = SpringApplication.run(ClientApplication.class, args);
CommandController commandClient = context.getBean(CommandController.class);
commandClient.initCommandMap();
try {
commandClient.startCommandThread();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

调试

  首先启动好服务端,然后启动客户端,在命令行操作。

客户端的输出如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
2021-03-22 11:52:51.039 INFO 52704 --- [ main] cn.didadu.chatclient.ClientApplication : Started ClientApplication in 1.942 seconds (JVM running for 3.334)
2021-03-22 11:52:51.276 INFO 52704 --- [pool-1-thread-1] cn.didadu.chatclient.client.NettyClient : 客户端开始连接 [疯狂创客圈IM]
2021-03-22 11:52:51.364 INFO 52704 --- [ntLoopGroup-3-1] c.d.chatclient.client.CommandController : 疯狂创客圈 IM 服务器 连接成功!
请输入某个操作指令:
[menu] 0->show 所有命令 | 1->登录 |
1
请输入用户信息(id:password)
bboyjing:123
2021-03-22 11:53:00.615 INFO 52704 --- [ 命令线程] cn.didadu.chatclient.sender.LoginSender : 构造登录消息
2021-03-22 11:53:00.712 INFO 52704 --- [ 命令线程] cn.didadu.chatclient.sender.LoginSender : 发送登录消息
2021-03-22 11:53:00.770 INFO 52704 --- [ntLoopGroup-3-1] cn.didadu.chatclient.sender.BaseSender : 发送成功
请输入某个操作指令:
[menu] 0->show 所有命令 | 1->登录 |
2021-03-22 11:53:00.830 INFO 52704 --- [ntLoopGroup-3-1] c.d.chatclient.client.ClientSession : 登录成功
2021-03-22 11:54:40.841 INFO 52704 --- [ntLoopGroup-3-1] c.d.c.handler.HeartBeatClientHandler : 发送 HEART_BEAT 消息 to server
2021-03-22 11:54:40.846 INFO 52704 --- [ntLoopGroup-3-1] c.d.c.handler.HeartBeatClientHandler : 收到回写的 HEART_BEAT 消息 from server
......

服务端的输出如下:

1
2
3
4
5
6
7
8
2021-03-22 11:52:38.501 INFO 52702 --- [ main] cn.didadu.chatserver.ServerApplication : Started ServerApplication in 1.268 seconds (JVM running for 2.025)
2021-03-22 11:52:38.681 INFO 52702 --- [ main] cn.didadu.chatserver.server.ChatServer : 疯狂创客圈 CrazyIM 服务启动, 端口 /0:0:0:0:0:0:0:0:8080
2021-03-22 11:53:00.804 INFO 52702 --- [pool-1-thread-1] cn.didadu.chatcommon.bean.User : 登录中: User(uid=bboyjing, devId=1111, token=123, nickName=null, platform=null, intPlatFrom=5, sessionId=null)
2021-03-22 11:53:00.806 INFO 52702 --- [pool-1-thread-1] c.d.chatserver.server.ServerSession : ServerSession 绑定会话 /127.0.0.1:55130
2021-03-22 11:53:00.806 INFO 52702 --- [pool-1-thread-1] cn.didadu.chatserver.server.SessionMap : 用户登录:id= bboyjing 在线总数: 1
2021-03-22 11:53:00.823 INFO 52702 --- [pool-1-thread-1] c.d.c.handler.LoginRequestHandler : 登录成功:User(uid=bboyjing, devId=1111, token=123, nickName=null, platform=null, intPlatFrom=5, sessionId=50eb2bbb34f54b0f96e3147bd88058ed)
2021-03-22 11:54:40.845 INFO 52702 --- [pool-2-thread-1] c.d.c.handler.HeartBeatServerHandler : 收到 HEART_BEAT 消息 from client
......

从log可以看出,到目前为止完成了简单的登录功能,下一章节来补充聊天功能。