RabbitMQ入门指南(三):Java入门示例

news2025/1/12 23:03:32

专栏导航

RabbitMQ入门指南

从零开始了解大数据


目录

专栏导航

前言

一、AMQP协议

1.AMQP

2.Spring AMQP

二、使用Spring AMQP实现对RabbitMQ的消息收发

1.案例准备阶段

2.入门案例(无交换机)

3.任务模型案例(Work Queues)

总结


前言

RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了AMQP、Spring AMQP和使用SpringAMQP实现对RabbitMQ的消息收发等内容。


一、AMQP协议

1.AMQP

全称为Advanced Message Queuing Protocol,是一种用于在应用程序之间传递业务消息的开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求。通过AMQP,不同的应用程序可以在不改变各自实现方式的情况下进行跨平台、跨语言的消息通信。

AMQP协议定义了消息的传输方式和消息的元数据,例如消息的发送者、接收者、消息体、消息类型等。这些元数据可以帮助应用程序对消息进行正确的处理。

2.Spring AMQP

在Spring框架中,有一个Spring AMQP的项目,它基于AMQP协议定义了一套API规范,提供了模板来发送和接收消息。这个项目包含两部分,其中spring-amqp是基础抽象,而spring-rabbit是底层的默认实现。

Spring AMQP通过提供模板和抽象层,简化了应用程序与RabbitMQ的交互。它提供了一组易于使用的API,用于发送和接收消息。这些API可以帮助开发人员更专注于业务逻辑,而不是消息的发送和接收细节。

spring-rabbit是Spring AMQP的一部分,它基于RabbitMQ实现了AMQP协议。spring-rabbit提供了对RabbitMQ的封装,使开发人员可以通过简单的配置和API调用与RabbitMQ进行交互。

Spring AMQP 主要功能:

  • 自动声明和配置队列、交换机及其绑定关系:通过简化队列和交换器的创建和管理过程,Spring AMQP 帮助开发人员专注于实现业务逻辑,而不是手动配置消息中间件。
  • 基于注解的监听器模式,实现异步消息接收:通过注解,Spring AMQP 可以自动将方法与特定的队列或交换机绑定,从而实现异步接收和处理消息。这种模式提高了应用程序的响应性能和吞吐量。
  • 封装了 RabbitTemplate 工具,用于发送消息:RabbitTemplate 是 RabbitMQ 的核心类之一,用于发送和接收消息。Spring AMQP 提供了对这个工具的封装,使得开发人员可以方便地使用它来发送消息。

官方文档:

Spring AMQPicon-default.png?t=N7T8https://spring.io/projects/spring-amqp

二、使用Spring AMQP实现对RabbitMQ的消息收发

1.案例准备阶段

项目结构如下:

项目结构介绍:

  • mq-demo:父工程,管理项目依赖

  • publisher:消息的发送者

  • consumer:消息的消费者

在父工程引入spring-amqp依赖:

<!--AMQP依赖-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

项目完整依赖如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>cn.rye.demo</groupId>
    <artifactId>mq-demo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <modules>
        <module>publisher</module>
        <module>consumer</module>
    </modules>
    <packaging>pom</packaging>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.15</version>
        <relativePath/>
    </parent>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!--AMQP依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <!--单元测试-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
        <!--Jackson-->
        <dependency>
            <groupId>com.fasterxml.jackson.dataformat</groupId>
            <artifactId>jackson-dataformat-xml</artifactId>
        </dependency>
    </dependencies>
</project>

在application.yml中配置RabbitMQ服务端信息(每个微服务都需要配置):

spring:
  rabbitmq:
    host: 10.0.0.100
    port: 5672
    virtual-host: /demo
    username: user
    password: 123456

2.入门案例(无交换机)

案例模型:

在RabbitMQ管理控制台新建队列:

查看新建结果:

在publisher服务中编写测试类,并利用RabbitTemplate实现消息发送

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    void testSendMessage2Queue() {
        // 队列名称
        String queueName = "demo.queue";
        // 消息
        String msg = "First demo";
        // 发送消息
        rabbitTemplate.convertAndSend(queueName, msg);
    }

运行测试用例,查看结果:

在consumer服务中新建一个类实现消息接收

@Component
public class MqListener {

    @RabbitListener(queues = "demo.queue")
    public void listenSimpleQueue(String msg) {
        System.out.println("消息:" + msg);
    }
}

启动consumer服务,查看消息(一旦监听的队列中有了消息,就会推送给当前服务):

3.任务模型案例(Work Queues)

让多个消费者绑定一个队列,共同消费队列中的消息。

案例模型:

在RabbitMQ管理控制台新建队列:

查看新建结果:

在publisher服务中的测试类添加一个测试方法(通过循环发送,模拟大量消息堆积现象 ):

    @Test
    void testWorkQueue() throws InterruptedException {
        String queueName = "work.queue";
        for (int i = 1; i <= 50; i++) {
            String msg = "Work Queues " + i;
            rabbitTemplate.convertAndSend(queueName, msg);
            Thread.sleep(20);
        }
    }

在consumer服务的类中添加2个新的方法,模拟多个消费者绑定同一个队列 :

    @RabbitListener(queues = "work.queue")
    public void listenWorkQueue1(String msg) throws InterruptedException {
        System.out.println("消费者1接收work.queue消息:" + msg);
    }

    @RabbitListener(queues = "work.queue")
    public void listenWorkQueue2(String msg) throws InterruptedException {
        System.err.println("消费者2接收work.queue消息:" + msg);
    }

运行结果:

修改consumer服务类中的方法:

  • 消费者1 sleep了20毫秒,相当于每秒钟处理50个消息

  • 消费者2 sleep了200毫秒,相当于每秒处理5个消息

    @RabbitListener(queues = "work.queue")
    public void listenWorkQueue1(String msg) throws InterruptedException {
        System.out.println("消费者1接收work.queue消息:" + msg);
        Thread.sleep(20);
    }

    @RabbitListener(queues = "work.queue")
    public void listenWorkQueue2(String msg) throws InterruptedException {
        System.err.println("消费者2接收work.queue消息:" + msg);
        Thread.sleep(200);
    }

重启后查看运行结果:

以上结果表明:默认情况下,RabbitMQ会将消息依次轮询投递给绑定在队列上的每一个消费者,并没有考虑到消费者是否已经处理完消息,可能出现消息堆积。

修改consumer服务的application.yml,设置preFetch值为1,确保同一时刻最多投递给消费者1条消息(每次只能获取一条消息,处理完成才能获取下一个消息):

spring:
  rabbitmq:
    host: 10.0.0.100
    port: 5672
    virtual-host: /demo
    username: user
    password: 123456
    listener:
      simple:
        prefetch: 1

重启后查看运行结果:


总结

RabbitMQ是一个开源的消息队列软件,旨在提供可靠的消息传递和消息队列功能。本文主要介绍了AMQP、Spring AMQP和使用Spring AMQP实现对RabbitMQ的消息收发等内容,希望对大家有所帮助。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1325008.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

论文笔记:Accurate Localization using LTE Signaling Data

1 intro 论文提出LTELoc&#xff0c;仅使用信令数据实现精准定位 信令数据已经包含在已在LTE系统中&#xff0c;因此这种方法几乎不需要数据获取成本仅使用TA&#xff08;时序提前&#xff09;和RSRP【这里单位是瓦】&#xff08;参考信号接收功率&#xff09; TA值对应于信号…

5.6 Linux rsync 服务

1、rsync 概念介绍 官方网站&#xff1a;rsync rsync(Remote Sync) 是一个Unix/linux系统下的文件同步和传输工具。Rsync通过“rsync算法”提供了一个客户机和远程服务器的文件同步的快速方法。 采用C/S模式 端口tcp:873 a. rsync 特性 ① 可以镜像保存整个目录树和文件系…

月薪30k的软件测试工程师,是一个什么样的工作状态?

一位大佬的亲身经历 用了大概6年的时间&#xff0c;成为了年薪30w的测试开发。 回顾我从功能测试到测试开发的成长路径&#xff0c;基本上是伴随着“3次能力飞跃”实现的。 年名企大厂测试岗位内推文末获取&#xff01;2022年名企大厂测试岗位内推文末获取&#xff01; 第一…

OpenCV消除高亮illuminationChange函数的使用

学更好的别人&#xff0c; 做更好的自己。 ——《微卡智享》 本文长度为1129字&#xff0c;预计阅读4分钟 导语 上一篇《OpenCV极坐标变换函数warpPolar的使用》中介绍了极坐标变换的使用&#xff0c;文中提到过因为手机拍的照片&#xff0c;部分地方反光厉害。OpenCV本身也有一…

使用yarn安装electron时手动选择版本

访问1Password或者其他可以提供随机字符的网站&#xff0c;获取随机密码运行安装命令 操作要点&#xff0c;必须触发Couldnt find any versions for "electron" that matches "*"才算成功 将复制的随机密码粘贴到后面 例如&#xff1a;yarn add --dev elec…

CAS-源码分析引出Unsafe类、Unsafe类详解

CASDemo演示 public class CASDemo {public static void main(String[] args) {AtomicInteger atomicInteger new AtomicInteger(5);System.out.println(atomicInteger.compareAndSet(5, 2022) "\t" atomicInteger.get());//true 2022System.out.println(atomicI…

vit-transfomers 逐段精读

Vision Transformer Explained | Papers With Code 有趣的特性 在cnn中处理的不太好&#xff0c;但是在transformers 都能处理的很好的例子。 Intriguing Properties of Vision Transformers | Papers With Code 标题 AN IMAGE IS WORTH 16X16 WORDS: TRANSFORMERS FOR IMAGE…

基于ETM+遥感数据的城市热岛效应现状研究的解决方案

1.引言 城市热岛效应&#xff08;Urban Heat Island Effect&#xff09;是指城市中的气温明显高于外围郊区的现象。在近地面温度图上&#xff0c;郊区气温变化很小&#xff0c;而城区则是一个高温区&#xff0c;就像突出海面的岛屿&#xff0c;由于这种岛屿代表高温的城市区域&…

【已解决】vs2015操作创建声明定义由于以下原因无法完成

本博文解决这样的一个问题&#xff0c;就是vs2015下用qt&#xff0c;在快速创建槽函数时给笔者报了个错误&#xff0c;错误的完整说法是这样子的”操作创建声明/定义“由于下列原因无法完成&#xff0c;所选的文本不包含任何函数签名。第一次遇到这种花里胡哨的问题&#xff0c…

[CVPR-23] PointAvatar: Deformable Point-based Head Avatars from Videos

[paper | code | proj] 本文的形变方法被成为&#xff1a;Forward DeformationPointAvatar基于点云表征动态场景。目标是根据给定的一段单目相机视频&#xff0c;重建目标的数字人&#xff0c;并且数字人可驱动&#xff1b;通过标定空间&#xff08;canonical space&#xff09…

Jmeter实现CSV数据批量导入

CSV&#xff1a;逗号分隔值&#xff0c;是一种简洁且常见的数据存储格式。 1、参数化&#xff1a; 在Jmeter中&#xff0c;可以通过“用户自定义的变量”来实现参数化使操作方便&#xff0c;使用语法位&#xff1a;${参数名}&#xff0c;如下图&#xff1a; 而CSV也同理&…

第二百一十八回 如何修改CircleAvatar的大小

文章目录 1. 概念介绍2. 使用方法3. 代码与效果3.1 示例代码3.2 运行效果 4. 内容总结 我们在上一章回中介绍了"修改页面导航中遇到的问题"沉浸式状态样相关的内容&#xff0c;本章回中将介绍如何修改avada的大小.闲话休提&#xff0c;让我们一起Talk Flutter吧。 1.…

代码随想录算法训练营第四十一天|198.打家劫舍 ,213.打家劫舍II ,337.打家劫舍III

198. 打家劫舍 - 力扣&#xff08;LeetCode&#xff09; 你是一个专业的小偷&#xff0c;计划偷窃沿街的房屋。每间房内都藏有一定的现金&#xff0c;影响你偷窃的唯一制约因素就是相邻的房屋装有相互连通的防盗系统&#xff0c;如果两间相邻的房屋在同一晚上被小偷闯入&#…

支持向量机 支持向量机概述

支持向量机概述 支持向量机 Support Vector MachineSVM ) 是一类按监督学习 ( supervisedlearning)方式对数据进行二元分类的广义线性分类器 (generalized linear classifier) &#xff0c;其决策边界是对学习样本求解的最大边距超亚面 (maximum-margin hyperplane)与逻辑回归和…

第二百一十七回 修改页面导航中遇到的问题

文章目录 1. 问题介绍2. 使用方法3. 代码与分析3.1 示例代码3.2 代码分析4. 内容总结我们在上一章回中介绍了"分享一种更新页面数据的方法"相关的内容,本章回中将介绍修改页面导航中遇到的问题.闲话休提,让我们一起Talk Flutter吧。 1. 问题介绍 我们在页面之间导…

C++刷题 -- 字符串

C刷题 – 字符串 文章目录 C刷题 -- 字符串1.重复的子字符串 1.重复的子字符串 https://leetcode.cn/problems/repeated-substring-pattern/submissions/490209402/ 暴力解法 第一个for循环用来标定子串的末尾&#xff0c;根据末尾取出子串 第二个while循环用来检查原字符串是…

WEB渗透—PHP反序列化(六)

Web渗透—PHP反序列化 课程学习分享&#xff08;课程非本人制作&#xff0c;仅提供学习分享&#xff09; 靶场下载地址&#xff1a;GitHub - mcc0624/php_ser_Class: php反序列化靶场课程&#xff0c;基于课程制作的靶场 课程地址&#xff1a;PHP反序列化漏洞学习_哔哩…

AT32 F435简介4/N ChibiOS porting plan

AT32 F435简介4/N ChibiOS porting plan 1. 源由2. 框图3. Makefile4. 分析4.1 Startup Code4.2 RT Port Layer4.3 HAL Board Layer4.4 HAL Port Layer 5. 总结6. 参考资料 1. 源由 对比STM32 F405进行AT32 F435 MCU的资料研读&#xff0c;期望获取更多差异化信息&#xff1b;…

Android 架构 - 组件化

一、概念 组件化是对单个功能进行开发&#xff0c;使得功能可以复用。将多个功能组合起来就是一个业务模块&#xff0c;因此去除了模块间的耦合&#xff0c;使得按业务划分的模块成了可单独运行的业务组件。&#xff08;一定程度上的独立&#xff0c;还是依附于整个项目中&…

Python 多维数组详解(numpy)

文章目录 1 概述1.1 numpy 简介1.2 ndarray 简介 2 数组操作2.1 创建数组&#xff1a;array()2.2 裁切数组&#xff1a;切片2.3 拼接数组&#xff1a;concatenate()2.4 拆分数组&#xff1a;array_split()2.5 改变数组形状&#xff1a;reshape() 3 元素操作3.1 获取元素&#x…