rocketmq源码-producer启动流程

news2025/1/12 10:53:10

前言

DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("127.0.0.1:9876");

producer.setNamesrvAddr("127.0.0.1:9876");

producer.start();

创建、启动producer的逻辑,是写在业务系统中的,根据rocketmq源码中的demo,只需要这几行代码,就可以启动producer
这几行代码的逻辑,也比较简单,就是创建一个producer对象,创建对象时,需要指定producerGroup’然后设置nameSrv地址;最后调用其start()方法即可
所以,启动producer最为核心的代码,是在start()方法中

在这里插入图片描述
在start()方法中,最重要的是mqClientFactory.start()方法
在这里插入图片描述

在mqClientFactory.start()方法中,有几个关键的点
在这里插入图片描述

  1. 如果内存中没有nameSrv的地址信息,手动去查询一次,更新内存
  2. 启动nettyClient
  3. 启动了一些定时任务
  4. 启动拉取消息和负载均衡的service,但是这两个service理论上,对于producer是不生效的,因为这两个service是consumer在启动的时候会用到,这个start()方法,producer和consumer在启动的时候,共用的

我们来看启动的定时任务有哪些?

private void startScheduledTask() {
    /**
     * 定时拉取nameServer的地址信息到内存中
     * 每2分钟执行一次
     */
    if (null == this.clientConfig.getNamesrvAddr()) {
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                try {
                    MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
                } catch (Exception e) {
                    log.error("ScheduledTask fetchNameServerAddr exception", e);
                }
            }
        }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
    }

    /**
     * 从nameServer更新topic的路由信息
     * 更新topic以及对应的broker信息到内存中
     */
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {
                MQClientInstance.this.updateTopicRouteInfoFromNameServer();
            } catch (Exception e) {
                log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
            }
        }
    }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);

    /**
     * 清理已经下线的broker
     * 定时发送心跳到所有的broker
     */
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {
                MQClientInstance.this.cleanOfflineBroker();
                MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
            } catch (Exception e) {
                log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
            }
        }
    }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);

    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {
                MQClientInstance.this.persistAllConsumerOffset();
            } catch (Exception e) {
                log.error("ScheduledTask persistAllConsumerOffset exception", e);
            }
        }
    }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {
                MQClientInstance.this.adjustThreadPool();
            } catch (Exception e) {
                log.error("ScheduledTask adjustThreadPool exception", e);
            }
        }
    }, 1, 1, TimeUnit.MINUTES);
}

这里的第二个定时任务,更新topic路由信息,在producer发送消息的时候,会用到,可以先关注下

总结

总结来看,producer启动的流程是比较简单的

  1. 启动nettyClient服务
  2. 启动了一些定时任务,定时更新内存中的一些数据;这些数据在producer发送消息的时候都会用到

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/86075.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

[附源码]Node.js计算机毕业设计电影推荐网站Express

项目运行 环境配置: Node.js最新版 Vscode Mysql5.7 HBuilderXNavicat11Vue。 项目技术: Express框架 Node.js Vue 等等组成,B/S模式 Vscode管理前后端分离等等。 环境需要 1.运行环境:最好是Nodejs最新版,我…

对长尾识别任务中解耦方法的改进

来源:投稿 作者:TransforMe 编辑:学姐 贡献 在长尾识别任务上,解耦(二阶段)的方法取得了巨大的进步,详情参考https://blog.csdn.net/weixin_41246832/article/details/115718084。本文详细分析…

Android实现SSH Client

本文实现的是如何使用JSCH在Android上实现一个简易版本的ssh client,来远程执行ssh命令。 1、启动ssh服务,本文以mac为例。 打开设置-->共享-->选择远程登录 2、检验ssh server是否开启成功。 打开shell ssh dongxuliip 输入dongxuli账户的密码&…

P3884 [JLOI2009]二叉树问题——树化图Floyd+dfs

[JLOI2009]二叉树问题 题目描述 如下图所示的一棵二叉树的深度、宽度及结点间距离分别为: 深度:444宽度:444结点 8 和 6 之间的距离:888结点 7 和 6 之间的距离:333 其中宽度表示二叉树上同一层最多的结点个数&…

[附源码]Python计算机毕业设计Django高校体育场馆管理系统

项目运行 环境配置: Pychram社区版 python3.7.7 Mysql5.7 HBuilderXlist pipNavicat11Djangonodejs。 项目技术: django python Vue 等等组成,B/S模式 pychram管理等等。 环境需要 1.运行环境:最好是python3.7.7,…

web前端期末大作业——基于HTML+CSS+JavaScript蓝色的远程监控设备系统后台管理界面模板

🎉精彩专栏推荐 💭文末获取联系 ✍️ 作者简介: 一个热爱把逻辑思维转变为代码的技术博主 💂 作者主页: 【主页——🚀获取更多优质源码】 🎓 web前端期末大作业: 【📚毕设项目精品实战案例 (10…

6-1和6-2矩阵键盘

江科大自动化单片机学习记录使用到的设备以及软件今天的学习内容弱上拉和强下拉代码LCD1602.cMatrixKey.c生成.h文件调用主函数main总结记录学习单片机的过程学习内容的视频链接:江科大自化协:51单片机入门教程-2020版,程序全程纯手打 使用到的设备以及软件 普中科技的嵌入式…

DC-2靶机教程

masscan -p1-65535 192.168.250.180 --rate10000nmap -sC -sV -p- -A -T4 192.168.250.180扫描看到80需要添加解析 C:\Windows\System32\drivers\etc添加记录:192.168.250.180 dc-2 同时我们也可以用cmseek扫描到用户名和相关的漏洞 或者使用 wpscan --url htt…

【图像处理OpenCV(C++版)】——2.3 灰度/彩色图像数字化

前言: 😊😊😊欢迎来到本博客😊😊😊 🌟🌟🌟 本专栏主要结合OpenCV和C来实现一些基本的图像处理算法并详细解释各参数含义,适用于平时学习、工作快…

基于无人机的移动边缘计算网络(Matlab代码实现)

目录 💥1 概述 📚2 运行结果 🎉3 参考文献 👨‍💻4 Matlab代码 💥1 概述 空中无人机(UAV)长期以来一直被用作移动网络中的网络处理器,但它们现在被用作移动边缘计算…

29-Vue之ECharts-散点图

ECharts-散点图前言散点图特点散点图实现步骤散点图常见效果气泡效果涟漪动画效果完整代码前言 本篇来学习散点图的实现 散点图特点 散点图可以帮助我们推断出不同维度数据之间的相关性, 比如:看得出身高和体重是正相关, 身 高越高, 体重越重散点图也经常用在地图…

解决 AssertionError Torch not compiled with CUDA enabled

最近在矩池云的的Tesla K80机子上跑MMYOLO,跟着MMYOLO官方文档《自定义数据集 标注训练测试部署 全流程 》操作到 “2.1.1 软件或者算法辅助”时,利用预训练模型官方脚本去辅助标注时,一按下回车就报错: 报错信息 AssertionError…

技巧分享:你知道视频转文字怎么操作?

随着科技的发展,很多东西都开通了“线上”这个渠道,例如线上教学、线上问诊等等。而我们也越来越习惯“线上”,因为它不仅方便,还更节省时间。例如我,学习一些知识或技能时,我会在网上寻找教学视频或报线上…

flink单机部署和简单使用

flink单机部署 Java版本:1.8.0_45 flink下载:https://archive.apache.org/dist/flink/flink-1.7.2/flink-1.7.2-bin-scala_2.11.tgz 解压安装包: [rootvm-9f-mysteel-dc-ebc-test03 opt]# tar -zxvf flink-1.7.2-bin-scala_2.11.tgz flin…

莽荒纪人物出场数据统计

今天继续给大家介绍Python相关知识,本文主要内容莽荒纪人物出场数据统计。 一、中文文本词频统计思路 在上文Python英文词频统计(哈姆雷特)程序示例中,我们进行了英文单词的统计。今天,我们进行中文人物出场频率统计…

java服装经销系统服装进销系统

简介 Ssm服装经销系统,主要分为6个角色:管理员、资料员、采购员、仓库员、售卖员、财务。采购员进行采购入库;仓库员进行采购入库、退货入库、提货出库、折损出库等库存管理;售卖员进行填单的创建,然后去仓库那里提货…

Score Matching

目录简介Score Function求解方法emm参考简介 score matching算法是一种求解概率密度函数的参数的算法。 在很多情况下,概率密度函数可以表示为: p(ξ;θ)1Z(θ)q(ξ;θ)p(\xi;\theta)\frac{1}{Z(\theta)}q(\xi;\theta) p(ξ;θ)Z(θ)1​q(ξ;θ) 假设我…

[Java] 什么是锁?什么是并发控制?线程安全又是什么?锁的本质是什么?如何实现一个锁?

文章目录前言并发控制并发访问控制是什么?如何实现并发访问控制?并发访问控制 与 线程安全锁是什么?1. 加锁操作2. 解锁操作锁状态是什么?如何实现一个锁?笔者相关博客连接结语前言 多线程编程中,锁是最重要…

oracle (+)学习

最近工作需要将oracle的存储过程转化为hive的sql脚本。遇到很多不一样的地方,例如oracle连接中有()号的用法。 借鉴这篇文章,但是这个排版比较烂。。。 oracle ()的,Oracle中()的作用_大雪菜的博客-CSDN博客 先建表和插入数据 --生成部门表CREATE TA…

2014年蓝桥杯Java C组——猜年龄

2014年蓝桥杯Java C组——猜年龄 标题:猜年龄 小明带两个妹妹参加元宵灯会。别人问她们多大了,她们调皮地说:“ 我们俩的年龄之积是年龄之和的6倍”。 小明又补充说:“她们可不是双胞胎,年龄差肯定也不超过8岁啊。” 请你写出:小明的较小的…