
前言
- 本文重点对Producer实现的client端的架构进行源码分析,以便理解Producer的发送消息API、本地缓存路由信息进行负载均衡、后台定时线程等特性。
- 本文基于 RocketMQ_V4_9_3 版本、JDK1.8的环境进行源码分析。
- RocketMQ源码github地址 :RocketMQ源码下载 ,对应的是client模块。
- 本文你可以了解到Producer的整体架构分析,即每个类的作用,包括每个类的核心属性的作用,在理解完整个架构后,我们后期对例如发送消息的API可以更好的深入学习。
Producer
RocketMQ中有生产者的角色,它与namesrv、broker都有着密切的关系,其最核心的任务就是发送消息到broker,相关特性有负载均衡、重试机制、同步、单向还是异步发送、甚至事务消息等等。
架构
以DefaultMQProducer的使用为例,如下 :
//① 创建DefaultMQProducer,加入到abin_test_group生产组中
DefaultMQProducer producer = new DefaultMQProducer("abin_test_group");
// 设置实例名称
producer.setInstanceName("abin-instance");
// 设置异步调用失败后的重试次数
producer.setRetryTimesWhenSendAsyncFailed(5);
// 设置namesrv的地址
producer.setNamesrvAddr("192.168.XX.XXX:9876");
try {
//② 启动
producer.start();
// 异步的方式循环发送100条消息到broker
for (int i = 1; i <= 100; i++) {
// 创建消息对象,绑定topic : abin_topic_test
Message msg = new Message("abin_topic_test", ("Hello,abin" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
//③ 异步发送,设置SendCallback回调对象
producer.send(msg, new SendCallback() {
// 发送成功的回调函数
@Override
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult.toString());
}
// 发送失败的回调函数
@Override
public void onException(Throwable e) {
e.printStackTrace();
}
});
}
} catch (MQClientException | InterruptedException | UnsupportedEncodingException | RemotingException e) {
e.printStackTrace();
}
//producer.shutdown();
PS : 上面是一个简单的发送消息使用例子,Step①、②是本文的重点分析,Step③发送消息将会独立一篇文章。
以DefaultMQProducer的使用为例,Producer的整体架构图如下 :

Producer采用了分层的思想,每一层的类都有各自的作用,在这里我们先简单说一下各个类的作用,然后再进行各自分析其实现细节 :
- DefaultMQProducer : 供我们使用的一个默认Producer类,提供了很多API给我们调用,例如对于Producer最重要的就是不同方式的send消息到broker的API,同时它也充当一个配置环境的作用,通过它启动前可以对一些参数进行配置、调优。
- DefaultMQProducerImpl : DefaultMQProducer的实现类,实现了DefaultMQProducer的API的逻辑代码,同时还负责创建实例(MQClientInstance)、异步调用线程池、缓存了topic路由信息等等。
- MQClientInstance : Producer有实例的概念,这对于整个架构来说是一个很重要的概念。启动一个DefaultMQProducer就是一个实例,同一个JVM进程可以初始化多个实例,但是要求名称不能相同;再来说说维护了哪些信息 :缓存了topic的路由信息、broker的地址以及主从信息、版本信息、后台定时线程任务、客户端远程调用服务端的的实现类(MQClientAPIImpl)、客户端的request、response处理器(ClientRemotingProcessor)、当然还有一些Consumer客户端相关的信息等等。
- PS : MQClientInstance这个类是Producer、Consumer共用的,所以会参杂这两个client的实现功能,本文就以Producer相关的信息为主;
- PS : 这里可以对比一下InnoDB存储引擎的Buffer Pool也支持初始化多个实例。
- MQClientAPIImpl : 主要与NettyRemotingClient打交道,暴露了远程处理服务端的实现API。
- NettyRemotingClient : 启动、初始化Netty,真正实现了远程处理服务端的API功能。
- PS : 这个类与namesrv实现的NettyRemotingServer是大同小异的,只是职责不同,一个是server一个是client。
接下来开始具体分析每个类的实现细节。
DefaultMQProducer
DefaultMQProducer是暴露给我们使用的,有很多不同的构造函数、各种API等来供我们使用,同时它还有一个配置属性的作用,我们先解释一些属性,然后再看构造器 :
# 继承了配置对象ClientConfig
public class DefaultMQProducer extends ClientConfig implements MQProducer {
// DefaultMQProducerImpl实现类,用组合的方式,通过构造器创建
protected final transient DefaultMQProducerImpl defaultMQProducerImpl;
// 生产组名称
private String producerGroup;
// 发送超时控制,默认3s
private int sendMsgTimeout = 3000;
// 同步调用失败时的重试次数,默认是2+1次,即3次
private int retryTimesWhenSendFailed = 2;
// 异步调用失败时的重试次数,默认是2次
private int retryTimesWhenSendAsyncFailed = 2;
// 同步调用失败时,是否开启选择其他broker进行重试,对一些重要的消息支持开启
private boolean retryAnotherBrokerWhenNotStoreOK = false;
// Producer调用的消息跟踪,对应的实现类是AsyncTraceDispatcher,其实也是一个实例
private TraceDispatcher traceDispatcher = null;
// 当消息内容(boby)达到4k时启动压缩
private int compressMsgBodyOverHowmuch = 1024 * 4;
// 一条消息最大4MB大小
private int maxMessageSize = 1024 * 1024 * 4;
// 一些函数...
}
public class ClientConfig {
// namesrc地址
private String namesrvAddr = NameServerAddressUtils.getNameServerAddresses();
// 客户端IP
private String clientIP = RemotingUtil.getLocalAddress();
// 实例名称,默认是DEFAULT
private String instanceName = System.getProperty("rocketmq.client.name", "DEFAULT");
// 健康心跳检测时间
private int heartbeatBrokerInterval = 1000 * 30;
// 超时时间
private int mqClientApiTimeout = 3 * 1000;
// 一些函数...
}
再来看看构造器 : DefaultMQProducer为我们提供了很多构造器,满参的构造器是下面这一个。
# namespace : namesrv地址
# producerGroup : 生产组名称
# rpcHook : RPC远程调用前、返回后可能需要执行一些命令
# enableMsgTrace : 是否开启消息跟踪,与上面的TraceDispatcher相关
# customizedTraceTopic : 是否配置消息跟踪的topic名称,不配置时使用默认的RMQ_SYS_TRACE_TOPIC
public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook,
boolean enableMsgTrace, final String customizedTraceTopic) {
this.namespace = namespace;
this.producerGroup = producerGroup;
// 创建DefaultMQProducerImpl
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
// 是否开启消息跟踪
if (enableMsgTrace) {
try {
// 其实也是个实例,读懂整篇文章后来看他就懂了
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(producerGroup, TraceDispatcher.Type.PRODUCE, customizedTraceTopic, rpcHook);
dispatcher.setHostProducer(this.defaultMQProducerImpl);
traceDispatcher = dispatcher;
this.defaultMQProducerImpl.registerSendMessageHook(
new SendMessageTraceHookImpl(traceDispatcher));
this.defaultMQProducerImpl.registerEndTransactionHook(
new EndTransactionTraceHookImpl(traceDispatcher));
} catch (Throwable e) {
log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
}
}
}
#我们使用的构造器
public DefaultMQProducer(final String producerGroup) {
this(null, producerGroup, null);
}
#最终调用,没有使用消息跟踪
public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) {
this.namespace = namespace;
this.producerGroup = producerGroup;
// 实现DefaultMQProducerImpl
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
}
总结一下DefaultMQProducer的作用 :
- 启动前支持对一些核心参数进行设置、配置;
- 创建DefaultMQProducerImpl;
- 提供API,例如start()启动服务、发送消息等API;
- 支持初始化并开启消息调用跟踪;
- 下图蓝色部分 :

DefaultMQProducerImpl
DefaultMQProducer的构造函数会创建它的实现类DefaultMQProducerImpl,这一点通过名称也可以看出来,DefaultMQProducer调用的API,这些API底层的逻辑代码实现其实都是DefaultMQProducerImpl来完成。
statr()启动服务
DefaultMQProducer通过调用statr()启动服务,实际上是调用了DefaultMQProducerImpl.start() :
public void start() throws MQClientException {
this.setProducerGroup(withNamespace(this.producerGroup));
// 调用DefaultMQProducerImpl.start() :
this.defaultMQProducerImpl.start();
// 判断是否开启消息跟踪
if (null != traceDispatcher) {
try {
// traceDispatcher其实也是一个实例
// 最终也是调用DefaultMQProducerImpl.start()
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
}
}
}
看start()之前,我们先看DefaultMQProducerImpl的几个重要的属性值和构造器 :
#属性
// 缓存了路由信息
// key : topic
// value : broker、queue的信息
// 这个有后台定时线程向namesrv获取,保持最新的数据
private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable =
new ConcurrentHashMap<String, TopicPublishInfo>();
// 维护了后台异步线程池 : 例如异步发送消息
private final ExecutorService defaultAsyncSenderExecutor;
// 实例
private MQClientInstance mQClientFactory;
// 负载均衡策略 : 选择一个MessageQueue来发送消息
private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy();
#构造器
public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer) {
this(defaultMQProducer, null);
}
public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) {
this.defaultMQProducer = defaultMQProducer;
this.rpcHook = rpcHook;
// 初始化工作队列
this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(50000);
// 初始化异步线程池,核心线程数和最大线程数为服务器的核数
this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.asyncSenderThreadPoolQueue,
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());
}
});
}
DefaultMQProducerImpl.start() :

创建实例
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);


MQClientManager : 一个JVM下管理着多个实例,初始化时保证每个实例的实例名称不同即可。
注册DefaultMQProducerImpl
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);

总结一下DefaultMQProducerImpl的作用 :
- DefaultMQProducer的实现类,实现了DefaultMQproducerImpl的API逻辑代码;
- 通过构造函数初始化异步线程池来供异步执行,例如异步发送消息,并且实现了默认的负载均衡策略(MQFaultStrategy);
- 创建、启动实例,并且同一个JVM下支持初始化多个实例,只要保证实例名称不相同即可;
- 维护了topic路由信息;
- 下图橙色部分 :

MQClientInstance
DefaultMQProducerImpl完成了对MQClientInstance实例的创建,并且调用实例的start()进行初始化实例。
看start()之前,我们先看MQClientInstance的几个重要的属性值和构造器 :
// 这个是存储了生产组与DefaultMQProducerImpl的映射,
private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();
// 这个是MQ客户端的真正的远程调用实现类
private final MQClientAPIImpl mQClientAPIImpl;
// 缓存了topic的路由信息
private final ConcurrentMap<String/* Topic */, TopicRouteData> topicRouteTable = new ConcurrentHashMap<String, TopicRouteData>();
// broker的集群地址信息
private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable =
new ConcurrentHashMap<String, HashMap<Long, String>>();
// broker的版本信息
private final ConcurrentMap<String/* Broker Name */, HashMap<String/* address */, Integer>> brokerVersionTable =
new ConcurrentHashMap<String, HashMap<String, Integer>>();
// 后台定时线程
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "MQClientFactoryScheduledThread");
}
});
// Netty接收到的request、response的处理器
private final ClientRemotingProcessor clientRemotingProcessor;
// 自己也创建了个DefaultMQProducerImpl
private final DefaultMQProducer defaultMQProducer;
#构造器
public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId) {
this(clientConfig, instanceIndex, clientId, null);
}
#DefaultMQProducerImpl是通过这个构造器创建的实例
public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {
this.clientConfig = clientConfig;
this.instanceIndex = instanceIndex;
// 创建Netty参数配置文件对象
this.nettyClientConfig = new NettyClientConfig();
this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());
this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());
// 创建Netty的request、response的处理器
this.clientRemotingProcessor = new ClientRemotingProcessor(this);
// 创建MQ客户端远程调用实现类,并将客户端处理器绑定到Netty
this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);
if (this.clientConfig.getNamesrvAddr() != null) {
this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());
log.info("user specified name server address: {}", this.clientConfig.getNamesrvAddr());
}
this.clientId = clientId;
// 下面几个与Producer无关的
this.mQAdminImpl = new MQAdminImpl(this);
this.pullMessageService = new PullMessageService(this);
this.rebalanceService = new RebalanceService(this);
// 创建内部的DefaultMQProducer,供自己使用
// 这个对象的初始化上面分析过了
// 定义为内部类型 : MixAll.CLIENT_INNER_PRODUCER_GROUP
this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);
// 设置配置文件对象的属性
this.defaultMQProducer.resetClientConfig(clientConfig);
// 与消费组有关的
this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);
log.info("Created a new client Instance, InstanceIndex:{}, ClientID:{}, ClientConfig:{}, ClientVersion:{}, SerializerType:{}",
this.instanceIndex,
this.clientId,
this.clientConfig,
MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION), RemotingCommand.getSerializeTypeConfigInThisServer());
}
MQClientInstance.start() :

启动后台定时任务
this.startScheduledTask();
- 定时任务1 : 如果没有配置namesrv地址,那么每2分钟就远程获取namesrc的地址,说明namesrc是支持动态配置的 :
if (null == this.clientConfig.getNamesrvAddr()) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
// 远程获取
MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
} catch (Exception e) {
log.error("ScheduledTask fetchNameServerAddr exception", e);
}
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}
- 定时任务2 : 每30s会根据topic更新路由信息,这里重点分析一下 :
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
// 更新路由信息
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
}
}
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);

#调用的是这个
public boolean updateTopicRouteInfoFromNameServer(final String topic) {
return updateTopicRouteInfoFromNameServer(topic, false, null);
}


在来看看namesrc的DefaultRequestProcessor处理器 :



最终DefaultMQProducerImpl的topicPublishInfoTable、MQClientInstance的topicRouteTable都缓存了topic路由信息。
- 定时任务3 : 定时更新broker地址、版本信息 :
#这个就不详细说明了
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
// 定时更新brokerAddrTable地址信息
MQClientInstance.this.cleanOfflineBroker();
// 定时与broker保持心跳检查,更新brokerVersionTable版本信息
MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
} catch (Exception e) {
log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
}
}
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
启动内部使用的DefaultMQProducerImpl
MQClientInstance自己也注册了DefaultMQProducerImpl,唯一的区别就是不再去创建、启动实例了。
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);

总结一下MQClientInstance的作用 :
- 创建MQClientAPIImpl,远程调用服务端是通过它来调用的;
- 缓存了topic路由信息、broker地址、版本信息;
- 定时线程更新DefaultMQProducerImpl的topicPublishInfoTable、MQClientInstance的topicRouteTable的topic路由信息,以及自己本地缓存的broker地址、版本信息;
- 注册了自己内部使用的DefaultMQProducerImpl;
- 启动MQClientAPIImpl;
- 下图紫色部分 :

MQClientAPIImpl
MQClientInstance实例通过构造器完成了对MQClientAPIImpl的创建,并且调用start()进行初始化Netty。
看start()之前,我们先看MQClientAPIImpl的几个重要的属性值和构造器 :
// 实现Netty客户端的对象
private final RemotingClient remotingClient;
// Netty对request、response的处理类
private final ClientRemotingProcessor clientRemotingProcessor;
// namesrv地址
private String nameSrvAddr = null;
#构造器
public MQClientAPIImpl(final NettyClientConfig nettyClientConfig,
final ClientRemotingProcessor clientRemotingProcessor,
RPCHook rpcHook, final ClientConfig clientConfig) {
this.clientConfig = clientConfig;
topAddressing = new TopAddressing(MixAll.getWSAddr(), clientConfig.getUnitName());
// 创建Netty客户端
this.remotingClient = new NettyRemotingClient(nettyClientConfig, null);
// 创建Netty客户端处理器
this.clientRemotingProcessor = clientRemotingProcessor;
this.remotingClient.registerRPCHook(rpcHook);
// 下面都是请求码和处理器、线程池的映射关系以及注册
this.remotingClient.registerProcessor(RequestCode.CHECK_TRANSACTION_STATE, this.clientRemotingProcessor, null);
this.remotingClient.registerProcessor(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, this.clientRemotingProcessor, null);
this.remotingClient.registerProcessor(RequestCode.RESET_CONSUMER_CLIENT_OFFSET, this.clientRemotingProcessor, null);
this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT, this.clientRemotingProcessor, null);
this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_RUNNING_INFO, this.clientRemotingProcessor, null);
this.remotingClient.registerProcessor(RequestCode.CONSUME_MESSAGE_DIRECTLY, this.clientRemotingProcessor, null);
this.remotingClient.registerProcessor(RequestCode.PUSH_REPLY_MESSAGE_TO_CLIENT, this.clientRemotingProcessor, null);
}

MQClientAPIImpl.start() :
public void start() {
// 启动Netty客户端
this.remotingClient.start();
}
总结一下MQClientAPIImpl的作用 :
- 创建Netty客户端对象NettyRemotingClient,真正的远程调用服务端是通过它来调用的;
- 根据请求码注册了处理器;
- 启动NettyRemotingClient,即启动Netty客户端;
- 下图红色部分 :

初始化、启动Netty客户端
MQClientAPIImpl完成了对NettyRemotingClient的创建,并且调用start()启动Netty客户端。
看start()之前,我们先看NettyRemotingClient的构造器 :
public NettyRemotingClient(final NettyClientConfig nettyClientConfig,
final ChannelEventListener channelEventListener) {
// 单向、异步的限流发送
super(nettyClientConfig.getClientOnewaySemaphoreValue(), nettyClientConfig.getClientAsyncSemaphoreValue());
this.nettyClientConfig = nettyClientConfig;
this.channelEventListener = channelEventListener;
// 公有线程池数量
int publicThreadNums = nettyClientConfig.getClientCallbackExecutorThreads();
if (publicThreadNums <= 0) {
publicThreadNums = 4;
}
// 初始化公有线程池
this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyClientPublicExecutor_" + this.threadIndex.incrementAndGet());
}
});
// Boss线程池
this.eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet()));
}
});
}
NettyRemotingClient.start() : 对于namesrc的NettyRemotingServer是差不多的,重点看看NettyClientHandler。

对于request、response的处理都是由NettyClientHandler来处理 : 这个方法在namesrc的文章已经分析过。

ClientRemotingProcessor.processRequest() : 发送请求。

ClientRemotingProcessor.processResponseCommand() : 处理响应,这个方法在namesrc已经解释过。
总结一下NettyRemotingClient的作用 :
- Netty客户端,由异步线程池负责处理ClientRemotingProcessor处理器的request、response逻辑代码;
- 下图绿色部分 :

总结
DefaultMQProducer支持同一JVM进程下进行多实例初始化,只要保证实例名称不相同即可。
本地缓存了topic路由信息、broker地址、版本信息,并且后台定时线程每2分钟会向namesrc拉取更新。
Netty客户端的request、response的逻辑代码是ClientRemotingProcessor,并且是异步处理的。
本文把DefaultMQProducer的整体架构分析完毕,主要涉及DefaultMQProducer、DefaultMQProducerImpl、MQClientInstance、MQClientAPIImpl、NettyRemotingClient。
后期会分析发送消息以及事务消息。
结束语
- 原创不易
- 希望看完这篇文章的你有所收获!
相关参考资料
- RocketMQ源码,V4_9_3 版本 ,对应的是client模块
- 《RocketMQ技术内幕》【书籍】