bboyjing's blog

RocketMQ源码分析之【rocketmq-namesrv】

本章节将对Name Server进行分析。

Name Server作用

官方文档对Name Server有一个概括:

Name Server是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。

也就是说就是RocketMQ的注册中心,是专门为RocketMQ设计的轻量级命名服务,可集群横向扩展、无状态等特点,而且代码不到1000行。

Name Server源码分析

Name Server功能的核心代码在namesrv子项目中,浏览了下,确实没有多少代码,真是屌。可以看下Quick Start中的Name Server的启动脚本:

1
2
3
> cat ./bin/mqnamesrv
...
sh ${ROCKETMQ_HOME}/bin/runserver.sh org.apache.rocketmq.namesrv.NamesrvStartup $@

脚本最后一行指定了启动类,我们就从这个类开始看起。

NamesrvStartup类分析

该类有两个成员变量:

1
2
3
4
// 用于保存、读取配置文件
public static Properties properties = null;
// 用于解析命令行输入参数
public static CommandLine commandLine = null;

CommandLine是apache CLI提供的功能,这个在第一章已经打下基础了。该类所有的逻辑都在main0()方法中,下面把代码和注释都贴出来:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
public static NamesrvController main0(String[] args) {
// 设置版本号[rocketmq.remoting.version -> MQVersion.CURRENT_VERSION]
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
/**
* Netty接收缓冲区大小
* 设置的是ChannelOption.SO_SNDBUF参数
* 该参数对应于套接字选项中的SO_SNDBUF
*/
if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) {
NettySystemConfig.socketSndbufSize = 4096;
}
/**
* Netty发送缓冲区大小
* 设置的是ChannelOption.SO_RCVBUF参数
* 该参数对应于套接字选项中的SO_RCVBUF
*/
if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) {
NettySystemConfig.socketRcvbufSize = 4096;
}
try {
// FastJson版本冲突检测,已被注释
//PackageConflictDetect.detectFastjson();
/**
* 构造org.apache.commons.cli.Options
* 并且添加-h -n参数
* -h 打印帮助信息
* -h 指定Name Server地址
*/
Options options = ServerUtil.buildCommandlineOptions(new Options());
/**
* 初始化成员变量commandLine
* 并且在Options中添加-c -p参数
* -c 指定Name Server配置文件
* -p 打印配置信息
*/
commandLine =
ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options),
new PosixParser());
if (null == commandLine) {
System.exit(-1);
return null;
}
// 初始化NamesrvConfig和NettyServerConfig
final NamesrvConfig namesrvConfig = new NamesrvConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
// Name Server的端口定为9876
nettyServerConfig.setListenPort(9876);
/**
* 如果命令带有-c参数,则读取文件内容,转换成全局Properties
* 通过反射,将Properties中的值赋值给NamesrvConfig、NettyServerConfig
*/
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
if (file != null) {
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);
namesrvConfig.setConfigStorePath(file);
System.out.printf("load config properties file OK, " + file + "%n");
in.close();
}
}
// 如果命令带有-p参数,则打印出NamesrvConfig、NettyServerConfig的属性
if (commandLine.hasOption('p')) {
MixAll.printObjectProperties(null, namesrvConfig);
MixAll.printObjectProperties(null, nettyServerConfig);
System.exit(0);
}
/**
* 解析命令行参数,并加载到namesrvConfig配置中
* 通过debug发现,这一行在这里没用
* commandLine2Properties()方法中将参数全名和属性值转换成Properties
* 目前支持的参数的全名为configFile、help、namesrvAddr、printConfigItem
* 但是NamesrvConfig类中没有与之对应的set方法,所以不知道意义何在
*/
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
// 判断ROCKETMQ_HOME不能为空
if (null == namesrvConfig.getRocketmqHome()) {
System.out.printf("Please set the " + MixAll.ROCKETMQ_HOME_ENV
+ " variable in your environment to match the location of the RocketMQ installation%n");
System.exit(-2);
}
/**
* 初始化Logback日志工厂
* RocketMQ默认使用Logback作为日志输出
*/
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
MixAll.printObjectProperties(log, namesrvConfig);
MixAll.printObjectProperties(log, nettyServerConfig);
/**
* 初始化NamesrvController
* 该类是Name Server的主要控制类
*/
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
/**
* remember all configs to prevent discard
* 将全局Properties的内容复制到NamesrvController.Configuration.allConfigs中
*/
controller.getConfiguration().registerConfig(properties);
// 初始化NamesrvController
boolean initResult = controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
// 注册ShutdownHook
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
private volatile boolean hasShutdown = false;
private AtomicInteger shutdownTimes = new AtomicInteger(0);
@Override
public void run() {
synchronized (this) {
log.info("shutdown hook was invoked, " + this.shutdownTimes.incrementAndGet());
if (!this.hasShutdown) {
this.hasShutdown = true;
long begineTime = System.currentTimeMillis();
controller.shutdown();
long consumingTimeTotal = System.currentTimeMillis() - begineTime;
log.info("shutdown hook over, consuming time total(ms): " + consumingTimeTotal);
}
}
}
}, "ShutdownHook"));
// 启动Netty服务
controller.start();
String tip = "The Name Server boot success. serializeType=" +
RemotingCommand.getSerializeTypeConfigInThisServer();
log.info(tip);
System.out.printf(tip + "%n");
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}

NamesrvConfig类

该类存放Name Server一些属性配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class NamesrvConfig {
/**
* RocketMQ安装目录
* 如果没有指定的话,默认值为系统环境变量ROCKETMQ_HOME
* 通过System.getenv获取,可以在~/.profile中export
* 或者可以在配置文件中指定rocketmqHome=***
*/
private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY,
System.getenv(MixAll.ROCKETMQ_HOME_ENV));
/**
* KV配置持久化地址
* 默认为System.getProperty("user.home")/namesrv/kvConfig.json文件
*/
private String kvConfigPath = System.getProperty("user.home") +
File.separator + "namesrv" +
File.separator + "kvConfig.json";
/**
* 持久化配置路径
* 默认为System.getProperty("user.home")/namesrv/namesrv.properties文件
*/
private String configStorePath = System.getProperty("user.home") +
File.separator + "namesrv" +
File.separator + "namesrv.properties";
// 下面三个属性暂不清楚是干嘛的
private String productEnvName = "center";
private boolean clusterTest = false;
private boolean orderMessageEnable = false;
...
}

NettyServerConfig类

Name Server Netty服务配置,RocketMQ底层通信采用使用netty4。如果要看懂源码,NIO和Netty的知识也是必不可少的,可以参考《Netty权威指南》,当然只是给个建议。下面把NettyServerConfig的属性列举下,这里也只是了解下属性的作用,更深层次的理解请参考Netty。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class NettyServerConfig implements Cloneable {
// 默认监听端口
private int listenPort = 8888;
// Netty服务工作线程数量
private int serverWorkerThreads = 8;
// Netty服务异步回调线程池线程数量
private int serverCallbackExecutorThreads = 0;
// Netty Selector线程数量
private int serverSelectorThreads = 3;
// 控制单向的信号量
private int serverOnewaySemaphoreValue = 256;
// 控制异步信号量
private int serverAsyncSemaphoreValue = 64;
// 服务空闲心跳检测时间间隔 单位秒
private int serverChannelMaxIdleTimeSeconds = 120;
// Netty发送缓冲区
private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
// Netty接受缓冲区
private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
// 是否使用Netty内存池
private boolean serverPooledByteBufAllocatorEnable = true;
...
}

NamesrvController类

先看下NamesrvController的构造函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
// Name Server配置
this.namesrvConfig = namesrvConfig;
// Netty配置
this.nettyServerConfig = nettyServerConfig;
// KV配置管理
this.kvConfigManager = new KVConfigManager(this);
// 路由信息管理
this.routeInfoManager = new RouteInfoManager();
// Broker连接事件处理服务
this.brokerHousekeepingService = new BrokerHousekeepingService(this);
// 属性配置
this.configuration = new Configuration(
log,
this.namesrvConfig, this.nettyServerConfig
);
/**
* 设置Configuration对象的storePathField字段
* 此处该字段被赋值为NamesrvConfig是的configStorePath字段
*/
this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
}

涉及到6个类的初始化,前两个已经看过了,剩下的应该是Name Server的核心内容了。NamesrvController类中还有其它方法,但是得先看下构造函数中剩下的4项是什么,再回过来看这个类。

KVConfigManager类

KVConfigManager有三个主要成员变量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class KVConfigManager {
// Name Server主要控制类
private final NamesrvController namesrvController;
// 读写锁
private final ReadWriteLock lock = new ReentrantReadWriteLock();
/**
* 以命名空间为单位存储的配置文件,存数示例如下:
* {"configTable":{"ORDER_TOPIC_CONFIG":{"UnitTest":"test"}}}%
* 此处的Namespace为ORDER_TOPIC_CONFIG,暂时不知道Namespace具体的含义
*/
private final HashMap<String/* Namespace */, HashMap<String/* Key */, String/* Value */>> configTable =
new HashMap<String, HashMap<String, String>>();
...
}

关于ReentrantReadWriteLock,该类中只用到了lockInterruptibly()方法,调用该方法会一直阻塞到获得锁,但是接受中断信号。

RouteInfoManager类

RouteInfoManager是管理路由信息的类,该类有7个主要成员变量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class RouteInfoManager {
private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
// 读写锁
private final ReadWriteLock lock = new ReentrantReadWriteLock();
// Topic,以及对应的队列信息
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
// 以Broker Name为单位的Broker集合
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
// 集群以及属于该进群的Broker列表
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
// 存活的Broker地址列表
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
// Broker对应的Filter Server列表
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
...
}

QueueData类

该类用于存放Topic对应的队列信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class QueueData implements Comparable<QueueData> {
// 队列所属的Broker名称
private String brokerName;
// 读队列数量
private int readQueueNums;
// 写队列数量
private int writeQueueNums;
// Topic的读写权限(2是写 4是读 6是读写)
private int perm;
/**
* 该字段对应TopicConfig.topicSysFlag
* 由创建Topic时-u/-s参数指定,不知道有何作用
* 参照:https://github.com/alibaba/RocketMQ/issues/206
*/
private int topicSynFlag;
...
}

BrokerData类

该类用于存放Broker信息:

1
2
3
4
5
6
7
8
9
10
11
12
public class BrokerData implements Comparable<BrokerData> {
// Broker所属的集群
private String cluster;
// Broker名称
private String brokerName;
/**
* Broker ID以及其地址
* 同一个brokerName下可以有一个Master和多个Slave,所以brokerAddrs是一个集合
*/
private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;
...
}

BrokerLiveInfo类

该类是RouteInfoManager的内部类,用于存放存活的Broker信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class BrokerLiveInfo {
// 最后一次更新时间
private long lastUpdateTimestamp;
// 版本号
private DataVersion dataVersion;
// Netty的Channel
private Channel channel;
/**
* HA Broker的地址
* 是Slave从Master拉取数据时链接的地址,由brokerIp2+HA端口构成
*/
private String haServerAddr;
...
}

BrokerHousekeepingService类

该类负责Broker连接事件的处理,实现了ChannelEventListener,主要用来管理RouteInfoManager的brokerLiveTable。实现了三个方法,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class BrokerHousekeepingService implements ChannelEventListener {
...
@Override
public void onChannelClose(String remoteAddr, Channel channel) {
this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
}
@Override
public void onChannelException(String remoteAddr, Channel channel) {
this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
}
@Override
public void onChannelIdle(String remoteAddr, Channel channel) {
this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
}
...
}

从方法名字可以看出,都是和Channel相关,这里的Channel就是BrokerLiveInfo的channel变量,是Netty的原生类。总的来说,CHannel是Netty抽象出来的网络I/O读写相关的接口。上面三个方法都调用了RouteInfoManager的onChannelDestroy()方法,下面来分析下该方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
public class RouteInfoManager {
/**
* 清除离线的Broker信息
* @param remoteAddr Broker的地址
* @param channel Broker和Name Server之间的连接通道,是一个NioSocketChannel实例
*/
public void onChannelDestroy(String remoteAddr, Channel channel) {
String brokerAddrFound = null;
if (channel != null) {
try {
try {
/**
* 通过channel从brokerLiveTable中找出对应的Broker地址
* 由于只是读,所以只需要获取readLock()
* 若该Broker已经从存活的Broker地址列表中被清除,则直接使用remoteAddr
*/
this.lock.readLock().lockInterruptibly();
Iterator<Entry<String, BrokerLiveInfo>> itBrokerLiveTable =
this.brokerLiveTable.entrySet().iterator();
while (itBrokerLiveTable.hasNext()) {
Entry<String, BrokerLiveInfo> entry = itBrokerLiveTable.next();
if (entry.getValue().getChannel() == channel) {
brokerAddrFound = entry.getKey();
break;
}
}
} finally {
this.lock.readLock().unlock();
}
} catch (Exception e) {
log.error("onChannelDestroy Exception", e);
}
}
if (null == brokerAddrFound) {
brokerAddrFound = remoteAddr;
} else {
log.info("the broker's channel destroyed, {}, clean it's data structure at once",
brokerAddrFound);
}
if (brokerAddrFound != null && brokerAddrFound.length() > 0) {
try {
try {
this.lock.writeLock().lockInterruptibly();
// 从存活的Broker地址列表中清除该Broker
this.brokerLiveTable.remove(brokerAddrFound);
// 从Filter Server中清除该Broker
this.filterServerTable.remove(brokerAddrFound);
/**
* 通过brokerAddrFound从Broker列表中找到该Broker,并删除
* 删除的是BrokerData.brokerAddrs的元素
* 上述操作之后,如果BrokerData.brokerAddrs空了,则从Broker列表中将Broker Name对应的元素删除
* 删除的是成员变量brokerAddrTable的元素
*/
String brokerNameFound = null;
boolean removeBrokerName = false;
Iterator<Entry<String, BrokerData>> itBrokerAddrTable =
this.brokerAddrTable.entrySet().iterator();
while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) {
BrokerData brokerData = itBrokerAddrTable.next().getValue();
Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator();
while (it.hasNext()) {
Entry<Long, String> entry = it.next();
Long brokerId = entry.getKey();
String brokerAddr = entry.getValue();
if (brokerAddr.equals(brokerAddrFound)) {
brokerNameFound = brokerData.getBrokerName();
it.remove();
log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed",
brokerId, brokerAddr);
break;
}
}
if (brokerData.getBrokerAddrs().isEmpty()) {
removeBrokerName = true;
itBrokerAddrTable.remove();
log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed",
brokerData.getBrokerName());
}
}
/**
* 如果该Broker Name下没有节点提供服务(已经从brokerAddrTable中清除)
* 从Broker集群中删除该Broker信息
* 如果集群下没有提供服务的Broker,则从集群列表中删除该进群信息
* 删除的是成员变量clusterAddrTable的元素
*/
if (brokerNameFound != null && removeBrokerName) {
Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, Set<String>> entry = it.next();
String clusterName = entry.getKey();
Set<String> brokerNames = entry.getValue();
boolean removed = brokerNames.remove(brokerNameFound);
if (removed) {
log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed",
brokerNameFound, clusterName);
if (brokerNames.isEmpty()) {
log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster",
clusterName);
it.remove();
}
break;
}
}
}
/**
* 如果该Broker Name下没有节点提供服务(已经从brokerAddrTable中清除)
* 从Topic列表中删除该Broker Name的信息,删除的是topicQueueTable的value的元素
* 如果Topic没有可提供服务的Broker了,则删除Topic的信息,删除的是topicQueueTable的元素
*/
if (removeBrokerName) {
Iterator<Entry<String, List<QueueData>>> itTopicQueueTable =
this.topicQueueTable.entrySet().iterator();
while (itTopicQueueTable.hasNext()) {
Entry<String, List<QueueData>> entry = itTopicQueueTable.next();
String topic = entry.getKey();
List<QueueData> queueDataList = entry.getValue();
Iterator<QueueData> itQueueData = queueDataList.iterator();
while (itQueueData.hasNext()) {
QueueData queueData = itQueueData.next();
if (queueData.getBrokerName().equals(brokerNameFound)) {
itQueueData.remove();
log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed",
topic, queueData);
}
}
if (queueDataList.isEmpty()) {
itTopicQueueTable.remove();
log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed",
topic);
}
}
}
} finally {
this.lock.writeLock().unlock();
}
} catch (Exception e) {
log.error("onChannelDestroy Exception", e);
}
}
}
}

Configuration类

Configuration类的构造函数接受变元参数,类型为Object,主要看下其registerConfig()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class Configuration {
...
public Configuration registerConfig(Object configObject) {
try {
readWriteLock.writeLock().lockInterruptibly();
try {
/**
* 通过反射将Config对象转为Properties
* Property的 k,v -> 对象的变量名,变量的值
*/
Properties registerProps = MixAll.object2Properties(configObject);
// 将registerProps中的元素merge到成员变量allConfigs中
merge(registerProps, this.allConfigs);
// 将Config对象添加成员变量configObjectList中
configObjectList.add(configObject);
} finally {
readWriteLock.writeLock().unlock();
}
} catch (InterruptedException e) {
log.error("registerConfig lock error");
}
return this;
}
...
}

Name Server初始化

之前NamesrvController类我们只看了它的构造函数,Name Server在启动时候调用了3个核心的方法,分行为initialize()、start()、shutdown(),这三个方法均在NamesrvController类中,涉及到Name Server的初始化、启动和关闭。先来看下初始化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class NamesrvController {
...
/**
* 初始化NamesrvController
* @return
*/
public boolean initialize() {
// 从kvConfigPath加载文件内容至KVConfigManager
this.kvConfigManager.load();
// 初始化RemotingServer
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
// workerThread线程池,默认线程数为8
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
// 注册Netty处理逻辑
this.registerProcessor();
/**
* 延迟5秒启动、每10秒执行一次的定时任务
* 用于扫描不存活的Broker
*/
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
/**
* 延迟1分钟启动、每10分钟执行一次的定时任务
* 作用式打印出kvConfig配置
*/
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);
return true;
}
...
}

NettyRemotingServer类

在NamesrvController的initialize()方法中初始化了该类的一个实例,很容易看出该类与Netty息息相关,我们慢慢看,一旦遇到不明白的,就发散开学习。该类的构造函数中首先调用了父类的构造函数,其父类为NettyRemotingAbstract类,先看下父类的构造函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
public abstract class NettyRemotingAbstract {
...
/**
* @param permitsOneway 控制单向信号量
* @param permitsAsync 控制异步信号量
*/
public NettyRemotingAbstract(final int permitsOneway, final int permitsAsync) {
// 创建公平的信号量
this.semaphoreOneway = new Semaphore(permitsOneway, true);
this.semaphoreAsync = new Semaphore(permitsAsync, true);
}
...
}

上面用到了Semaphore,简单解释下,Java中Semaphore可以控制某个资源可被同时访问的个数,其默认值分别为256和64。下面看下自己的构造函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
...
public NettyRemotingServer(final NettyServerConfig nettyServerConfig, final ChannelEventListener channelEventListener) {
// 设置单向信号量和异步信号量
super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
// Netty原生类,Netty的主要启动类
this.serverBootstrap = new ServerBootstrap();
this.nettyServerConfig = nettyServerConfig;
this.channelEventListener = channelEventListener;
// 判断Netty服务异步回调线程池线程数量,默认值为4
int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();
if (publicThreadNums <= 0) {
publicThreadNums = 4;
}
// 初始化线程数固定的线程池,并自定义线程名称
this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());
}
});
/**
* 创建单个线程的NIO线程组
* 该EventLoop线程组负责处理客户端的TCP连接请求
*/
this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyBoss_%d", this.threadIndex.incrementAndGet()));
}
});
/**
* 创建固定数量的线程组,默认3个
* 如果是Linux平台,并且useEpollNativeSelector属性为true时使用EpollEventLoopGroup,否则使用NioEventLoopGroup
* 该EventLoop线程组真正负责I/O读写操作
*/
if (RemotingUtil.isLinuxPlatform() //
&& nettyServerConfig.isUseEpollNativeSelector()) {
this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
private int threadTotal = nettyServerConfig.getServerSelectorThreads();
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
}
});
} else {
this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
private int threadTotal = nettyServerConfig.getServerSelectorThreads();
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
}
});
}
}
...
}

NamesrvController类的initialize()方法中调用了私有的registerProcessor()方法,该方法的作用是向NettyRemotingServer注册处理逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class NamesrvController {
...
/**
* 注册Netty服务端业务处理逻辑
* NamesrvConfig的clusterTest字段暂时不知道何用,先就看false分支
*/
private void registerProcessor() {
if (namesrvConfig.isClusterTest()) {
this.remotingServer.registerDefaultProcessor(
new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),
this.remotingExecutor);
} else {
this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);
}
}
...
}

registerDefaultProcessor()方法给NettyRemotingServer的成员变量defaultEventExecutorGroup赋值,NettyRemotingServer启动的时候注册了NettyServerHandler,NettyServerHandler重写了channelRead0()方法,当服务端接收到请求时调用该方法,其中最终调用了NettyRemotingAbstract类的processMessageReceived()方法,该方法中使用到了defaultRequestProcessor对象。这些方法后面会分析到,这里重点看下上述代码中的两个参数。

  • DefaultRequestProcessor类:默认的请求处理类
  • remotingExecutor:workerThread线程池

DefaultRequestProcessor类

该类实现自NettyRequestProcessor接口,重写了接口的两个方法,processRequest()和rejectRequest(),rejectRequest()方法比较简单,函数体直接返回false,此处表示始终不拒绝请求。processRequest()方法中根据请求的类型执行不同的操作,请求类型的定义在RequestCode中,阿里云上对该类有注释。processRequest()的描述如下:

1
2
3
4
5
6
7
public class DefaultRequestProcessor implements NettyRequestProcessor {
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
...
}
}

该方法的内容暂不细看了,其中包含了很多针对性的动作操作,比如注册Broker等。有一点需要扩展开的是,该方法的入参和返回值有一个共同的类RemotingCommand,该类封装了RocketMQ的网络通信协议,当然,涉及到通信协议相关的不止这一个类,下面就把这一块来理一理。

RocketMQ网络通信协议

无论是发消息、拉取消息、发送心跳、修改配置等所有网络通讯层协议都使用同一套协议。并且无论请求还是响应,协议是一样的。协议格式如下:
<length> <header length> <header data> <body data>

  • length:4个字节的int型数据,用来存储header length、header data、body data的总和
  • header length:4个字节的int型数据,用来存储header data的长度
  • header data:存储报文头部的数据
  • body data:存储报文体的数据

下面看下Header相关字段:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
public class RemotingCommand {
...
// 以下是Hear相关字段
/**
* Request :请求操作码,对应RequestCode,通过switch匹配处理逻辑
* Response:相应吗,对应ResponseCode,0表示成功,非0表示错误码
*/
private int code;
/**
* Request :标记请求方的语言类型,默认JAVA
* Response:应答方所使用的语言,默认JAVA
*/
private LanguageCode language = LanguageCode.JAVA;
/**
* Request :请求方的版本号
* Response:应答方的版本号
*/
private int version = 0;
/**
* Request :同一个连接上标记是哪次请求。
* 服务响应时会返回这个请求标示码,以达到请求方多线程中复用连接,在收到响应的时候找到对应的请求
* Response:应答方不做修改,原值返回
* 该值的类型是AtomicInteger,获取了当前值,然后再Increment
*/
private int opaque = requestId.getAndIncrement();
/**
* 标记通信的类型,借助下面两个字段,通过2位的bit mask来标记
* private static final int RPC_TYPE = 0;
* private static final int RPC_ONEWAY = 1
* 低位为0表示是请求方,为1是表示应大方
* 高位为0时表示普通的需要等待结果的RPC请求,高位为1时表示单向请求
*/
private int flag = 0;
/**
* Request :传输的自定义文本
* Response:错误详细描述信息
*/
private String remark;
/**
* Request :请求自定义字段
* Response:应答自定义字段
*/
private HashMap<String, String> extFields;
/**
* 自定义请求头
* 当数据需要网络传输时,customHeader会转为extFields
*/
private transient CommandCustomHeader customHeader;
public void markOnewayRPC() {
int bits = 1 << RPC_ONEWAY;
this.flag |= bits;
}
@JSONField(serialize = false)
public boolean isOnewayRPC() {
int bits = 1 << RPC_ONEWAY;
return (this.flag & bits) == bits;
}
public void markResponseType() {
int bits = 1 << RPC_TYPE;
this.flag |= bits;
}
@JSONField(serialize = false)
public boolean isResponseType() {
int bits = 1 << RPC_TYPE;
return (this.flag & bits) == bits;
}
...
}

相关encode和decode方法也在RemotingCommand类中,默认序列化成json格式,序列化框架采用自家的fastjson,详细代码就不看了。

Name Server启动

Name Server的启动调用了NamesrvController类的start()方法,其方法简单,直接调用了成员变量remotingServer的start()方法,该变量就是NettyRemotingServer类,下面就来看下最终的start()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
...
@Override
public void start() {
// 构造执行编码相关任务的线程池,默认线程数为8
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyServerConfig.getServerWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
}
});
// 初始化Netty启动类
ServerBootstrap childHandler = this.serverBootstrap
// group方法设置NIO线程组
.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
// 设置Channel为NioServerSocketChannel
.channel(NioServerSocketChannel.class)
// 服务端接收客户端连接数上限
.option(ChannelOption.SO_BACKLOG, 1024)
// 允许公用本地地址和端口
.option(ChannelOption.SO_REUSEADDR, true)
// 不允许长连接
.option(ChannelOption.SO_KEEPALIVE, false)
// 禁止使用Nagle算法,使用于小数据即时传输
.childOption(ChannelOption.TCP_NODELAY, true)
// 设置发送缓冲区大小
.option(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
// 设置接缓冲区大小
.option(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
// 设置Netty监听的端口
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
// 绑定I/O事件的处理类
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
// 执行编码相关任务的线程池
defaultEventExecutorGroup,
// 编码器
new NettyEncoder(),
/**
* 解码器,基于LengthFieldBasedFrameDecoder
* 支持自动的TCP粘包和板报处理,只需要给出标识消息长度的字段偏移量和消息长度自身所占的字节数
*/
new NettyDecoder(),
/**
* 心跳检测,默认120秒
* 当客户端连接空闲指定时间之后,触发userEventTriggered()方法
* userEventTriggered()方法在NettyConnetManageHandler类中实现,具体行为就是关闭空闲的通道
*
*/
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
// Netty连接管理Handler
new NettyConnetManageHandler(),
// 具体的业务处理Handler
new NettyServerHandler());
}
});
// 使用Netty内存池,至于这里的内存池不详细分析了,以后如果看Netty源码再说
if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}
try {
// 绑定端口,同步等待成功
ChannelFuture sync = this.serverBootstrap.bind().sync();
// 获取绑定的地址和端口
InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
this.port = addr.getPort();
} catch (InterruptedException e1) {
throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
}
/**
* 如果channelEventListener不为空,则启动nettyEventExecuter
* 在Name Server启动环节,此处的channelEventListener是BrokerHousekeepingService实例
* nettyEventExecuter的作用就是根据Channel的状态来调用channelEventListener的方法
*/
if (this.channelEventListener != null) {
this.nettyEventExecuter.start();
}
/**
* 定时任务,3秒后启动,每秒执行一次
* 用于处理responseTable,来实现异步回调,支持客户端invokeAsync
*/
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
NettyRemotingServer.this.scanResponseTable();
} catch (Exception e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
}
...
}

Name Server关闭

Name Server的关闭就没啥好说的了,调用的是NamesrvController的shutdown()方法,按次序把所有线程池关关掉就可以了。

Name Server的源码就看到这里,有些地方还没有完全理解透彻,随着源码的深入,应该能了解到RocketMQ的全貌。目前感触比较深的是,Netty一定要熟悉,要不然那些线程之间的协作就没法理解清楚。本人对Netty也是一知半解,后期这个知识点将是重中之重。