springboot集成rabbitmq

news2025/1/12 8:04:15

简介

RabbitMQ 是实现 AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。 RabbitMQ 主要是为了实现系统之间的双向解耦而实现的。当生产者大量产生数据时,消费者无法快速消费,那么需要一个中间层。保存这个数据。

AMQP,即 Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。AMQP 的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。

RabbitMQ 是一个开源的 AMQP 实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP 等,支持 AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

相关概念

通常我们谈到队列服务, 会有三个概念: 发消息者、队列、收消息者,RabbitMQ 在这个基本概念之上, 多做了一层抽象, 在发消息者和 队列之间, 加入了交换器 (Exchange). 这样发消息者和队列就没有直接联系, 转而变成发消息者把消息给交换器, 交换器根据调度策略再把消息再给队列。

  • 左侧 P 代表 生产者,也就是往 RabbitMQ 发消息的程序。

  • 中间即是 RabbitMQ,其中包括了 交换机 和 队列。

  • 右侧 C 代表 消费者,也就是往 RabbitMQ 拿消息的程序。

    那么,其中比较重要的概念有 4 个,分别为:虚拟主机,交换机,队列,和绑定。

  • 虚拟主机:一个虚拟主机持有一组交换机、队列和绑定。为什么需要多个虚拟主机呢?很简单, RabbitMQ 当中,用户只能在虚拟主机的粒度进行权限控制。 因此,如果需要禁止A组访问B组的交换机/队列/绑定,必须为A和B分别创建一个虚拟主机。每一个 RabbitMQ 服务器都有一个默认的虚拟主机“/”。

  • 交换机:Exchange 用于转发消息,但是它不会做存储 ,如果没有 Queue bind 到 Exchange 的话,它会直接丢弃掉 Producer 发送过来的消息。 这里有一个比较重要的概念:路由键 。消息到交换机的时候,交互机会转发到对应的队列中,那么究竟转发到哪个队列,就要根据该路由键。

  • 绑定:也就是交换机需要和队列相绑定,这其中如上图所示,是多对多的关系。

交换机(Exchange)

交换机的功能主要是接收消息并且转发到绑定的队列,交换机不存储消息,在启用ack模式后,交换机找不到队列会返回错误。交换机有四种类型:Direct, topic, Headers and Fanout

  • Direct:direct 类型的行为是”先匹配, 再投送”. 即在绑定时设定一个 routing_key, 消息的routing_key 匹配时, 才会被交换器投送到绑定的队列中去.
  • Topic:按规则转发消息(最灵活)
  • Headers:设置 header attribute 参数类型的交换机
  • Fanout:转发消息到所有绑定队列

Direct Exchange

Direct Exchange 是 RabbitMQ 默认的交换机模式,也是最简单的模式,根据key全文匹配去寻找队列。

第一个 X - Q1 就有一个 binding key,名字为 orange; X - Q2 就有 2 个 binding key,名字为 black 和 green。当消息中的 路由键 和 这个 binding key 对应上的时候,那么就知道了该消息去到哪一个队列中。

Ps:为什么 X 到 Q2 要有 black,green,2个 binding key呢,一个不就行了吗? - 这个主要是因为可能又有 Q3,而Q3只接受 black 的信息,而Q2不仅接受black 的信息,还接受 green 的信息。

Topic Exchange

Topic Exchange 转发消息主要是根据通配符。 在这种交换机下,队列和交换机的绑定会定义一种路由模式,那么,通配符就要在这种路由模式和路由键之间匹配后交换机才能转发消息。

在这种交换机模式下:

  • 路由键必须是一串字符,用句号(.) 隔开,比如说 agreements.us,或者 agreements.eu.stockholm 等。
  • 路由模式必须包含一个 星号(*),主要用于匹配路由键指定位置的一个单词,比如说,一个路由模式是这样子:agreements…b.*,那么就只能匹配路由键是这样子的:第一个单词是 agreements,第四个单词是 b。 井号(#)就表示相当于一个或者多个单词,例如一个匹配模式是 agreements.eu.berlin.#,那么,以agreements.eu.berlin 开头的路由键都是可以的。

具体代码发送的时候还是一样,第一个参数表示交换机,第二个参数表示 routing key,第三个参数即消息。如下:

rabbitTemplate.convertAndSend("testTopicExchange","key1.a.c.key2", " this is  RabbitMQ!");

topic 和 direct 类似, 只是匹配上支持了”模式”, 在”点分”的 routing_key 形式中, 可以使用两个通配符:

  • *表示一个词.
  • #表示零个或多个词.

Headers Exchange

headers 也是根据规则匹配, 相较于 direct 和 topic 固定地使用 routing_key , headers 则是一个自定义匹配规则的类型. 在队列与交换器绑定时, 会设定一组键值对规则, 消息中也包括一组键值对( headers 属性), 当这些键值对有一对, 或全部匹配时, 消息被投送到对应队列.

Fanout Exchange

Fanout Exchange 消息广播的模式,不管路由键或者是路由模式,会把消息发给绑定给它的全部队列,如果配置了 routing_key 会被忽略。

Spring Boot 集成 RabbitMQ

Spring Boot 集成 RabbitMQ 非常简单,如果只是简单的使用配置非常少,Spring Boot 提供了spring-boot-starter-amqp 项目对消息各种支持。

1、添加依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2、配置文件

##rabbitmq配置
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

3、生产者

/**
 * 消息生产者
 */
@Component
public class Send implements RabbitTemplate.ConfirmCallback {

    private RabbitTemplate rabbitTemplate;

    /**
     * 构造方法注入
     */
    @Autowired
    public Send(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
        rabbitTemplate.setConfirmCallback(this); //rabbitTemplate如果为单例的话,那回调就是最后设置的内容
    }

    public void sendMsg(String content) {
        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE, RabbitConfig.ROUTINGKEY_A, content, correlationId);
    }

    /**
     * 回调
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        System.out.println(" 回调id:" + correlationData);
        if (ack) {
            System.out.println("消息成功消费");
        } else {
            System.out.println("消息消费失败:" + cause);
        }
    }

}

4、消费者

/**
 * 消费消费者
 */
@Configuration
public class RabbitConfig {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    public static final String EXCHANGE = "amq.direct";

    public static final String QUEUE_A = "test";

    public static final String ROUTINGKEY_A = "test_A";

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory("127.0.0.1",5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");
        connectionFactory.setPublisherConfirms(true);//必须要设置,才能进行消息的回调。
        return connectionFactory;
    }

    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    //必须是prototype类型
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        return template;
    }

    /**
     * 针对消费者配置
     * 1. 设置交换机类型
     * 2. 将队列绑定到交换机
     FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念
     HeadersExchange :通过添加属性key-value匹配
     DirectExchange:按照routingkey分发到指定队列
     TopicExchange:多关键字匹配
     */
    @Bean
    public DirectExchange defaultExchange() {
        return new DirectExchange(EXCHANGE);
    }

    /**
     * 获取队列A
     * @return
     */
    @Bean
    public Queue queueA() {
        return new Queue(QUEUE_A, true); //队列持久
    }


	 /**
     * 队列和交换机绑定
     * @return
     */
    @Bean
    public Binding bindingA() {
        return BindingBuilder.bind(queueA()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_A);
    }


    @Bean
    public SimpleMessageListenerContainer messageContainer() {
        //加载处理消息A的队列
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
        //设置接收多个队列里面的消息,这里设置接收队列A
        //假如想一个消费者处理多个队列里面的信息可以如下设置:
        //container.setQueues(queueA(),queueB(),queueC());
        container.setQueues(queueA());
        container.setExposeListenerChannel(true);
        //设置最大的并发的消费者数量
        container.setMaxConcurrentConsumers(1);
        //最小的并发消费者的数量
        container.setConcurrentConsumers(1);
        //设置确认模式手工确认
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        container.setMessageListener(new ChannelAwareMessageListener() {
            @Override
            public void onMessage(Message message, Channel channel) throws Exception {
                /**通过basic.qos方法设置prefetch_count=1,这样RabbitMQ就会使得每个Consumer在同一个时间点最多处理一个Message,
                 换句话说,在接收到该Consumer的ack前,它不会将新的Message分发给它 */
                channel.basicQos(1);
                byte[] body = message.getBody();
                logger.info("接受到的数据为:{}",new String(body));
                /**为了保证永远不会丢失消息,RabbitMQ支持消息应答机制。
                 当消费者接收到消息并完成任务后会往RabbitMQ服务器发送一条确认的命令,然后RabbitMQ才会将消息删除。*/
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            }
        });
        return container;
    }

}

5. 发送消息

@Configuration
@EnableScheduling
public class RabbitMqJob {

    private static final Logger log = LoggerFactory.getLogger(RabbitMqJob.class);

    @Autowired
    private Send send;

    //每30s发送一次
    @Scheduled(cron = "0/30 * * * * ?")
    public void sendMessage(){
    	log.info("开始发送消息:"+ Instant.now());
        try
            send.sendMsg("hello world");
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

结果如下:

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/627364.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【LeetCode】HOT 100(4)

题单介绍&#xff1a; 精选 100 道力扣&#xff08;LeetCode&#xff09;上最热门的题目&#xff0c;适合初识算法与数据结构的新手和想要在短时间内高效提升的人&#xff0c;熟练掌握这 100 道题&#xff0c;你就已经具备了在代码世界通行的基本能力。 目录 题单介绍&#…

容器(第三篇)docker-cgroup资源限制

Docker 通过 Cgroup 来控制容器使用的资源配额&#xff0c;包括 CPU、内存、磁盘三大方面&#xff0c; 基本覆盖了常见的资源配额和使用量控制。 Cgroup 是 ControlGroups 的缩写&#xff0c;是 Linux 内核提供的一种可以限制、记录、隔离进程组所使用的物理资源(如 CPU、内存、…

一路狂飙,性能测试流程与性能测试主要指标整理,直接上高速...

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 性能测试实战 性…

什么是真正的需求,如何才能找到?

此为内容创作模板&#xff0c;在发布之前请将不必要的内容删除 对需求本身的误判&#xff0c;比错误本身更为恐怖&#xff0c;直接导致必然失败的局面。 工作失误必不可免&#xff0c;好工作核心在于有需求&#xff0c;自己需要去做&#xff0c;有动力&#xff0c;别人需要你…

【Cloudgetway网关】 GetWay网关入门使用

一、概述 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ow0KO8iV-1686286922163)(null)] GateWay是zuul的替代品&#xff0c;由于Zuul2.0迟迟没有出来&#xff0c;SpringCloud社区推出了gateWay网关来替代zuul1.x版本。提供了以下功能: 底层使用n…

使用 LabVIEW调用LeNet快速搭建手写数字识别系统(内含源码)

‍‍&#x1f3e1;博客主页&#xff1a; virobotics的CSDN博客&#xff1a;LabVIEW深度学习、人工智能博主 &#x1f384;所属专栏&#xff1a;『LabVIEW深度学习实战』 &#x1f37b;上期文章&#xff1a; 【图像分类】基于OpenVINO实现PyTorch ResNet50图像分类 &#x1f4f0…

PyCaret解决二分类任务教程示例

PyCaret是一个Python中的开源、低代码机器学习库&#xff0c;可以自动化机器学习工作流。它是一个端到端的机器学习和模型管理工具&#xff0c;可以成倍地加快实验周期&#xff0c;提高工作效率。 与其他开源机器学习库相比&#xff0c;PyCaret是一个替代的低代码库&#xff0c…

多分类问题与卷积模型的优化

文章目录 1. 创建自定义Dataset类2. 基础卷积模型3. Dropout抑制过拟合4. 批标准化5. 学习速率衰减6. 最终优化整合代码 首先导入用到的库: import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim import numpy as np import matp…

文章写作的诀窍:10个技巧让你的文章升华

首先要找到自己的写作声音和确定文章的中心思想&#xff0c;其次要使用简单明了和描述性的语言&#xff0c;增加细节并结构化文章&#xff1a; 找到你的写作声音&#xff1a;找到适合自己的写作风格和声音&#xff0c;这有助于让读者更容易地理解和记住你的文章。确定文章的中心…

QxRibbon 知:搭建 CMake 构建环境

文章目录 前言安装 cmake问题处理qtcreator 检测 CMake 异常 参考资料 前言 高版本的 QtCreator 已经集成了 cmake 工具&#xff0c;并支持以 CMakelists.txt 文件作为工程开发项目。 https://www.qt.io/blog/2019/07/30/update-on-cmake-project-support-in-qt-creator 安装…

NodeLocal DNS介绍及部署应用

目录 一、NodeLocal DNS是什么&#xff1f; 二、为什么使用NodeLocal DNS&#xff1f; 三、工作原理 四、安装NodeLocal DNS 五、在应用中使用NodeLocal DNSCache 六、验证 一、NodeLocal DNS是什么&#xff1f; NodeLocal DNSCache 通过在集群节点上运行一个 DaemonSet …

qrcodejs2生成二维码,通过canvas绘制带边框+中间logo的二维码图片,下载二维码

文章目录 一、通过qrcodejs2生成一个二维码二、点击【下载配置服务器二维码】来下载二维码1、通过canvas去绘制 边框二维码logo&#xff08;1&#xff09;为canvas增加绘制圆角矩形的方法&#xff08;canvas本身不提供&#xff09;&#xff08;2&#xff09;通过canvas绘制 圆角…

饮酒过多和腌制食品是导致中风的最大导火索

中风是一种常见的疾病&#xff0c;它的发生和饮食习惯有很大关系。近年来&#xff0c;我国中风病患人数和病发率都呈现出了不同程度的上升趋势&#xff0c;这给我们的健康带来了很大的威胁。下面我们可以通过数据可视化大屏来了解一下饮食健康与预防中风有哪些影响&#xff0c;…

ESP32-S3 边缘人工智能|使用加速度计数据和 ESP-DL 识别人体活动

边缘计算是一种分布式计算范例&#xff0c;指在更靠近设备的地方进行数据存储和计算。边缘人工智能&#xff08;边缘 AI&#xff09;是边缘计算中一项振奋人心的成果&#xff0c;可以令传统技术更高效地运行&#xff0c;在降低功耗的同时又有更好的性能。训练好的神经网络可以在…

通信算法之167: (低空无人机)机载视频通信传输系统基带算法设计

一.物理层基带仿真 通信系统的链路级仿真主要可以分成5个部分。 1.系统参数 2.发送机算法 3.信道模型 4.接收机算法 5.统计性能 其中主要组成部分很明显是中间三部分&#xff0c;即发送&#xff0c;信道&#xff0c;接收。但系统参数和统计性能这两部分的适当设计会大大…

linux基础命令系列之10 分钟掌握 ln 命令:创建链接,软链接,硬链接,递归链接,打印详细输出

文章目录 前言一. ln命令介绍二. 语法格式及常用选项三. 参考案例3.1 ln命令创建硬链接3.1.1 创建硬链接3.1.2 源文件被删除&#xff0c;不影响链接文件的正常使用3.1.3 硬链接不能跨分区创建 3.2 为什么目录刚刚创建的时候&#xff0c;链接数为23.3 ln -s 软链接的创建3.3.1 l…

【漏洞修复】node-exporter被检测出来pprof调试信息泄露漏洞

node-exporter被检测出来pprof调试信息泄露漏洞 说在前面解决方法结语 说在前面 惯例开篇吐槽&#xff0c;有些二五仔习惯搞点自研的安全扫描工具&#xff0c;然后加点DIY元素&#xff0c;他也不管扫的准不准&#xff0c;就要给你报个高中危的漏洞&#xff0c;然后就要去修复&…

C++元模板技术与traits解析:根据类型的特性来调整代码的行为,解决没有重载运算符的情况

C元模板技术与traits解析 第一章、C元模板技术简介 (C Meta-template Introduction)1.1 元模板的定义与概念 (Definition and Concepts)1.2 元模板技术的发展历程 (Evolution of Meta-templates)1.3 元模板应用场景举例 (Examples of Meta-template Applications) 第二章、 tra…

[数据结构初阶]顺序表

目录 静态顺序表 动态顺序表 初始化 销毁 尾插 ​编辑 尾删 头插 头删 Insert erase find查找 顺序表是用一段物理地址连续的存储单元依次存储数据元素的线性结构&#xff0c;一般情况下采用数组存储。在数组上完成数据的增删查改。 静态顺序表 定义结构体&#xff1…

Talk | 北卡罗来纳州立大学唐圣坤浙江大学张磊: 数据为中心的高效视觉语言学习—动态退出与数据蒸馏

本期为TechBeat人工智能社区第504期线上Talk&#xff01; 北京时间6月8日(周四)20:00&#xff0c;北卡罗来纳州立大学在读博士生—唐圣坤与浙江大学硕士生—张磊的Talk将准时在TechBeat人工智能社区开播&#xff01; 他们与大家分享的主题是: “数据为中心的高效视觉语言学习…