RocketMQ消息发送

news2025/1/11 14:48:48

消息发送示例代码:

public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        producer.setNamesrvAddr("127.0.0.1:9876"); //naseServer地址
        producer.start();
        for (int i = 0; i < 1000; i++) {
            try {
              //消息
                Message msg = new Message("TopicTest" ,
                    "TagA",
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );
              //消息发送
                SendResult sendResult = producer.send(msg,10000);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }
        producer.shutdown();
    }

DefaultMQProducer是消息发送者的默认实现类,它的继承关系是:

MQAdmin是MQ的基本管理接口,它的功能主要接口包括创建Topic,查找消费消息物理偏移量等等。

MQProducer是消息发送者接口,包括消息发送,开启、关闭等功能接口在这里面定义。

来看一下几个常用的消息发送API。

这两个接口都是都是同步发送消息,一个不指定超时时间,另一个可以指定超时时间。(不指定超时时间,那么默认为3s):

SendResult send(final Message msg);
SendResult send(final Message msg, final long timeout);

 异步消息发送,SendCallback是消息发送之后的回调接口,区别依然是指定超时时间:

void send(final Message msg, final SendCallback sendCallback);
void send(final Message msg, final SendCallback sendCallback, final long timeout);

单向发送:

void sendOneway(final Message msg); 

其他发送AIP:

//同步指定消息队列进行发送
SendResult send(final Message msg, final MessageQueue mq);
//异步指定消息队列进行消息发送
void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback);
//单向指定消息队列进行消息发送
void sendOneway(final Message msg, final MessageQueue mq);
//消息发送时可以使用自定义的队列负载机制,同样的也还有异步、单向的API
SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg);
//批量消息发送、批量发送只有同步发送模式
SendResult send(final Collection<Message> msgs) 

  • 同步sync

消息发送之后,同步等待服务器返回结果

  • 异步async

    消息发送之后,立即返回、消息发送者线程不阻塞,消息发送成功或者失败对在一个新的线程中回调定义的回调接口。

  • 单向oneway

    消息发送之后立即返回,不关心发送接口,也没有回调函数。

消息发送流程:

然后来看一下producer启动的代码:

当调用 producer.start() 后他经历了下面这些步骤

 this.serviceState = ServiceState.START_FAILED;
//检查生产者组是否符合要求,改变生产者的instanceName为进程id
this.checkConfig();
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
  this.defaultMQProducer.changeInstanceNameToPID();
}
//创建mQClientInstance实例
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
//向MQClientInstance注册服务,将当前生产者加入MQClientInstance管理,方便后续调用网络请求,进行心跳检测等等
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
  this.serviceState = ServiceState.CREATE_JUST;
  throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                              + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                              null);
}
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
//启动
if (startFactory) {
  mQClientFactory.start();
}
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
         this.defaultMQProducer.isSendMessageWithVIPChannel());
this.serviceState = ServiceState.RUNNING;

当启动了Producer之后,就会有一个MQClientInstance实例了,他就主要承担着与RocketMQ服务器进行交互了。

消息发送流程:

首先我们来看 producer.send(msg,10000)方法:

1.一直点进来之后就会来到 sendDefaultImpl 方法:

//校验message,校验topic是否合法,判断消息体是否是空的或者消息题内容是否超过4M。
Validators.checkMessage(msg, this.defaultMQProducer);

 2.获取路由信息:

//获取路由信息
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());

根据topic获取路由信息

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
  			//尝试从本地缓存获取
        TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); 
        if (null == topicPublishInfo || !topicPublishInfo.ok()) {
            this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
          	//nameserver获取
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);//#@1 
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
        }

        if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
            return topicPublishInfo;
        } else {
          	//使用默认主题去查路由信息
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);//#@2
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
            return topicPublishInfo;
        }
    }

 

然后再说一下上面代码中的 #@1 和#@2两个地方的代码。

#@1是根据topic从NameServer获取路由信息,在前面一章文章讲NameServer的路由发现时,讲过NameServer提供了根据topic找路由信息的功能。那么这里这个地方代码较多我就不全部粘贴出来了。其实这两个地方吊的事一个方法,只不过一个使用默认主题去查而第一次没有用默认主题,来看一下这个方法。

第一部分:

是否使用默认主题获取路由信息,若使用默认主题查询信息,就将topic的队列替换为生产者默认的队列个数,不是默认主题就用该主题去过去路由信息。

 TopicRouteData topicRouteData;
if (isDefault && defaultMQProducer != null) {
  //1.用默认主题查询路由信息,如果查到了就将路由信息中读写队列个数替换为生产者默认的队列个数
  topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),1000 * 3);
  if (topicRouteData != null) {
    for (QueueData data : topicRouteData.getQueueDatas()) {
      int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
      data.setReadQueueNums(queueNums);
      data.setWriteQueueNums(queueNums);
    }
  }
} else {
  //不是默认主题,根据topic从nameserver获取路由信息
  topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
}

没找到路由信息就返回了。

第二部分找到路由信息:

就根据topic从本地获取原来的路由信息进行比较看是够发送了变化。若发生了变化就需要更新地址缓存表。

if (topicRouteData != null) { //找到路由信息
    //获取本地路由信息
    TopicRouteData old = this.topicRouteTable.get(topic);
    //判断是够发生改变
    boolean changed = topicRouteDataIsChange(old, topicRouteData);
    if (!changed) {
    	changed = this.isNeedUpdateTopicRouteInfo(topic);
    } else {
    	log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
    }
    //发生改变,更新地址缓存表
    if (changed) {
      TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
      //更新brokerAddrTable
      for (BrokerData bd : topicRouteData.getBrokerDatas()) {
      	this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
      }
      // Update Pub info
      {
      //路由信息转换为topic信息
      TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
      publishInfo.setHaveTopicRouterInfo(true);
      Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
      while (it.hasNext()) {
        Entry<String, MQProducerInner> entry = it.next();
        MQProducerInner impl = entry.getValue();
        if (impl != null) {
          impl.updateTopicPublishInfo(topic, publishInfo);
        }
      }
    }
    // Update sub info
    {
      Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
      Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
      while (it.hasNext()) {
        Entry<String, MQConsumerInner> entry = it.next();
        MQConsumerInner impl = entry.getValue();
        if (impl != null) {
          impl.updateTopicSubscribeInfo(topic, subscribeInfo);
        }
      }
    }
    log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
    this.topicRouteTable.put(topic, cloneTopicRouteData);
    return true;
}

3.选择一个消息队列

在这里面会获取一个自增值然后与消息队列个数取模然后返回这个队列

//选择一个消息队列,故障规避机制也在这里面,默认是不启用故障规避机制的
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);

4.消息发送

在这里面会根据broker地址,然后构建请求,选择消息发送模式(同步、异步、单向),然后将消息发送出去。

//消息发送
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); 

这一块里面代码也是蛮多的,我也不罗列出来了,感兴趣的可以自己去点开看一看。

本篇文章简单说了一下消息发送的集中API,以及消息发送的整个流程,但是后面对于代码逻辑这部分讲的也是比较粗糙的,因为这块的代码确实挺多的,我一一粘出来可能这篇笔记的长度是现在的3倍了,我罗列了几个关键点代码,并且简单的说明了一下。

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/552205.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

leetcode-743. 网络延迟时间

1.思路分析&#xff1a; 一道Dijkstra模板题 推荐Dijkstra算法讲解教程 Dijkstra&#xff08;有向图某点到其他所有点的最短路径问题&#xff09; Dijkstra算法的基本思想是贪心策略&#xff0c;每次从未确定最短路径的顶点中选择距离源点最近的一个&#xff0c;然后以该顶点…

Python使用正则表达式

正则表达式&#xff08;Regular Expression&#xff09;&#xff0c;又称规则表达式&#xff0c;是一个计算机科学的概念&#xff0c;通常被用来检索和替换符合某些规则的文本。 1. 正则表达式语法 正则表达式就是记录文本规则的代码。 1. 行定位符 行定位符就是用来描述字…

【STM32G431RBTx】备战蓝桥杯嵌入式→决赛试题→第十二届

文章目录 前言一、题目二、模块初始化三、代码实现interrupt.h:interrupt.c:main.h:main.c: 四、完成效果五、总结 前言 无 一、题目 二、模块初始化 1.LCD这里不用配置&#xff0c;直接使用提供的资源包就行 2.ADC:开启ADCsingle-ended 3.LED:开启PC8-15,PD2输出模式就行了…

MySQL高级(InnoDB引擎)

&#xff08;一&#xff09;逻辑存储结构 表空间&#xff08;ibd文件&#xff09;&#xff0c;会生成ibd文件&#xff0c;一个mysql实例可以对应多个表空间&#xff0c;用于存储记录、索引等数据。 段&#xff0c;分为数据段&#xff08;Leaf node segment&#xff09;、索引段…

学弟研一,有几篇SCI论文,做过前端,读博 or 走开发进国企?

同学你好&#xff0c;在正面先抛开选择就业的方面的问题&#xff0c;其实我觉得生活种的很多选择&#xff0c;都可以从以下的几点进行斟酌与考虑&#xff1a; &#xff08;1&#xff09;你最擅长的是哪个方面&#xff1f;&#xff08;2&#xff09;你的兴趣爱好是在哪个方面&am…

从0开始搭建完整UVM工程(可直接用于实际的工程中)、含源码(包括makefile文件)、可直接运行,及详细注释

一、说明 网上的实现uvm工程代码都是抄自张强所著的《UVM实战》,都是讲所有文件放到一个文件夹,且不涉及到实际工程中的uvm结构,以及多文件层级结构,让人理解起来较为困难,本文则将会从0开始教大家如何搭建一个具有实际工程效果的UVM框架: 其对应的书中的框架图如下所示:…

chatgpt赋能Python-pythoncontinue

简介 Python是一种高级编程语言&#xff0c;受到越来越多的人们的欢迎。其中&#xff0c;continue是Python语言中的一个很重要的关键字&#xff0c;它的出现可以很好地帮助程序员们实现自己的编程目标。在本文中&#xff0c;我们将介绍continue关键字&#xff0c;并解释它在Py…

Fourier分析入门——第5章——连续函数

目录 第 5 章 连续函数 5.1 引言 5.2 内积和正交性(Inner products and orthogonality) 5.3 对称性(Symmetry) 5.4 复数值函数 第 5 章 连续函数 5.1 引言 在前面的章节中&#xff0c;我们只考虑了在有限区间 L 上定义的离散函数的Fourier级数模型。此类函数在实验科学…

spring 源码

bean的创建 获取类class 推断构造方法 Autoware 创建一个普通对象 依赖注入 populateBean 把一些属性注入 初始化之前 PostConstruct 注解 初始注入 实际可以用构造方法啊 初始化 initializationBean 1.判断是否有aware接口 invokeAwareMethods 2.执行 applyBeanP…

enq: TM - contention等待事件引起的数据库卡顿分析

用户的数据库系统在2022年5月31日下午17:25至17:45出现严重的锁等待&#xff0c;导致对应的应用程序出现卡顿等情况&#xff0c;业务系统的正常使用受到影响&#xff0c;无法正常办理业务&#xff1b;在此情况下需要排查出锁问题的深层原因&#xff0c;从而从根本上解决问题。 …

【Python psycopg2】零基础也能轻松掌握的学习路线与参考资料

Python psycopg2是一个Python库&#xff0c;在Python中提供了一个连接PostgreSQL数据库的接口。它可以让Python应用程序和PostgreSQL数据库之间进行通信和数据传输。学习Python psycopg2的路线和教程可以在查阅资料和实践中快速入门。 一、学习前置知识 学习Python psycopg2需…

23 memset 的调试

前言 同样是一个 很常用的 glibc 库函数 不管是 用户业务代码 还是 很多类库的代码, 基本上都会用到 内存数据的设置 不过 我们这里是从 具体的实现 来看一下 它的实现 主要是使用 汇编 来进行实现的, 因此 理解需要一定的基础 测试用例 就是简单的使用了一下 memcpy,…

去面试测试开发工程师要做哪些准备?大厂真实面试题汇总

目录 1.黑盒测试和白盒测试的区别特点和方法。 2.单元测试、集成测试、系统测试、验收测试、回归测试 3.集成测试和系统测试的区别和应用场景 4.α测试、β测试&#xff0c;以及它们的区别 5.给你一个字符串&#xff0c;你怎么判断是不是ip地址&#xff1f;手写这段代码&…

大数据之RDD的算子分类

文章目录 前言一、RDD的算子分类二、Transformation转换算子三、Action动作算子总结 前言 #博学谷IT学习技术支持# 上一篇文章主要讲述了两种RDD的创建方式&#xff0c;本篇文章接着讲RDD的算子及其分类。 一、RDD的算子分类 RDD的算子主要有两种类型&#xff0c;一种是Tran…

docker面试题:docker容器虚拟化与传统虚拟机比较

容器就是将软件打包成标准化单元&#xff0c;以用于开发、交付和部署。 容器镜像是轻量的、可执行的独立软件包 &#xff0c;包含软件运行所需的所有内容&#xff1a;代码、运行时环境、系统工具、系统库和设置。容器化软件在任何环境中都能够始终如一地运行。容器赋予了软件独…

什么是强化学习?强化学习有哪些框架、算法、应用?

什么是强化学习&#xff1f; 强化学习是人工智能领域中的一种学习方式&#xff0c;其核心思想是通过一系列的试错过程&#xff0c;让智能体逐步学习如何在一个复杂的环境中进行最优的决策。这种学习方式的特点在于&#xff0c;智能体需要通过与环境的交互来获取奖励信号&#…

JAVA常用API - 正则表达式

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 目录 文章目录 前言 一&#xff0c;正则表达式是什么&#xff1f; 二&#xff0c;正则表达式符号 三&#xff0c;常用正则表达式组合 四&#xff0c;正则表达…

利用MySQL的binlog恢复误删的数据库

1 查看当前数据库内容并备份数据库 查看数据库信息&#xff1a; 备份数据库&#xff1a; [rootlocalhost ~]# mysqldump -u root -p t > /mnt/t.sql Enter password: [rootlocalhost ~]# ll /mnt/t.sql -rw-r--r-- 1 root root 1771 Aug 25 11:56 /mnt/t.sql2 开启bin…

java-----web项目部署到新服务器以及服务器的部署

目录 一、服务器安装jdk1.8 二、安装mysql5.7 2.1下载mysql 2.2修改root账号密码 2.3设置远程登录 三、项目的部署 3.1导入数据库 3.2将项目打成jar包 3.3项目上传 服务器部署项目的方式&#xff0c;本次以打成jar包的形式讲解&#xff08;以springboot项目为例&#xf…

2023年贵工程团体程序设计赛--部分题解

作者&#xff1a;杨书瑶 单位&#xff1a;贵州工程应用技术学院 本次比赛由大学生程序设计协会(cpa)举办,共计17道题&#xff0c;295分。其中5分题三道&#xff0c;10分题三道&#xff0c;15分题两道&#xff0c;20分题三道&#xff0c;25分题两道&#xff0c;30分题两道…