RabbitMQ学习整理————基于RabbitMQ实现RPC

news2024/11/24 17:31:00

基于RabbitMQ实现RPC

  • 前言
  • 什么是RPC
  • RabbitMQ如何实现RPC
  • RPC简单示例
  • 通过Spring AMQP实现RPC

前言

这边参考了RabbitMQ的官网,想整理一篇关于RabbitMQ实现RPC调用的博客,打算把两种实现RPC调用的都整理一下,一个是使用官方提供的一个Java client,还有一个是Spring AMQP的整合使用。
代码路径:https://github.com/yzh19961031/blogDemo/tree/master/rabbitmq

什么是RPC

RPC是远程过程调用(Remote Procedure Call)的缩写形式,简单说就是一个节点去请求另一个节点上面的服务并获得响应结果。
我们之前总结的工作模式都是发送消息到指定的队列,再由相关的消费者进行消费,如果存在这样的场景,比如消费者消费完消息需给生产者一个具体的响应,然后生产者再根据这个响应进行其他的业务逻辑,这样就需要使用到RabbitMQ提供的RPC能力。

RabbitMQ如何实现RPC

官方有很详细的介绍文档,这边贴一下地址:https://www.rabbitmq.com/tutorials/tutorial-six-java.html
RabbitMQ实现RPC很简单,正常的流程就是请求以及响应,我们只需要在请求的消息的属性里面添加一个响应队列的地址,这边需要使用到一个BasicProperties这个类。具体配置如下:

// 指定一个回调队列
callbackQueueName = channel.queueDeclare().getQueue();
// 设置replyTo的属性为指定的回调队列
BasicProperties props = new BasicProperties
                            .Builder()
                            .replyTo(callbackQueueName)
                            .build();

channel.basicPublish("", "rpc_queue", props, message.getBytes());

BasicProperties这个类中提供了很多的属性,有14个,很多基本上很少用到,常用的就是几个,我这边也贴一下,其实在我上一篇文章中基于RabbitMQ实现的一个RPC工具里面都有用到这些属性。

  1. contentType 这个属性用来表明消息的类型,默认是"application/octet-stream"这种流的类型,还有常用的比如"application/json","text/plain"等,这些在我的RPC工具里面都有用到。
  2. replyTo 这个就是上面指定的回调队列。
  3. correlationId 这个id可以用来进行消息的确认,将相应与请求相关联。主要是可以确认服务端收到的消息是不是指定客户端发过来的,用于确认。

首先先贴一张官方提供的图,这个是RabbitMQ实现RPC的主要工作流程:
在这里插入图片描述
实现RPC的具体工作流程:

  1. 首先客户端发送一个请求消息,这个请求消息里面有两个属性,一个是replyTo回调队列的地址,一个是correlationId用于标识当前消息唯一的id信息。
  2. 这个消息是发送到指定的rpc_queue这个队列上面。
  3. 对应我们的服务端Server就会等待rpc_queue上面的请求消息,当请求消息来得时候,服务端会进行处理,处理完成会将相应的消息再发送到请求消息属性中的replyTo回调的队列上面。
  4. 客户端发送消息之后,会等待replyTo队列中的消息。当有消息来得时候,会检查响应消息中correlationId属性和请求消息中correlationId是否一致,完成一次PRC调用。

RPC简单示例

我这边根据官网上面提供的例子简单修改整理了一下,这边提供一个大小写转换的功能,就是客户端发送一段小写的字符串,服务端将字符串转为大写再响应过来。详细逻辑可以看下代码中注释,具体代码如下:
首先服务端:

/**
 * RPC服务端
 *
 * @author yuanzhihao
 * @since 2020/11/21
 */
public class RPCServer {

    public static void main(String[] args) throws IOException, TimeoutException {
        // 首先还是正常获得connection以及channel对象
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.1.108");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        // 定义一个rpc的队列
        String queueName = "test_rpc";
        channel.queueDeclare(queueName, false, false, false, null);

        Object monitor = new Object();
        // 具体的消费代码里面实现
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            // 消费者将请求消息中的correlationId信息再作为响应传回replyTo队列
            AMQP.BasicProperties replyProps = new AMQP.BasicProperties
                    .Builder()
                    .correlationId(delivery.getProperties().getCorrelationId())
                    .build();

            String response = "";
            try {
                // 提供一个大小写转换的方法
                String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
                System.out.println("toUpperCase(" + message + ")");
                response = toUpperCase(message);
            } catch (RuntimeException e) {
                System.out.println(e.toString());
            } finally {
                // 将响应传回replyTo队列
                channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes(StandardCharsets.UTF_8));
                // 设置了手动应答 需要手动确认消息
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                // 执行完成会释放主线程的锁
                // RabbitMq consumer worker thread notifies the RPC server owner thread
                synchronized (monitor) {
                    monitor.notify();
                }
            }
        };

        // 监听"test_rpc"队列
        channel.basicConsume(queueName, false, deliverCallback, (consumerTag -> { }));
        // 这个锁对象是确保我们server的调用逻辑执行完成 首先挂起主线程
        // Wait and be prepared to consume the message from RPC client.
        while (true) {
            synchronized (monitor) {
                try {
                    monitor.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    // 提供一个大小写转换的方法
    private static String toUpperCase(String msg) {
        return msg.toUpperCase();
    }
}

客户端:

/**
 * RPC客户端
 *
 * @author yuanzhihao
 * @since 2020/11/21
 */

public class RPCClient {

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        // 创建connection以及channel对象
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.1.108");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        try ( Connection connection = connectionFactory.newConnection();
              Channel channel = connection.createChannel()) {
            // 声明一个队列
            String queueName = "test_rpc";

            // 请求消息中需要带一个唯一标识ID 
            String corrId = UUID.randomUUID().toString();
            // 声明一个回调队列
            String replayQueueName = channel.queueDeclare().getQueue();
            // 将correlationId以及回调队列设置在消息的属性中
            AMQP.BasicProperties properties = new AMQP.BasicProperties
                    .Builder()
                    .correlationId(corrId)
                    .replyTo(replayQueueName)
                    .build();
            // 具体消息内容
            String msg = "hello rpc";
            // 发送请求消息
            channel.basicPublish("",queueName,properties,msg.getBytes());
            // 设置一个阻塞队列  等待服务端的响应
            final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);

            String ctag = channel.basicConsume(replayQueueName, true, (consumerTag, message) -> {
                // 注意 这边根据correlationId进行下判断
                if (message.getProperties().getCorrelationId().equals(corrId)) {
                    response.offer(new String(message.getBody(), StandardCharsets.UTF_8));
                }
            }, consumerTag -> {});

            // 获取响应结果
            String take = response.take();
            System.out.println("rpc result is "+ take);
            channel.basicCancel(ctag);
        }
    }
}

执行代码,具体的客户端与服务端运行结果在这里插入图片描述 在这里插入图片描述

通过Spring AMQP实现RPC

通过Spring来实现RPC也很简单,主要通过spring提供的一个RabbitTemplate对象中sendAndReceive方法来实现,这个方法是发送消息然后一直等待响应。监听器里面实现的和之前的逻辑大致相同,都需要将response响应消息发送到对应的replyTo回调队列上。下面直接贴一下代码。
首先是服务端,我这边直接是使用配置类的形式,具体一些的配置项可以参考下我之前的那篇博客或者上网搜一下~

/**
 * 主配置类
 *
 * @author yuanzhihao
 * @since 2021/1/9
 */
@Configuration
public class RabbitMQConfig {

    private static final Logger log = LoggerFactory.getLogger(RabbitMQConfig.class);

    // 注入connectionFactory对象
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses("192.168.1.108:5672");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");
        return connectionFactory;
    }

    // 声明队列
    @Bean
    public Queue rpcQueue() {
        return new Queue("test_rpc",false);
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        return new RabbitTemplate(connectionFactory());
    }

    // 创建初始化RabbitAdmin对象
    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        // 只有设置为 true,spring 才会加载 RabbitAdmin 这个类
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }

    // 消息监听器
    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer(RabbitTemplate rabbitTemplate) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
        // 监听的队列
        container.setQueues(rpcQueue());
        MessageListener messageListener = message -> {
            String receiveMsg = new String(message.getBody(), StandardCharsets.UTF_8);
            log.info("Receive a message message is {}", receiveMsg);
            // 执行对应逻辑
            String responseMsg = toUpperCase(receiveMsg);
            MessageProperties messageProperties = MessagePropertiesBuilder.newInstance().
                    setCorrelationId(message.getMessageProperties().getCorrelationId()).
                    build();
            // 响应消息 这边就是如果没有绑定交换机和队列的话 消息应该直接传到对应的队列上面
            rabbitTemplate.send("", message.getMessageProperties().getReplyTo(), new Message(responseMsg.getBytes(StandardCharsets.UTF_8), messageProperties));
        };
        // 设置监听器
        container.setMessageListener(messageListener);
        return container;
    }

    // 提供一个大小写转换的方法
    private String toUpperCase(String msg) {
        return msg.toUpperCase();
    }
}

客户端我采用test单元测试的形式

/**
 * spring amqp rpc 测试类
 *
 * @author yuanzhihao
 * @since 2021/1/9
 */
@ContextConfiguration(classes = {RabbitMQConfig.class})
@RunWith(SpringRunner.class)
public class RabbitMQRpcTest {
    private static final Logger log = LoggerFactory.getLogger(RabbitMQConfig.class);

    @Autowired
    private RabbitTemplate rabbitTemplate;

    // 测试RPC客户端
    @Test
    public void testRpcClient() {
        // 设置correlationId
        String corrId = UUID.randomUUID().toString();
        String msg = "hello rpc";
        MessageProperties messageProperties = MessagePropertiesBuilder.newInstance().setCorrelationId(corrId).build();
        // 注意 这边如果使用sendAndReceive不指定replyTo回调队列 spring会默认帮我们添加一个回调队列
        // 格式默认 "amq.rabbitmq.reply-to" 前缀
        Message message = rabbitTemplate.sendAndReceive("", "test_rpc", new Message(msg.getBytes(StandardCharsets.UTF_8), messageProperties));
        log.info("The response is {}", new String(message.getBody(), StandardCharsets.UTF_8));

    }
}

具体实现可以看下代码的注释
代码执行结果:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1465543.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

适用于生物行业的样本管理系统

在生物样本管理系统的应用中&#xff0c;我们首先需要了解生物样本的特点和要求。生物样本具有多样性和易变性&#xff0c;需要被妥善保存和跟踪&#xff0c;以确保其质量和可用性。 因此&#xff0c;一个有效的生物样本管理系统需要具备以下特点&#xff1a; 全面性&#xff1…

测试用例设计方法:招式组合,因果判定出世

1 引言 上篇讲了等价类划分和边界值分析法&#xff0c;而这两种方法只考虑了单个的输入条件&#xff0c;并未考虑输入条件的各种组合、输入条件之间的相互制约关系的场景。基于此短板&#xff0c;因果图法和判定表法应运而生。 2 因果图法 2.1 概念及原理 2.1.1 定义 一种…

外包干了两个月,技术退步明显。。。。。

先说一下自己的情况&#xff0c;本科生&#xff0c;19年通过校招进入广州某软件公司&#xff0c;干了接近4年的功能测试&#xff0c;今年年初&#xff0c;感觉自己不能够在这样下去了&#xff0c;长时间呆在一个舒适的环境会让一个人堕落!而我已经在一个企业干了四年的功能测试…

2024/02/23

使用消息队列完成两个进程间相互通信 A.c #include<myhead.h> struct msgbuf {long mtype;char mtext[1024]; }; //定义表示正文内容大小的宏 #define MSGSIZE sizeof(struct msgbuf)-sizeof(long)int main(int argc, const char *argv[]) {//创建一个key值key_t key;ke…

快速学习安全框架 Springsecurity最新版(6.2)--用户授权模块

简介 上一节Springsecurity 用户认证 Springsecurity 拥有强大的认证和授权功能并且非常灵活&#xff0c;,一来说我们都i有以下需求 可以帮助应用程序实现以下两种常见的授权需求&#xff1a; 用户-权限-资源&#xff1a;例如张三的权限是添加用户、查看用户列表&#xff0c;李…

springboot214基于springboot的多媒体素材库的开发与应用

多媒体素材库的设计与实现 摘要 近年来&#xff0c;信息化管理行业的不断兴起&#xff0c;使得人们的日常生活越来越离不开计算机和互联网技术。首先&#xff0c;根据收集到的用户需求分析&#xff0c;对设计系统有一个初步的认识与了解&#xff0c;确定多媒体素材库的总体功…

CentOS使用Docker搭建Halo网站并实现无公网ip远程访问

&#x1f525;博客主页&#xff1a; 小羊失眠啦. &#x1f3a5;系列专栏&#xff1a;《C语言》 《数据结构》 《C》 《Linux》 《Cpolar》 ❤️感谢大家点赞&#x1f44d;收藏⭐评论✍️ 前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&…

阿里云2024年优惠政策合集,你要的阿里云优惠政策都在这!

2024年阿里云优惠活动大全&#xff0c;包括阿里云服务器优惠活动清单、配置价格表、域名优惠活动、阿里云建站活动、阿里云优惠代金券免费领取、对象存储OSS活动、企业邮箱优惠、无影云电脑优惠、CDN特惠等等&#xff0c;阿里云百科aliyunbaike.com分享2024阿里云优惠活动大全_…

Linux RocketMQ 安装及卸载(附控制台搭建)

一、前言 在安装 RocketMQ 前需要确保 JDK 已安装并正确配置环境变量 二、下载安装 1.下载 下载 | RocketMQ 2.安装 # 打开存放目录 cd /usr/local # 创建目录 mkdir rocketMQ # 进入目录 cd rocketMQ # 把下载的压缩包上传到 rocketMQ 目录中 # 解压 $ unzip rocketmq-all-…

Elasticsearch:基于 Langchain 的 Elasticsearch Agent 对文档的搜索

在今天的文章中&#xff0c;我们将重点介绍如何使用 LangChain 提供的基础设施在 Python 中构建 Elasticsearch agent。 该 agent 应允许用户以自然语言询问有关 Elasticsearch 集群中数据的问题。 Elasticsearch 是一个强大的搜索引擎&#xff0c;支持词法和向量搜索。 Elast…

人工智能 — 数字图像

目录 一、图像1、像素2、图像分辨率3、RGB 模型4、灰度5、通道6、对比度7、RGB 转化为 Gray8、RGB 值转化为浮点数9、二值化10、常用视觉库11、频率12、幅值 二、图像的取样与量化1、数字图像2、取样3、量化 三、上采样与下采样1、上采样&#xff08;upsampling&#xff09;2、…

NOIP2018-J-4-对称二叉树的题解

原题描述&#xff1a; 题目描述 时间&#xff1a;1s 空间&#xff1a;256M 一棵有点权的有根树如果满足以下条件&#xff0c;则被轩轩称为对称二叉树&#xff1a; 1. 二叉树&#xff1b; 2. 将这棵树所有节点的左右子树交换&#xff0c;新树和原树对应位置的结构相同且…

【机器学习的基本术语和概念】

曾梦想执剑走天涯&#xff0c;我是程序猿【AK】 目录 简述概要知识图谱 简述概要 提示&#xff1a;简要描述文章内容&#xff0c;适合哪些人观看 知识图谱 样本&#xff08;Sample&#xff09;/实例&#xff08;Instance&#xff09;&#xff1a;在机器学习中&#xff0c;我…

vue-利用属性(v-if)控制表单(el-form-item)显示/隐藏

表单控制属性 v-if 示例&#xff1a; 通过switch组件作为开关&#xff0c;控制表单的显示与隐藏 <el-form-item label"创建数据集"><el-switch v-model"selectFormVisible"></el-switch></el-form-item><el-form-item label&…

AndroidStudio 2024-2-21 Win10/11最新安装配置(Kotlin快速构建配置,gradle镜像源)

AndroidStudio 2024 Win10/11最新安装配置 教程目的&#xff1a; (从安装到卸载) &#xff0c;针对Kotlin开发配置&#xff0c;gradle-8.2-src/bin下载慢&#xff0c;以及Kotlin构建慢的解决 好久没玩AS了,下载发现装个AS很麻烦,就觉得有必要出个教程了(就是记录一下:嘻嘻) 因…

❤ hexo主题+Gitee搭建个人博客

Hexo的基本使用 ​官网 官网地址&#xff1a;https://hexo.io/zh-cn/ Hexo是一个快速、简洁且高效的博客框架。Hexo 使用 Markdown&#xff08;或其他渲染引擎&#xff09;解析文章&#xff0c;在几秒内&#xff0c;即可利用靓丽的主题生成静态网页。即把用户的markdown文件…

开源LLMs导览:工作原理、顶级LLM列表对比

目录 一、开源 LLM 是什么意思&#xff1f;二、开源LLM如何工作&#xff1f;2.1 预训练2.2 代币化2.3 开源LLM的微调2.4 输入编码2.5 训练与优化2.6 推理 三、开源LLM对组织的好处3.1 增强的数据安全和隐私3.2 节约成本3.3 减少供应商依赖性3.4 代码透明度 四、哪种LLM模式最好…

AcrelEMS-HIM高速公路综合能效系统在高速公路的案例

摘 要&#xff1a;我国新型工业化、信息化、城镇化和农业现代化加快发展&#xff0c;经济结构加快转型&#xff0c;交通运输总量将保持较快增长态势&#xff0c;各项事业发展要求提高国家公路网的服务能力和水平。高速公路沿线的收费站、互通枢纽、服务区、隧道等配置的供配电、…

白令海峡的题解

目录 原题描述&#xff1a; 题目描述 输入格式 输出格式 样例输入 样例输出 样例解释 数据规模 主要思路&#xff1a; 小细节&#xff1a; 代码code: 原题描述&#xff1a; 时间限制: 1000ms 空间限制: 524288kB 题目描述 很久很久以前&#xff0c;一座大陆桥横…

云图极速版限时免费活动

产品介绍 云图极速版是针对拥有攻击面管理需求的用户打造的 SaaS 应用&#xff0c;致力于协助用户发现并管理互联网资产攻击面。 实战数据 (2023.11.6 - 2024.2.23) 云图极速版上线 3 个月以来&#xff0c;接入用户 3,563 家&#xff0c;扫描主体 19,961 个&#xff0c;累计发…