RabbitMQ消息队列实战(5)—— 发后即忘和远程RPC数据传输模型

news2025/1/21 0:56:35

        本文我们学习下使用RabbitMQ实现的几种数据发送的模型——发后即忘模型和远程RPC调用。二者实际上是从业务的角度定义的一个RabbitMQ的使用模型。发后即忘模型,强调发送时不太关心消息接收者的执行结果,仅仅是为了发送信息。而远程RPC调用模型强调,另外开辟通道获取消息接收者的执行结果,而且执行的结果直接影响业务。

        从业务上来划分,通常我们通过MQ发出的信息可以分为三种:消息、命令和事件。对于消息来说,我们发送之后不期望会得到回复,或者说不期望马上得到回复,类似于我们接收到手机短信,只是知道这件事情。然后我们怎么去处理或者去不去处理,实际上给我们发送短信的人并不关心,所以这种情况下比较适合使用发后即忘模型。当发送的是命令时,信息的发送者明确知道接受者是谁,通过命令的方式让接收者去进行某项业务,并期望得到反馈,这种情况下比较适合采用远程RPC调用的模型。而最后一种事件,更像是在EDA(Event Driven Architecture)的系统中定义的一种命令,不过命令的格式紧紧和业务模型绑定,所以这里单独提出来叫做事件。很显然,也是使用远程RPC调用的数据发送模型比较合适。

        接下来,我们将以实例的方式分别介绍发后即忘模型和远程RPC调用模型的使用。

一、发后即忘模型

        我们用代码模拟这样一种业务——业务日志的记录。业务日志其实最符合发后即忘模型的要求,因为日志的记录和我们完成一个业务无关(日志记录成功与否都不会影响业务的成败)。有过编程经验的童鞋都知道,日志按照级别来划分从低到高,可以分为三种:debug、info和error。在这个模型中,我们创建一个topic exchange,然后分别以debug、info和error为主题分别绑定到三个队列。不同级别的日志消费者订阅不同的队列,然后记录到不同的日志文件(或者同一个文件使用不同的标识区分)中。

        整个消息的流通图如下:

 

        消息由生产者产生之后,通过一个topic交换机,根据不同的topic发送到响应的队列中,然后定义了3个消费者,每个消费者订阅了存放不同级别日志的通道,获取消息后进行相应的处理。

        我们决定采用spring boot集成RabbitMQ的方式实现,首先配置相关的exchange、binding和queue,如下代码:

@Configuration
public class RabbitConfig {
    @Value("${spring.rabbitmq.host}")
    private String host;
    @Value("${spring.rabbitmq.port}")
    private int port;
    @Value("${spring.rabbitmq.username}")
    private String username;
    @Value("${spring.rabbitmq.password}")
    private String password;
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setPublisherConfirms(true);
        return connectionFactory;
    }
    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_SINGLETON)
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        return template;
    }
    @Bean
    TopicExchange logTopicExchange() {
        return new TopicExchange("logTopicExchange", true, false);
    }
    @Bean
    public Queue debugQueue() {
        return new Queue("debugQueue", true, false, false);
    }
    @Bean
    public Queue infoQueue() {
        return new Queue("infoQueue", true, false, false);
    }
    @Bean
    public Queue errorQueue() {
        return new Queue("errorQueue", true, false, false);
    }
    @Bean
    Binding bindingDebugQueue() {
        return BindingBuilder.bind(debugQueue()).to(logTopicExchange()).with("debug");
    }
    @Bean
    Binding bindingInfoQueue() {
        return BindingBuilder.bind(infoQueue()).to(logTopicExchange()).with("info");
    }
    @Bean
    Binding bindingErrorQueue() {
        return BindingBuilder.bind(errorQueue()).to(logTopicExchange()).with("error");
    }
}

第1行:通过@Configuration注解开启配置支持

第3~10行:引入配置文件中的RabbitMQ的配置信息

第11~19行:创建链接工厂ConnectionFactory 实例。

第20~25行:创建RabbitTemplate实例,后面将用它来发送消息。

第26~29行:创建了名称为logTopicExchange的主题交换机

第30~41行:创建三个队列。

第42~53行:分别将队列绑定到交换机上。

        完成了上述生产者端的配置,接下来我们看下发送消息的代码:

public class LogServiceImpl implements LogService {

    private ExecutorService executorService = Executors.newFixedThreadPool(10);

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Override
    public void sendMsg(String routeKey, String msg) {
        MessageProperties messageProperties = new MessageProperties();
        // 设置过期时间,单位:毫秒,30分钟
        messageProperties.setExpiration("1800000");
        messageProperties.setContentType("text/plain");
        messageProperties.setContentEncoding("UTF-8");
        byte[] msgBytes = msg.getBytes();
        Message message = new Message(msgBytes, messageProperties);
        CompletableFuture.runAsync(() -> rabbitTemplate.convertAndSend(
                "logTopicExchange",
                routeKey,
                message), executorService);
    }
    }

第10~16行:这是在通过传过来的消息来设置Message对象,可以看到,为了防止消息不能被及时读取而大量堆积,这里设置了消息的超时为半个小时。

第17~20行:我们选择了异步发送消息的方法,主要是考虑到业务日志的写入不应该影响业务的实现,而又不会关心日志写入的结果,所以这里采用了异步的方式。

        建立单元测试,发送消息之后可以看到,交换机和队列都已经创建,而且消息已经正确路由到了队列中。

生成的交换机:

 生成的队列:

         生产者一方的准备工作做好之后,我们看下消费者的处理。相比生产者,消费者的实现要简单的多,有关RabbitMQ的配置这里不再重复列举,只看下消费者的监听部分代码:

@Service
public class LogServiceImpl implements LogService {
    @Override
    @RabbitListener(queues = {"debugQueue"})
    public void writeDebug(Message message) {
        String str=new String(message.getBody());
        System.out.println(str);
    }

    @Override
    @RabbitListener(queues = {"infoQueue"})
    public void writeInfo(Message message) {
        String str=new String(message.getBody());
        System.out.println(str);
    }

    @Override
    @RabbitListener(queues = {"errorQueue"})
    public void writeError(Message message) {
        String str=new String(message.getBody());
        System.out.println(str);
    }
}

第1行,@Service必不可少,需要将监听的服务类托管到IOC中。

第4、11、18行,使用3个 @RabbitListener注解来监听debugQueue、infoQueue和errorQueue三个队列

        以上就是我们简单实现的一个发后即忘模型的案例。虽然简单,但是足以作为一个经典案例。而且有些细节需要注意,比如:在发送消息时要考虑异步发送,才不会对业务代码进行干扰。接下来我们开始用实例解释下RabbitMQ远程RPC调用的方式。

二、远程RPC调用模型

        所谓远程RPC方式调用模型,在上文中我们已经介绍过,简单理解就是发送信息后,生产者一直等待消费者返回消费后的结果。那么问题来了,消费者是怎么把消费的结果返回给生产者呢?毋庸置疑,消费者返回的肯定也是一个消息,那么这个消息要通过哪个交换机?到达哪个通道?下面我们就来一一解决这些问题。

        首先,看下远程RPC方式调用模型的示意:

 

        笔者来解释下整个过程:

(1)生产者向业务交换机里面发送业务命令或者事件,同时需要创建一个只有自己能够监听的而且是保证队列名称唯一的私密队列,然后开始监听这个队列。

(2)发送消息的消息头中具有一个叫做reply_to的字段,这个字段设置为上一步骤创建的队列名称。

(3)消费者获取到业务命令或者事件之后,开始执行业务。执行完成业务之后,将回复消息通过默认的交换机传递到reply_to队列里面。

(4)生产者接收到消费者回复的消息之后,完成业务,结束等待。

        下面,我们来看下生产者端的代码。ConnectionFactory等基本配置我这里不再展示,需要特别注意的是引入了一个新的Bean——simpleMessageListenerContainer,主要用来手动添加监听的队列以及监听器。

@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer() {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
    return container;
}

        接下来是远程RPC调用的方法:

public void sendRPCMsg(String routeKey, String msg) {
    RabbitAdmin admin = new RabbitAdmin(connectionFactory);
    Queue replytoQueue = admin.declareQueue();
    MessageProperties messageProperties = new MessageProperties();
    messageProperties.setReplyTo(replytoQueue.getName());
    byte[] msgBytes = msg.getBytes();
    Message message = new Message(msgBytes, messageProperties);
    rabbitTemplate.convertAndSend(
            "eventTopicExchange",
            routeKey,
            message);
    Thread currentThread = Thread.currentThread();
    simpleMessageListenerContainer.addQueues(replytoQueue);
    simpleMessageListenerContainer.setMessageListener(new MessageListener() {

        @Override
        public void onMessage(Message message) {
            String str = new String(message.getBody());
            System.out.println(str);
            simpleMessageListenerContainer.removeQueues(replytoQueue);
            LockSupport.unpark(currentThread);
        }
    });
    LockSupport.park();
}

第1~2行:创建一个RabbitAdmin对象,这个对象可以手动创建交换机、队列等等。

第4~11行:将队列名称放到消息的reply_to头部,并且进行消息的发送。

第13~14行:使用simpleMessageListenerContainer监听新创建的队列,并且设置监听对象。

第24行:保持线程阻塞,然后在第21行解除阻塞状态。

        我们在第2行创建了一个队列,我们看下declareQueue方法的定义:

public Queue declareQueue() {
   try {
      DeclareOk declareOk = this.rabbitTemplate.execute(Channel::queueDeclare);
      return new Queue(declareOk.getQueue(), false, true, true); // NOSONAR never null
   }
   catch (AmqpException e) {
      logOrRethrowDeclarationException(null, "queue", e);
      return null;
   }
}

     注意上述代码的第4行,在这里实际上创建了一个随机名称的队列,RabbitMQ会保证队列名称的唯一,而创建的Queue对象的后面三个boolean类型的参数指明了队列是不可持久化的、排他的、以及自动删除,也就是说创建的队列只能当前的channel自己监听,而且一旦队列里面没有消息或者channel关闭队列就会消失。就是这些属性,保证了创建了一个临时性的队列,而且其他消费者无法进行监听。

        最后,我们再看下消费者的处理逻辑:

@RabbitListener(queues = {"eventQueue"})
public void getMsg(Message message) {
    String str = new String(message.getBody());
    System.out.println(str);
    String replayTo = message.getMessageProperties().getReplyTo();
    System.out.println("replayTo =" + replayTo);
    byte[] msgBytes = "我收到了".getBytes();
    MessageProperties messageProperties = new MessageProperties();
    Message replayMessage = new Message(msgBytes, messageProperties);
    try {
        rabbitTemplate.send(replayTo, replayMessage);
    } catch (AmqpException e) {
        e.printStackTrace();
    }
}

第1行:使用RabbitListener监听名称为eventQueue的队列。

第5行:从接受到的消息中获取replay_to的队列名称。

第11行:向生产者回复消息

        我们看到,相比生产者,消费者代码要简单的多,就是多了一个获取replay_to队列并发送消息的过程。下面看下replay_to队列的庐山真面目,如下图红色圈出部分:

 

三、总结

        本文主要介绍了RabbitMQ发后即忘和远程RPC调用两种数据发送模型,现总结如下:

(1)发后即忘数据发送模型针对发送的信息生产者不关心对方的处理结果这一业务前提实现,实现起来比较简单,但是需要注意发送消息时应该采用异步发送,避免消息的发送影响业务。

(2)如果需要等待消费者的返回结果,应该采用远程RPC调用数据发送模型。生产者自己创建接受回复消息的队列,而且应该保证队列名称唯一、队列私有和支持自动删除,通过消息的reply_to头部将队列名称发送给消费者,消费者再通过RabbitMQ的默认交换机向reply_to队列回复消息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/511198.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

vulnhub靶场之double:1

1.信息收集 探测存活主机,发现192.168.239.178存活 对目标主机192.168.239.176进行端口扫描,发现存活22(SSH)、25(smtp)、80、8080端口。 浏览器访问http://192.168.239.178,发现有两个链接。查看源码发现一个是跳转到/production&#xf…

David Silver Lecture 6: Value function approximation

1 Introduction pipeline大致讲完了,开始到数值计算的部分。 1.1 大规模的运算 对于这种大规模运算,如何拓展前面两个章节的内容,进行实战。 1.1.1 回顾value function approximation 1.1.3 which function approximator 强化学习中的值函…

java源码----集合系列1----ArrayList,linkedList

Arraylist 基础信息 底层是一个object数组 Arraylist 是java里面Collection 标准的一个集合,其底层是一个object数组。当new一个空参的ArrayList的时候,会默认生成一个空数组。 Arraylist上限是 Integer.MAX_VALUE - 8(Integer.MAX_VALUE 2^31-1);…

一文搞定接口测试及常用接口测试工具解析

目录 首先,什么是接口呢? 一、常见接口: 二、前端和后端: 三、什么是接口测试: 四、接口组成 五、为什么要做接口测试: 六、接口测试怎么测: 七、用什么工具测 首先,什么是接…

软件工程开发文档写作教程(06)—项目建议书写作规范

本文原创作者:谷哥的小弟作者博客地址:http://blog.csdn.net/lfdfhl本文参考资料:电子工业出版社《软件文档写作教程》 马平,黄冬梅编著 项目建议书概述 项目建议书一般是由主策划或者项目经理负责编写的。进行可行性分析是一个自…

3.编写油猴脚本之-helloword

3.编写油猴脚本之-helloword Start 通过上一篇文章的学习,我们安装完毕了油猴插件。今天我们来编写一个helloword的脚步,体验一下油猴。 1. 开始 点击油猴插件>添加新脚本 默认生成的脚本 // UserScript // name New Userscript // name…

Linux介绍及环境搭建

文章目录 🎬1.Linux背景💻1.1 计算机的发展💻1.2 操作系统的故事💻1.3 Linux操作系统💻1.4 Linux的应用场景💻1.5 Linux版本 🔌2. Linux环境💾2.1 环境选择💾2.2 云服务器…

u1s1,查问题已经从百度到Google,再从Google到gpt了

现在查问题,查资料,基本都是问gpt。 感觉AI的回答会比较智能。 除了解释说明,还会附录Demo源码。 而且没有广告和其他杂七杂八的。 方便/快捷,提高了工作效率。 举例 上传图片后无法渲染的文章,发现数据库的图片地址前缀带blob,可…

数据结构-查找-线性结构(顺序、折半、分块)查找

目录 一、顺序查找 *查找效率分析 二、折半查找 *查找效率分析 三、分块查找 *查找效率分析 一、顺序查找 有称线性查找, 算法思想:从头到尾挨个查找(反过来也行) typedef struct{int *elem; //数据int TableLen; …

MySQL原理(六):日志

前言 上一篇介绍了 MySQL 的锁,这一篇将介绍日志相关的内容。 MySQL 中最常见的日志有三类: undo log(回滚日志):是 Innodb 存储引擎层生成的日志,实现了事务中的原子性,主要用于事务回滚和 …

MATLAB程序在设备端部署实例

背景介绍 MATLAB广泛应用于物理系统建模、测量测试、系统控制以及深度学习等,在工程实践中具有非常重要的地位,具体如图1所示。调研发现,科研人员能够编写各种matlab代码,通过建模仿真来更好的认识世界。近年来,随着物…

《LeetCode》—— 摆动序列

今天,我们要讲解的是 “摆动序列” 这道题目。对于这道题目,我们可以从贪心的思想去解决,也可以使用动态规划的方法。接下来,我通过这两种方法的讲解让你轻松拿捏它! 目录 (一)贪心算法 1、上下…

跑在笔记本里的大语言模型 - GPT4All

何为GPT4All GPT4All 官网给自己的定义是:一款免费使用、本地运行、隐私感知的聊天机器人,无需GPU或互联网。 从官网可以得知其主要特点是: 本地运行(可包装成自主知识产权🐶)无需GPU(穷人适配…

sort、uniq、tr、cut的使用

管理文件内容的使用 一、sort命令二、uniq命令三、tr命令四、cut命令 一、sort命令 sort命令是以行为单位对文件内容进行排序,也可以根据不同的数据类型来排序,比较原则是从首字符向后,依次按ASCII码进行比较,最后将他们按升序输…

Linux:rpm查询安装 yum安装

环境: 需要插入安装镜像 镜像内有所需的安装库 我这里使用的虚拟机直接连接光盘 连接的光盘挂载在/dev/cdrom 由于我们无法直接进入,所以选择把/dev/cdrom挂载到别的地方即可 mount /dev/cdrom /123 将/dev/cdrom 挂载到 /123 目录下 Packages下就是…

C++笔记—— 第十七篇 智能指针 C++11来了(下)

目录 1. 为什么需要智能指针 2. 内存泄漏 2.1 什么是内存泄漏,内存泄漏的危害 2.2 内存泄漏分类 2.3如何避免内存泄漏 3.智能指针的使用及原理 3.1 RAII 3.2 智能指针的原理 3.3 std::auto_ptr 3.4 std::unique_ptr 3.5 std::shared_ptr shared_ptr的线…

JVM性能调优

一、JVM内存模型及垃圾收集算法 1.根据Java虚拟机规范,JVM将内存划分为: New(年轻代) Tenured(年老代) 永久代(Perm) 其中New和Tenured属于堆内存,堆内存会从JVM启动参…

【牛客刷题专栏】0x28:JZ30 包含min函数的栈(C语言编程题)

前言 个人推荐在牛客网刷题(点击可以跳转),它登陆后会保存刷题记录进度,重新登录时写过的题目代码不会丢失。个人刷题练习系列专栏:个人CSDN牛客刷题专栏。 题目来自:牛客/题库 / 在线编程 / 剑指offer: 目录 前言问…

【神经网络】tensorflow实验9--分类问题

1. 实验目的 ①掌握逻辑回归的基本原理,实现分类器,完成多分类任务; ②掌握逻辑回归中的平方损失函数、交叉熵损失函数以及平均交叉熵损失函数。 2. 实验内容 ①能够使用TensorFlow计算Sigmoid函数、准确率、交叉熵损失函数等&#xff0c…

(浙大陈越版)数据结构 第二章 线性结构 2.4 多项式的加法和乘法运算实现

目录 2.4.1多项式的加法运算实现 如何设计一个函数分别求两个一元多项式的和? 算法思路:两个指针p1,p2分别指向两个多项式的第一个结点(最高项)并循环 循环: 2.4.2 多项式的乘积 1.多项式的表示 2.程…