
前言
- 本文需网络知识、Netty、线程池、多线程以及并发工具锁等基础知识。
- 本文基于 RocketMQ_V4_9_3 版本、JDK1.8的环境进行源码分析。
- RocketMQ源码github地址 :RocketMQ源码下载 ,对应的是namesrv模块。
Namesrv
Namesrv是RocketMQ的"大脑",与生产者、broker、消费者都有密切的关系,尤其是存储了broker的元信息,并且提供了服务注册与发现以及路由管理。
Namesrv的网络架构
Namesrv使用的是Netty网络框架、单Reactor-多线程模式,如下图:

- 上面流程图我们大致可以知道 BOSS线程、Handler、工作线程 这三种概念 :
- Boss线程 : 通过 select或者poll或者epoll 监控客户端请求事件,收到事件后,通过 dispatch 进行分发。收到的事件有不同的类型,比如是建立连接请求类型,那么由 Acceptor 通过 accept 处理连接请求,然后创建一个 Handler 对象处理完成连接后的各种事件。如果不是建立连接请求类型,那么则分发调用连接对应的 Handler 来处理。
- Handler : 只负责将read读取数据后,分发给工作线程去执行某个请求的业务代码,然后收到工作线程的响应后,通过write去发送给client。
- 工作线程 : 负责真正的执行业务代码线程,执行结果会返回给Handler。
【PS : 如果没有Netty的知识体系,目前了解到这三个概念就行,尤其是Handler,因为Handler可以看到具体的代码实现,而且Handler可能是有多个组成的,比如一些编码、解码、根据长度读取byte、byte转换为对象、执行我们的逻辑代码等等】
NameSrv的启动、初始化、运行
启动前可配置一些环境变量、运行变量 :

通过调用NamesrvStarup的主函数来启动服务 :
public static void main(String[] args) {
main0(args);
}
public static NamesrvController main0(String[] args) {
try {
//①创建NamesrvController
NamesrvController controller = createNamesrvController(args);
//②启动Netty服务
start(controller);
String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
创建 NamesrvController --- 控制器
NamesrvController控制器维护着几个属性,包括RouteInfoManager(维护broker、topic、queue等存储结构信息)、2个属性配置对象(NamesrvConfig、NettyServerConfig)、RemotingServer(Netty服务的一些封装)等等属性。
public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
//设置版本号...
//获取运行参数...
//①创建NamesrvConfig、NettyServerConfig对象
final NamesrvConfig namesrvConfig = new NamesrvConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
//②设置监听port,即默认为9876
nettyServerConfig.setListenPort(9876);
//③查看启动时有没有-c命令指定配置文件的路径,如果有,那么将会基于指定的配置文件去设置属性值
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
if (file != null) {
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
//设置属性值
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);
namesrvConfig.setConfigStorePath(file);
System.out.printf("load config properties file OK, %s%n", file);
in.close();
}
}
//④查看启动时有没有-p命令,如果有,那么会打印所有NamesrvConfig、NettyServerConfig的属性值,然后退出进程
if (commandLine.hasOption('p')) {
InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
MixAll.printObjectProperties(console, namesrvConfig);
MixAll.printObjectProperties(console, nettyServerConfig);
System.exit(0);
}
//⑤再次执行写入NamesrvConfig的配置
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
//⑥这里需要配置下环境变量
if (null == namesrvConfig.getRocketmqHome()) {
System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
}
//日志相关的代码...
//⑦创建控制器
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
//把刚刚加载到的配置属性注册到Configuration对象的allConfigs
controller.getConfiguration().registerConfig(properties);
return controller;
}
createNamesrvController()逻辑代码不复杂,我们这里画个流程图总结一下 :

接下来具体看看NamesrvConfig、NettyServerConfig的属性以及作用 :
NamesrvConfig
- 这个配置类的属性是与Namesrc相关的 :
- rocketmqHome : 主目录,通过环境变量ROCKETMQ_HOME的配置或者-Drocketmq.home.dir=path。
- kvConfigPath : NameServer存储KV配置属性的持久化路径。
- configStorePath : 默认的配置文件路径,默认是不生效的,要通过配置文件去配置属性的话还是通过-c命令。
- orderMessageEnable : 是否支持顺序消息,默认不支持。
NettyServerConfig
- 这个配置类的属性是与Netty相关的,需要结合上面的网络架构的单Reactor多线程模式 :
- listenPort : 监听端口号,默认是9876.
- serverWorkerThreads : 工作线程,对应的就是执行Handler业务代码的线程数,默认8。
- serverCallbackExecutorThreads : 属于公有线程,只是封装成另外一个线程池去处理其他类型的业务逻辑,例如处理响应borker的请求,默认是4,即小于等于0时取4条线程。
- serverSelectorThreads : 处理Handler的read、write的IO线程,主要就是处理网络请求、解析请求包、然后转发到Handler给各大工作线程执行具体的业务操作,默认3。
- serverOnewaySemaphoreValue : 这个是控制broker跟namesrv的单向调用时的并发请求数,默认256。
- serverAsyncSemaphoreValue : 这个是控制broker跟namesrv的异步调用时的并发请求数,默认64。
- serverChannelMaxIdleTimeSeconds : 网络连接的最大空闲数,120s,即broker和namesrv的空闲时间超过120s,连接将关闭。
- serverSocketSndBufSize : socket的发送缓存大小,默认64k。
- serverSocketRcvBufSize : socket的接收缓存大小,默认64k。
- writeBufferHighWaterMark : 高水位线,用于高并发情况下的限流,如果写入缓冲区中排队的字节数超过高水位线,Channel.isWritable()将开始返回false,默认64k。
- writeBufferLowWaterMark : 低水位线,用于高并发情况下的限流,如果写入缓冲区中排队的字节数超过高水位线,然后下降到低水位线以下,Channel.isWritable()将再次开始返回true,默认32k。
- serverSocketBacklog : 服务端接受连接的队列长度,如果队列已满,客户端连接将被拒绝,默认1024。PS : select最大支持1024个并发请求,epoll没有限制。
- serverPooledByteBufAllocatorEnable : ByteBuffer是否开启缓存,默认是true。
- useEpollNativeSelector : 是否开启epoll IO模型,Linux环境下建议开启。
小结
NamesrvConfig :
- namesrv通过-c配置文件去加载配置属性的话,优先级是最高的,优先级第二是VM启动参数,最后才使用默认值。
NettyServerConfig :
- Netty采用了单Reactor多线程的模式,把网络I/O线程和业务线程分离开来,将任务交给业务线程池异步执行降低了Netty的I/O线程的占用时间、减轻了压力,但同时业务线程池增加了线程上下文切换的次数,所以serverWorkerThreads参数调优也是个潜在问题。
- Netty提供了很多网络参数,相关连接推荐 ==> Netty相关网络参数
初始化 Netty
加载完上面的配置参数后,就正式调用start()初始化Netty了 :
public static NamesrvController start(final NamesrvController controller) throws Exception {
//①初始化Netty
boolean initResult = controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
//很好的一个编程规范,大致的意思就是注册一个JVM钩子函数,在JVM关闭之前,先将线程池关闭,及时释放资源
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, (Callable<Void>) () -> {
controller.shutdown();
return null;
}));
//②启动Netty
controller.start();
return controller;
}
初始化Netty函数为 : initialize() :
public boolean initialize() {
// 将磁盘上指定的 KV json文件载入到内存来,存储到KVConfigManager的configTable
// 目前还没找到这个文件配置属性的作用,后期看到了再补上
this.kvConfigManager.load();
//① NettyRemotingServer 这个对象就是封装了Netty启动时的所需对象、环境
// 第二个参数是监听器,监听、管理这些连接,例如与broker断开连接后需要清理broker的相关数据
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
//② 创建一个RemotingExecutor线程池,虽然使用了serverWorkerThreads的参数,但这个线程池并不是Reactor的工作线程池
this.remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
//③ 注册一下处理器,其实就是处理器和线程池绑定起来,即处理器的业务代码都是交给与之绑定的线程池来执行
this.registerProcessor();
//④ 定时器,延迟5s后执行,每10s执行一次
this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.routeInfoManager::scanNotActiveBroker, 5, 10, TimeUnit.SECONDS);
//⑤ 定时器,延迟1s后执行,每10分钟执行一次
this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.kvConfigManager::printAllPeriodically, 1, 10, TimeUnit.MINUTES);
//下面是TSL相关的
return true;
}
Step① : 创建NettyRemotingServer对象,即初始化与Netty相关的变量,如ServerBootstrap、eventLoopGroupSelector、eventLoopGroupBoss、defaultEventExecutorGroup,还有一些监听器(channelEventListener)、公有线程池(publicExecutor)等,该类的父类是NettyRemotingAbstract,也有一些属性,比如与broker单向、异步调用时的基于信号量的并发度控制等等。
#Step①
public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
final ChannelEventListener channelEventListener) {
// 设置父类的2个信号量,与borker单向、异步调用的并发度控制相关
super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
// 创建Netty的相关变量
this.serverBootstrap = new ServerBootstrap();
this.nettyServerConfig = nettyServerConfig;
// 创建监听连接的监听器
this.channelEventListener = channelEventListener;
// 计算公有线程池的线程数
int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();
if (publicThreadNums <= 0) {
publicThreadNums = 4;
}
// 创建公有线程池 : NettyServerPublicExecutor
this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());
}
});
// 判断是否使用Epoll : 如果是Linux系统 且 配置了开启使用Epool的属性
if (useEpoll()) {
// 创建1条Boss线程的线程池 : NettyEPOLLBoss
this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet()));
}
});
// 创建3条I/O线程的线程池 : NettyServerEPOLLSelector
this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
private int threadTotal = nettyServerConfig.getServerSelectorThreads();
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
}
});
} else {
// 创建1条Boss线程的线程池 : NettyNIOBoss
this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyNIOBoss_%d", this.threadIndex.incrementAndGet()));
}
});
// 创建3条I/O线程的线程池 : NettyServerNIOSelector
this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
private int threadTotal = nettyServerConfig.getServerSelectorThreads();
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
}
});
}
// SSL相关的
loadSslContext();
}
// 判断依据 : 如果是Linux系统 且 配置了开启使用Epool的属性
private boolean useEpoll() {
return RemotingUtil.isLinuxPlatform()
&& nettyServerConfig.isUseEpollNativeSelector()
&& Epoll.isAvailable();
}
Step②: 创建了8条线程的线程池,这个线程池会与默认处理器绑定,即默认处理器的业务代码执行会交给这个线程池执行,处理器不知道可以看看Step③。
#Step②
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
Step③: 注册处理器,即设置父类NettyRemotingAbstract的defaultRequestProcessor属性,这个默认处理器DefaultRequestProcessor与Handler是有密切关系的。
#Step③
private void registerProcessor() {
// 使用的是默认处理器DefaultRequestProcessor,这个处理器是与Handler有密切关系的,目前知道这一点就行
// 这个处理器的业务代码执行交给remotingExecutor来执行
this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);
}
Step④: 定时器,延迟5s后执行,每10s执行一次,扫描哪些broker已经超过120s没联系了。
#Step④
this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.routeInfoManager::scanNotActiveBroker, 5, 10, TimeUnit.SECONDS);
// 扫描函数
public int scanNotActiveBroker() {
int removeCount = 0;
// 获取broker迭代器
Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, BrokerLiveInfo> next = it.next();
long last = next.getValue().getLastUpdateTimestamp();
// 如果距离上次的心跳检测时间已经超过120s
if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
// 关闭连接
RemotingUtil.closeChannel(next.getValue().getChannel());
// 剔除该broker
it.remove();
log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
// 删除broker、topic、queue等相关信息
this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
removeCount++;
}
}
return removeCount;
}
Step⑤: 定时器,延迟1s后执行,每10分钟执行一次,只是打印KV Json配置的参数信息。
#Step⑤
this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.kvConfigManager::printAllPeriodically, 1, 10, TimeUnit.MINUTES);
启动 Netty
initialize()初始化完Netty,接下来就是调用控制器的start()来启动Netty :
public void start() throws Exception {
// 核心看这里,真正的实现类是NettyRemotingServer
this.remotingServer.start();
// 这里开启了一条监听文件的后台线程,与TSL相关
if (this.fileWatchService != null) {
this.fileWatchService.start();
}
}
public void start() {
// 这个就是Reactoc的工作线程池 : NettyServerCodecThread
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyServerConfig.getServerWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
}
});
//① 初始化Handler,Handler对于Netty来说是个很重要的概念
prepareSharableHandlers();
// Netty相关的参数配置
ServerBootstrap childHandler =
// 绑定Boss线程组、I/O线程组
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, nettyServerConfig.getServerSocketBacklog())
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
// 把所有Handler载入到ChannelPipeline中
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
// Handler的代码由工作线程来执行
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
.addLast(defaultEventExecutorGroup,
// 编码器Handler
encoder,
// 解码器Handler
new NettyDecoder(),
// 120s的心跳检测Handler
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
// 建立连接管理的Handler
connectionManageHandler,
// 处理器Handler,这个可以看到具有的请求、响应的代码执行逻辑
serverHandler
);
}
});
//这里都是一些Netty一些网络参数的配置....,不懂的可以结合上面给的链接去查找说明
try {
//异步启动
ChannelFuture sync = this.serverBootstrap.bind().sync();
InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
this.port = addr.getPort();
} catch (InterruptedException e1) {
throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
}
//这个结合上面的监听器,就是上面创建NettyRemotingServer的时候传送构造器的第二个参数brokerHousekeepingService
if (this.channelEventListener != null) {
//② 开启一条后台线程执行监听连接
this.nettyEventExecutor.start();
}
//每1s都去扫描response连接
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
NettyRemotingServer.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
}
Step①: prepareSharableHandlers()初始化Handler :
#Step①
private void prepareSharableHandlers() {
//TSL相关的Handler
handshakeHandler = new HandshakeHandler(TlsSystemConfig.tlsMode);
//编码器Handler
encoder = new NettyEncoder();
//连接管理Handler
connectionManageHandler = new NettyConnectManageHandler();
//处理请求、响应的Handler,也是最核心的一个
serverHandler = new NettyServerHandler();
}
Step②: 开启一条后台线程执行监听、管理连接,实际是父类NettyRemotingAbstract的NettyEventExecutor属性,是一个自己内部类的一个线程对象。这个线程超时(3s)从阻塞队列中获取一个连接事件来根据状态执行不同的操作 :
【PS : 下面的图画错了,不是每3s去获取任务,而是不断的循环拿任务,如果没有任务的情况下,会阻塞3s后返回null】
#Step②:
public void run() {
log.info(this.getServiceName() + " service started");
// 获取监听对象,就是上面创建NettyRemotingServer的时候传送构造器的第二个参数brokerHousekeepingService
final ChannelEventListener listener = NettyRemotingAbstract.this.getChannelEventListener();
while (!this.isStopped()) {
try {
//超时3s从阻塞队列获取事件
NettyEvent event = this.eventQueue.poll(3000, TimeUnit.MILLISECONDS);
if (event != null && listener != null) {
switch (event.getType()) {
//根据类型执行不同操作...
//例如 listener.onChannelIdle(event.getRemoteAddr(), event.getChannel());
}
}
} catch (Exception e) {
log.warn(this.getServiceName() + " service has exception. ", e);
}
}
log.info(this.getServiceName() + " service end");
}
到此,Netty的初始化以及启动就基本介绍完了,现在我们停下脚步,画个流程图总结一下 :

- NamesrvController是核心 :
- 配置类信息 : NamesrvConfig、NettyServerConfig。
- Netty相关类 : NettyRemotingServer、NettyRemotingAbstract。
- 监听管理broker类 : BrokerHousekeepingService,后台监听线程妹3s都会基于连接事件的不同状态去执行的相关broker操作,那么这个执行的相关broker操作就是通过BrokerHousekeepingService来执行。
- ScheduledExecutorService : 定时线程池,每10s扫描下有没有超过120s没联系的broker,满足这个条件则剔除。
- 对于Netty来说,NettyRemotingServer是核心 :
- 单Reactor多线程 : 1条Boss线程、3条I/O线程、8条工作线程。
- Handler : NettyServerHandler。
本问还有很多疑惑
- broker与namesrv的注册、发现、路由是如何实现的?
- namesrv是怎么存储broker这些元信息的?
- 处理器为什么要跟RemotingExecutor绑定在一起?
- 公有线程池(publicExecutor)一般都是处理什么?
- 上图中没有被圈红的都是没串联起来或者说还没学到的点,上面4个问题能否串联起来?答案是可以的,下篇文章继续分析。
结束语
- 原创不易
- 希望看完这篇文章的你有所收获!
相关参考资料
- RocketMQ源码,V4_9_3 版本 ,对应的是namesrv模块
- 《RocketMQ技术内幕》【书籍】