Springboot + RabbitMq 消息队列

news2025/1/14 17:57:37

前言

一、RabbitMq简介

1、RabbitMq场景应用,RabbitMq特点

场景应用
以订单系统为例,用户下单之后的业务逻辑可能包括:生成订单、扣减库存、使用优惠券、增加积分、通知商家用户下单、发短信通知等等。在业务发展初期这些逻辑可能放在一起同步执行,随着业务的发展订单量增长,需要提升系统服务的性能,这时可以将一些不需要立即生效的操作拆分出来异步执行。这种场景下就可以用 MQ ,在下单的主流程(比如扣减库存、生成订单)完成之后发送一条消息到 MQ 让主流程快速完结,而由另外的单独消费线程拉取MQ的消息(或者由 MQ 推送消息。
特点
RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。

  • AMQP :Advanced Message
    Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。

  • 可靠性(Reliability) RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。

  • 灵活的路由(Flexible Routing) 在消息进入队列之前,通过 Exchange
    来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个
    Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。

  • 消息集群(Clustering)
    多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 。

  • 高可用(Highly Available Queues)
    队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。

  • 多种协议(Multi-protocol)
    RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等多种消息中间件协议。

  • 多语言客户端(Many Clients)
    RabbitMQ 几乎支持所有常用语言,比如Java、Python 、Ruby 、PHP 、C# 、JavaScript 等。

  • 管理界面(Management UI)
    RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。

  • 跟踪机制(Tracing)
    如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。

  • 插件机制(Plugin System)
    RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件。

2、RabbitMq工作流程

  • publisher连接到rabbitMQ Broker,创建connection,开启channel。声明交换机类型、名称、是否持久化等。

  • 消息可靠投递(开启confirmCallback returnCallback),并指定消息是否持久化等属性和routing key。

  • exchange收到消息之后,根据routing key路由到跟当前交换机绑定的相匹配的(存)队列里面。

  • consumer监听接收到消息之后开始业务处理,然后发送一个ack确认告知消息已经被消费。
    rabbitMQ Broker收到ack之后将对应的消息从队列里面删除掉。
    如下图:
    在这里插入图片描述

  • Message
    消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。

  • Publisher
    消息的生产者,也是一个向交换器发布消息的客户端应用程序。

  • Exchange
    交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。

  • Binding
    绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。

  • Queue
    消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

  • Connection
    网络连接,比如一个TCP连接。

  • Channel
    信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。

  • Consumer
    消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。

  • Virtual Host
    虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。

  • Broker
    表示消息队列服务器实体。

二 引用RabbitMq

将RabbitMq配置成插件使用
1.创建工程 pom.xml 引用jar
在这里插入图片描述

		<!--mq 模块-->
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-starter-bus-amqp</artifactId>
		</dependency>
		<!--提供服务发现能力-->
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-commons</artifactId>
		</dependency>
  1. 创建RabbitMQConfig

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitMQConfig {

    public static final String DELAYED_SWITCH_OFF_QUEUE_NAME = "delay.queue.switchOff.delay.queue";
    public static final String DELAYED_SWITCH_OFF_EXCHANGE_NAME = "delay.queue.switchOff.delay.exchange";
    public static final String DELAYED_SWITCH_OFF_ROUTING_KEY = "delay.queue.switchOff.delay.routingkey";

    /**
     * @desciption 控制设备开关延时关闭
     * @author zhangbl
     * @Date : 2022-08-08 16:29
     **/
    @Bean
    public Queue immediateSwitchOffQueue() {
        return new Queue(DELAYED_SWITCH_OFF_QUEUE_NAME,true);
    }

    @Bean
    public CustomExchange customSwitchOffExchange() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(DELAYED_SWITCH_OFF_EXCHANGE_NAME, "x-delayed-message", true, false, args);
    }

    @Bean
    public Binding bindingSwitchOffNotify() {
        return BindingBuilder.bind(immediateSwitchOffQueue()).to(customSwitchOffExchange()).with(DELAYED_SWITCH_OFF_ROUTING_KEY).noargs();
    }
}
  1. 创建RabbitMq注解类 EnableRabbitMq
import com.common.rabbitmq.config.RabbitMQConfig;
import org.springframework.context.annotation.Import;

import java.lang.annotation.*;

/**
 * <p>
 * 开启支持RabbitMQ
 */
@Target({ ElementType.TYPE })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Import({ RabbitMQConfig.class })
public @interface EnableRabbitMq {

}
  1. 服务引用Mq pom.xml添加mq插件jar包
<dependency>
	<groupId>com.mq</groupId>
	<artifactId>common-rabbitmq</artifactId>
</dependency>
  1. 启动类引用 @EnableRabbitMq 注解
@EnableRabbitMq
@SpringBootApplication
public class TestApplication{
    public static void main(String[] args) {
        SpringApplication.run(TestApplication.class, args);
    }
}
  1. 创建消费者实现类
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class SwitchOffListener {


    @Resource
    private RedisTemplate strRedisTemplate;
	@Resource
	private RabbitTemplate rabbitTemplate;

    @RabbitListener(queues = RabbitMQConfig.DELAYED_SWITCH_OFF_QUEUE_NAME)
    public void onExpired(Message message, Channel channel) throws IOException {
		String msg = new String(message.getBody());
		//获取锁,设置有效期,防止程序异常没有释放锁导致死锁
		String str = UUID.randomUUID().toString();
		String lockKey = RabbitMQConfig.DELAYED_SWITCH_OFF_QUEUE_NAME + msg;
		Boolean lock = strRedisTemplate.opsForValue().setIfAbsent(lockKey, str, Duration.ofSeconds(5));
		try {
			if(lock){
				log.info("========进入开关延时关闭处理==========");
				//具体实现
			}
			log.info("开关延时关闭成功!!!");
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			//释放锁
			if (str.equals(strRedisTemplate.opsForValue().get(lockKey))){
				strRedisTemplate.delete(lockKey);
			}
		}
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
  1. 业务调用
 @Autowired
 private RabbitTemplate rabbitTemplate;
  
public void machDeviceCloseTime(String msg) {
		if(StringUtils.isNotBlank(msg){
			try {
				rabbitTemplate.convertAndSend(RabbitMQConfig.DELAYED_SWITCH_OFF_EXCHANGE_NAME,RabbitMQConfig.DELAYED_SWITCH_OFF_ROUTING_KEY, message, listener ->{
						listener.getMessageProperties().setDelay(1000);//1毫秒后发送mq消息
						return listener;
					});
				}
			} catch (JsonProcessingException e) {
				throw new RuntimeException(e);
			}
		}
	}
  1. application.yml 添加配置
spring:
  rabbitmq:
  	# mq单机模式配置
  	host: 127.0.0.1
  	port: 5672
  	# mq集群模式配置方式
    addresses: 127.0.0.1:5672,127.0.0.2:5672,127.0.0.3:5672
    username: admin
    password: 123456
    virtual-host: /test
    #这个配置是保证提供者确保消息推送到交换机中,不管成不成功,都会回调
    publisher-confirm-type: correlated
    #保证交换机能把消息推送到队列中
    publisher-returns: true
    #这个配置是保证消费者会消费消息,手动确认
    listener:
      simple:
        acknowledge-mode: manual
    template:
      mandatory: true

配置Rabbitmq 集群参照 https://blog.csdn.net/zhangbinlong/article/details/127067044?spm=1001.2014.3001.5502

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/331393.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【23种设计模式】创建型模式详细介绍

前言 本文为 【23种设计模式】创建型模式详细介绍 相关内容介绍&#xff0c;下边具体将对单例模式&#xff0c;工厂方法模式&#xff0c;抽象工厂模式&#xff0c;建造者模式&#xff0c;原型模式&#xff0c;具体包括它们的特点与实现等进行详尽介绍~ &#x1f4cc;博主主页&…

计算机组成原理(一)

1.了解计算机硬件的发展和软件的发展历程&#xff1b; 硬件&#xff1a;   电子管时代&#xff08;1946-1959&#xff09;&#xff1a;电子管、声汞延迟线、磁鼓   晶体管时代&#xff08;1959-1964&#xff09;&#xff1a;晶体管、磁芯   中、小规模集成电路时代&#…

OpenStack云平台搭建(1) | 基础环境准备

目录 一、环境准备 1.1、关闭selinxu 1.2、关闭防火墙 1.3、修改主机名 1.4、配置时间同步服务器 1.5、配置域名 二、安装OpenStack库 2.1、启用OpenStack仓库的包 2.2、安装python-openstackclient 2.3、controller安装数据库 2.4、安装消息队列 2.5、配置缓存 2.…

Linux驱动开发基础__中断的线程化处理

目录 1 引入 2 内核机制 2.1 调用 request_threaded_irq 后内核的数据结构 2.2 request_threaded_irq 2.3 中断的执行过程 1 引入 复杂、耗时的事情&#xff0c;尽量使用内核线程来处理。工作队列用起来挺简单&#xff0c;但是它有一个缺点&#xff1a;工作队列中有多个 …

【Java 面试合集】HashMap中为什么要使用红黑树

HashMap中为什么要使用红黑树1. 概述 从源码的结构方面讲述下为什么HashMap要使用红黑树。那没有红黑树的时候&#xff0c;底层是基于什么逻辑进行存储的。 2. 底层结构 如果忽略红黑树的话&#xff0c;HashMap底层的数据存储结构如下&#xff1a; 总体而言就是数组 链表的形…

Vscode使用

我是四五年的webstorm忠粉&#xff0c;一直觉得它是世界上最好用、强大、方便的编辑器。 为了它深谙各种破解方法&#xff0c;突然有一天我知道的几种方法都不奏效了。破解的实在太累了&#xff0c;算了&#xff0c;尝试尝试不同的工具吧。 含泪挥别webstrom&#xff0c;捏着…

JDBC编程复习

文章目录JDBC1.概念2.原理3. 如何使用JDBC编程1. 下载mysql的jdbc驱动2. 项目中引入驱动4. JDBC使用1. 和数据库建立连接2.获取连接3. Statement对象4. 释放资源JDBC 1.概念 JDBC,即Java Database Connectivity&#xff0c;java数据库连接。是Java提供的API用来执行SQL语句&a…

SWPU新生赛WriteUp

一个线上赛&#xff0c;这个NSSCTF最爽的就是没有靶机操作的一分钟冷却&#xff0c;10.11比赛结束&#xff0c;但是我还要看看工控&#xff0c;所以不打这个比赛了&#xff0c;先把wp写了&#xff0c;pwn入门真TM艰难 WEB 前面送分题&#xff0c;中间的也是基础题&#xff0c;…

SQL概述及数据定义

文章目录一、SQL概述1.简介2.特点3.组成4.SQL分类5.书写规范二、数据定义1.模式的定义与删除①定义模式②删除模式2.基本表的定义、删除与修改①定义基本表②数据类型③模式与表④修改基本表⑤删除基本表3.索引的建立与删除①建立索引②删除索引一、SQL概述 1.简介 SQL (Stru…

使用IDA查看汇编代码上下文去辅助排查C++软件异常问题

目录 1、概述 2、汇编指令能最直接反映异常崩溃的原因 2.1、查看异常Code码及对应的异常类型 2.2、查看发生崩溃的那条汇编指令 3、阅读汇编代码上下文需要掌握一定的基础汇编知识 4、Windbg中显示的函数调用堆栈中的C代码行号&#xff0c;和最新的代码对不上了 5、Wind…

openGL学习之GLFW和GLAD的下载和编译

背景:为什么使用GLFW和GLADOPenGL环境 目前主流的桌面平台是GLFW和GLAD之前使用的GLUT和Free GLUT已经基本淘汰了&#xff0c;所以记录一下如何下载GLFW和GLAD并且编译.GLFW下载:An OpenGL library | GLFW复制到你想存放的位置,我这里就存放到C盘Libaray文件夹下了,这里是我存放…

算法训练营 day37 贪心算法 K次取反后最大化的数组和 加油站 分发糖果

算法训练营 day37 贪心算法 K次取反后最大化的数组和 加油站 分发糖果 K次取反后最大化的数组和 1005. K 次取反后最大化的数组和 - 力扣&#xff08;LeetCode&#xff09; 给你一个整数数组 nums 和一个整数 k &#xff0c;按以下方法修改该数组&#xff1a; 选择某个下标…

Eclipse 版本升级记录

前言 Eclipse 不是不能在线升级&#xff0c;至少没有找对方法&#xff0c;下面就让我来一步一步带你学会它、使用它吧! 一、概念 Eclipse主要分为两个概念&#xff0c;一个是在线升级 Eclipse 新版本&#xff0c;另一个是在线升级 Eclipse 插件&#xff0c;这两个是有很大区…

【记录】ChatGPT使用记录

文章目录2023年02月08日数学哲学Java其他2023年02月09日ChatGPT网络根据对话的日期、问题的类型进行整理 2023年02月08日 数学 感想&#xff1a;数学应该没啥问题&#xff0c;感觉只要自然语言没理解错了&#xff0c;剩下就不是事 积分 代数 哲学 哲学问题的回答就属于模棱…

JetpackCompose从入门到实战学习笔记7—Dialog的简单使用

JetpackCompose从入门到实战学习笔记7—Dialog的简单使用 1.Dialog对话框 Dialog的参数如下: Composable fun Dialog(onDismissRequest: (() -> Unit)?, //关闭对话框的回调properties: DialogProperties! DialogProperties(), //自定义对话框的属性content: ( Compose…

智能无障碍轮椅——汇总

文章目录一、设计内容二、控制理论三、材料列表四、控制图五、硬件介绍1、TB6612FNG电机驱动模块2、DX-BT04 2.0蓝牙模块3、MPU6050陀螺仪模块4、电源模块5、520编码器直流减速电机六、PID理论与算法控制七、代码解析八、参考博文一、设计内容 使用 STM32 作为主处理器进行开发…

Web3.0:互联网新阶段

Web3.0 定义&#xff1a;从后端生产关系革新开始。 Web3.0 是结合了去中心化和代币&#xff08;Token&#xff09;经济学等概念&#xff0c;基于区块链技术的全新的互联网迭代方向&#xff0c;意在解决 Web2.0 带来的生态不平衡、发展不透明等问题。与 AR/VR 等前端创新相比&am…

oracle外键约束、级联删除

根据约束名称查询&#xff1a;select * from user_constraints t where t.CONSTRAINT_NAME 约束名称举例&#xff1a;字段解析&#xff1a;1、CONSTRAINT_NAME&#xff1a;约束名称。2、CONSTRAINT_TYPE&#xff1a;约束类型。3、TABLE_NAME&#xff1a;约束所在的表。4、R_CO…

FreeModbus RTU 移植指南

FreeModbus 简介 FreeModbus 是一个免费的软件协议栈&#xff0c;实现了 Modbus 从机功能&#xff1a; 纯 C 语言支持 Modbus RTU/ASCII支持 Modbus TCP 本文介绍 Modbus RTU 移植。 移植环境&#xff1a; 裸机Keil MDK 编译器Cortex-M3 内核芯片&#xff08;LPC1778/88&…

Intel x86_64 PMU简介

文章目录前言一、性能监控概述二、CPUID information三、架构性能监控3.1 架构性能监控 Version 13.1.1 架构性能监控 Version 1 Facilities3.1.2 预定义的体系结构性能事件3.1.3 cmask demo测试参考资料前言 Intel 64 和 IA-32 架构提供了 PMU&#xff08;Performance Monito…