bboyjing's blog

CrazyIM学习笔记二【ProtoBuf协议及编解码器】

  系统的通讯协议采用稍复杂的Head-Content协议,head部分为【2字节魔数】+【2字节版本】+【4字节content长度】。其中, 魔数可用来识别数据包是否符合规范;版本号可用来处理协议版本升级的相关问题。下面通过ProtoBuf消息格式的设计、编码器以及解码器来实现协议的相关内容。

ProtoBuf消息格式的设计

  一般来说网络通信涉及的消息大体可以分为3大新消息类型:

  • 请求消息
  • 应答消息
  • 命令消息

每个消息基本上会包含一个序列号和一个类型定义。序列号用来唯一区分一个消息,类型用来决定消息的处理方式。在chatcommon/proto目录下新建ProtoMsg.proto文件,内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
syntax = "proto3";
package cn.didadu.chatcommon.bean.msg;
/*消息的枚举类型*/
enum HeadType {
LOGIN_REQUEST = 0; //登录请求
LOGIN_RESPONSE = 1; //登录响应
LOGOUT_REQUEST = 2; //登出请求
LOGOUT_RESPONSE = 3; //登出相响应
KEEPALIVE_REQUEST = 4; //心跳请求
KEEPALIVE_RESPONSE = 5; //心跳响应
MESSAGE_REQUEST = 6; //聊天消息请求
MESSAGE_RESPONSE = 7; // 聊天消息响应
MESSAGE_NOTIFICATION = 8; //服务器通知
}
/*登录信息*/
// LoginRequest对应的HeadType为LOGIN_REQUEST
// 消息名称去掉下划线,更加符合Java 的类名规范
message LoginRequest {
string uid = 1; // 用户唯一id
string deviceId = 2; // 设备ID
string token = 3; // 用户token
uint32 platform = 4; //客户端平台 windows、mac、android、ios、web
string appVersion = 5; // APP版本号
}
/*登录响应*/
message LoginResponse {
bool result = 1; //true表示发送成功,false表示发送失败
uint32 code = 2; //错误码
string info = 3; //错误描述
uint32 expose = 4; //错误描述是否提示给用户:1 提示;0 不提示
}
/*聊天消息*/
message MessageRequest {
uint64 msgId = 1;
string from = 2;
string to = 3;
uint64 time = 4;
uint32 msgType = 5;
string content = 6;
string url = 8;
string property = 9;
string fromNick = 10;
string json = 11;
}
/*聊天响应*/
message MessageResponse {
bool result = 1; //true表示发送成功,false表示发送失败
uint32 code = 2; //错误码
string info = 3; //错误描述
uint32 expose = 4; //错误描述是否提示给用户:1、提示;0、不提示
bool lastBlock = 5; //是否为最后的应答
fixed32 blockIndex = 6; //应答序号
}
/*通知*/
message MessageNotification {
uint32 msgType = 1;
bytes sender = 2;
string json = 3;
string timestamp = 4;
}
/*心跳*/
message MessageHeartBeat {
uint32 seq = 1;
string uid = 2;
string json =3;
}
/*顶层消息*/
//顶层消息是一种嵌套消息,嵌套了各种类型消息
//逻辑上:根据消息类型 type的值,最多只有一个有效
message Message {
HeadType type = 1; //通用字段: 消息类型
uint64 sequence = 2; //通用字段:消息序列号
string sessionId = 3; //通用字段:会话id
LoginRequest loginRequest = 4; //登录请求
LoginResponse loginResponse = 5; //登录响应
MessageRequest messageRequest = 6; //IM消息请求
MessageResponse messageResponse = 7; //IM消息响应
MessageNotification notification = 8; //系统通知
MessageHeartBeat heartBeat = 9; //心跳
}

编译之后将会生成对应的Java类文件。

自定义ProtoBuf编解码器

  面对复杂的Head-Content协议的解析,Netty内置的ProtoBuf系列编解码器就无能为力了,这时候我们需要自定义编解码器,需要自己去解决半包问题。包括如下两个方面:

  • 继承Netty提供的MessageToByteEncoder编码器,完成Head-Content协议的复杂数据包的编码,将ProtobufPOJO编码成Head-Content协议的二进制ByteBuf数据包。
  • 继承Netty提供的ByteToMessageDecoder解码器,完成完成Head-Content协议的复杂数据包的解码,将二进制ByteBuf数据包最终解码出ProtoBufPOJO实例。

自定义ProtoBuf编码器

  编码器将以下内容写入到目标ByteBuf:

  • 写入魔数、版本号
  • 写入ProtoBuf地POJO的字节码长度
  • 写入ProtoBuf的POJO的字节码内容

ProtoBuf编码器的实现比较简单,实现步骤如下:

  1. 添加lombok、netty依赖:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.18</version>
    <scope>provided</scope>
    </dependency>
    <dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.59.Final</version>
    </dependency>
  2. 在chatcommon模块中添加一个常量类,用于存储魔数、版本号等信息:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    public class ProtoInstant {
    /**
    * 魔数,可以通过配置获取
    */
    public static final short MAGIC_CODE = 0x86;
    /**
    * 版本号
    */
    public static final short VERSION_CODE = 0x01;
    }
  3. 在chatcommon模块中添加ProtoBufEncoder类:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    public class ProtoBufEncoder extends MessageToByteEncoder<ProtoMsg.Message> {
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, ProtoMsg.Message message, ByteBuf byteBuf) throws Exception {
    // 将对象转换为字节码
    byte[] bytes = message.toByteArray();
    // 读取消息的长度
    int length = bytes.length;
    // 1. 写入2个字节魔数
    byteBuf.writeShort(ProtoInstant.MAGIC_CODE);
    // 2. 写入2个字节版本号
    byteBuf.writeShort(ProtoInstant.VERSION_CODE);
    // 3. 写入消息长度
    byteBuf.writeInt(length);
    // 4. 写入消息体
    byteBuf.writeBytes(bytes);
    }
    }

自定义ProtoBuf解码器

  解码器的大致过程如下:

  • 首先读取魔数、版本号,如果长度位数不够,则终止读取
  • 然后读取长度
  • 最后按照净长度读取内容。如果内容的字节数不够,则恢复到之前的起始位置(也就是魔数的位置),然后终止读取

实现步骤如下:

  1. 在chatcommon模块中添加创建自定义异常类InvalidFrameException:

    1
    2
    3
    4
    5
    public class InvalidFrameException extends Exception{
    public InvalidFrameException(String s) {
    super(s);
    }
    }
  2. 在chatcommon模块中添加创建ProtoBufDecoder类:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    public class ProtoBufDecoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
    // 标记一下当前的readIndex的位置
    byteBuf.markReaderIndex();
    // 判断包头长度
    if (byteBuf.readableBytes() < 8) {
    // 如果不满足8个字节停止解码,不再往之后的handler传递
    return;
    }
    // 读取魔数
    short magic = byteBuf.readShort();
    if (magic != ProtoInstant.MAGIC_CODE) {
    String error = "客户端口令不对:" + channelHandlerContext.channel().remoteAddress();
    throw new InvalidFrameException(error);
    }
    // 读取版本号
    short version = byteBuf.readShort();
    // 读取传送过来的消息的长度
    int length = byteBuf.readInt();
    if (length < 0) {
    // 如果长度小于0,非法数据,关闭连接
    channelHandlerContext.close();
    }
    if (length > byteBuf.readableBytes()) {
    /*
    * 读到的消息体长度如果小于传送过来的消息长度
    * 重置读取位置(魔数所在的位置,byteBuf.markReaderIndex())
    * 停止解码,不再往之后的handler传递
    */
    byteBuf.resetReaderIndex();
    return;
    }
    byte[] array;
    // 通过hasArray()方法来判断是否是堆缓冲
    if (byteBuf.hasArray()) {
    // 堆缓冲
    ByteBuf slice = byteBuf.slice();
    array = slice.array();
    } else {
    // 直接缓冲
    array = new byte[length];
    byteBuf.readBytes(array, 0, length);
    }
    // 字节转换成对象
    ProtoMsg.Message message = ProtoMsg.Message.parseFrom(array);
    if (message != null) {
    // 将消息传递到下一个handler
    list.add(message);
    }
    }
    }

  消息协议的编解码的实现就到这里,下一章节将实现登录流程。