
前言
- 基于第一篇文章RocketMQ:Namesrv源码分析(一) ,我们已经大致知道了Namesrv的配置加载、Netty的初始化与启动、一些定时、后台线程以及属性的作用。
- 本问在第一篇文章的基础上继续分析 :
- namesrv上元信息的存储结构。
- client端(例如broker、producer、cunsurmer)的request到达Namesrv后,以及response,Namesrv是如何处理。
元信息的存储结构
Namesrv作为RocketMQ的"大脑",他的本地必须存储一些数据,例如topic、queue信息、broker的基本信息、集群信息、borker的状态(例如心跳检测)等等,这些数据我们这里称这为元信息,Namesrv通过RouteInfoManager来存储这些元信息,核心控制器(NamesrvController)就包含了这个对象属性。RouteInfoManager的数据结构如下图 :

- topicQueueTable : topic消息队列信息表,存储了topic跟queue的映射关系,消息发送时可以基于这张表来进行负载均衡 : 这是一个双Map集合,因为一个topic有多个queue队列,一个broker默认是4个读队列、4个写队列。
// topic消息队列信息表
private final HashMap<String/* topic */, Map<String /* brokerName */ , QueueData>> topicQueueTable;
// queue数据结构
public class QueueData implements Comparable<QueueData> {
// broker名称
private String brokerName;
// 读队列
private int readQueueNums;
// 写队列
private int writeQueueNums;
// 权限
private int perm;
//topic同步表示
private int topicSysFlag;
//get、set...
}

- brokerAddrTable : broker基本信息,包括属于哪个集群、集群中主备信息。PS : 多个broker可以组成Master-Slave集群,brokerId=0的就是Master,大于0的都是Slave.
#brokerAddrTable
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
#BrokerData
public class BrokerData implements Comparable<BrokerData> {
// 集群名称
private String cluster;
// broker名称
private String brokerName;
// 主备关系 : 0为Master,大于0是Slave
private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;
private final Random random = new Random();
// 其他函数
}

- clusterAddrTable: 记录了每个集群下有哪些broker,如下图 :
#clusterAddrTable
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;

- brokerLiveTable: broker的状态信息,记录了broker每次心跳检测的时间戳。
#brokerLiveTable
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
class BrokerLiveInfo {
// 上一次心跳检测的时间戳
private long lastUpdateTimestamp;
// 版本信息
private DataVersion dataVersion;
// 长连接Channel
private Channel channel;
// 如果是slave,那么这个属性存储的就是master的ip地址
// 如果是master,那么这个属性存储的是master的IP + ":" + haListenPort(默认10912) 目前不知道为啥这么设计
private String haServerAddr;
}

元信息的存储结构相对简单,只要理清cluster、broker、topic、queue之间的关系就行。
Handler、处理器
Handler上执行着真正的逻辑代码,所以对于namesrv的request或者response的处理,都会涉及到Handler上,而真正逻辑代码的实现,实际上是处理器。从NettyRemotingServer的start()可以看到具体的几个Hadnler :

不同的Handler有各自的作用 : TSL相关的Handler、解码器Handler、编码器Handler、120s心跳检测Handler、连接管理Handler、对外提供服务Handler,其中连接管理(NettyConnectManageHandler)、和对外提供服务Handler(NettyServerHandler)是我们的重点关注。
- NettyDecoder(解码) : 客户端通过request过来是byte类型,服务端需要将byte解码成RemotingCommand对象,这里的NettyDecoder就是负责将byte解码成RemotingCommand对象。
- NettyEncoder(编码) : 服务端response结果给客户端也是基于byte传输,必须对RemotingCommand对象进行编码,这里的NettyEncoder就是负责编码成将RemotingCommand对象编码成byte传输。
- IdleStateHandler(心跳检测) : Netty自带的一个心跳检测Handler,主要负责120s的心跳检测,具体的可以去深入了解。
- NettyConnectManageHandler : 管理着不同状态的连接。
- NettyServerHandler : 处理namesrv所有的request、response请求。
客户端与namesrv是基于RemotingCommand对象来交流的,RemotingCommand的几个重要属性 :
public class RemotingCommand {
// 默认是request类型
private static final int RPC_TYPE = 0; // 0, REQUEST_COMMAND
// 默认是单向请求类型
private static final int RPC_ONEWAY = 1; // 0, RPC
// 请求Id,递增的
private int opaque = requestId.getAndIncrement();
// 处理请求消息时根据这个字段来走不同的消息处理类
private int code;
// 自定义字段
private HashMap<String, String> extFields;
// 自定义头,不进行序列化
private transient CommandCustomHeader customHeader;
// 内容
private transient byte[] body;
//....
}
- RPC_ONEWAY做个说明 : Rocketmq对于request,支持三种方式,即同步(sync)、异步(async)、单向(oneway),前两种是双向的,即会收到response,而最后一种是单向的,不会收到response。
- 同步(sync) : 同步等待调用,直到服务端处理完成后才返回结果,所以客户端会阻塞在服务端,服务端不会占用太多的系统资源;
- 异步(async) : 异步调用,会立即返回,客户端不阻塞,服务端会注册回调任务;交给后台线程池去处理,所以需要限流保证系统资源
- 单向(oneway) : 单向调用,会立即返回,客户端不阻塞,无需等待也无需注册回调任务,简单的说就是只管发,所以也需要限流保证系统资源;
NettyConnectManageHandler
NettyConnectManageHandler : 从名字看出是管理着连接的一个Handler ,它会根据不同的连接状态封装为不同的NettyEvent,然后注册到父类的定时线程NettyEventExecutor的阻塞队列,NettyEventExecutor你可以理解为一个简易版的线程池。

将事件对象NettyEvent存储到阻塞队列中,并且后台线程NettyEventExecutor不断的循环获取 :
#将事件对象NettyEvent添加到阻塞队列
public void putNettyEvent(final NettyEvent event) {
this.nettyEventExecutor.putNettyEvent(event);
}

BrokerHousekeepingService : 不同的事件状态会执行不同的操作,统一BrokerHousekeepingService来管理 :
#获取监听管理对象BrokerHousekeepingService
final ChannelEventListener listener = NettyRemotingAbstract.this.getChannelEventListener();

最终是RouteInfoManager来执行元信息数据的更新 : onChannelDestroy()会去删除该broker相关的元信息数据,这里就不贴出来了。
现在,与上一篇的后台监听线程NettyEventExecutor、BrokerHousekeepingService的知识点就串联起来了 :

再看看下图的紫色部分 :

小结一下 : NettyConnectManageHandler主要的作用就是来管理、监听这些连接的,会根据不同的触发事件,封装成事件对象供后台线程不断的轮询去获取这些事件对象,后台线程根据不同的事件方式调用给BrokerHousekeepingService去处理,BrokerHousekeepingService的职责就是跟RouteInfoManager打交道,即更新元信息数据。
NettyServerHandler
NettyServerHandler : 是最最核心的一个Handler了,它负责了所有request、response的逻辑代码的处理。

具体看processMessageReceived() :
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
final RemotingCommand cmd = msg;
if (cmd != null) {
// 根据不同的请求类型执行不同的操作
switch (cmd.getType()) {
// 处理request类型
case REQUEST_COMMAND:
processRequestCommand(ctx, cmd);
break;
// 处理response类型
case RESPONSE_COMMAND:
processResponseCommand(ctx, cmd);
break;
default:
break;
}
}
}
REQUEST_COMMAND : 处理request
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
//① 根据请求code从processorTable获取处理器、线程池
final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
// 如果没获取到,则使用默认处理器
// 这个上一篇文章有讲到默认处理器DefaultRequestProcessor与线程池RemotingExecutor的绑定
final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
// 获取请求id
final int opaque = cmd.getOpaque();
if (pair != null) {
// 创建任务执行对象
Runnable run = new Runnable() {
@Override
public void run() {
try {
String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
// 远程调用前一些参数的设置
doBeforeRpcHooks(remoteAddr, cmd);
// 创建回调实例
final RemotingResponseCallback callback = new RemotingResponseCallback() {
@Override
public void callback(RemotingCommand response) {
// 远程调用一些参数的设置
doAfterRpcHooks(remoteAddr, cmd, response);
//判断是否单向请求(RPC_ONEWAY),不是的话需要返回response
if (!cmd.isOnewayRPC()) {
if (response != null) {
response.setOpaque(opaque);
// 需要响应,设置为response请求类型
response.markResponseType();
try {
//响应
ctx.writeAndFlush(response);
} catch (Throwable e) {
//一些错误日志打印...
}
} else {
}
}
}
};
// 是否使用同步的处理器类型 : AsyncNettyRequestProcessor
if (pair.getObject1() instanceof AsyncNettyRequestProcessor) {
// 这个没有具体实现,目前还不清楚作用
AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1();
processor.asyncProcessRequest(ctx, cmd, callback);
} else {
// 使用默认处理器 DefaultRequestProcessor
// 先主要看看这个
NettyRequestProcessor processor = pair.getObject1();
//② 执行具体的逻辑代码
RemotingCommand response = processor.processRequest(ctx, cmd);
// 执行回调,即执行response
callback.callback(response);
}
} catch (Throwable e) {
//一些错误日志打印...
// 同上
if (!cmd.isOnewayRPC()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
RemotingHelper.exceptionSimpleDesc(e));
response.setOpaque(opaque);
ctx.writeAndFlush(response);
}
}
}
};
// 如果出现system busy,即拒绝请求
if (pair.getObject1().rejectRequest()) {
//创建新的响应对象,响应给client
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"[REJECTREQUEST]system busy, start flow control for a while");
response.setOpaque(opaque);
ctx.writeAndFlush(response);
return;
}
try {
// 创建异步执行任务对象
final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
//③ 交给线程池异步执行
pair.getObject2().submit(requestTask);
} catch (RejectedExecutionException e) {
// 一些错误日志打印...
//判断是否单向请求,不是的话需要返回response
if (!cmd.isOnewayRPC()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"[OVERLOAD]system busy, start flow control for a while");
response.setOpaque(opaque);
ctx.writeAndFlush(response);
}
}
} else {
// 没有获取到处理器,返回失败报错
String error = " request type " + cmd.getCode() + " not supported";
final RemotingCommand response =
RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
response.setOpaque(opaque);
ctx.writeAndFlush(response);
log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
}
}
- Step①: processorTable : 根据请求code存储了不同处理器对应不同的线程池,目前来看namesrc使用的是默认处理器DefaultRequestProcessor,并且没有注册进来,而是注册在了全局变量defaultRequestProcessor。
- Step②: DefaultRequestProcessor : namesrv对于request是使用默认处理器,真正的实现代码函数是processRequest()。
- Step③: namesrc对于request是交给线程池RemotingExecutor异步处理的。
DefaultRequestProcessor.processRequest() :

这里就不具体每个都去分析了,后期会根据不同的场景请求去分析。
现在,与上一篇的后台线程池RemotingExecutor、默认处理器DefaultRequestProcessor的知识点就串联起来了 :

再看看下图的橙色部分 :

小结一下 : Netty通过NettyServerHandler实现了对request、response请求的处理,目前先总结request,对于request,namesrv是通过默认处理器DefaultRequestProcessor、后台线程池RemotingExexcutor去异步执行处理。
RESPONSE_COMMAND : 处理response
在上面的request中,对于RemotingResponseCallback的回调函数的执行,会触发响应,即执行NettyServerHandler的RESPONSE_COMMAND。
public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
// 请求ID
final int opaque = cmd.getOpaque();
//① 从正在执行请求的集合(responseTable)中获取ResponseFuture
final ResponseFuture responseFuture = responseTable.get(opaque);
if (responseFuture != null) {
responseFuture.setResponseCommand(cmd);
responseTable.remove(opaque);
// 是否有实现异步执行时的回调类InvokeCallback
// 如果有,说明该请求是异步的请求方式,需要异步执行一些注册的回调对象
if (responseFuture.getInvokeCallback() != null) {
//②异步执行
executeInvokeCallback(responseFuture);
} else {
// 否则无需异步执行一些注册的回调对象
responseFuture.putResponse(cmd);
//③释放资源,对应的NettyRemotingAbstract的semaphoreOneway
//控制并发度,保护系统资源
responseFuture.release();
}
} else {
// 说明该请求是同步请求,直接返回
}
}
Step① : 从正在执行请求的集合(responseTable)中获取ResponseFuture。
ResponseFuture : response结果对象,具体的作用就是封装了需要response的连接,即正在执行同步或者异步的连接的response对象。
#ResponseFuture
public class ResponseFuture {
//请求id
private final int opaque;
//连接管道
private final Channel processChannel;
//超时时间
private final long timeoutMillis;
//执行异步时必须注册的回调基类
private final InvokeCallback invokeCallback;
// 开始时间
private final long beginTimestamp = System.currentTimeMillis();
// 控制同步请求的超时时间
private final CountDownLatch countDownLatch = new CountDownLatch(1);
// 单向\异步请求时的并发度控制
// 这里不能只看名称,其实异步调用时也是使用的这个基类。。。。
private final SemaphoreReleaseOnlyOnce once;
private final AtomicBoolean executeCallbackOnlyOnce = new AtomicBoolean(false);
private volatile RemotingCommand responseCommand;
private volatile boolean sendRequestOK = true;
private volatile Throwable cause;
//...
}
responseTable : responseTable的作用就是保存了正在请求(request)还没有处理response的ResponseFuture。那什么时候会封装这些正在请求还没有处理response的ResponseFuture放进responseTable呢,答案就是NettyRemotingAbstract实现的三种不同的请求方式 :
- 同步 : invokeSyncImpl,没有注册回调对象InvokeCallback,是null,并且做了超时控制,同时也把ResponseFuture保存到responseTable :

- 异步 : invokeAsyncImpl(),注册了回调对象InvokeCallback,并且做了限流,同时也把ResponseFuture保存到responseTable :

- 单向 : invokeOnewayImpl(),做了限流,由于是单向调用,无需注册ResponseFuture :

Step② : executeInvokeCallback() : 基于上面的invokeAsyncImpl()注册的回调对象InvokeCallback的子类去异步执行。
private void executeInvokeCallback(final ResponseFuture responseFuture) {
boolean runInThisThread = false;
// 获取publicExecutor线程池
ExecutorService executor = this.getCallbackExecutor();
if (executor != null) {
try {
// 异步提交
executor.submit(new Runnable() {
@Override
public void run() {
try {
//异步执行
responseFuture.executeInvokeCallback();
} catch (Throwable e) {
log.warn("execute callback in executor exception, and callback throw", e);
} finally {
//释放资源
responseFuture.release();
}
}
});
} catch (Exception e) {
runInThisThread = true;
log.warn("execute callback in executor exception, maybe executor busy", e);
}
} else {
runInThisThread = true;
}
if (runInThisThread) {
try {
responseFuture.executeInvokeCallback();
} catch (Throwable e) {
log.warn("executeInvokeCallback Exception", e);
} finally {
responseFuture.release();
}
}
}
另外这里提一下与responseTable相关的一条后台定时线程。
#NettyRemotingServer.start()后会去启动这么一条定时线程
# 每1s扫描一次
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
// 看下图
NettyRemotingServer.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
这么设计可以做到加快异步响应。

再看看下图的绿色部分 :

小结一下 :
对于response类型的处理,需要分为三大类去处理,即同步、异步、单向,同步、单向请求无需注册回调任务,而异步请求需要注册回调任务对象(InvokeCallback)来异步执行回调响应到客户端。
InvokeCallback的异步执行是交给publicExecutor线程池来异步处理。
异步、单向都做了并发度控制,是因为在比较高并发的情况下,如果是异步请求,那么对于服务端来说可能会导致太多Channel同时request、response处理不过来导致拥堵,对于单向请求,那么瓶颈在于request,所以这两必须限流的,而对于同步请求,其实已经是用线程去控制了Channel了。
基于上篇文章的疑惑
- broker与namesrv的注册、发现、路由是如何实现的?(独立到第三篇文章)
- namesrv是怎么存储broker这些元信息的?(已分析)
- 处理器为什么要跟RemotingExecutor绑定在一起?(异步处理request,分担Reactor工作线程的压力)
- 公有线程池(publicExecutor)一般都是处理什么?(异步处理response,分担Reactor工作线程的压力)
- 上图中没有被圈红的都是没串联起来或者说还没学到的点,上面4个问题能否串联起来?(基本串联起来了)
结束语
- 原创不易
- 希望看完这篇文章的你有所收获!
相关参考资料
- RocketMQ源码,V4_9_3 版本 ,对应的是namesrv模块
- 《RocketMQ技术内幕》【书籍】