把学习当成一种习惯
选择往往大于努力,越努力越幸运

前言

  • 本文重点对Producer实现的client端的架构进行源码分析,以便理解Producer的发送消息API、本地缓存路由信息进行负载均衡、后台定时线程等特性。
  • 本文基于 RocketMQ_V4_9_3 版本、JDK1.8的环境进行源码分析。
  • RocketMQ源码github地址 :RocketMQ源码下载 ,对应的是client模块。
  • 本文你可以了解到Producer的整体架构分析,即每个类的作用,包括每个类的核心属性的作用,在理解完整个架构后,我们后期对例如发送消息的API可以更好的深入学习。

Producer

  RocketMQ中有生产者的角色,它与namesrv、broker都有着密切的关系,其最核心的任务就是发送消息到broker,相关特性有负载均衡、重试机制、同步、单向还是异步发送、甚至事务消息等等。

架构

以DefaultMQProducer的使用为例,如下 :

        //① 创建DefaultMQProducer,加入到abin_test_group生产组中
        DefaultMQProducer producer = new DefaultMQProducer("abin_test_group");
        // 设置实例名称
        producer.setInstanceName("abin-instance");
        // 设置异步调用失败后的重试次数
        producer.setRetryTimesWhenSendAsyncFailed(5);
        // 设置namesrv的地址
        producer.setNamesrvAddr("192.168.XX.XXX:9876");
        try {
            //② 启动
            producer.start();
            // 异步的方式循环发送100条消息到broker
            for (int i = 1; i <= 100; i++) {
                // 创建消息对象,绑定topic : abin_topic_test
                Message msg = new Message("abin_topic_test", ("Hello,abin" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                //③ 异步发送,设置SendCallback回调对象
                producer.send(msg, new SendCallback() {
                    // 发送成功的回调函数
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        System.out.println(sendResult.toString());
                    }
                    // 发送失败的回调函数
                    @Override
                    public void onException(Throwable e) {
                        e.printStackTrace();
                    }
                });
            }
        } catch (MQClientException | InterruptedException | UnsupportedEncodingException | RemotingException e) {
            e.printStackTrace();
        }
        //producer.shutdown();

PS : 上面是一个简单的发送消息使用例子,Step①、②是本文的重点分析,Step③发送消息将会独立一篇文章。

以DefaultMQProducer的使用为例,Producer的整体架构图如下 :

Producer采用了分层的思想,每一层的类都有各自的作用,在这里我们先简单说一下各个类的作用,然后再进行各自分析其实现细节 :

  • DefaultMQProducer : 供我们使用的一个默认Producer类,提供了很多API给我们调用,例如对于Producer最重要的就是不同方式的send消息到broker的API,同时它也充当一个配置环境的作用,通过它启动前可以对一些参数进行配置、调优。
  • DefaultMQProducerImpl : DefaultMQProducer的实现类,实现了DefaultMQProducer的API的逻辑代码,同时还负责创建实例(MQClientInstance)、异步调用线程池、缓存了topic路由信息等等。
  • MQClientInstance : Producer有实例的概念,这对于整个架构来说是一个很重要的概念。启动一个DefaultMQProducer就是一个实例,同一个JVM进程可以初始化多个实例,但是要求名称不能相同;再来说说维护了哪些信息 :缓存了topic的路由信息、broker的地址以及主从信息、版本信息、后台定时线程任务、客户端远程调用服务端的的实现类(MQClientAPIImpl)、客户端的request、response处理器(ClientRemotingProcessor)、当然还有一些Consumer客户端相关的信息等等。
    • PS : MQClientInstance这个类是Producer、Consumer共用的,所以会参杂这两个client的实现功能,本文就以Producer相关的信息为主;
    • PS : 这里可以对比一下InnoDB存储引擎的Buffer Pool也支持初始化多个实例。
  • MQClientAPIImpl : 主要与NettyRemotingClient打交道,暴露了远程处理服务端的实现API。
  • NettyRemotingClient : 启动、初始化Netty,真正实现了远程处理服务端的API功能。
    • PS : 这个类与namesrv实现的NettyRemotingServer是大同小异的,只是职责不同,一个是server一个是client。

接下来开始具体分析每个类的实现细节。

DefaultMQProducer

  DefaultMQProducer是暴露给我们使用的,有很多不同的构造函数、各种API等来供我们使用,同时它还有一个配置属性的作用,我们先解释一些属性,然后再看构造器 :

    # 继承了配置对象ClientConfig
    public class DefaultMQProducer extends ClientConfig implements MQProducer {
    // DefaultMQProducerImpl实现类,用组合的方式,通过构造器创建
    protected final transient DefaultMQProducerImpl defaultMQProducerImpl;
    // 生产组名称
    private String producerGroup;
    // 发送超时控制,默认3s
    private int sendMsgTimeout = 3000;
    // 同步调用失败时的重试次数,默认是2+1次,即3次
    private int retryTimesWhenSendFailed = 2;
    // 异步调用失败时的重试次数,默认是2次
    private int retryTimesWhenSendAsyncFailed = 2;
    // 同步调用失败时,是否开启选择其他broker进行重试,对一些重要的消息支持开启
    private boolean retryAnotherBrokerWhenNotStoreOK = false;
    // Producer调用的消息跟踪,对应的实现类是AsyncTraceDispatcher,其实也是一个实例
    private TraceDispatcher traceDispatcher = null;
    // 当消息内容(boby)达到4k时启动压缩
    private int compressMsgBodyOverHowmuch = 1024 * 4;
    // 一条消息最大4MB大小
    private int maxMessageSize = 1024 * 1024 * 4;
    // 一些函数...
    }
    public class ClientConfig {
    // namesrc地址
    private String namesrvAddr = NameServerAddressUtils.getNameServerAddresses();
    // 客户端IP
    private String clientIP = RemotingUtil.getLocalAddress();
    // 实例名称,默认是DEFAULT
    private String instanceName = System.getProperty("rocketmq.client.name", "DEFAULT");
    // 健康心跳检测时间
    private int heartbeatBrokerInterval = 1000 * 30;
    // 超时时间
    private int mqClientApiTimeout = 3 * 1000;
    
    // 一些函数...
    }

再来看看构造器 : DefaultMQProducer为我们提供了很多构造器,满参的构造器是下面这一个。

    # namespace : namesrv地址
    # producerGroup : 生产组名称
    # rpcHook : RPC远程调用前、返回后可能需要执行一些命令
    # enableMsgTrace : 是否开启消息跟踪,与上面的TraceDispatcher相关
    # customizedTraceTopic : 是否配置消息跟踪的topic名称,不配置时使用默认的RMQ_SYS_TRACE_TOPIC
    public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook,
        boolean enableMsgTrace, final String customizedTraceTopic) {
        this.namespace = namespace;
        this.producerGroup = producerGroup;
        // 创建DefaultMQProducerImpl
        defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
        // 是否开启消息跟踪
        if (enableMsgTrace) {
            try {
                // 其实也是个实例,读懂整篇文章后来看他就懂了
                AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(producerGroup, TraceDispatcher.Type.PRODUCE, customizedTraceTopic, rpcHook);
                dispatcher.setHostProducer(this.defaultMQProducerImpl);
                traceDispatcher = dispatcher;
                this.defaultMQProducerImpl.registerSendMessageHook(
                    new SendMessageTraceHookImpl(traceDispatcher));
                this.defaultMQProducerImpl.registerEndTransactionHook(
                    new EndTransactionTraceHookImpl(traceDispatcher));
            } catch (Throwable e) {
                log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
            }
        }
    }
    
    #我们使用的构造器
    public DefaultMQProducer(final String producerGroup) {
        this(null, producerGroup, null);
    }
    
    #最终调用,没有使用消息跟踪
    public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) {
        this.namespace = namespace;
        this.producerGroup = producerGroup;
        // 实现DefaultMQProducerImpl
        defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
    }

总结一下DefaultMQProducer的作用 :

  • 启动前支持对一些核心参数进行设置、配置;
  • 创建DefaultMQProducerImpl;
  • 提供API,例如start()启动服务、发送消息等API;
  • 支持初始化并开启消息调用跟踪;
  • 下图蓝色部分 :

DefaultMQProducerImpl

  DefaultMQProducer的构造函数会创建它的实现类DefaultMQProducerImpl,这一点通过名称也可以看出来,DefaultMQProducer调用的API,这些API底层的逻辑代码实现其实都是DefaultMQProducerImpl来完成。

statr()启动服务

  DefaultMQProducer通过调用statr()启动服务,实际上是调用了DefaultMQProducerImpl.start() :

    public void start() throws MQClientException {
        this.setProducerGroup(withNamespace(this.producerGroup));
        // 调用DefaultMQProducerImpl.start() :
        this.defaultMQProducerImpl.start();
        // 判断是否开启消息跟踪
        if (null != traceDispatcher) {
            try {
                // traceDispatcher其实也是一个实例
                // 最终也是调用DefaultMQProducerImpl.start()
                traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
            } catch (MQClientException e) {
                log.warn("trace dispatcher start failed ", e);
            }
        }
    }

  看start()之前,我们先看DefaultMQProducerImpl的几个重要的属性值和构造器 :

    #属性
    // 缓存了路由信息
    // key : topic
    // value : broker、queue的信息
    // 这个有后台定时线程向namesrv获取,保持最新的数据
    private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable =
        new ConcurrentHashMap<String, TopicPublishInfo>();
    // 维护了后台异步线程池 : 例如异步发送消息
    private final ExecutorService defaultAsyncSenderExecutor;
    // 实例
    private MQClientInstance mQClientFactory;
    // 负载均衡策略 : 选择一个MessageQueue来发送消息
    private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy();
    
    #构造器
    public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer) {
        this(defaultMQProducer, null);
    }
    
    public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) {
        this.defaultMQProducer = defaultMQProducer;
        this.rpcHook = rpcHook;
        // 初始化工作队列
        this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(50000);
        // 初始化异步线程池,核心线程数和最大线程数为服务器的核数
        this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(
            Runtime.getRuntime().availableProcessors(),
            Runtime.getRuntime().availableProcessors(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.asyncSenderThreadPoolQueue,
            new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());
                }
            });
    }
     

DefaultMQProducerImpl.start() :

创建实例

this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);

MQClientManager : 一个JVM下管理着多个实例,初始化时保证每个实例的实例名称不同即可。

注册DefaultMQProducerImpl

boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);

总结一下DefaultMQProducerImpl的作用 :

  • DefaultMQProducer的实现类,实现了DefaultMQproducerImpl的API逻辑代码;
  • 通过构造函数初始化异步线程池来供异步执行,例如异步发送消息,并且实现了默认的负载均衡策略(MQFaultStrategy);
  • 创建、启动实例,并且同一个JVM下支持初始化多个实例,只要保证实例名称不相同即可;
  • 维护了topic路由信息;
  • 下图橙色部分 :

MQClientInstance

  DefaultMQProducerImpl完成了对MQClientInstance实例的创建,并且调用实例的start()进行初始化实例。

  看start()之前,我们先看MQClientInstance的几个重要的属性值和构造器 :

    // 这个是存储了生产组与DefaultMQProducerImpl的映射,
    private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();
    // 这个是MQ客户端的真正的远程调用实现类
    private final MQClientAPIImpl mQClientAPIImpl;
    // 缓存了topic的路由信息
    private final ConcurrentMap<String/* Topic */, TopicRouteData> topicRouteTable = new ConcurrentHashMap<String, TopicRouteData>();
    // broker的集群地址信息
    private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable =
            new ConcurrentHashMap<String, HashMap<Long, String>>();
    // broker的版本信息  
    private final ConcurrentMap<String/* Broker Name */, HashMap<String/* address */, Integer>> brokerVersionTable =
            new ConcurrentHashMap<String, HashMap<String, Integer>>();
    // 后台定时线程
    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "MQClientFactoryScheduledThread");
        }
    });
    // Netty接收到的request、response的处理器
    private final ClientRemotingProcessor clientRemotingProcessor;
    // 自己也创建了个DefaultMQProducerImpl
    private final DefaultMQProducer defaultMQProducer;
    
    #构造器
    public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId) {
        this(clientConfig, instanceIndex, clientId, null);
    }
    
    #DefaultMQProducerImpl是通过这个构造器创建的实例
    public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {
        this.clientConfig = clientConfig;
        this.instanceIndex = instanceIndex;
        // 创建Netty参数配置文件对象
        this.nettyClientConfig = new NettyClientConfig();
        this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());
        this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());
        // 创建Netty的request、response的处理器
        this.clientRemotingProcessor = new ClientRemotingProcessor(this);
        // 创建MQ客户端远程调用实现类,并将客户端处理器绑定到Netty
        this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);
        
        if (this.clientConfig.getNamesrvAddr() != null) {
            this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());
            log.info("user specified name server address: {}", this.clientConfig.getNamesrvAddr());
        }
        
        this.clientId = clientId;
        // 下面几个与Producer无关的
        this.mQAdminImpl = new MQAdminImpl(this);

        this.pullMessageService = new PullMessageService(this);

        this.rebalanceService = new RebalanceService(this);
        // 创建内部的DefaultMQProducer,供自己使用
        // 这个对象的初始化上面分析过了
        // 定义为内部类型 : MixAll.CLIENT_INNER_PRODUCER_GROUP
        this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);
        // 设置配置文件对象的属性
        this.defaultMQProducer.resetClientConfig(clientConfig);
        // 与消费组有关的
        this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);

        log.info("Created a new client Instance, InstanceIndex:{}, ClientID:{}, ClientConfig:{}, ClientVersion:{}, SerializerType:{}",
            this.instanceIndex,
            this.clientId,
            this.clientConfig,
            MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION), RemotingCommand.getSerializeTypeConfigInThisServer());
    }
    

MQClientInstance.start() :

启动后台定时任务

this.startScheduledTask();    
  • 定时任务1 : 如果没有配置namesrv地址,那么每2分钟就远程获取namesrc的地址,说明namesrc是支持动态配置的 :
     if (null == this.clientConfig.getNamesrvAddr()) {
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                     // 远程获取
                     MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
                    } catch (Exception e) {
                        log.error("ScheduledTask fetchNameServerAddr exception", e);
                    }
                }
            }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
        }
  • 定时任务2 : 每30s会根据topic更新路由信息,这里重点分析一下 :
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    // 更新路由信息
                    MQClientInstance.this.updateTopicRouteInfoFromNameServer();
                } catch (Exception e) {
                    log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
                }
            }
        }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
    #调用的是这个
    public boolean updateTopicRouteInfoFromNameServer(final String topic) {
        return updateTopicRouteInfoFromNameServer(topic, false, null);
    }

在来看看namesrc的DefaultRequestProcessor处理器 :

  最终DefaultMQProducerImpl的topicPublishInfoTable、MQClientInstance的topicRouteTable都缓存了topic路由信息。

  • 定时任务3 : 定时更新broker地址、版本信息 :
    #这个就不详细说明了
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                try {
                    // 定时更新brokerAddrTable地址信息
                    MQClientInstance.this.cleanOfflineBroker();
                    // 定时与broker保持心跳检查,更新brokerVersionTable版本信息
                    MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
                } catch (Exception e) {
                    log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
                }
            }
        }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);

启动内部使用的DefaultMQProducerImpl

MQClientInstance自己也注册了DefaultMQProducerImpl,唯一的区别就是不再去创建、启动实例了。

this.defaultMQProducer.getDefaultMQProducerImpl().start(false);

总结一下MQClientInstance的作用 :

  • 创建MQClientAPIImpl,远程调用服务端是通过它来调用的;
  • 缓存了topic路由信息、broker地址、版本信息;
  • 定时线程更新DefaultMQProducerImpl的topicPublishInfoTable、MQClientInstance的topicRouteTable的topic路由信息,以及自己本地缓存的broker地址、版本信息;
  • 注册了自己内部使用的DefaultMQProducerImpl;
  • 启动MQClientAPIImpl;
  • 下图紫色部分 :

MQClientAPIImpl

  MQClientInstance实例通过构造器完成了对MQClientAPIImpl的创建,并且调用start()进行初始化Netty。

  看start()之前,我们先看MQClientAPIImpl的几个重要的属性值和构造器 :

// 实现Netty客户端的对象
private final RemotingClient remotingClient;
// Netty对request、response的处理类
private final ClientRemotingProcessor clientRemotingProcessor;
// namesrv地址
private String nameSrvAddr = null;

#构造器
public MQClientAPIImpl(final NettyClientConfig nettyClientConfig,
        final ClientRemotingProcessor clientRemotingProcessor,
        RPCHook rpcHook, final ClientConfig clientConfig) {
        this.clientConfig = clientConfig;
        topAddressing = new TopAddressing(MixAll.getWSAddr(), clientConfig.getUnitName());
        // 创建Netty客户端
        this.remotingClient = new NettyRemotingClient(nettyClientConfig, null);
        // 创建Netty客户端处理器
        this.clientRemotingProcessor = clientRemotingProcessor;

        this.remotingClient.registerRPCHook(rpcHook);
        
        // 下面都是请求码和处理器、线程池的映射关系以及注册
        this.remotingClient.registerProcessor(RequestCode.CHECK_TRANSACTION_STATE, this.clientRemotingProcessor, null);

        this.remotingClient.registerProcessor(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, this.clientRemotingProcessor, null);

        this.remotingClient.registerProcessor(RequestCode.RESET_CONSUMER_CLIENT_OFFSET, this.clientRemotingProcessor, null);

        this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT, this.clientRemotingProcessor, null);

        this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_RUNNING_INFO, this.clientRemotingProcessor, null);

        this.remotingClient.registerProcessor(RequestCode.CONSUME_MESSAGE_DIRECTLY, this.clientRemotingProcessor, null);

        this.remotingClient.registerProcessor(RequestCode.PUSH_REPLY_MESSAGE_TO_CLIENT, this.clientRemotingProcessor, null);
    }

MQClientAPIImpl.start() :

    public void start() {
        // 启动Netty客户端
        this.remotingClient.start();
    }

总结一下MQClientAPIImpl的作用 :

  • 创建Netty客户端对象NettyRemotingClient,真正的远程调用服务端是通过它来调用的;
  • 根据请求码注册了处理器;
  • 启动NettyRemotingClient,即启动Netty客户端;
  • 下图红色部分 :

初始化、启动Netty客户端

  MQClientAPIImpl完成了对NettyRemotingClient的创建,并且调用start()启动Netty客户端。

  看start()之前,我们先看NettyRemotingClient的构造器 :

    public NettyRemotingClient(final NettyClientConfig nettyClientConfig,
        final ChannelEventListener channelEventListener) {
        // 单向、异步的限流发送
        super(nettyClientConfig.getClientOnewaySemaphoreValue(), nettyClientConfig.getClientAsyncSemaphoreValue());
        this.nettyClientConfig = nettyClientConfig;
        this.channelEventListener = channelEventListener;
        // 公有线程池数量
        int publicThreadNums = nettyClientConfig.getClientCallbackExecutorThreads();
        if (publicThreadNums <= 0) {
            publicThreadNums = 4;
        }
        // 初始化公有线程池
        this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
            private AtomicInteger threadIndex = new AtomicInteger(0);
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "NettyClientPublicExecutor_" + this.threadIndex.incrementAndGet());
            }
        });
        // Boss线程池
        this.eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactory() {
            private AtomicInteger threadIndex = new AtomicInteger(0);
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet()));
            }
        });
    }

  NettyRemotingClient.start() : 对于namesrc的NettyRemotingServer是差不多的,重点看看NettyClientHandler。

对于request、response的处理都是由NettyClientHandler来处理 : 这个方法在namesrc的文章已经分析过。

ClientRemotingProcessor.processRequest() : 发送请求。

ClientRemotingProcessor.processResponseCommand() : 处理响应,这个方法在namesrc已经解释过。

总结一下NettyRemotingClient的作用 :

  • Netty客户端,由异步线程池负责处理ClientRemotingProcessor处理器的request、response逻辑代码;
  • 下图绿色部分 :

总结

  • DefaultMQProducer支持同一JVM进程下进行多实例初始化,只要保证实例名称不相同即可。

  • 本地缓存了topic路由信息、broker地址、版本信息,并且后台定时线程每2分钟会向namesrc拉取更新。

  • Netty客户端的request、response的逻辑代码是ClientRemotingProcessor,并且是异步处理的。

  • 本文把DefaultMQProducer的整体架构分析完毕,主要涉及DefaultMQProducer、DefaultMQProducerImpl、MQClientInstance、MQClientAPIImpl、NettyRemotingClient。

  • 后期会分析发送消息以及事务消息。

结束语

  • 原创不易
  • 希望看完这篇文章的你有所收获!

相关参考资料


目录