把学习当成一种习惯
选择往往大于努力,越努力越幸运

前言

  • 基于第一篇文章RocketMQ:Namesrv源码分析(一) ,我们已经大致知道了Namesrv的配置加载、Netty的初始化与启动、一些定时、后台线程以及属性的作用。
  • 本问在第一篇文章的基础上继续分析 :
    • namesrv上元信息的存储结构。
    • client端(例如broker、producer、cunsurmer)的request到达Namesrv后,以及response,Namesrv是如何处理。

元信息的存储结构

  Namesrv作为RocketMQ的"大脑",他的本地必须存储一些数据,例如topic、queue信息、broker的基本信息、集群信息、borker的状态(例如心跳检测)等等,这些数据我们这里称这为元信息,Namesrv通过RouteInfoManager来存储这些元信息,核心控制器(NamesrvController)就包含了这个对象属性。RouteInfoManager的数据结构如下图 :

  • topicQueueTable : topic消息队列信息表,存储了topic跟queue的映射关系,消息发送时可以基于这张表来进行负载均衡 : 这是一个双Map集合,因为一个topic有多个queue队列,一个broker默认是4个读队列、4个写队列。
    // topic消息队列信息表
    private final HashMap<String/* topic */, Map<String /* brokerName */ , QueueData>> topicQueueTable;
    // queue数据结构
    public class QueueData implements Comparable<QueueData> {
        // broker名称
        private String brokerName;
        // 读队列
        private int readQueueNums;
        // 写队列
        private int writeQueueNums;
        // 权限
        private int perm;
        //topic同步表示
        private int topicSysFlag;
    
        //get、set...
    }
  • brokerAddrTable : broker基本信息,包括属于哪个集群、集群中主备信息。PS : 多个broker可以组成Master-Slave集群,brokerId=0的就是Master,大于0的都是Slave.
  #brokerAddrTable
  private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
  #BrokerData
  public class BrokerData implements Comparable<BrokerData> {
    // 集群名称
    private String cluster;
    // broker名称
    private String brokerName;
    // 主备关系 : 0为Master,大于0是Slave
    private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;

    private final Random random = new Random();
    
    // 其他函数
    }
  • clusterAddrTable: 记录了每个集群下有哪些broker,如下图 :
    #clusterAddrTable
    private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
  • brokerLiveTable: broker的状态信息,记录了broker每次心跳检测的时间戳。
    #brokerLiveTable
    private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    
    class BrokerLiveInfo {
    
    // 上一次心跳检测的时间戳
    private long lastUpdateTimestamp;
    // 版本信息
    private DataVersion dataVersion;
    // 长连接Channel
    private Channel channel;
    // 如果是slave,那么这个属性存储的就是master的ip地址
    // 如果是master,那么这个属性存储的是master的IP + ":" + haListenPort(默认10912) 目前不知道为啥这么设计
    private String haServerAddr;
    
    }

元信息的存储结构相对简单,只要理清cluster、broker、topic、queue之间的关系就行。

Handler、处理器

  Handler上执行着真正的逻辑代码,所以对于namesrv的request或者response的处理,都会涉及到Handler上,而真正逻辑代码的实现,实际上是处理器。从NettyRemotingServer的start()可以看到具体的几个Hadnler :

  不同的Handler有各自的作用 : TSL相关的Handler、解码器Handler、编码器Handler、120s心跳检测Handler、连接管理Handler、对外提供服务Handler,其中连接管理(NettyConnectManageHandler)、和对外提供服务Handler(NettyServerHandler)是我们的重点关注。

  • NettyDecoder(解码) : 客户端通过request过来是byte类型,服务端需要将byte解码成RemotingCommand对象,这里的NettyDecoder就是负责将byte解码成RemotingCommand对象。
  • NettyEncoder(编码) : 服务端response结果给客户端也是基于byte传输,必须对RemotingCommand对象进行编码,这里的NettyEncoder就是负责编码成将RemotingCommand对象编码成byte传输。
  • IdleStateHandler(心跳检测) : Netty自带的一个心跳检测Handler,主要负责120s的心跳检测,具体的可以去深入了解。
  • NettyConnectManageHandler : 管理着不同状态的连接。
  • NettyServerHandler : 处理namesrv所有的request、response请求。

客户端与namesrv是基于RemotingCommand对象来交流的,RemotingCommand的几个重要属性 :

    public class RemotingCommand {
        // 默认是request类型
        private static final int RPC_TYPE = 0; // 0, REQUEST_COMMAND
        // 默认是单向请求类型
        private static final int RPC_ONEWAY = 1; // 0, RPC
        // 请求Id,递增的
        private int opaque = requestId.getAndIncrement();
        // 处理请求消息时根据这个字段来走不同的消息处理类
        private int code;
        // 自定义字段
        private HashMap<String, String> extFields;
        // 自定义头,不进行序列化
        private transient CommandCustomHeader customHeader;
        // 内容
        private transient byte[] body;
        //....
    }

  • RPC_ONEWAY做个说明 : Rocketmq对于request,支持三种方式,即同步(sync)、异步(async)、单向(oneway),前两种是双向的,即会收到response,而最后一种是单向的,不会收到response。
    • 同步(sync) : 同步等待调用,直到服务端处理完成后才返回结果,所以客户端会阻塞在服务端,服务端不会占用太多的系统资源;
    • 异步(async) : 异步调用,会立即返回,客户端不阻塞,服务端会注册回调任务;交给后台线程池去处理,所以需要限流保证系统资源
    • 单向(oneway) : 单向调用,会立即返回,客户端不阻塞,无需等待也无需注册回调任务,简单的说就是只管发,所以也需要限流保证系统资源;

NettyConnectManageHandler

  NettyConnectManageHandler : 从名字看出是管理着连接的一个Handler ,它会根据不同的连接状态封装为不同的NettyEvent,然后注册到父类的定时线程NettyEventExecutor的阻塞队列,NettyEventExecutor你可以理解为一个简易版的线程池。

将事件对象NettyEvent存储到阻塞队列中,并且后台线程NettyEventExecutor不断的循环获取 :

    #将事件对象NettyEvent添加到阻塞队列
    public void putNettyEvent(final NettyEvent event) {
        this.nettyEventExecutor.putNettyEvent(event);
    }

BrokerHousekeepingService : 不同的事件状态会执行不同的操作,统一BrokerHousekeepingService来管理 :

#获取监听管理对象BrokerHousekeepingService
final ChannelEventListener listener = NettyRemotingAbstract.this.getChannelEventListener();

最终是RouteInfoManager来执行元信息数据的更新 : onChannelDestroy()会去删除该broker相关的元信息数据,这里就不贴出来了。

现在,与上一篇的后台监听线程NettyEventExecutor、BrokerHousekeepingService的知识点就串联起来了 :

再看看下图的紫色部分 :

  小结一下 : NettyConnectManageHandler主要的作用就是来管理、监听这些连接的,会根据不同的触发事件,封装成事件对象供后台线程不断的轮询去获取这些事件对象,后台线程根据不同的事件方式调用给BrokerHousekeepingService去处理,BrokerHousekeepingService的职责就是跟RouteInfoManager打交道,即更新元信息数据。

NettyServerHandler

NettyServerHandler : 是最最核心的一个Handler了,它负责了所有request、response的逻辑代码的处理。

具体看processMessageReceived() :

    public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
        final RemotingCommand cmd = msg;
        if (cmd != null) {
            // 根据不同的请求类型执行不同的操作
            switch (cmd.getType()) {
                // 处理request类型
                case REQUEST_COMMAND:
                    processRequestCommand(ctx, cmd);
                    break;
                // 处理response类型
                case RESPONSE_COMMAND:
                    processResponseCommand(ctx, cmd);
                    break;
                default:
                    break;
            }
        }
    }

REQUEST_COMMAND : 处理request

    public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
        //① 根据请求code从processorTable获取处理器、线程池
        final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
        // 如果没获取到,则使用默认处理器
        // 这个上一篇文章有讲到默认处理器DefaultRequestProcessor与线程池RemotingExecutor的绑定
        final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
        // 获取请求id
        final int opaque = cmd.getOpaque();
        
        if (pair != null) {
            // 创建任务执行对象
            Runnable run = new Runnable() {
                @Override
                public void run() {
                    try {
                        String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
                        // 远程调用前一些参数的设置
                        doBeforeRpcHooks(remoteAddr, cmd);
                        // 创建回调实例
                        final RemotingResponseCallback callback = new RemotingResponseCallback() {
                            @Override
                            public void callback(RemotingCommand response) {
                                // 远程调用一些参数的设置
                                doAfterRpcHooks(remoteAddr, cmd, response);
                                //判断是否单向请求(RPC_ONEWAY),不是的话需要返回response
                                if (!cmd.isOnewayRPC()) {
                                    if (response != null) {
                                        response.setOpaque(opaque);
                                        // 需要响应,设置为response请求类型
                                        response.markResponseType();
                                        try {
                                            //响应
                                            ctx.writeAndFlush(response);
                                        } catch (Throwable e) {
                                            //一些错误日志打印...
                                        }
                                    } else {
                                    }
                                }
                            }
                        };
                        // 是否使用同步的处理器类型 : AsyncNettyRequestProcessor
                        if (pair.getObject1() instanceof AsyncNettyRequestProcessor) {
                            // 这个没有具体实现,目前还不清楚作用
                            AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1();
                            processor.asyncProcessRequest(ctx, cmd, callback);
                        } else {
                            // 使用默认处理器 DefaultRequestProcessor
                            // 先主要看看这个
                            NettyRequestProcessor processor = pair.getObject1();
                            //② 执行具体的逻辑代码
                            RemotingCommand response = processor.processRequest(ctx, cmd);
                            // 执行回调,即执行response
                            callback.callback(response);
                        }
                    } catch (Throwable e) {
                        //一些错误日志打印...
                        
                        // 同上
                        if (!cmd.isOnewayRPC()) {
                            final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
                                RemotingHelper.exceptionSimpleDesc(e));
                            response.setOpaque(opaque);
                            ctx.writeAndFlush(response);
                        }
                    }
                }
            };
            // 如果出现system busy,即拒绝请求
            if (pair.getObject1().rejectRequest()) {
                //创建新的响应对象,响应给client
                final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                    "[REJECTREQUEST]system busy, start flow control for a while");
                response.setOpaque(opaque);
                ctx.writeAndFlush(response);
                return;
            }

            try {
                // 创建异步执行任务对象
                final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
                //③ 交给线程池异步执行
                pair.getObject2().submit(requestTask);
            } catch (RejectedExecutionException e) {
                // 一些错误日志打印...
                
                //判断是否单向请求,不是的话需要返回response
                if (!cmd.isOnewayRPC()) {
                    final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                        "[OVERLOAD]system busy, start flow control for a while");
                    response.setOpaque(opaque);
                    ctx.writeAndFlush(response);
                }
            }
        } else {
            // 没有获取到处理器,返回失败报错
            String error = " request type " + cmd.getCode() + " not supported";
            final RemotingCommand response =
                RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
            response.setOpaque(opaque);
            ctx.writeAndFlush(response);
            log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
        }
    }
  • Step①: processorTable : 根据请求code存储了不同处理器对应不同的线程池,目前来看namesrc使用的是默认处理器DefaultRequestProcessor,并且没有注册进来,而是注册在了全局变量defaultRequestProcessor。
  • Step②: DefaultRequestProcessor : namesrv对于request是使用默认处理器,真正的实现代码函数是processRequest()。
  • Step③: namesrc对于request是交给线程池RemotingExecutor异步处理的。

DefaultRequestProcessor.processRequest() :

这里就不具体每个都去分析了,后期会根据不同的场景请求去分析。

现在,与上一篇的后台线程池RemotingExecutor、默认处理器DefaultRequestProcessor的知识点就串联起来了 :

再看看下图的橙色部分 :

  小结一下 : Netty通过NettyServerHandler实现了对request、response请求的处理,目前先总结request,对于request,namesrv是通过默认处理器DefaultRequestProcessor、后台线程池RemotingExexcutor去异步执行处理。

RESPONSE_COMMAND : 处理response

  在上面的request中,对于RemotingResponseCallback的回调函数的执行,会触发响应,即执行NettyServerHandler的RESPONSE_COMMAND。

    public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
        // 请求ID
        final int opaque = cmd.getOpaque();
        //① 从正在执行请求的集合(responseTable)中获取ResponseFuture
        final ResponseFuture responseFuture = responseTable.get(opaque);
        if (responseFuture != null) {
            responseFuture.setResponseCommand(cmd);

            responseTable.remove(opaque);
            // 是否有实现异步执行时的回调类InvokeCallback
            // 如果有,说明该请求是异步的请求方式,需要异步执行一些注册的回调对象
            if (responseFuture.getInvokeCallback() != null) {
                //②异步执行
                executeInvokeCallback(responseFuture);
            } else {
                // 否则无需异步执行一些注册的回调对象
                responseFuture.putResponse(cmd);
                //③释放资源,对应的NettyRemotingAbstract的semaphoreOneway
                //控制并发度,保护系统资源
                responseFuture.release();
            }
        } else {
           // 说明该请求是同步请求,直接返回
        }
    }

Step① : 从正在执行请求的集合(responseTable)中获取ResponseFuture。
ResponseFuture : response结果对象,具体的作用就是封装了需要response的连接,即正在执行同步或者异步的连接的response对象。

    #ResponseFuture
    public class ResponseFuture {
    //请求id
    private final int opaque;
    //连接管道
    private final Channel processChannel;
    //超时时间
    private final long timeoutMillis;
    //执行异步时必须注册的回调基类
    private final InvokeCallback invokeCallback;
    // 开始时间
    private final long beginTimestamp = System.currentTimeMillis();
    // 控制同步请求的超时时间
    private final CountDownLatch countDownLatch = new CountDownLatch(1);
    // 单向\异步请求时的并发度控制
    // 这里不能只看名称,其实异步调用时也是使用的这个基类。。。。
    private final SemaphoreReleaseOnlyOnce once;

    private final AtomicBoolean executeCallbackOnlyOnce = new AtomicBoolean(false);
    private volatile RemotingCommand responseCommand;
    private volatile boolean sendRequestOK = true;
    private volatile Throwable cause;
    //...
    }

  responseTable : responseTable的作用就是保存了正在请求(request)还没有处理response的ResponseFuture。那什么时候会封装这些正在请求还没有处理response的ResponseFuture放进responseTable呢,答案就是NettyRemotingAbstract实现的三种不同的请求方式 :

  • 同步 : invokeSyncImpl,没有注册回调对象InvokeCallback,是null,并且做了超时控制,同时也把ResponseFuture保存到responseTable :
  • 异步 : invokeAsyncImpl(),注册了回调对象InvokeCallback,并且做了限流,同时也把ResponseFuture保存到responseTable :
  • 单向 : invokeOnewayImpl(),做了限流,由于是单向调用,无需注册ResponseFuture :

Step② : executeInvokeCallback() : 基于上面的invokeAsyncImpl()注册的回调对象InvokeCallback的子类去异步执行。

    private void executeInvokeCallback(final ResponseFuture responseFuture) {
        boolean runInThisThread = false;
        // 获取publicExecutor线程池
        ExecutorService executor = this.getCallbackExecutor();
        if (executor != null) {
            try {
                // 异步提交
                executor.submit(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            //异步执行
                            responseFuture.executeInvokeCallback();
                        } catch (Throwable e) {
                            log.warn("execute callback in executor exception, and callback throw", e);
                        } finally {
                            //释放资源
                            responseFuture.release();
                        }
                    }
                });
            } catch (Exception e) {
                runInThisThread = true;
                log.warn("execute callback in executor exception, maybe executor busy", e);
            }
        } else {
            runInThisThread = true;
        }

        if (runInThisThread) {
            try {
                responseFuture.executeInvokeCallback();
            } catch (Throwable e) {
                log.warn("executeInvokeCallback Exception", e);
            } finally {
                responseFuture.release();
            }
        }
    }

另外这里提一下与responseTable相关的一条后台定时线程。

    #NettyRemotingServer.start()后会去启动这么一条定时线程
    # 每1s扫描一次
    this.timer.scheduleAtFixedRate(new TimerTask() {

            @Override
            public void run() {
                try {
                    // 看下图
                    NettyRemotingServer.this.scanResponseTable();
                } catch (Throwable e) {
                    log.error("scanResponseTable exception", e);
                }
            }
        }, 1000 * 3, 1000);

这么设计可以做到加快异步响应。

再看看下图的绿色部分 :

小结一下 :
  对于response类型的处理,需要分为三大类去处理,即同步、异步、单向,同步、单向请求无需注册回调任务,而异步请求需要注册回调任务对象(InvokeCallback)来异步执行回调响应到客户端。
  InvokeCallback的异步执行是交给publicExecutor线程池来异步处理。
  异步、单向都做了并发度控制,是因为在比较高并发的情况下,如果是异步请求,那么对于服务端来说可能会导致太多Channel同时request、response处理不过来导致拥堵,对于单向请求,那么瓶颈在于request,所以这两必须限流的,而对于同步请求,其实已经是用线程去控制了Channel了。

基于上篇文章的疑惑

  • broker与namesrv的注册、发现、路由是如何实现的?(独立到第三篇文章)
  • namesrv是怎么存储broker这些元信息的?(已分析)
  • 处理器为什么要跟RemotingExecutor绑定在一起?(异步处理request,分担Reactor工作线程的压力)
  • 公有线程池(publicExecutor)一般都是处理什么?(异步处理response,分担Reactor工作线程的压力)
  • 上图中没有被圈红的都是没串联起来或者说还没学到的点,上面4个问题能否串联起来?(基本串联起来了)

结束语

  • 原创不易
  • 希望看完这篇文章的你有所收获!

相关参考资料


目录