把学习当成一种习惯
选择往往大于努力,越努力越幸运

前言

  • 本文需网络知识、Netty、线程池、多线程以及并发工具锁等基础知识。
  • 本文基于 RocketMQ_V4_9_3 版本、JDK1.8的环境进行源码分析。
  • RocketMQ源码github地址 :RocketMQ源码下载 ,对应的是namesrv模块。

Namesrv

  Namesrv是RocketMQ的"大脑",与生产者、broker、消费者都有密切的关系,尤其是存储了broker的元信息,并且提供了服务注册与发现以及路由管理。

Namesrv的网络架构

Namesrv使用的是Netty网络框架、单Reactor-多线程模式,如下图:

  • 上面流程图我们大致可以知道 BOSS线程、Handler、工作线程 这三种概念 :
    • Boss线程 : 通过 select或者poll或者epoll 监控客户端请求事件,收到事件后,通过 dispatch 进行分发。收到的事件有不同的类型,比如是建立连接请求类型,那么由 Acceptor 通过 accept 处理连接请求,然后创建一个 Handler 对象处理完成连接后的各种事件。如果不是建立连接请求类型,那么则分发调用连接对应的 Handler 来处理。
    • Handler : 只负责将read读取数据后,分发给工作线程去执行某个请求的业务代码,然后收到工作线程的响应后,通过write去发送给client。
    • 工作线程 : 负责真正的执行业务代码线程,执行结果会返回给Handler。

【PS : 如果没有Netty的知识体系,目前了解到这三个概念就行,尤其是Handler,因为Handler可以看到具体的代码实现,而且Handler可能是有多个组成的,比如一些编码、解码、根据长度读取byte、byte转换为对象、执行我们的逻辑代码等等】

NameSrv的启动、初始化、运行

启动前可配置一些环境变量、运行变量 :

通过调用NamesrvStarup的主函数来启动服务 :


public static void main(String[] args) {
        main0(args);
    }

public static NamesrvController main0(String[] args) {
        try {
            //①创建NamesrvController
            NamesrvController controller = createNamesrvController(args);
            //②启动Netty服务
            start(controller);
            String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
            log.info(tip);
            System.out.printf("%s%n", tip);
            return controller;
        } catch (Throwable e) {
            e.printStackTrace();
            System.exit(-1);
        }
        return null;
    }

创建 NamesrvController --- 控制器

NamesrvController控制器维护着几个属性,包括RouteInfoManager(维护broker、topic、queue等存储结构信息)、2个属性配置对象(NamesrvConfig、NettyServerConfig)、RemotingServer(Netty服务的一些封装)等等属性。


public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
        //设置版本号...
        //获取运行参数...

        //①创建NamesrvConfig、NettyServerConfig对象
        final NamesrvConfig namesrvConfig = new NamesrvConfig();
        final NettyServerConfig nettyServerConfig = new NettyServerConfig();
        //②设置监听port,即默认为9876
        nettyServerConfig.setListenPort(9876);
        //③查看启动时有没有-c命令指定配置文件的路径,如果有,那么将会基于指定的配置文件去设置属性值
        if (commandLine.hasOption('c')) {
            String file = commandLine.getOptionValue('c');
            if (file != null) {
                InputStream in = new BufferedInputStream(new FileInputStream(file));
                properties = new Properties();
                properties.load(in);
                //设置属性值
                MixAll.properties2Object(properties, namesrvConfig);
                MixAll.properties2Object(properties, nettyServerConfig);

                namesrvConfig.setConfigStorePath(file);

                System.out.printf("load config properties file OK, %s%n", file);
                in.close();
            }
        }
        //④查看启动时有没有-p命令,如果有,那么会打印所有NamesrvConfig、NettyServerConfig的属性值,然后退出进程
        if (commandLine.hasOption('p')) {
            InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
            MixAll.printObjectProperties(console, namesrvConfig);
            MixAll.printObjectProperties(console, nettyServerConfig);
            System.exit(0);
        }
        //⑤再次执行写入NamesrvConfig的配置
        MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
        //⑥这里需要配置下环境变量
        if (null == namesrvConfig.getRocketmqHome()) {
            System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
            System.exit(-2);
        }
        
        //日志相关的代码...

        //⑦创建控制器
        final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);

        //把刚刚加载到的配置属性注册到Configuration对象的allConfigs
        controller.getConfiguration().registerConfig(properties);
        
        return controller;
    }


createNamesrvController()逻辑代码不复杂,我们这里画个流程图总结一下 :

接下来具体看看NamesrvConfig、NettyServerConfig的属性以及作用 :

NamesrvConfig

  • 这个配置类的属性是与Namesrc相关的 :
    • rocketmqHome : 主目录,通过环境变量ROCKETMQ_HOME的配置或者-Drocketmq.home.dir=path。
    • kvConfigPath : NameServer存储KV配置属性的持久化路径。
    • configStorePath : 默认的配置文件路径,默认是不生效的,要通过配置文件去配置属性的话还是通过-c命令。
    • orderMessageEnable : 是否支持顺序消息,默认不支持。

NettyServerConfig

  • 这个配置类的属性是与Netty相关的,需要结合上面的网络架构的单Reactor多线程模式 :
    • listenPort : 监听端口号,默认是9876.
    • serverWorkerThreads : 工作线程,对应的就是执行Handler业务代码的线程数,默认8。
    • serverCallbackExecutorThreads : 属于公有线程,只是封装成另外一个线程池去处理其他类型的业务逻辑,例如处理响应borker的请求,默认是4,即小于等于0时取4条线程。
    • serverSelectorThreads : 处理Handler的read、write的IO线程,主要就是处理网络请求、解析请求包、然后转发到Handler给各大工作线程执行具体的业务操作,默认3。
    • serverOnewaySemaphoreValue : 这个是控制broker跟namesrv的单向调用时的并发请求数,默认256。
    • serverAsyncSemaphoreValue : 这个是控制broker跟namesrv的异步调用时的并发请求数,默认64。
    • serverChannelMaxIdleTimeSeconds : 网络连接的最大空闲数,120s,即broker和namesrv的空闲时间超过120s,连接将关闭。
    • serverSocketSndBufSize : socket的发送缓存大小,默认64k。
    • serverSocketRcvBufSize : socket的接收缓存大小,默认64k。
    • writeBufferHighWaterMark : 高水位线,用于高并发情况下的限流,如果写入缓冲区中排队的字节数超过高水位线,Channel.isWritable()将开始返回false,默认64k。
    • writeBufferLowWaterMark : 低水位线,用于高并发情况下的限流,如果写入缓冲区中排队的字节数超过高水位线,然后下降到低水位线以下,Channel.isWritable()将再次开始返回true,默认32k。
    • serverSocketBacklog : 服务端接受连接的队列长度,如果队列已满,客户端连接将被拒绝,默认1024。PS : select最大支持1024个并发请求,epoll没有限制。
    • serverPooledByteBufAllocatorEnable : ByteBuffer是否开启缓存,默认是true。
    • useEpollNativeSelector : 是否开启epoll IO模型,Linux环境下建议开启。

小结

NamesrvConfig :

  • namesrv通过-c配置文件去加载配置属性的话,优先级是最高的,优先级第二是VM启动参数,最后才使用默认值。

NettyServerConfig :

  • Netty采用了单Reactor多线程的模式,把网络I/O线程和业务线程分离开来,将任务交给业务线程池异步执行降低了Netty的I/O线程的占用时间、减轻了压力,但同时业务线程池增加了线程上下文切换的次数,所以serverWorkerThreads参数调优也是个潜在问题。
  • Netty提供了很多网络参数,相关连接推荐 ==> Netty相关网络参数

初始化 Netty

加载完上面的配置参数后,就正式调用start()初始化Netty了 :


public static NamesrvController start(final NamesrvController controller) throws Exception {

        //①初始化Netty
        boolean initResult = controller.initialize();
        if (!initResult) {
            controller.shutdown();
            System.exit(-3);
        }
        //很好的一个编程规范,大致的意思就是注册一个JVM钩子函数,在JVM关闭之前,先将线程池关闭,及时释放资源
        Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, (Callable<Void>) () -> {
            controller.shutdown();
            return null;
        }));
        //②启动Netty
        controller.start();
        return controller;
    }

初始化Netty函数为 : initialize() :


public boolean initialize() {

        // 将磁盘上指定的 KV json文件载入到内存来,存储到KVConfigManager的configTable
        // 目前还没找到这个文件配置属性的作用,后期看到了再补上
        this.kvConfigManager.load();
        
        //① NettyRemotingServer 这个对象就是封装了Netty启动时的所需对象、环境
        // 第二个参数是监听器,监听、管理这些连接,例如与broker断开连接后需要清理broker的相关数据
        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
        
        //② 创建一个RemotingExecutor线程池,虽然使用了serverWorkerThreads的参数,但这个线程池并不是Reactor的工作线程池
        this.remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
        
        //③ 注册一下处理器,其实就是处理器和线程池绑定起来,即处理器的业务代码都是交给与之绑定的线程池来执行
        this.registerProcessor();   
        
        //④  定时器,延迟5s后执行,每10s执行一次
        this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.routeInfoManager::scanNotActiveBroker, 5, 10, TimeUnit.SECONDS);

        //⑤  定时器,延迟1s后执行,每10分钟执行一次
        this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.kvConfigManager::printAllPeriodically, 1, 10, TimeUnit.MINUTES);

        //下面是TSL相关的

        return true;
    }

Step① : 创建NettyRemotingServer对象,即初始化与Netty相关的变量,如ServerBootstrap、eventLoopGroupSelector、eventLoopGroupBoss、defaultEventExecutorGroup,还有一些监听器(channelEventListener)、公有线程池(publicExecutor)等,该类的父类是NettyRemotingAbstract,也有一些属性,比如与broker单向、异步调用时的基于信号量的并发度控制等等。

 #Step①
 public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
        final ChannelEventListener channelEventListener) {
        // 设置父类的2个信号量,与borker单向、异步调用的并发度控制相关
        super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
        // 创建Netty的相关变量
        this.serverBootstrap = new ServerBootstrap();
        this.nettyServerConfig = nettyServerConfig;
        // 创建监听连接的监听器
        this.channelEventListener = channelEventListener;
        // 计算公有线程池的线程数
        int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();
        if (publicThreadNums <= 0) {
            publicThreadNums = 4;
        }
        // 创建公有线程池 : NettyServerPublicExecutor
        this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
            private AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());
            }
        });
        // 判断是否使用Epoll : 如果是Linux系统 且 配置了开启使用Epool的属性
        if (useEpoll()) {
            // 创建1条Boss线程的线程池 : NettyEPOLLBoss
            this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet()));
                }
            });
            
            // 创建3条I/O线程的线程池 : NettyServerEPOLLSelector
            this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);
                private int threadTotal = nettyServerConfig.getServerSelectorThreads();

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
                }
            });
        } else {
            // 创建1条Boss线程的线程池 : NettyNIOBoss
            this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyNIOBoss_%d", this.threadIndex.incrementAndGet()));
                }
            });
            // 创建3条I/O线程的线程池 : NettyServerNIOSelector
            this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);
                private int threadTotal = nettyServerConfig.getServerSelectorThreads();

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
                }
            });
        }
        // SSL相关的
        loadSslContext();
    }

    // 判断依据 : 如果是Linux系统 且 配置了开启使用Epool的属性
    private boolean useEpoll() {
        return RemotingUtil.isLinuxPlatform()
            && nettyServerConfig.isUseEpollNativeSelector()
            && Epoll.isAvailable();
    }

Step②: 创建了8条线程的线程池,这个线程池会与默认处理器绑定,即默认处理器的业务代码执行会交给这个线程池执行,处理器不知道可以看看Step③。

#Step②
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);

Step③: 注册处理器,即设置父类NettyRemotingAbstract的defaultRequestProcessor属性,这个默认处理器DefaultRequestProcessor与Handler是有密切关系的。

    #Step③
    private void registerProcessor() {
         // 使用的是默认处理器DefaultRequestProcessor,这个处理器是与Handler有密切关系的,目前知道这一点就行
         // 这个处理器的业务代码执行交给remotingExecutor来执行
         this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);
    }

Step④: 定时器,延迟5s后执行,每10s执行一次,扫描哪些broker已经超过120s没联系了。

#Step④
this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.routeInfoManager::scanNotActiveBroker, 5, 10, TimeUnit.SECONDS);

// 扫描函数
public int scanNotActiveBroker() {
        int removeCount = 0;
        // 获取broker迭代器
        Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, BrokerLiveInfo> next = it.next();
            long last = next.getValue().getLastUpdateTimestamp();
            // 如果距离上次的心跳检测时间已经超过120s
            if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
                // 关闭连接
                RemotingUtil.closeChannel(next.getValue().getChannel());
                // 剔除该broker
                it.remove();
                log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
                // 删除broker、topic、queue等相关信息
                this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
                removeCount++;
            }
        }

        return removeCount;
    }

Step⑤: 定时器,延迟1s后执行,每10分钟执行一次,只是打印KV Json配置的参数信息。

#Step⑤
this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.kvConfigManager::printAllPeriodically, 1, 10, TimeUnit.MINUTES);

启动 Netty

initialize()初始化完Netty,接下来就是调用控制器的start()来启动Netty :


    public void start() throws Exception {
        // 核心看这里,真正的实现类是NettyRemotingServer
        this.remotingServer.start();
        // 这里开启了一条监听文件的后台线程,与TSL相关
        if (this.fileWatchService != null) {
            this.fileWatchService.start();
        }
    }
    
    
    public void start() {
        // 这个就是Reactoc的工作线程池 : NettyServerCodecThread
        this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
            nettyServerConfig.getServerWorkerThreads(),
            new ThreadFactory() {

                private AtomicInteger threadIndex = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
                }
            });
        //① 初始化Handler,Handler对于Netty来说是个很重要的概念
        prepareSharableHandlers();
        // Netty相关的参数配置
        ServerBootstrap childHandler =
            // 绑定Boss线程组、I/O线程组
            this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, nettyServerConfig.getServerSocketBacklog())
                .option(ChannelOption.SO_REUSEADDR, true)
                .option(ChannelOption.SO_KEEPALIVE, false)
                .childOption(ChannelOption.TCP_NODELAY, true)
                .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                // 把所有Handler载入到ChannelPipeline中
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline()
                            // Handler的代码由工作线程来执行
                            .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
                            .addLast(defaultEventExecutorGroup,
                                // 编码器Handler
                                encoder,
                                // 解码器Handler
                                new NettyDecoder(),
                                // 120s的心跳检测Handler
                                new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                                // 建立连接管理的Handler
                                connectionManageHandler,
                                // 处理器Handler,这个可以看到具有的请求、响应的代码执行逻辑
                                serverHandler
                            );
                    }
                });
        
        //这里都是一些Netty一些网络参数的配置....,不懂的可以结合上面给的链接去查找说明

        try {
            //异步启动
            ChannelFuture sync = this.serverBootstrap.bind().sync();
            InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
            this.port = addr.getPort();
        } catch (InterruptedException e1) {
            throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
        }

        //这个结合上面的监听器,就是上面创建NettyRemotingServer的时候传送构造器的第二个参数brokerHousekeepingService
        if (this.channelEventListener != null) {
            //② 开启一条后台线程执行监听连接
            this.nettyEventExecutor.start();
        }

        //每1s都去扫描response连接
        this.timer.scheduleAtFixedRate(new TimerTask() {
            @Override
            public void run() {
                try {
                    NettyRemotingServer.this.scanResponseTable();
                } catch (Throwable e) {
                    log.error("scanResponseTable exception", e);
                }
            }
        }, 1000 * 3, 1000);
    }

Step①: prepareSharableHandlers()初始化Handler :

   #Step①
   private void prepareSharableHandlers() {
        //TSL相关的Handler
        handshakeHandler = new HandshakeHandler(TlsSystemConfig.tlsMode);
        //编码器Handler
        encoder = new NettyEncoder();
        //连接管理Handler
        connectionManageHandler = new NettyConnectManageHandler();
        //处理请求、响应的Handler,也是最核心的一个
        serverHandler = new NettyServerHandler();
    }

Step②: 开启一条后台线程执行监听、管理连接,实际是父类NettyRemotingAbstract的NettyEventExecutor属性,是一个自己内部类的一个线程对象。这个线程超时(3s)从阻塞队列中获取一个连接事件来根据状态执行不同的操作 :

【PS : 下面的图画错了,不是每3s去获取任务,而是不断的循环拿任务,如果没有任务的情况下,会阻塞3s后返回null】

    #Step②:
    public void run() {
            log.info(this.getServiceName() + " service started");

            // 获取监听对象,就是上面创建NettyRemotingServer的时候传送构造器的第二个参数brokerHousekeepingService
            final ChannelEventListener listener = NettyRemotingAbstract.this.getChannelEventListener();

            while (!this.isStopped()) {
                try {
                    //超时3s从阻塞队列获取事件
                    NettyEvent event = this.eventQueue.poll(3000, TimeUnit.MILLISECONDS);
                    if (event != null && listener != null) {
                        switch (event.getType()) {
                            //根据类型执行不同操作...
                            //例如 listener.onChannelIdle(event.getRemoteAddr(), event.getChannel());
                        }
                    }
                } catch (Exception e) {
                    log.warn(this.getServiceName() + " service has exception. ", e);
                }
            }
            log.info(this.getServiceName() + " service end");
        }

到此,Netty的初始化以及启动就基本介绍完了,现在我们停下脚步,画个流程图总结一下 :

  • NamesrvController是核心 :
    • 配置类信息 : NamesrvConfig、NettyServerConfig。
    • Netty相关类 : NettyRemotingServer、NettyRemotingAbstract。
    • 监听管理broker类 : BrokerHousekeepingService,后台监听线程妹3s都会基于连接事件的不同状态去执行的相关broker操作,那么这个执行的相关broker操作就是通过BrokerHousekeepingService来执行。
    • ScheduledExecutorService : 定时线程池,每10s扫描下有没有超过120s没联系的broker,满足这个条件则剔除。
  • 对于Netty来说,NettyRemotingServer是核心 :
    • 单Reactor多线程 : 1条Boss线程、3条I/O线程、8条工作线程。
    • Handler : NettyServerHandler。

本问还有很多疑惑

  • broker与namesrv的注册、发现、路由是如何实现的?
  • namesrv是怎么存储broker这些元信息的?
  • 处理器为什么要跟RemotingExecutor绑定在一起?
  • 公有线程池(publicExecutor)一般都是处理什么?
  • 上图中没有被圈红的都是没串联起来或者说还没学到的点,上面4个问题能否串联起来?答案是可以的,下篇文章继续分析。

结束语

  • 原创不易
  • 希望看完这篇文章的你有所收获!

相关参考资料


目录