本章节将对Name Server进行分析。
Name Server作用
官方文档对Name Server有一个概括:
Name Server是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。
也就是说就是RocketMQ的注册中心,是专门为RocketMQ设计的轻量级命名服务,可集群横向扩展、无状态等特点,而且代码不到1000行。
Name Server源码分析
Name Server功能的核心代码在namesrv子项目中,浏览了下,确实没有多少代码,真是屌。可以看下Quick Start中的Name Server的启动脚本:
脚本最后一行指定了启动类,我们就从这个类开始看起。
NamesrvStartup类分析
该类有两个成员变量:
CommandLine是apache CLI提供的功能,这个在第一章已经打下基础了。该类所有的逻辑都在main0()方法中,下面把代码和注释都贴出来:
NamesrvConfig类
该类存放Name Server一些属性配置:
NettyServerConfig类
Name Server Netty服务配置,RocketMQ底层通信采用使用netty4。如果要看懂源码,NIO和Netty的知识也是必不可少的,可以参考《Netty权威指南》,当然只是给个建议。下面把NettyServerConfig的属性列举下,这里也只是了解下属性的作用,更深层次的理解请参考Netty。
NamesrvController类
先看下NamesrvController的构造函数:
涉及到6个类的初始化,前两个已经看过了,剩下的应该是Name Server的核心内容了。NamesrvController类中还有其它方法,但是得先看下构造函数中剩下的4项是什么,再回过来看这个类。
KVConfigManager类
KVConfigManager有三个主要成员变量:
关于ReentrantReadWriteLock,该类中只用到了lockInterruptibly()方法,调用该方法会一直阻塞到获得锁,但是接受中断信号。
RouteInfoManager类
RouteInfoManager是管理路由信息的类,该类有7个主要成员变量:
QueueData类
该类用于存放Topic对应的队列信息:
BrokerData类
该类用于存放Broker信息:
BrokerLiveInfo类
该类是RouteInfoManager的内部类,用于存放存活的Broker信息:
BrokerHousekeepingService类
该类负责Broker连接事件的处理,实现了ChannelEventListener,主要用来管理RouteInfoManager的brokerLiveTable。实现了三个方法,如下:
从方法名字可以看出,都是和Channel相关,这里的Channel就是BrokerLiveInfo的channel变量,是Netty的原生类。总的来说,CHannel是Netty抽象出来的网络I/O读写相关的接口。上面三个方法都调用了RouteInfoManager的onChannelDestroy()方法,下面来分析下该方法:
Configuration类
Configuration类的构造函数接受变元参数,类型为Object,主要看下其registerConfig()方法:
Name Server初始化
之前NamesrvController类我们只看了它的构造函数,Name Server在启动时候调用了3个核心的方法,分行为initialize()、start()、shutdown(),这三个方法均在NamesrvController类中,涉及到Name Server的初始化、启动和关闭。先来看下初始化:
NettyRemotingServer类
在NamesrvController的initialize()方法中初始化了该类的一个实例,很容易看出该类与Netty息息相关,我们慢慢看,一旦遇到不明白的,就发散开学习。该类的构造函数中首先调用了父类的构造函数,其父类为NettyRemotingAbstract类,先看下父类的构造函数。
上面用到了Semaphore,简单解释下,Java中Semaphore可以控制某个资源可被同时访问的个数,其默认值分别为256和64。下面看下自己的构造函数:
NamesrvController类的initialize()方法中调用了私有的registerProcessor()方法,该方法的作用是向NettyRemotingServer注册处理逻辑:
registerDefaultProcessor()方法给NettyRemotingServer的成员变量defaultEventExecutorGroup赋值,NettyRemotingServer启动的时候注册了NettyServerHandler,NettyServerHandler重写了channelRead0()方法,当服务端接收到请求时调用该方法,其中最终调用了NettyRemotingAbstract类的processMessageReceived()方法,该方法中使用到了defaultRequestProcessor对象。这些方法后面会分析到,这里重点看下上述代码中的两个参数。
- DefaultRequestProcessor类:默认的请求处理类
- remotingExecutor:workerThread线程池
DefaultRequestProcessor类
该类实现自NettyRequestProcessor接口,重写了接口的两个方法,processRequest()和rejectRequest(),rejectRequest()方法比较简单,函数体直接返回false,此处表示始终不拒绝请求。processRequest()方法中根据请求的类型执行不同的操作,请求类型的定义在RequestCode中,阿里云上对该类有注释。processRequest()的描述如下:
该方法的内容暂不细看了,其中包含了很多针对性的动作操作,比如注册Broker等。有一点需要扩展开的是,该方法的入参和返回值有一个共同的类RemotingCommand,该类封装了RocketMQ的网络通信协议,当然,涉及到通信协议相关的不止这一个类,下面就把这一块来理一理。
RocketMQ网络通信协议
无论是发消息、拉取消息、发送心跳、修改配置等所有网络通讯层协议都使用同一套协议。并且无论请求还是响应,协议是一样的。协议格式如下:
<length> <header length> <header data> <body data>
- length:4个字节的int型数据,用来存储header length、header data、body data的总和
- header length:4个字节的int型数据,用来存储header data的长度
- header data:存储报文头部的数据
- body data:存储报文体的数据
下面看下Header相关字段:
相关encode和decode方法也在RemotingCommand类中,默认序列化成json格式,序列化框架采用自家的fastjson,详细代码就不看了。
Name Server启动
Name Server的启动调用了NamesrvController类的start()方法,其方法简单,直接调用了成员变量remotingServer的start()方法,该变量就是NettyRemotingServer类,下面就来看下最终的start()方法:
Name Server关闭
Name Server的关闭就没啥好说的了,调用的是NamesrvController的shutdown()方法,按次序把所有线程池关关掉就可以了。
Name Server的源码就看到这里,有些地方还没有完全理解透彻,随着源码的深入,应该能了解到RocketMQ的全貌。目前感触比较深的是,Netty一定要熟悉,要不然那些线程之间的协作就没法理解清楚。本人对Netty也是一知半解,后期这个知识点将是重中之重。