RocketMq源码解析四:生产者Producer启动

news2024/12/23 12:39:09

一、主要接口和类

        生产者服务核心接口和类的关系如下图所示:

        MQProducer是生产者解耦,这里找几个有代表性的方法

// 同步发送消息

SendResult send(final Message msg) throws MQClientException, RemotingException, MQBrokerException,InterruptedException;

// 同步超时发送消息 如果超过了timeout的时间就抛出异常

SendResult send(final Message msg, final long timeout) throws MQClientException,
RemotingException, MQBrokerException, InterruptedException;

// 异步发送消息

void send(final Message msg, final SendCallback sendCallback) throws MQClientException,RemotingException, InterruptedException;
// 异步超时发送消息 如果超过了timeout的时间就抛出异常
void send(final Message msg, final SendCallback sendCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException;

// 指定消息队列同步发送消息

SendResult send(final Message msg, final MessageQueue mq) throws MQClientException,
RemotingException, MQBrokerException, InterruptedException;

        DefaultMQProducer除了实现MQProducer的方法外,还继承了ClientConfig类,ClientConfig中主要记录了客户端的一些连接配置信息,我们重点看下DefaultMQProducer中有哪些核心属性

producerGroup:生产者所属组
createTopickey:默认Topic

defaultTopicQueueNums:默认主题在每一个Broker队列数量

sendMsgTimeout:发送消息默认超时时间,默认3s

compressMsgBodyOverHowmuch:消息体超过该值则启用压缩,默认4k

retryTimeswhensendFailed:同步方式发送消息重试次数,默认为2,总共执行3次retryTimeswhensendAsyncFailed:异步方法发送消息重试次数,默认为2

retryAnotherBrokerwhenNotstoreOK:消息重试时选择另外一个Broker时,是否不等待存储结果就返回,默认为false

maxMessagesize:允许发送的最大消息长度,默认为4M 

        我们看到 DefaultMQProducer中持有了一个transient 修饰的DefaultMQProducerImpl类的成员属性defaultMQProducerImpl,实际上核心的功能都封装在了这个DefaultMQProducerImpl类中,下面我们逐一来为读者展开说明。

二、生产者启动流程 

        我们先来看生产者启动的方法 DefaultMQProducer::start

    @Override
    public void start() throws MQClientException {
        this.setProducerGroup(withNamespace(this.producerGroup));
        this.defaultMQProducerImpl.start();
        if (null != traceDispatcher) {
            try {
                traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
            } catch (MQClientException e) {
                log.warn("trace dispatcher start failed ", e);
            }
        }
    }

        第一步是获取并设置生产者组的信息;

        第二步调用defaultMQProducerImpl的start方法,我们上文讲过DefaultMQProducer的大部分核心功能都是封装在DefaultMQProducerImpl类中。我们来看下defaultMQProducerImpl中的start方法:

        流程图如下:

        源码如下:

public void start(final boolean startFactory) throws MQClientException {
        switch (this.serviceState) {
            // 如果是启动
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                // 1、先检查一下配置
                this.checkConfig();
                // 2、设置自身的客户端名称为进程ID
                if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                    this.defaultMQProducer.changeInstanceNameToPID();
                }
                // 3、获取MQClientManager并获得MQClientInstance实例
                this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);

                // 4、把当前的生产者注入MQClientFactory中
                boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
                if (!registerOK) {
                    this.serviceState = ServiceState.CREATE_JUST;
                    throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
                }
                
                this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
                
                // 5、调用start方法启动
                if (startFactory) {
                    mQClientFactory.start();
                }

                log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                    this.defaultMQProducer.isSendMessageWithVIPChannel());
                this.serviceState = ServiceState.RUNNING;
                break;
            ...
        }
        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
        RequestFutureHolder.getInstance().startScheduledTask(this);
    }

       重点讲一下第3步:获取MQClientManager并获得MQClientInstance实例。整个JVM中只存在一个MQClienManager实例,维护一个MQClientInstance缓存表

ConcurrentMap<String/* clientld */, MQClientinstance> factoryTable = newConcurrentHashMap<String,MQClientlnstance>():

        同一个clientld只会创建一个MQClientInstance。MQClientinstance封装了RocketMQ网络处理API,是消息生产者和消息消费者与NameServer、Broker打交道的网络通道
        代码:MQClientManager::getAndCreateMQClientInstance 

    public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
        String clientId = clientConfig.buildMQClientId();
        MQClientInstance instance = this.factoryTable.get(clientId);
        if (null == instance) {
            instance =
                new MQClientInstance(clientConfig.cloneClientConfig(),
                    this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
            MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
            if (prev != null) {
                instance = prev;
                log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
            } else {
                log.info("Created new MQClientInstance for clientId:[{}]", clientId);
            }
        }

        return instance;
    }

a:构建客户端的id;

b:从缓存表factoryTable中获取对应clientId的实例;

c:如果没有就生成一个并放入到缓存表中;

d:返回 

        最后我们来看下mQClientFactory.start()当中的源码。

// 先把服务状态改为失败

this.serviceState = ServiceState.START_FAILED;
// 如果配置中的namesrv地址为空,重新获取
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel  启动一个netty服务用于处理请求
this.mQClientAPIImpl.start();
// Start various schedule tasks 启动一系列定时任务用于更新namesrv地址,topic消费情况等
this.startScheduledTask();
// Start pull service 启动拉取消息服务
this.pullMessageService.start();
// Start rebalance service 启动relalance服务
this.rebalanceService.start();
// Start push service 启动推送服务
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);

// 把服务状态改为运行中
this.serviceState = ServiceState.RUNNING;

        至此生产者启动流程已经讲述完毕。

        

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1700692.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

如何使用Cloudways搭建WordPress网站

如今&#xff0c;搭建网站已经变得非常简单&#xff0c;这主要得益于开源的CMS建站系统的兴起。即使是不懂编程的人也能轻松搭建自己的网站&#xff0c;这些CMS系统提供了丰富的主题模板和插件&#xff0c;使用户可以通过简单的拖放和配置操作来建立自己的网站。 WordPress是目…

mysql中单表查询的成本

大家好。我们知道MySQL在执行一个查询时&#xff0c;经常会有多个执行方案&#xff0c;然后从中选取成本最低或者说代价最低的方案去真正的执行查询。今天我们来聊一聊单表查询的成本。 那么到底什么是成本呢&#xff1f;这里我们说的成本或者代价是由两方面组成的&#xff1a…

全新交友盲盒+付费进群二合一源码 包含全套源码+教程

盲盒交友脱单系统源码&#xff0c;带教程&#xff0c;免授权这套源码已经替你们搭建测试过了 附带进群系统&#xff0c;定位是正常的 申明需要无限回调&#xff0c;没有回调的搭建出来不能用不要说源码不能用 全新系统方便大家使用&#xff0c;已经录制好详细的教程&#xf…

解决问题的多样手段:不止律师

在我们日常生活和工作中&#xff0c;总是会遇到各种各样的问题。有时我们会不由自主地想到找律师打官司&#xff0c;认为这是解决问题的唯一途径。然而&#xff0c;解决问题其实有很多手段&#xff0c;律师和法庭只是其中的一种。事实上&#xff0c;只要能够发现问题并及时解决…

大模型应用:LLM基本原理及应用场景

1.背景 23年以来&#xff0c;随着OpenAI公司的ChatGPT横空出世&#xff0c;大模型一词开始火爆全球。国内外以OpenAI、Google、百度、阿里、字节等大厂为代表&#xff0c;相继推出一系列大模型及其应用&#xff0c;涉及社交、问答、代码助手等多个方面。 目前主流的大模型及产…

从零构建vue3+ts+vite项目打包及项目依赖配置

❗️❗️❗️❗️ 写在最前: 本文是根据B站作者 月光分层 视频vuets 工程化配置以及作者笔记稍作整理 &#x1f496;&#x1f496;作者B站地址https://space.bilibili.com/14110850 &#x1f496;&#x1f496;视频教程地址vuets 工程化配置 &#x1f496;&#x1f496;作者微信…

自反馈 Transformer:一种针对真实世界胰腺神经内分泌肿瘤数据的多标签诊断模型

文章目录 Self-feedback Transformer: A Multi-label Diagnostic Model for Real-World Pancreatic Neuroendocrine Neoplasms Data摘要方法实验结果 Self-feedback Transformer: A Multi-label Diagnostic Model for Real-World Pancreatic Neuroendocrine Neoplasms Data 摘…

最早做“转化医学”的国货护肤品牌,发力了!

文章来自化妆品行业媒体青眼 作者小朱 放眼全球护肤市场&#xff0c;皮肤科学的力量正在前所未有地凸显&#xff0c;多个国际美妆巨头专门设立了皮肤科学部门&#xff0c;国内皮肤科医生参与护肤品牌创建也成为一股风潮。 据青眼不完全统计&#xff0c;近年来&#xff0c;至少…

使用阿里云服务器部署(完整步骤)

部署项目前需要环境&#xff1a;阿里云云服务器ECS&#xff0c;宝塔面板 阿里云云服务器ECS实例创建过程 先登录阿里云网站注册账号,进入控制台左侧导航栏中云服务器ECS页面根据自己的需求去创建一个新的实例&#xff08;需要付费&#xff09;如果是学生的话&#xff0c;完成…

粤嵌—2024/5/13—删除排序链表中的重复元素(✔)

代码实现&#xff1a; /*** Definition for singly-linked list.* struct ListNode {* int val;* struct ListNode *next;* };*/ struct ListNode* deleteDuplicates(struct ListNode *head) {if (head NULL || head->next NULL) {return head;}struct ListNode *…

FFmpeg开发笔记(三十一)使用RTMP Streamer开启APP直播推流

RTMP Streamer是一个安卓手机端的开源RTMP直播推流框架&#xff0c;可用于RTMP直播和RTSP直播&#xff0c;其升级版还支持SRT直播&#xff08;腾讯视频云就采用SRT协议&#xff09;。RTMP Streamer支持的视频编码包括H264、H265、AV1等等&#xff0c;支持的音频编码包括AAC、G7…

fastadmin二次开发 修改默认的前端弹出样式

需要修改fastadmin后台默认的弹出提示样式效果&#xff1a; 在项目里搜索这个关键词&#xff1a;Toastr 首先这个文件&#xff0c;里面的success和error就是弹出提示的方法。 public/assets/js/fast.js 然后是下面这个文件&#xff1a; public/assets/js/require-form.js 你…

ROS2入门21讲__第20讲__RQT:模块化可视化工具

目录 前言 rqt介绍 日志显示 图像显示 发布话题数据/调用服务请求 绘制数据曲线 数据包管理 节点可视化 前言 ROS中的Rviz功能已经很强大了&#xff0c;不过有些场景下&#xff0c;我们可能更需要一些简单的模块化的可视化工具&#xff0c;比如只显示一个摄像头的图像…

【北京市政府网_注册安全分析报告】

前言 由于网站注册入口容易被黑客攻击&#xff0c;存在如下安全问题&#xff1a; 暴力破解密码&#xff0c;造成用户信息泄露短信盗刷的安全问题&#xff0c;影响业务及导致用户投诉带来经济损失&#xff0c;尤其是后付费客户&#xff0c;风险巨大&#xff0c;造成亏损无底洞…

CDC 数据实时同步入湖的技术、架构和方案(截至2024年5月的现状调研)

近期&#xff0c;对 “实时摄取 CDC 数据同步到数据湖” 这一技术主题作了一系列深入的研究和验证&#xff0c;目前这部分工作已经告一段落&#xff0c;本文把截止目前&#xff08;2024年5月&#xff09;的研究结果和重要结论做一下梳理和汇总。为了能给出针对性的技术方案&…

深入分析 Android Activity (六)

文章目录 深入分析 Android Activity (六)1. Activity 的权限管理1.1 在 Manifest 文件中声明权限1.2 运行时请求权限1.3 处理权限请求结果1.4 处理权限的最佳实践 2. Activity 的数据传递2.1 使用 Intent 传递数据2.2 使用 Bundle 传递复杂数据 3. Activity 的动画和过渡效果3…

照片处理软件哪个好?爆款图片编辑工具分享

照片处理软件哪个好&#xff1f;在数字时代&#xff0c;照片处理软件已经成为我们日常生活和工作中不可或缺的工具。无论是为了美化照片、修复旧照&#xff0c;还是进行专业的图像处理&#xff0c;都有各种软件可以满足我们的需求。以下是一些值得一试的照片处理软件&#xff0…

海山数据库(He3DB)代理ProxySQL使用详解:(二)功能实测

读写分离实测 ProxySQL官方demo演示了三种读写分离的方式&#xff1a;使用不同的端口进行读写分离、使用正则表达式进行通用的读写分离、使用正则和digest进行更智能的读写分离。最后一种是针对特定业务进行的优化调整&#xff0c;也可将其归结为第二种方式&#xff0c;下边分…

IO流:字节流 字符流 缓冲流详解

IO流&#xff1a;字节流 字符流 缓冲流详解 &#x1f4da; 【Java】IO流&#xff1a;字节流 字符流 缓冲流详解 &#x1f4da;摘要引言一、“流”的概念1. “流”的分类1.1 输入流和输出流1.2 字节流和字符流字节和字符的区别&#xff1f;为什么要有字符流&#xff1f; 1.3 节点…