系统的通讯协议采用稍复杂的Head-Content协议,head部分为【2字节魔数】+【2字节版本】+【4字节content长度】。其中, 魔数可用来识别数据包是否符合规范;版本号可用来处理协议版本升级的相关问题。下面通过ProtoBuf消息格式的设计、编码器以及解码器来实现协议的相关内容。
ProtoBuf消息格式的设计
一般来说网络通信涉及的消息大体可以分为3大新消息类型:
- 请求消息
- 应答消息
- 命令消息
每个消息基本上会包含一个序列号和一个类型定义。序列号用来唯一区分一个消息,类型用来决定消息的处理方式。在chatcommon/proto目录下新建ProtoMsg.proto文件,内容如下:
|
|
编译之后将会生成对应的Java类文件。
自定义ProtoBuf编解码器
面对复杂的Head-Content协议的解析,Netty内置的ProtoBuf系列编解码器就无能为力了,这时候我们需要自定义编解码器,需要自己去解决半包问题。包括如下两个方面:
- 继承Netty提供的MessageToByteEncoder编码器,完成Head-Content协议的复杂数据包的编码,将ProtobufPOJO编码成Head-Content协议的二进制ByteBuf数据包。
- 继承Netty提供的ByteToMessageDecoder解码器,完成完成Head-Content协议的复杂数据包的解码,将二进制ByteBuf数据包最终解码出ProtoBufPOJO实例。
自定义ProtoBuf编码器
编码器将以下内容写入到目标ByteBuf:
- 写入魔数、版本号
- 写入ProtoBuf地POJO的字节码长度
- 写入ProtoBuf的POJO的字节码内容
ProtoBuf编码器的实现比较简单,实现步骤如下:
添加lombok、netty依赖:
123456789101112<dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.18</version><scope>provided</scope></dependency><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.59.Final</version></dependency>在chatcommon模块中添加一个常量类,用于存储魔数、版本号等信息:
1234567891011public class ProtoInstant {/*** 魔数,可以通过配置获取*/public static final short MAGIC_CODE = 0x86;/*** 版本号*/public static final short VERSION_CODE = 0x01;}在chatcommon模块中添加ProtoBufEncoder类:
123456789101112131415161718public class ProtoBufEncoder extends MessageToByteEncoder<ProtoMsg.Message> {protected void encode(ChannelHandlerContext channelHandlerContext, ProtoMsg.Message message, ByteBuf byteBuf) throws Exception {// 将对象转换为字节码byte[] bytes = message.toByteArray();// 读取消息的长度int length = bytes.length;// 1. 写入2个字节魔数byteBuf.writeShort(ProtoInstant.MAGIC_CODE);// 2. 写入2个字节版本号byteBuf.writeShort(ProtoInstant.VERSION_CODE);// 3. 写入消息长度byteBuf.writeInt(length);// 4. 写入消息体byteBuf.writeBytes(bytes);}}
自定义ProtoBuf解码器
解码器的大致过程如下:
- 首先读取魔数、版本号,如果长度位数不够,则终止读取
- 然后读取长度
- 最后按照净长度读取内容。如果内容的字节数不够,则恢复到之前的起始位置(也就是魔数的位置),然后终止读取
实现步骤如下:
在chatcommon模块中添加创建自定义异常类InvalidFrameException:
12345public class InvalidFrameException extends Exception{public InvalidFrameException(String s) {super(s);}}在chatcommon模块中添加创建ProtoBufDecoder类:
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758public class ProtoBufDecoder extends ByteToMessageDecoder {protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {// 标记一下当前的readIndex的位置byteBuf.markReaderIndex();// 判断包头长度if (byteBuf.readableBytes() < 8) {// 如果不满足8个字节停止解码,不再往之后的handler传递return;}// 读取魔数short magic = byteBuf.readShort();if (magic != ProtoInstant.MAGIC_CODE) {String error = "客户端口令不对:" + channelHandlerContext.channel().remoteAddress();throw new InvalidFrameException(error);}// 读取版本号short version = byteBuf.readShort();// 读取传送过来的消息的长度int length = byteBuf.readInt();if (length < 0) {// 如果长度小于0,非法数据,关闭连接channelHandlerContext.close();}if (length > byteBuf.readableBytes()) {/** 读到的消息体长度如果小于传送过来的消息长度* 重置读取位置(魔数所在的位置,byteBuf.markReaderIndex())* 停止解码,不再往之后的handler传递*/byteBuf.resetReaderIndex();return;}byte[] array;// 通过hasArray()方法来判断是否是堆缓冲if (byteBuf.hasArray()) {// 堆缓冲ByteBuf slice = byteBuf.slice();array = slice.array();} else {// 直接缓冲array = new byte[length];byteBuf.readBytes(array, 0, length);}// 字节转换成对象ProtoMsg.Message message = ProtoMsg.Message.parseFrom(array);if (message != null) {// 将消息传递到下一个handlerlist.add(message);}}}
消息协议的编解码的实现就到这里,下一章节将实现登录流程。