Java小案例-RocketMQ的11种消息类型,你知道几种?(请求应答消息)

news2025/1/16 14:09:37

前言

Rocket的请求应答消息是指在使用Rocket(这里可能是RocketMQ或者Rocket框架)进行通信时,客户端发送一个请求到服务端,然后服务端处理该请求并返回一个响应的过程中的数据交换。

在RocketMQ中:

请求应答消息通常涉及到以下几个步骤:

  1. 生产者(Producer)创建一个消息,并将其发送到Broker(消息中间件服务器)。
  2. Broker接收到消息后,可能需要进行存储、路由或者其他处理操作。
  3. 如果请求是需要立即响应的(例如RPC调用),Broker会在处理完消息后生成一个响应消息,并通过网络返回给生产者。
  4. 生产者接收到响应消息后,可以根据响应内容进行相应的业务处理。

在Rocket框架中:

请求应答消息通常涉及到HTTP请求和响应:

  1. 客户端(通常是Web浏览器或者API客户端)向Rocket应用服务器发送一个HTTP请求,请求可能包含JSON、XML或者其他格式的数据。
  2. Rocket框架接收到请求后,根据路由规则将请求分发到对应的处理器函数(handler)。
  3. 处理器函数处理请求,这可能包括查询数据库、计算结果或者其他业务逻辑。
  4. 处理完成后,处理器函数构建一个HTTP响应,响应中包含处理结果以及可能的状态码、头部信息等。
  5. Rocket框架将响应返回给客户端,客户端解析响应并进行相应的处理。

无论是RocketMQ还是Rocket框架,请求应答消息都是系统间或者组件间通信的基本机制,用于实现功能调用、数据交换或者状态同步。

请求应答消息

这个消息类型比较有意思,类似一种RPC的模式

生产者发送消息之后可以阻塞等待消费者消费这个消息的之后返回的结果

生产者通过过调用request方法发送消息,接收回复消息

public class Producer {
    public static void main(String[] args) throws Exception {
        //创建一个生产者,指定生产者组为 sanyouProducer
        DefaultMQProducer producer = new DefaultMQProducer("sanyouProducer");
        // 指定NameServer的地址
        producer.setNamesrvAddr("192.168.200.143:9876");
        // 启动生产者
        producer.start();


        Message message = new Message("sanyouTopic", "三友的java日记".getBytes());
        
        //发送消息,拿到响应结果, 3000代表超时时间,3s内未拿到响应结果,就超时,会抛出RequestTimeoutException异常
        Message result = producer.request(message, 3000);
        System.out.println("接收到响应消息:" + result);

        // 关闭生产者
        producer.shutdown();
    }

}

而对于消费者来着,当消费完消息之后,也要作为生产者,将响应的消息发送出去

public class Consumer {
    public static void main(String[] args) throws InterruptedException, MQClientException {

        //创建一个生产者,指定生产者组为 sanyouProducer
        DefaultMQProducer producer = new DefaultMQProducer("sanyouProducer");
        // 指定NameServer的地址
        producer.setNamesrvAddr("192.168.200.143:9876");
        // 启动生产者
        producer.start();


        // 通过push模式消费消息,指定消费者组
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("sanyouConsumer");
        // 指定NameServer的地址
        consumer.setNamesrvAddr("192.168.200.143:9876");

        // 订阅这个topic下的所有的消息
        consumer.subscribe("sanyouTopic", "*");
        // 注册一个消费的监听器,当有消息的时候,会回调这个监听器来消费消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.printf("消费消息:%s", new String(msg.getBody()) + "\n");

                    try {
                        // 用RocketMQ自带的工具类创建响应消息
                        Message replyMessage = MessageUtil.createReplyMessage(msg, "这是响应消息内容".getBytes(StandardCharsets.UTF_8));
                        // 将响应消息发送出去,拿到发送结果
                        SendResult replyResult = producer.send(replyMessage, 3000);
                        System.out.println("响应消息的结果 = " + replyResult);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }

                }

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 启动消费者
        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

这种请求-应答消息实现原理也比较简单,如下图所示

生产者和消费者,会跟RocketMQ服务端进行网络连接

所以他们都是通过这个连接来发送和拉取消息的

当服务端接收到回复消息之后,有个专门处理回复消息的类

这个类就会直接找到发送消息的生产者的连接,之后会通过这个连接将回复消息发送给生产者

RocketMQ底层是基于Netty通信的,所以如果你有用过Netty的话,应该都知道,就是通过Channel来发送的

联系方式

关于文章中大家有任何疑问可以通过关注公众号《编程乐学》进行留言,同时,公众号还有更多有趣的项目以及关于学习编程的笔记资料大家可以看看,欢迎大家进行留言。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1318479.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MySQL性能测试(完整版)

MySQL性能测试之SysBench 一、SysBench安装 1、mac安装命令&#xff1a;&#xff08;其他系统安装对应的命令即可&#xff0c;不影响后面的使用&#xff09; brew install sysbench2、查看是否安装成功&#xff1b; sysbench --version附&#xff1a; &#xff08;1&#x…

The Grid – Responsive WordPress Grid响应式网格插件

点击阅读The Grid – Responsive WordPress Grid响应式网格插件原文 The Grid – Responsive WordPress Grid响应式网格插件是一个高级 wordpress 网格插件&#xff0c;它允许您在完全可定制且响应迅速的网格系统中展示任何自定义帖子类型。 Grid WordPress 非常适合展示您的博…

runCatching异常捕获onSuccess/onFailure返回函数,Kotlin

runCatching异常捕获onSuccess/onFailure返回函数&#xff0c;Kotlin fun test(a: Int, b: Int) {runCatching {a / b}.onSuccess {println("onSuccess: $it")return ok(it)}.onFailure {println("onFailure: $it")return fail(it)} }fun ok(o: Any) {prin…

QT第一步

文章目录 软件下载软件安装QT的程序组新建项目 软件下载 qt下载网址&#xff1a;https://download.qt.io/archive/qt/   关于版本&#xff1a;     我选择的版本是5.14.2&#xff0c;这个版本是最后的二进制安装包的版本&#xff0c;在往后的版本就需要在线安装了。并且5…

常用网安渗透工具及命令(扫目录、解密爆破、漏洞信息搜索)

目录 dirsearch&#xff1a; dirmap&#xff1a; 输入目标 文件读取 ciphey&#xff08;很强的一个自动解密工具&#xff09;&#xff1a; john(破解密码)&#xff1a; whatweb指纹识别&#xff1a; searchsploit&#xff1a; 例1&#xff1a; 例2&#xff1a; 例3&…

QT5 CMake进行开发

配置环境 因为是使用CMake进行开发&#xff0c;所以推荐使用的QT版本是 5.14.2。因为楼主有 vs2015的环境&#xff0c;所以在安装QT时选择的是 msvc 2015 64bit msvc 2017 32bit 勾选了所有需要的模块。kit配置如下 图中画框的地方是比较关键的地方&#xff0c;1. 指定编译器…

3. cgal 示例 GIS (Geographic Information System)

GIS (Geographic Information System) 地理信息系统 原文地址: https://doc.cgal.org/latest/Manual/tuto_gis.html GIS 应用中使用的许多传感器&#xff08;例如激光雷达&#xff09;都会生成密集的点云。此类应用程序通常利用更先进的数据结构&#xff1a;例如&#xff0c;不…

【STM32】STM32学习笔记-对射式红外传感器计次 旋转编码器计次(12)

00. 目录 文章目录 00. 目录01. NVIC相关函数1.1 NVIC_PriorityGroupConfig函数1.2 NVIC_PriorityGroup类型1.3 NVIC_Init函数1.4 NVIC_InitTypeDef类型 02. 外部中断相关API2.1 GPIO_EXTILineConfig2.2 EXTI_Init2.3 EXTI_GetITStatus2.4 EXTI_ClearITPendingBit2.5 中断回调函…

一篇文章了解Flutter Json系列化和反序列化

目录 一. 使用dart:convert实现JSON格式编解码1. 生成数据模型类2. 将JSON数据转化成数据模型类3. 数据模型类转化成JSON字符串 二、借助json_serializable实现Json编解码1.添加json_annotation、build_runner、json_serializable依赖2. 创建一个数据模型类3. 使用命令行生成JS…

【XR806开发板试用】+ 通过网络控制led并上报按键状态

通过网络控制led并上报按键状态 本次做一个手机通过mqtt服务器控制板子上的LED亮灭&#xff0c;板子也可以将按钮状态变化通过mqtt服务器上报给手机的功能 硬件上&#xff0c;从原理图看&#xff0c;LED接到了PA21&#xff0c;高电平点亮。 按键则时接到了PA11&#xff0c;并…

kali虚拟机无网络

1.查看虚拟机的网卡模式 在虚拟机设置里&#xff0c;一般选择桥接模式&#xff0c;也可以选择NAT模式。 2、你的IP地址是否写死了&#xff08;设置为静态IP&#xff09; vim编辑模式下的命令&#xff1a; 按a或i进入编辑模式&#xff0c;然后按esc键退出编辑模式&#xff0c;s…

多条件三元表达式如何写?

在某些业务需求情况下&#xff0c;如何书写多条件三元表达式&#xff1f;&#xff08;例如&#xff0c;父组件传值给子组件&#xff0c;子组件根据不同的值去响应不同的颜色变化该如何实现&#xff1f;&#xff09; 父组件&#xff1a; 父组件传testData的值给子组件&#xff…

diffuser库之 Load pipelines, models, and schedulers 的阅读记录

加载不同pipeline safe checker pipeline转换 加载模型配置 远程库与本地库使用区别 本地库必须引入variant参数用于选择加载哪一种模型 保存模型 修改pipeline的scheduler pipeline class以及各个子模块的定义

保护性地编写readObject方法

在Java中&#xff0c;通过谨慎保护性地编写 readObject 方法&#xff0c;我们可以在对象反序列化的过程中加入额外的安全检查和验证&#xff0c;以确保反序列化后的对象的状态是合法和安全的。以下是一个简单的例子&#xff0c;演示如何保护性地编写 readObject 方法&#xff1…

表格el-tooltip和show-overflow-tooltip衝突

表格el-tooltip和show-overflow-tooltip衝突&#xff1a; 二、产品需要实现的效果如下 三、解决文案 1、HTML代码 <el-table:data"tableData"header-row-class-name"custom-table-header"header-cell-class-name"custom-table-header-cell"…

【高效写作技巧】文章质量分有什么用?如何提高质量分

&#x1f3ac; 鸽芷咕&#xff1a;个人主页 &#x1f525; 个人专栏:《写作技巧》 《C嘎嘎干货基地》 ⛺️生活的理想&#xff0c;就是为了理想的生活! &#x1f4cb; 前言 &#x1f308;hello&#xff01; 各位铁铁们大家好啊&#xff0c;这段时间接触了一些刚开始写作的新人…

Command line is too long. Shorten command line for Application or also

一、问题描述 Error running ‘Application’: Command line is too long. Shorten command line for Application or also for Spring Boot default configuration? 二、原因分析 springboot项目启动命令过长&#xff01; 三、解决方案 第1步:点击项目启动配置项 第2步…

Python 爬虫之简单的爬虫(二)

爬取百度热搜榜 文章目录 爬取百度热搜榜前言一、展示哪些东西二、基本流程三、前期数据获取1.引入库2.请求解析获取 四、后期数据处理1.获取保存 总结 前言 每次打开浏览器&#xff0c;我基本上都会看一下百度热搜榜。这篇我就写一下如何获取百度的热搜榜信息吧。 如果到最后…

C++ list常用操作

目录 一、介绍 二、list的常用操作 1、构造 2、迭代器 3、元素访问 4、容量操作 一、介绍 std::list文档链接 list是可以在常数范围内在任意位置进行插入和删除的序列式容器&#xff0c;并且该容器可以前后双向迭代。list的底层是双向链表结构&#xff0c;双向链表中每个…

D3D12可编程渲染流水线

一、初始化D3D库 启用 DirectX数学库 x86需要启用SSE2指令集&#xff0c;所有平台均需将浮点模型设置为fast。默认为&#xff1a; 精度 (/fp:precise)。 #include <DirectXMath.h> #include <DirectXPackedVector.h> 启用调试模式下的内存泄漏检测 // Enabl…