RabbitMQ讲解与整合

news2024/11/16 13:58:04

RabbitMq安装

类型概念

租户
RabbitMQ 中有一个概念叫做多租户,每一个 RabbitMQ 服务器都能创建出许多虚拟的消息服务器,这些虚拟的消息服务器就是我们所说的虚拟主机(virtual host),一般简称为 vhost。
每一个 vhost 都是一个独立的小型 RabbitMQ 服务器,这个 vhost 中会有自己的消息队列、消息交换机以及相应的绑定关系等等,并且拥有自己独立的权限,不同的 vhost 中的队列和交换机不能互相绑定,这样技能保证运行安全又能避免命名冲突
交换机
在这里插入图片描述

交换机
属性意义意义
type类型direct默认的直接交换机
根据交换机下队列绑定的routingKey直接匹配
fanout扇形交换机
简单来说就是发布订阅
队列直接绑定在交换机下,统一发布消息
headers头部交换机,通过message header头部信息进行比对
可以根据定义全匹配、部分匹配等规则
topic主题交换机
通过绑定routingKey进行模糊匹配
Durability耐用
(持久化)
durable持久化,数据存放于硬盘
transient瞬态,数据存放于内存
Auto delete自动删除Yes没有绑定队列时自动删除,针对的是曾经有过但后来没有的事物
No不自动删除
Internal内部使用Yes该路由绑定的队列不会被用户消费
No不自动删除

队列

在这里插入图片描述

队列
属性意义意义
type类型Default for virtual host租户配置的默认选项,下列三种其一
默认Classic无需设置
Classic传统的队列类型
数据存储在单个节点上
不具备quorum队列的高可用性和数据保护特性
ps:单机时使用
Quorum高可用性队列
数据会被复制到多个节点
提供更好的数据可靠性和持久性
ps:部署多节点时使用
Stream特殊类型的队列
用于支持事件流处理(event streaming)
具有类似于Kafka的流式处理特性
ps:听说不成熟,暂时用不上
Durability耐用
(持久化)
durable持久化,数据存放于硬盘
transient瞬态,数据存放于内存

参数:

显示参数实际参数作用
Auto expirex-expires设置队列的过期时间,单位为毫秒。当队列在指定时间内未被使用,将会被自动删除
Message TTLx-message-ttl设置队列中消息的过期时间(Time-To-Live),单位为毫秒。消息在队列中存放的时间超过设定的过期时间后会被自动删除
Overflow behaviourx-overflow设置队列溢出行为,可选值为 drop-head(删除最旧的消息)或 reject-publish(拒绝发布新消息)
Single active consumerx-single-active-consumer配置队列是否只允许单个消费者消费消息。当设置了x-single-active-consumer参数时,表示队列只允许有一个消费者活跃地消费消息,其他消费者将被阻塞,直到当前的消费者停止消费或断开连接
Dead letter exchangex-dead-letter-exchange设置队列中的死信消息转发到的交换机名称。当消息成为死信时,将会被转发到指定的交换机
Dead letter routing keyx-dead-letter-routing-key设置死信消息转发时的路由键。死信消息将通过指定的路由键转发到目标交换机
Max lengthx-max-length设置队列的最大长度,即队列中消息的最大数量。当队列中消息数量达到设定的最大长度后,新消息将无法入队
Max length bytesx-max-length-bytes设置队列消息的最大总字节数。当队列中消息的总字节数达到设定的最大值后,新消息将无法入队
Leader locatorx-queue-leader-locator配置队列的领导者(Leader)定位器,集群中使用

SpringBoot整合

引入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>2.6.3</version>
</dependency>

配置数据源

spring: 
  rabbitmq:
    addresses: xxx.xxx.xx.xx:5672
    username: admin
    password: xxxxxx
    virtual-host: /

配置交换机和队列

@Component
public class RabbitMqConfig {
	// 定义交换机名称
    public static final String FANOUT_EXCHANGE = "fanout.test";
    @Bean(name = FANOUT_EXCHANGE)
    public FanoutExchange fanoutExchange() {
    	// 交换机类型按需创建,这里用的是Fanout,发布订阅,绑定在该交换机下的队列都会收到消息
    	// 参数2:是否持久化
    	// 参数3:是否自动删除
        return new FanoutExchange(FANOUT_EXCHANGE, true, false);
    }
	
	//  定义队列
    public static final String FANOUT_QUEUE1 = "queue1";
    @Bean(name = FANOUT_QUEUE1)
    public Queue fanoutQueue1() {
    	// 后三个不写也行,这是默认值
    	// 参数2:是否持久化数据到磁盘(防止意外关闭数据丢失)
    	// 参数3:是否具有排他性
    	// 参数4:队列不再使用时是否自动删除
        return new Queue(FANOUT_QUEUE1, true, false, false);
    }
	public static final String FANOUT_QUEUE2 = "queue2";
    @Bean(name = FANOUT_QUEUE2)
    public Queue fanoutQueue2() {
        return new Queue(FANOUT_QUEUE2, true, false, false);
    }
		
    @Bean
    public Binding bindingSimpleQueue1(@Qualifier(FANOUT_QUEUE1) Queue fanoutQueue1,
                                       @Qualifier(FANOUT_EXCHANGE) FanoutExchange fanoutExchange) {
        // 将交换机和队列绑定
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }
    @Bean
    public Binding bindingSimpleQueue2(@Qualifier(FANOUT_QUEUE2) Queue fanoutQueue2,
                                       @Qualifier(FANOUT_EXCHANGE) FanoutExchange fanoutExchange) {
        // 将交换机和队列绑定
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}

测试发一条消息到队列

@SpringBootTest(classes = TemplateApplication.class)
public class RabbitMQTest {

    @Autowired
    RabbitMessagingTemplate rabbitMessagingTemplate;

    @Test
    public void testSent(){
		//指定交换机->指定队列(因为创建的交换机是FanoutExchange,所以绑定该交换机的队列都会收到一条消息)
        rabbitMessagingTemplate.convertAndSend("fanout.test","发送数据到FanoutExchange");
    	// 如果创建队列不绑定交换机和路由键,那么实际上会有默认的交换机和路由键,均为空,直接将消息发送给队列,队列名则和路由键保持一致,仍然可以成功发送消息。
    }
}

测试接收队列消息

写个监听类接收消息:

@Component
public class RabbitMqListenter {
    @RabbitListener(queues = {RabbitMqConfig.FANOUT_QUEUE1,RabbitMqConfig.FANOUT_QUEUE2})
    public void reciveLogAll(String msg) throws Exception {
        System.out.println("消费到数据:" + msg);
    }
}

-------------基础的使用到这里就结束了-------------

拓展事项

rabbitMqPusher

自己封装一个更加方便使用的发送工具,可有可无,其中可以使用RabbitMessagingTemplate和RabbitTemplate,RabbitMessagingTemplate和RabbitTemplate都是Spring AMQP提供的用于与RabbitMQ进行交互的工具类如果只是简单使用,那么RabbitMessagingTemplate就够用了,如果需要更精细的控制,可以选择使用RabbitTemplate

,但它们在使用方式和功能上有一些不同点:

RabbitMessagingTemplate:

RabbitMessagingTemplate是MessagingTemplate的子类,用于在Spring应用程序中发送和接收消息。
它提供了一种更高级别的抽象,使得在Spring框架中更容易使用消息发送和接收的功能。
可以直接与Spring的消息通道(MessageChannel)集成,方便进行消息的发送和接收。

RabbitTemplate:

RabbitTemplate是Spring AMQP提供的用于与RabbitMQ进行交互的核心类,提供了丰富的方法来发送和接收消息。
它是一个强大而灵活的工具,可以直接与RabbitMQ的交互进行细粒度的控制。
可以设置消息的属性、监听发送确认、接收确认等功能,更加灵活地处理消息发送和接收的细节。

package com.template.rabbitmq.producer.impl;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;


@Component
@Slf4j
public class RabbitMqPusher {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 发送消息
     * @param quene   队列名称 或 交换机名称
     * @param message 消息内容
     */
    public void send(String quene, String message) {
        rabbitTemplate.send(quene, MessageBuilder.withBody(message.getBytes()).build());
        log.info("发送消息---> quene:{} ---> message:{}", message, quene);
    }

    /**
     * 直接发送消息到队列
     * 超过有效期丢弃,如果队列没有声明x-message-ttl属性则无效
     *
     * @param quene      队列名称
     * @param message    消息内容
     * @param expiration 有效期(毫秒)
     */
    public void send(String quene, String message, Integer expiration) {
        rabbitTemplate.send(quene, MessageBuilder.withBody(message.getBytes()).setExpiration(String.valueOf(expiration)).build());
        log.info("发送消息---> quene:{} ---> message:{} ---> expiration:{}", quene, message, expiration);
    }


    /**
     * 发送消息
     * 超过有效期丢弃,如果队列没有声明x-message-ttl属性则无效
     *
     * @param exchange   交换机名称
     * @param routingKey 路由键
     * @param message    消息内容
     * @param expiration 有效期(毫秒)
     */
    public void send(String exchange, String routingKey, String message, Integer expiration) {
        rabbitTemplate.send(exchange, routingKey, MessageBuilder.withBody(message.getBytes()).setExpiration(String.valueOf(expiration)).build());
        log.info("发送消息---> exchange:{} ---> routingKey:{} ---> message:{} ---> expiration:{}", exchange, routingKey, message, expiration);
    }

    /**
     * 发送消息
     *
     * @param exchange   交换机名称
     * @param routingKey 路由键
     * @param message    消息内容
     */
    public void send(String exchange, String routingKey, String message) {
        rabbitTemplate.send(exchange, routingKey, MessageBuilder.withBody(message.getBytes()).build());
        log.info("发送消息---> exchange:{} ---> routingKey:{} ---> message:{}", exchange, routingKey, message);
    }


}

在RabbitMQ中,如果队列没有设置过期时间(即没有声明x-message-ttl属性),那么即使在发送消息时设置了消息的过期时间也会失效。消息的过期时间只有在队列设置了过期时间的情况下才会生效。
实测以上列代码的方式直接对消息设置有效期是生效的。

死信队列

和普通队列一样,只不过是对其他队列进行配置,将过期的消息路由到死信队列中。
创建死信交换机和死信路由

	// 配置交换机的文件中继续增加配置
	public static final String DIRECT_GP_DEAD_LETTER_EXCHANGE = "DIRECT_GP_DEAD_LETTER_EXCHANGE";
    public static final String DIRECT_GP_DEAD_LETTER_QUEUE = "DIRECT_GP_DEAD_LETTER_QUEUE";
    @Bean(DIRECT_GP_DEAD_LETTER_EXCHANGE)
    public DirectExchangedirectDeadLetterExchange() {
        return new DirectExchange(DIRECT_GP_DEAD_LETTER_EXCHANGE, true, false, new HashMap<>());
    }
    @Bean(DIRECT_GP_DEAD_LETTER_QUEUE)
    public Queue directDeadLetterQueue() {
        return new Queue(DIRECT_GP_DEAD_LETTER_QUEUE, true, false, false, new HashMap<>());
    }

设置队列消息有效期并绑定死信队列

	@Bean(name = DIRECT_QUEUE1)
    public Queue directQueue1() {
        HashMap<String, Object> headers = new HashMap<>();
        // 配置消息有效期,消息发送到队列10秒后如果未被消费者消费,则过期
        headers.put("x-message-ttl",10000);
        // 配置超期交换机,消息过期后会发送到此交换机
        headers.put("x-dead-letter-exchange",DIRECT_GP_DEAD_LETTER_EXCHANGE);
        // 配置超期routingKey,消息过期后转移消息时指定的routingKey
        headers.put("x-dead-letter-routing-key",DIRECT_GP_DEAD_LETTER_QUEUE);
        // 如果只配置了有效期,未配置交换机和routingKey,则消息会被直接丢弃
        return new Queue(DIRECT_QUEUE1, true, false, false,headers);
    }

配置完成后,尝试向DIRECT_QUEUE1发送一条消息,不启动消费者,10秒后消息会自动转移到死信队列中,可在可视化管理界面进行验证。

延时队列
延时队列场景举例:

预定一个会议室,两个小时后开始,要求提前十分钟通知参会人员进行开会。
如果不使用延时队列,那么就需要不断轮询,查看是否到达需要通知的时间,进行消息通知。

延时队列的实现方式:

死信队列+消息有效期
预定时间到提前十分钟通知中间有110分钟,那么创建一条通知消息,设置有效期110分钟丢入队列,不用消费者去监听,等待消息过期后路由到指定的死信队列,再去消费死信队列中的消息即可。
所以延时队列实际上是一种实现方案,而不是一种特定的队列类型。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1476810.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Java程序设计】【C00331】基于Springboot的驾校预约学习系统(有论文)

基于Springboot的驾校预约学习系统&#xff08;有论文&#xff09; 项目简介项目获取开发环境项目技术运行截图 项目简介 这是一个基于Springboot的驾校预约学习系统&#xff0c;本系统有管理员、用户和教练三种角色&#xff1b; 管理员&#xff1a;个人中心、管理员管理、教练…

哈夫曼树的介绍

定义 路径长度&#xff1a;从根结点到该结点所经过的边数。 叶子结点的带权路径长度&#xff1a;叶子结点的权值*路径长度 树的带权路径长度&#xff1a;所有叶子结点的带权路径长度之和 哈夫曼树&#xff1a;带权路径长度最小的树&#xff0c;也称最优二叉树。 构造 反复选…

Scala Intellij编译错误:idea报错xxxx“is already defined as”

今天写scala代码时,Idea报了这样的错误&#xff0c;如下图所示&#xff1a; 一般情况下原因分两种&#xff1a; 第一是我们定义的类或对象重复多次出现&#xff0c;编译器无法确定使用哪个定义。 这通常是由于以下几个原因导致的&#xff1a; 重复定义&#xff1a;在同一个文件…

LNMP架构的源码编译环境下部署Discuz!社区论坛与Wordpress博客

一.编译安装Nginx 1.关闭防火墙 systemctl stop firewalld systemctl disable firewalld setenforce 0 2.安装依赖包 yum -y install pcre-devel zlib-devel gcc gcc-c make 3.创建运行用户 nginx 服务程序默认 以 nobody 身份运行&#xff0c;建议为其创建专门的用户账户&…

飞天使-学以致用-devops知识点4-SpringBoot项目CICD实现

文章目录 代码准备创建jenkins 任务测试推送使用项目里面的jenkinsfile 进行升级操作 代码准备 推送代码到gitlab 代码去叩叮狼教育找 k8s 创建jenkins 任务 创建一个k8s-cicd-demo 流水线任务 将jenkins 里面构建时候的地址还有token&#xff0c; 给到gitlab里面的webhooks…

MySQL 的数据库操作,利用Spring Boot实现MySQL数据库的自动创建

执行 show databases; 命令可以查看当前数据库的所有数据库。 注意在 MySQL 客户端执行 SQL 语句的时候要带上分号 ; 并按下 enter 键&#xff0c;不然 MySQL 会认为你还没有输入完&#xff0c;会换一行继续等待你输入。 OK&#xff0c;像上面截图中的 information_schema、mys…

2024/02/28

绘制思维导图 将今天的模拟面试内容进行整合并上传作业 1、什么是回调函数? 回调函数是一种作为参数传递给其他函数的函数&#xff0c;在 C 语言中&#xff0c;函数指针允许我们将函数作为参数传递给其他函数&#xff0c;从而实现回调函数的功能&#xff0c;例如线程的创建函…

【Vue】插槽-slot

&#x1f4dd;个人主页&#xff1a;五敷有你 &#x1f525;系列专栏&#xff1a;Vue ⛺️稳中求进&#xff0c;晒太阳 插槽 作用&#xff1a;让组件内部一些 结构 支持 自定义 插槽的分类&#xff1a; 默认插槽。具名插槽。 基础语法 组件内需要定制的结构部分&…

如何利用HubSpot出海营销CRM实现品牌建设与传播的有效管理?

利用HubSpot出海营销CRM优化客户互动和沟通可以通过以下方式实现&#xff1a; 个性化客户管理&#xff1a; 利用HubSpot的客户管理功能&#xff0c;集中管理客户信息&#xff0c;并根据客户的行为、偏好和历史数据等信息进行个性化分类和标记。这样可以更好地了解客户需求&am…

[CSS]文字旁边的竖线以及布局知识

场景&#xff1a;文字前面常见加竖线。 .center-title { 常见内容color: #FFF;font-family: "Source Han Sans CN";font-size: 50px;font-style: normal;font-weight: 700;line-height: normal;position: relative; 要定位left: 16px; 这里是想拉开间距margin-b…

力扣-跳跃游戏

问题 给你一个非负整数数组 nums &#xff0c;你最初位于数组的 第一个下标 。数组中的每个元素代表你在该位置可以跳跃的最大长度。 判断你是否能够到达最后一个下标&#xff0c;如果可以&#xff0c;返回 true &#xff1b;否则&#xff0c;返回 false 。 解答 class Solu…

CUMT---图像处理与视觉感知---期末复习重点

文章目录 一、概述 本篇文章会随课程的进行持续更新中&#xff01; 一、概述 1. 图像的概念及分类。  图像是用各种观测系统以不同形式和手段观测客观世界而获得的、可以直接或间接作用于人的视觉系统而产生的视知觉实体。  图像分为模拟图像和数字图像&#xff1a;(1) 模拟图…

开源的 Python 数据分析库Pandas 简介

阅读本文之前请参阅-----如何系统的自学python Pandas 是一个开源的 Python 数据分析库&#xff0c;它提供了高性能、易用的数据结构和数据分析工具。Pandas 特别适合处理表格数据&#xff0c;例如时间序列数据、异构数据等。以下是对 Pandas 的简明扼要的介绍&#xff0c;包括…

基于React, Redux实现的俄罗斯方块游戏及源码

分享一个俄罗斯方块游戏游戏框架使用的是 React Redux&#xff0c;其中再加入了 Immutable&#xff0c;用它的实例来做来Redux的state。&#xff08;有关React和Redux的介绍可以看 安装 npm install运行 npm start浏览自动打开 http://127.0.0.1:8080/ 打包编译 npm run …

Vue源码系列讲解——生命周期篇【七】(模板编译阶段)

目录 1. 前言 2. 模板编译阶段分析 2.1 两种$mount方法对比 2.2 完整版的vm.$mount方法分析 3. 总结 1. 前言 前几篇文章中我们介绍了生命周期的初始化阶段&#xff0c;我们知道&#xff0c;在初始化阶段各项工作做完之后调用了vm.$mount方法&#xff0c;该方法的调用标志…

mongoDB 优化(1)索引

1、创建复合索引&#xff08;多字段&#xff09; db.collection_test1.createIndex({deletedVersion: 1,param: 1,qrYearMonth: 1},{name: "deletedVersion_1_param_1_qrYearMonth_1",background: true} ); 2、新增索引前&#xff1a; 执行查询&#xff1a; mb.r…

第3部分 原理篇2去中心化数字身份标识符(DID)(4)

3.2.3. DID解析 3.2.3.1. DID解析参与方 图3-5 DID 解析过程 本聪老师&#xff1a;我们之前提到过&#xff0c;DID 解析过程是将 DID 转换为对应的 DID 文档。这样做的目的是验证 DID 所代表的主体的身份。那么解析过程会涉及哪些概念呢&#xff1f;我们看图3-&#xff0c;DI…

uniapp 微信小程序使用高德地图Vue3不兼容Vue2问题

1. uniapp 微信小程序使用高德地图Vue3不兼容Vue2问题 1.1. 问题 uniapp Vue3项目引用高德地图报错 import amapPlugin from ‘…/…/…/js_sdk/js_amap/amap-wx.130’; "default" is not exported by "../../../MyProject/Base/Szy/js_sdk/js_amap/amap-wx.1…

springboot+vue网站开发-后端管理框架-vue-admin-template

为了方便国内用户下载&#xff0c;我把自己的百度网盘分享给大家一份地址&#xff0c;可以去下载。 如果你有上网盒子软件&#xff0c;那就自己去下载&#xff0c;很小。不到1MB. 链接&#xff1a;https://pan.baidu.com/s/15LJ2MoSWToFGFp28VaxBeQ?pwdbaby 提取码&#xff…

C++之queue和dqueue

1、queue queue&#xff08;队列&#xff09;&#xff0c;一种数据结构&#xff0c;可以让某些数据结构的操作变得简单。队列&#xff08;queue&#xff09;最大的特点就是先进先出。就是说先放入queue容器的元素一定是要先出队列之后&#xff0c;比它后进入队列的元素才能够出…