【SpringCloud】SpringAMQP

news2024/11/20 6:27:45

文章目录

  • 1、AMQP
  • 2、基本消息模型队列
  • 3、WorkQueue模型
  • 4、发布订阅模型
  • 5、发布订阅-Fanout Exchange
  • 6、发布订阅-DirectExchange
  • 7、发布订阅-TopicExchange

1、AMQP

Advanced Message Queuing Protocol,高级消息队列协议。是用于在应用程序之间传递业务消息的开放标准。

该协议与语言和平台无关,更符合微服务中独立性的要求。

在这里插入图片描述

Spring AMQP是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。

它包含两部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现。

在这里插入图片描述

接下来对几种常见的MQ模型,用SpringAMQP来演示一下具体实现。

2、基本消息模型队列

消息发送

  • 引入AMQP依赖(publisher和consumer服务都需要amqp依赖,因此这里把依赖直接放到父工程)
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>    		
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

  • 在publisher服务中编写application.yml,添加mq连接信息
spring:
  rabbitmq:
      host: 192.168.150.101 # 主机名
      port: 5672 # 端口
      virtual-host: / # 虚拟主机 
      username: root # 用户名
      password: 123321 # 密码

  • 在publisher服务中新建一个测试类,注入RabbitTemplate,调用convertAndSend方法,传入队列名称和消息内容即可
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testSimpleQueue() { 
           String queueName = "simple.queue";
           String message = "hello, spring amqp!";
           rabbitTemplate.convertAndSend(queueName, message);
    }
}

消息接收

  • 在consumer服务中编写application.yml,添加mq连接信息
spring:
  rabbitmq:
      host: 192.168.150.101 # 主机名
      port: 5672 # 端口
      virtual-host: / # 虚拟主机 
      username: root # 用户名
      password: 123321 # 密码

  • 在consumer服务中新建一个类,编写消费逻辑
//定义类,添加@Component注解
@Component
public class SpringRabbitListener {
	@RabbitListener(queues = "simple.queue")    
	public void listenSimpleQueueMessage(String msg) throws InterruptedException {        
		System.out.println("spring 消费者接收到消息 :【" + msg + "】");    
	}
}

类中声明方法,添加@RabbitListener注解,注解上写明要消费的队列名称,此时,方法的参数就是消息。

在这里插入图片描述

3、WorkQueue模型

和基本模型不一样,WorkQueue模型,即工作队列,有多个消费者。可以提高消息处理速度,避免队列消息堆积。

在这里插入图片描述

案例:实现一个队列绑定多个消费者

思路如下:

  • 在publisher服务中定义测试方法,每秒产生50条消息,发送到队列simple.queue
  • 在consumer服务中定义两个消息监听者,都监听simple.queue队列,但消费者1每秒能处理50条消息,而消费者2每秒只能处理10条消息

演示代码如下,先写生产者:

//在publisher服务中添加一个测试方法,循环发送50条消息到simple.queue队列

@Test
public void testWorkQueue() throws InterruptedException {    
	// 队列名称    
	String queueName = "simple.queue";    
	// 消息   
	String message = "hello, message__";    
	for (int i = 0; i < 50; i++) {        
		// 发送消息
		rabbitTemplate.convertAndSend(queueName, message + i);
        // 避免发送太快        
        Thread.sleep(20);    
     }
}

接下来编写两个消费者,都监听上面的队列simple.queue:

@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage1(String msg) throws InterruptedException {
    System.out.println("spring 消费者1接收到消息:【" + msg + "】");
    Thread.sleep(25);
}

@RabbitListener(queues = "simple.queue") 
public void listenSimpleQueueMessage2(String msg) throws InterruptedException {    
	System.err.println("spring 消费者2接收到消息:【" + msg + "】");    
	Thread.sleep(100);

}

上面一个用System.out.println一个用System.error.println,这样一红一白,在控制台查看消费的效果:

在这里插入图片描述

可以看到,消费者2处理消息慢(代码里用处理一次休眠的旧来模拟),但它从队列里拿的消息却和消费者1一样,因此导致消费总时间过长。对消费者2来说,这就是没有那个金刚钻,却揽了这么多瓷器活儿。这个现象的原因是 ⇒ 消息预取机制

预取机制,通俗说就是消息到了队列后,消费者通过通道一人一个先拿过来,能不能快速处理完的另说,先取走再说。

接下来加一个消费预取限制。通过设置prefetch来控制消费者预取的消息数量。修改消费者的application.yml文件,设置preFetch这个值,可以控制预取消息的上限:

spring:
  rabbitmq:
      host: 192.168.150.101 # 主机名
      port: 5672 # 端口
      virtual-host: / # 虚拟主机 
      username: root # 用户名
      password: 123321 # 密码
	  listener:      
	  	simple:        
	  	  prefetch: 1  # 每次只能获取一条消息,处理完成才能获取下一个消息

prefetch默认无限,这里改为1,即 每次只能获取一条消息,处理完成才能获取下一个消息。重启,这次同样生产50条消息,消费总时间变短了,可以看到消费者2处理的慢,取的也慢。

在这里插入图片描述

4、发布订阅模型

和前面两种模型不同,一个消息被一个消费者消费完就没了。发布订阅模式允许将同一消息发送给多个消费者。实现方式是加入了exchange(交换机)。 当前上一章说的支付功能的实现,就得用这个模型。

在这里插入图片描述

消息被路由到哪些队列中,由exchange决定。常见exchange类型包括:

  • Fanout:广播
  • Direct:路由
  • Topic:话题

注意:exchange负责消息路由,而不是存储,路由失败则消息丢失

5、发布订阅-Fanout Exchange

Fanout Exchange 会将接收到的消息广播到每一个跟其绑定的queue

在这里插入图片描述

SpringAMQP提供了声明交换机、队列、绑定关系的API:

在这里插入图片描述

接下来写演示代码:

  • 步骤1:在消费consumer服务声明Exchange、Queue,并Binding绑定
//在consumer服务配置目录下创建一个类,添加@Configuration注解
@Configuration
public class FanoutConfig {
    // 声明FanoutExchange交换机
    @Bean    
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("exchange.fanout");
    }    
    // 声明第1个队列
    @Bean    
    public Queue fanoutQueue1(){        
    	return new Queue("fanout.queue1");    
	}   

	//绑定队列1和交换机,方法形参就是队列和交换机类型的参数  
	@Bean    
	public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){        
		return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);    
	}    

	//以相同方式声明第2个队列,并完成绑定到上面的交换机
	@Bean    
    public Queue fanoutQueue2(){        
    	return new Queue("fanout.queue2");    
	}   

	//绑定队列2和交换机  
	@Bean    
	public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){        
		return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);    
	}    
}

//将来Spring读到这些Bean,就会向RabbitMQ去声明这些队列和交换机,并绑定

启动消费服务,可以看到队列创建并且绑定到了交换机,以后交换机有消息,和交换机绑定的队列都能收到一份!

在这里插入图片描述

  • 步骤2:在consumer服务声明两个消费者,即添加两个方法,分别监听fanout.queue1和fanout.queue2
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
   System.out.println("消费者1接收到fanout.queue1的消息:【" + msg + "】");
}

@RabbitListener(queues = "fanout.queue2") 
public void listenFanoutQueue2(String msg) {
    System.out.println("消费者2接收到fanout.queue2的消息:【" + msg + "】");
}

//这里在一个服务里用两个消费者方法模拟被多个服务消费
//到此,两个队列就分别和这两个消费者方法勾搭上了
//重启消费者服务。
  • 步骤3:在publisher服务发送消息到FanoutExchange
//这次不再是send到队列,而是发到交换机
@Test
public void testFanoutExchange() {    // 队列名称    String exchangeName = "itcast.fanout";    // 消息    String message = "hello, everyone!";    // 发送消息,参数分别是:交互机名称、RoutingKey(暂时为空)、消息     rabbitTemplate.convertAndSend(exchangeName, "", message);}

//执行这段测试代码,往交换机发消息

查看消费者服务控制台,可以看到一次发送,被多个消费者收到

在这里插入图片描述

小总结:

交换机的作用是什么?

➢ 接收publisher发送的消息
➢ 将消息按照规则路由到与之绑定的队列
➢ 不能缓存消息,路由失败,消息丢失
➢FanoutExchange的会将消息路由到每个绑定的队列

6、发布订阅-DirectExchange

Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此称为路由模式(routes)。

  • 每一个Queue都与Exchange设置一个BindingKey
  • 发布者发送消息时,指定消息的RoutingKey
  • Exchange将消息路由到BindingKey与消息RoutingKey一致的队列

在这里插入图片描述

当然两个队列可以设置同一个key,此时的效果就和上面的Fanout Exchange一样了。

在这里插入图片描述

步骤1:在consumer服务声明Exchange、Queue。这里不再用Bean,而是利用@RabbitListener声明Exchange、Queue、RoutingKey

@RabbitListener(bindings = @QueueBinding(        
		value = @Queue(name = "direct.queue1"),        
		exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),       
		key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
    System.out.println("消费者1接收到Direct消息:【"+msg+"】");
}


@RabbitListener(bindings = @QueueBinding(        
	value = @Queue(name = "direct.queue2"),        
	exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),        
	key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
    System.out.println("消费者2接收到Direct消息:【"+msg+"】 ");
}

看IDEA的提示类型写就行:exchange的type属性默认就是direct类型,这里不加也行。

在这里插入图片描述
启动消费者服务,到此,队列和交换机都被创建和绑定好了,查看RabbitMQ控制台:

在这里插入图片描述

步骤2:在publisher服务发送消息到DirectExchange,此时convertAndSend方法的第二个参数就要写Routing key了

//在publisher服务的SpringAmqpTest类中添加测试方法
@Test
public void testDirectExchange() {    
	// 队列名称    
	String exchangeName = "itcast.direct";    
	// 消息     
	String message = "hello,blue!";    
	// 发送消息,参数依次为:交换机名称,RoutingKey,消息
	 rabbitTemplate.convertAndSend(exchangeName, "blue", message);
}

运行这个测试方法,查看消费服务:

在这里插入图片描述

小总结:

Direct交换机与Fanout交换机的差异?
➢ Fanout交换机将消息路由给每一个与之绑定的队列
➢ Direct交换机根据RoutingKey判断路由给哪个队列
➢ 如果多个队列具有相同的RoutingKey,则与Fanout功能类似

基于@RabbitListener注解声明队列和交换机的常用注解有:
@Queue
@Exchange

7、发布订阅-TopicExchange

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/591357.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ElasticSearch安装部署——超详细

ElasticSearch安装部署 简介 全文搜索属于最常见的需求&#xff0c;开源的 Elasticsearch &#xff08;以下简称 es&#xff09;是目前全文搜索引擎的首选。 它可以快速地储存、搜索和分析海量数据。维基百科、Stack Overflow、Github 都采用它。 Elasticsearch简称es&…

JS中手撕防抖函数和节流函数

1.防抖函数 1.1定义 说明&#xff1a;在一定时间内&#xff0c;频繁执行事件&#xff0c;只执行最后一次函数。(英雄联盟回城) 1.2步骤&#xff1a; 声明定时器函数判断是否有定时器函数&#xff0c;如果有定时器函数的话就清除定时器。。如果没有定时器函数的话&#xff0…

chatgpt赋能python:Python中的三角函数介绍

Python中的三角函数介绍 Python作为一种高级编程语言&#xff0c;可以处理基础算术运算、三角函数等高等数学的操作。其中&#xff0c;三角函数是常用的数学函数之一&#xff0c;Pyhon中的三角函数包括正弦函数、余弦函数、正切函数等。 正弦函数 正弦函数在三角学中是最基本…

chatgpt赋能python:精度问题在Python编程中的影响及解决方法

精度问题在Python编程中的影响及解决方法 Python是一种解释性编程语言&#xff0c;以其简单易学、开发效率高等特点而广受欢迎。然而&#xff0c;Python中的浮点数精度问题却经常困扰着程序员。在本文中&#xff0c;我们将详细介绍Python中精度问题的影响及解决方法。 精度问…

华为OD机试真题B卷 Java 实现【字符串分隔】,附详细解题思路

一、题目描述 输入一个字符串&#xff0c;请按长度为8拆分每个输入字符串并进行输出&#xff0c;长度不是8整数倍的字符串请在后面补数字0&#xff0c;空字符串不处理。 二、输入描述 连续输入字符串(每个字符串长度小于等于100)。 三、输出描述 依次输出所有分割后的长度…

layui框架学习(25:弹出层模块_加载框询问框)

layui框架的弹出层模块layer中最重要的函数即layer.open&#xff0c;基于该函数&#xff0c;layer模块封装了很多常用弹出框&#xff0c;上文已介绍了消息框和提示框函数&#xff0c;本文学习加载框和询问框函数的基本用法&#xff0c;同时继续学习layer模块中基础参数的用法。…

Part1:使用 TensorFlow 和 Keras 的 NeRF计算机图形学和深度学习——计算机图形学世界中相机的工作原理

Part1&#xff1a;使用 TensorFlow 和 Keras 的 NeRF计算机图形学和深度学习 1. 效果图2. 原理2.0 前向成像模型2.1 世界坐标系2.2 相机坐标系2.3 坐标变换2.4 投影转换2.5 数据 3. 源码参考 是否有一种方法可以仅从一个场景多张不同视角的照片中捕获整个3D场景&#xff1f; 有…

有奖励!2023陕西省首台(套)重大技术装备产品项目申报条件、认定材料

本文整理了2023陕西省首台&#xff08;套&#xff09;重大技术装备产品项目申报条件&#xff0c;认定材料等相关内容&#xff0c;感兴趣的朋友快跟小编一起来看看吧&#xff01; 一、重点支持方向及领域 重点支持方向及领域&#xff1a;高档工业母机、电力装备、大型矿山和冶金…

【MySQL】MySQL 字段为 NULL 的5大坑

数据准备 建立一张表 数据如下&#xff1a; 1.count 数据丢失 count(*) 会统计值为 NULL 的行&#xff0c;而 count(列名) 不会统计此列为 NULL 值的行。 select count(*),count(name) from person; ----------------------- count(*) | count(name)10 | 8-------…

如何使用Python自动化测试工具Selenium进行网页自动化?

引言 Selenium是一个流行的Web自动化测试框架&#xff0c;它支持多种编程语言和浏览器&#xff0c;并提供了丰富的API和工具来模拟用户在浏览器中的行为。Selenium可以通过代码驱动浏览器自动化测试流程&#xff0c;包括页面导航、元素查找、数据填充、点击操作等。 与PyAuto…

关于linux的ssh(出现的问题以及ubuntu的ssh配置和ssh连接超时问题)

目录 Ubuntu进行ssh连接 关于ssh报错排错 备注&#xff1a;防火墙和selinux可能对ssh连接存在限制&#xff0c;但是我在操作的时候并没对我照成影响 查看selinux状态 ssh_config和sshd_config的区别 Ubuntu进行ssh连接 1.首先需要安装SSH服务器&#xff0c;在ubuntu终端输…

Java学习路线(19)——IO流(下)

一、缓冲流 1、概念&#xff1a; 一种自带缓冲区的字节流、可提高原始字节流、字符流读写数据的性能。 2、缓冲流高性能原理&#xff1a; 磁盘与内存之间有一块存储区域&#xff0c;当磁盘向内存传输数据时&#xff0c;先传输到缓冲区&#xff0c;当缓冲区满了之后&#xff0…

[强网杯 2019]随便注 1【SQL注入】解析过程

1.首先启动并访问靶机&#xff0c;有一个输入框&#xff0c;随便输入1 or 1 1&#xff0c;测试一下是否存在sql注入。 2.提交后提示error 1064 : You have an error in your SQL syntax; check the manual that corresponds to your MariaDB server version for the right syn…

chatgpt赋能python:Python中的_--了解这个神秘的下划线

Python中的_ – 了解这个神秘的下划线 Python是一种流行的编程语言&#xff0c;它具有简单易学的语法和强大的功能。一些Python的特殊语法经常会让初学者感到困惑。其中&#xff0c;一个神秘的下划线符号在Python中出现的频率非常高&#xff0c;而且它的含义和使用也非常多样化…

chatgpt赋能python:Python中符号的用法

Python中符号的用法 在Python编程中&#xff0c;符号是非常重要的一部分。通过合理使用符号&#xff0c;我们可以轻松地实现许多功能和操作。下面是Python中一些常用的符号的介绍和用法。 赋值符号 斜杠等于号&#xff08;&#xff09;被用来赋值。例如&#xff0c;如果我们要…

Windows的Powershell终端增强

Ubuntu上一直用的Oh My Zsh强化终端&#xff0c;体验非常nice。最近在Win上做东西比较多&#xff0c;于是也想把Powershell这个简陋的终端加强一下。 说干就干&#xff0c;网上查了一圈&#xff0c;发现大部分人用Oh My Posh来操作&#xff0c;因此试了一下&#xff0c;发现卡…

UFS 1-UFS架构简介1

UFS 1-UFS架构简介 1 UFS是什么&#xff1f;1.1 UFS1.2 一般特征1.2.1 Target performance1.2.2 Target host applications1.2.3 Target device types1.2.4 Topology1.2.5 UFS Layering 1.3 Interface Features1.3.1 Three power supplies1.3.2 Signaling as defined by [MIPI-…

Linux系统下imx6ull QT编程—— C++类和对象(三)

Linux QT编程 文章目录 Linux QT编程一、类和对象 一、类和对象 C 在 C 语言的基础上增加了面向对象编程&#xff0c;C 支持面向对象程序设计。类是 C 的核心特性&#xff0c;通常被称为用户定义的类型。类用于指定对象的形式&#xff0c;它包含了数据表示法和用于处理数据的方…

C++ 函数对象 详解

目录 &#x1f914;函数对象&#xff1a; &#x1f914;本质&#xff1a; &#x1f914;特点&#xff1a; 代码示例&#xff1a; 运行结果&#xff1a; &#x1f914; 内置函数对象&#xff1a; 1.算数仿函数 代码示例&#xff1a; 运行结果&#xff1a; 2.关系仿函数 …

四轴姿态解算-imu算法

理论篇 欧拉角四元数方向余弦矩阵 强调三者描述的是坐标系A,A之间的变换关系 欧拉角&#xff0c;四元数&#xff0c;方向余弦矩阵都可以描述四轴的姿态变换 注意这里强调的是变换 三者转换公式 一阶龙格库塔法 核心要点简介: 假设一阶函数随时间关系如: y a * T1b 则,在经…