不科学,RocketMQ生产者在一个应用服务竟然不能向多个NameServer发送消息

news2025/1/18 6:57:35

前言

目前有两套RocketMQ集群,集群A包含topic名称为cluster_A_topic,集群B包含topic名称为cluster_B_topic,在应用服务OrderApp上通过RocketMQ Client创建两个DefaultMQProducer实例发送消息给集群A和集群B,架构图如下:

根据上述架构图,我们给出的示例代码如下:

// 创建第一个DefaultMQProducer
DefaultMQProducer producer1 = new DefaultMQProducer("producer_group_1");
    // 设置nameServer地址
    producer1.setNamesrvAddr("192.168.2.230:9876");
    try {
      producer1.start();
      // 发送消息
      SendResult result1 = producer1.send(new Message("cluster_A_topic", "ping".getBytes(StandardCharsets.UTF_8)));
      switch (result1.getSendStatus()) {
        case SEND_OK:
          System.out.println("cluster_A_topic 发送成功!");
          break;
        case FLUSH_DISK_TIMEOUT:
          System.out.println("cluster_A_topic 持久化失败!");
          break;
        case FLUSH_SLAVE_TIMEOUT:
          System.out.println("cluster_A_topic 同步slave失败!");
          break;
        case SLAVE_NOT_AVAILABLE:
          System.out.println("cluster_A_topic 副本不可用!");
      }
    } catch (Exception e) {
      e.printStackTrace();
    }
    // 创建第二个DefaultMQProducer
    DefaultMQProducer producer2 = new DefaultMQProducer("producer_group_2");
    // 设置nameServer地址
    producer2.setNamesrvAddr("192.168.2.231:9876");
    try {
      producer2.start();
      // 发送消息
      SendResult result2 = producer2.send(new Message("cluster_B_topic", "ping".getBytes(StandardCharsets.UTF_8)));
      switch (result2.getSendStatus()) {
        case SEND_OK:
          System.out.println("cluster_B_topic 发送成功!");
          break;
        case FLUSH_DISK_TIMEOUT:
          System.out.println("cluster_B_topic 持久化失败!");
          break;
        case FLUSH_SLAVE_TIMEOUT:
          System.out.println("cluster_B_topic 同步slave失败!");
          break;
        case SLAVE_NOT_AVAILABLE:
          System.out.println("cluster_B_topic 副本不可用!");
      }
      return "ok";
    } catch (Exception e) {
      e.printStackTrace();
    } finally {
      producer1.shutdown();
      producer2.shutdown();
    }
复制代码

结果竟然报错了,报错内容时cluster_B_topic不存在:

经过不断的测试,发现只有放在最前面启动的DefaultMQProducer会生效,后面启动的DefaultMQProducer发送消息就报错说对应的topic不存在,而且报错的broker竟然是前面启动的DefaultMQProducer对应的broker。这就不科学了,难道RocketMQ不允许在一个应用上创建多个生产者?

问题定位

首先说明一下,当前使用的RocketMQ Client版本是4.8.0。为了确定是哪儿出了问题,不得不对源码来一波探索[哭泣脸😢]。

我们都知道生产者是发送消息给Broker的,获取Broker信息是通过连接NameServer获取的。既然报错的Broker和目标Broker竟然不对应,肯定是后面启动的生产者获取的Broker不对。有了最基本的判断,我们先从DefaultMQProducer#start()入手,最终我们定位到这样一段代码DefaultMQProducerImpl#start(final boolean startFactory)

public void start(final boolean startFactory) throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
​
                this.checkConfig();
// 如果生产者group名称不是`CLIENT_INNER_PRODUCER`,那么修改InstanceName值
                if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                    this.defaultMQProducer.changeInstanceNameToPID();
                }
            // 创建MQClientInstance实例
                this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
            // 注册生产者实例到MQClientInstance中
                boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
                if (!registerOK) {
                    this.serviceState = ServiceState.CREATE_JUST;
                    throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
                }
            // 添加TBW102对应的topic信息,broker设置autoCreateTopicEnable = true才起作用
            this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
​
                if (startFactory) {
                    // 启动刚刚创建的MQClientInstance实例
                    mQClientFactory.start();
                }
​
                log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                    this.defaultMQProducer.isSendMessageWithVIPChannel());
                // 修改服务状态为RUNNING
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The producer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
            default:
                break;
        }
复制代码

上面的代码主要是创建了MQClientInstance实例,并且通过start()方法启动。

通过针对这两段代码的debug,我们发现创建的两个DefaultMQProducer对象是共用了一个MQClientInstance实例,并且所有针对NameServerBroker的远程操作全部是通过MQClientInstance实例来做的。比如发送消息的时候需要找到对应的Broker下的消息队列:

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
        TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
        if (null == topicPublishInfo || !topicPublishInfo.ok()) {
            this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
            // 从NameServer更新topic路由
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
        }
​
        if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
            return topicPublishInfo;
        } else {
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
            return topicPublishInfo;
        }
    }
复制代码

最终我们发现两个DefaultMQProducer对象都是去同一个NameServer下获取对应的topic信息,这下问题就定位到了:因为使用了同一个MQClientInstance实例导致不同的DefaultMQProducer去访问了同一个NameServer,同一个集群需要同时接收两个topic的消息,也就出现了前面的报错说topic不存在的情况。

如何解决

我们来看看MQClientInstance实例是如何保证唯一性的:

public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
        // 生成clientID
        String clientId = clientConfig.buildMQClientId();
        // 从缓存中获取MQClientInstance
        MQClientInstance instance = this.factoryTable.get(clientId);
        if (null == instance) {
            // 没有缓存的话就创建一个MQClientInstance
            instance =
                new MQClientInstance(clientConfig.cloneClientConfig(),
                    this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
            // 新创建出来的再放进缓存
            MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
            if (prev != null) {
                instance = prev;
                log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
            } else {
                log.info("Created new MQClientInstance for clientId:[{}]", clientId);
            }
        }
        // 返回MQClientInstance实例
        return instance;
    }
复制代码

我们之所以拿到的MQClientInstance实例是同一个,是因为在同一个服务下创建的clientId相同:

    public String buildMQClientId() {
        StringBuilder sb = new StringBuilder();
        sb.append(this.getClientIP());
​
        sb.append("@");
        sb.append(this.getInstanceName());
        if (!UtilAll.isBlank(this.unitName)) {
            sb.append("@");
            sb.append(this.unitName);
        }
​
        return sb.toString();
    }
复制代码

两个clientId都是192.168.18.173@14933,为了防止clientId相同,我们可以在创建DefaultMQProducer实例是加上unitName值,保证两个unitName值不同来避免共享同一个MQClientInstance

DefaultMQProducer producer1 = new DefaultMQProducer("producer_group_1");
producer1.setNamesrvAddr("192.168.2.230:9876");
producer1.setUnitName("producer1");
producer1.start();
​
DefaultMQProducer producer2 = new DefaultMQProducer("producer_group_1");
producer2.setNamesrvAddr("192.168.2.231:9876");
producer2.setUnitName("producer2");
producer2.start();
复制代码

通过上述代码修改后,两个消息都发送成功了。

另一个办法就是升级RocketMQ Client4.9.0,我们来看一下RocketMQ Client 4.9.0是怎么解决这个问题的:

    public void changeInstanceNameToPID() {
        if (this.instanceName.equals("DEFAULT")) {
            this.instanceName = UtilAll.getPid() + "#" + System.nanoTime();
        }
    }
复制代码

RocketMQ Client 4.9.0在后面补充了一个纳秒值,之前的代码是这样的:

    public void changeInstanceNameToPID() {
        if (this.instanceName.equals("DEFAULT")) {
            this.instanceName = String.valueOf(UtilAll.getPid());
        }
    }
复制代码

也就是说,在新的版本中,一个应用服务内创建多个DefaultMQProducer就会有多个MQClientInstance实例对应,不会再出现我们前面的报错。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/18689.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

使用Vue脚手架配置代理服务器的两种方式

1 前言 本文主要介绍使用Vue脚手架配置代理服务器的两种方式 注意:Vue脚手架给我们提供了两种配置代理服务器的方式,各有千秋,使用的时候只能二选一,不能同时使用 2 代理 除了cros和jsonp,还有一种代理方式&#x…

传奇GM调整极品属性的命令------技术分享

传奇GM调整极品属性的命令 GM命令supermake命令用法! 以下格式皆为supermake a b c   以上命令含义:调整A(装备)的B(属性)到C(点数) supermake 1 0 10  1代表武器  0代表攻击 10代表调整的点数 B参数代表需要调整的那项属性如攻击 魔法 道术 …

黑*头条_第4章_文章搜索前后端成形记 实名认证审核

黑*头条_第4章_文章搜索前后端成形记 & 实名认证审核 文章目录黑*头条_第4章_文章搜索前后端成形记 & 实名认证审核文章搜索前后端成形记 & admin实名认证审核1 文章详情-前端开发1.1登录接口1.1.1 基本定义1.1.2 code定义1.1.3 mapper实现1.1.4 service代码实现1.…

宝塔一键安装wordpress

使用宝塔面板来部署网站是非常方便的,以WordPress网站为例来说: 一般有两种方式安装WordPress网站,第一种是上传网站程序到网站根目录手动安装,另外一种是在宝塔面板后台左侧菜单,找到“WordPress一键部署”&#xff…

Map 和 Set

模型 一般我们把搜索的数据称为 关键字(key) , 关键字对应的值叫做 值(value) , 将之称为 key-value 键值对. 衍生出两种模型: 1. 纯 key 模型 例如 : 班级上点名, 在花名册上找人的名字. 2. key-value 模型 例如 : 统计一个字符串中每个字母出现的次数, 结果是每个字母和它对…

简易版 图书管理系统

目录 1. Book包 1.1 Book类 1.2 BookList类 2. User包 2.1 User抽象类 2.2 AdminUser类 2.3 NormalUser类 3. Operate包 3.1 MyOperate接口 3.2 AddOperation类 3.3 DelOperation类 3.4 ExitOperation 3.5 FindOperation类 3.6 ShowOperation类 3.7 BorrowedOpe…

MySQL——数据库、表的操作

文章目录数据库的操作创建数据库创建数据库例子字符集和校验规则查看数据库支持的字符集查看默认的字符校验规则校验规则对数据库的影响查看数据库显示详细的创建数据库语句修改数据库删除数据库查看连接情况表的操作创建表显示创建表的详细过程不同的数据库引擎查看表结构修改…

【C语言】操作符与优先级详解

C的操作符 文章目录C的操作符前言一、算术操作符二、移位操作符三、位操作符四、赋值操作符五、单目操作符六、条件操作符七、逻辑操作符八、条件操作符九、逗号表达式十、下标引用、函数调用和结构成员十一、表达式求值11.1 隐式类型转换12.2 算术转换12.2 操作符的属性总结前…

解决Vue前后端跨域问题的多种方式

1 前言 本文主要介绍借助解决Vue前后端跨域问题的几种方式 说到ajax请求,就不得不说下xhr(XMLHttpRequest)了,它可以说是鼻祖,但是实际开发中,我们不会直接使用它,而是进行二次封装或者使用成熟的第三方封装&#xf…

Zookeeper:分布式过程协同技术

Zookeeper 是一个高性能的分布式一致系统,在分布式系统中有着广泛的应用。基于它,可以实现诸如“分布式同步”、“配置管理”、“命名空间管理”等众多功能,是分布式系统中常见的基础系统。Zookeeper 主要用来解决分布式集群中应用系统的一致…

http,https,ip,tcp,udp

http:超文本传输协议,明文传输,不安全 超文本:早期,文本存在本地,文本可以被计算机解析为二进制的数据包,随着发展,出现图片,视频,链接等,成为超文本 传输&a…

批量生成Excel文件,可以按模板进行自动生成

目录 一、文件目录结构 二、编辑生成名单 三、编辑模板 四、生成操作 软件描述:根据Excel模板 和 生成名单 可以批量生成相同格式的文件,可以应用于考核、工资单等文件的批量生成。方便快捷,有需求的小伙伴可以到最下面点击下载 注&#…

mysql 客户端简单搭建

主要使用的是mysql开发包中的api接口 操作流程 1.初始化mysql操作句柄 MYSQL *mysql_init(MYSQL *mysql); 对传人的句柄进行初始化 若传入的句柄为NULL,则内部会动态申请空间,进行初始化,并返回句柄首地址 返回值:若…

基于51单片机的ds18b20数字华氏温度计

资料编号:114 下面是相关功能视频演示: 114-基于51单片机的数字华氏温度计报警(源码仿真全套资料)功能讲解: 采用51单片机采集DS18B20的温度,LCD1602显示,并且可以设置上下限值,超…

Unity UI 框架

开源地址: GitHub - NRatel/NRFramework.UI: 基于 Unity UGUI 的 UI 开发框架基于 Unity UGUI 的 UI 开发框架. Contribute to NRatel/NRFramework.UI development by creating an account on GitHub.https://github.com/NRatel/NRFramework.UI 一、需求/功能要点…

headscale的部署方法和使用教程

headscale的部署方法和使用教程1. headscale文件下载2. 上传并赋予文件权限3. 创建以及修改相关配置文件3.1 创建配置目录:3.2 创建目录用来存储数据与证书:3.3 创建空的 SQLite 数据库文件:3.4 创建 Headscale 配置文件:3.5 创建…

Vue 组件间通信并不是每一次操作都会触发新的通信

需求:新增或者修改都需要组件间立马通信。 操作:把B组件(子组件,这里指的是三级联动组件)的数据传输过来,在A(父组件)组件中处理 即 子传父 这里指的是修改页面或者新增页面三级联动下拉选择完之后 点击 提交 会执行A组件的修改操…

开启安全测试评估赛道,永信至诚发布“数字风洞”产品体系

11月19日,永信至诚产品战略发布会上,面向安全测试评估领域的“数字风洞”产品体系战略发布,标志着永信至诚作为网络靶场和人才建设领军企业,再次以“产品乘服务”的价值体系,开启网络安全测试评估专业赛道。 数字化时代…

MySQL安装

本笔记来自B站黑马程序员讲解的MySQL的使用。 目录 ​编辑 一、MySQL的安装 1、数据库基础概念 2、MySQL下载并安装​编辑 三、启动MySQL 四、连接MySQL数据库 1、使用MySQL提供的客户端命令来连接 2、使用Windows 命令打开: 第一步 配置path的环境变量 第…

uniapp入门:常用事件绑定与数据同步

1.常见事件与事件绑定 1.1点击事件bindtap 1.2 文本输入事件bindinput 1.3 切换事件bindtouchend 2.数据同步 2.1事件回调 2.2逻辑层中page对象中的中数据如何进行改变 2.3页面触发事件如何传参到page中数据 …