【SpringCloud】SpringAMQP总结

news2024/11/18 11:48:33

文章目录

  • 1、AMQP
  • 2、基本消息模型队列
  • 3、WorkQueue模型
  • 4、发布订阅模型
  • 5、发布订阅-Fanout Exchange
  • 6、发布订阅-DirectExchange
  • 7、发布订阅-TopicExchange
  • 8、消息转换器

1、AMQP

Advanced Message Queuing Protocol,高级消息队列协议。是用于在应用程序之间传递业务消息的开放标准。

该协议与语言和平台无关,更符合微服务中独立性的要求。

在这里插入图片描述

Spring AMQP是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。

它包含两部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现。

在这里插入图片描述

接下来对几种常见的MQ模型,用SpringAMQP来演示一下具体实现。

2、基本消息模型队列

消息发送

  • 引入AMQP依赖(publisher和consumer服务都需要amqp依赖,因此这里把依赖直接放到父工程)
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>    		
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

  • 在publisher服务中编写application.yml,添加mq连接信息
spring:
  rabbitmq:
      host: 192.168.150.101 # 主机名
      port: 5672 # 端口
      virtual-host: / # 虚拟主机 
      username: root # 用户名
      password: 123321 # 密码

  • 在publisher服务中新建一个测试类,注入RabbitTemplate,调用convertAndSend方法,传入队列名称和消息内容即可
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testSimpleQueue() { 
           String queueName = "simple.queue";
           String message = "hello, spring amqp!";
           rabbitTemplate.convertAndSend(queueName, message);
    }
}

消息接收

  • 在consumer服务中编写application.yml,添加mq连接信息
spring:
  rabbitmq:
      host: 192.168.150.101 # 主机名
      port: 5672 # 端口
      virtual-host: / # 虚拟主机 
      username: root # 用户名
      password: 123321 # 密码

  • 在consumer服务中新建一个类,编写消费逻辑
//定义类,添加@Component注解
@Component
public class SpringRabbitListener {
	@RabbitListener(queues = "simple.queue")    
	public void listenSimpleQueueMessage(String msg) throws InterruptedException {        
		System.out.println("spring 消费者接收到消息 :【" + msg + "】");    
	}
}

类中声明方法,添加@RabbitListener注解,注解上写明要消费的队列名称,此时,方法的参数就是消息。

在这里插入图片描述

3、WorkQueue模型

和基本模型不一样,WorkQueue模型,即工作队列,有多个消费者。可以提高消息处理速度,避免队列消息堆积。

在这里插入图片描述

案例:实现一个队列绑定多个消费者

思路如下:

  • 在publisher服务中定义测试方法,每秒产生50条消息,发送到队列simple.queue
  • 在consumer服务中定义两个消息监听者,都监听simple.queue队列,但消费者1每秒能处理50条消息,而消费者2每秒只能处理10条消息

演示代码如下,先写生产者:

//在publisher服务中添加一个测试方法,循环发送50条消息到simple.queue队列

@Test
public void testWorkQueue() throws InterruptedException {    
	// 队列名称    
	String queueName = "simple.queue";    
	// 消息   
	String message = "hello, message__";    
	for (int i = 0; i < 50; i++) {        
		// 发送消息
		rabbitTemplate.convertAndSend(queueName, message + i);
        // 避免发送太快        
        Thread.sleep(20);    
     }
}

接下来编写两个消费者,都监听上面的队列simple.queue:

@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage1(String msg) throws InterruptedException {
    System.out.println("spring 消费者1接收到消息:【" + msg + "】");
    Thread.sleep(25);
}

@RabbitListener(queues = "simple.queue") 
public void listenSimpleQueueMessage2(String msg) throws InterruptedException {    
	System.err.println("spring 消费者2接收到消息:【" + msg + "】");    
	Thread.sleep(100);

}

上面一个用System.out.println一个用System.error.println,这样一红一白,在控制台查看消费的效果:

在这里插入图片描述

可以看到,消费者2处理消息慢(代码里用处理一次休眠的旧来模拟),但它从队列里拿的消息却和消费者1一样,因此导致消费总时间过长。对消费者2来说,这就是没有那个金刚钻,却揽了这么多瓷器活儿。这个现象的原因是 ⇒ 消息预取机制

预取机制,通俗说就是消息到了队列后,消费者通过通道一人一个先拿过来,能不能快速处理完的另说,先取走再说。

接下来加一个消费预取限制。通过设置prefetch来控制消费者预取的消息数量。修改消费者的application.yml文件,设置preFetch这个值,可以控制预取消息的上限:

spring:
  rabbitmq:
      host: 192.168.150.101 # 主机名
      port: 5672 # 端口
      virtual-host: / # 虚拟主机 
      username: root # 用户名
      password: 123321 # 密码
	  listener:      
	  	simple:        
	  	  prefetch: 1  # 每次只能获取一条消息,处理完成才能获取下一个消息

prefetch默认无限,这里改为1,即 每次只能获取一条消息,处理完成才能获取下一个消息。重启,这次同样生产50条消息,消费总时间变短了,可以看到消费者2处理的慢,取的也慢。

在这里插入图片描述

4、发布订阅模型

和前面两种模型不同,一个消息被一个消费者消费完就没了。发布订阅模式允许将同一消息发送给多个消费者。实现方式是加入了exchange(交换机)。 当前上一章说的支付功能的实现,就得用这个模型。

在这里插入图片描述

消息被路由到哪些队列中,由exchange决定。常见exchange类型包括:

  • Fanout:广播
  • Direct:路由
  • Topic:话题

注意:exchange负责消息路由,而不是存储,路由失败则消息丢失

5、发布订阅-Fanout Exchange

Fanout Exchange 会将接收到的消息广播到每一个跟其绑定的queue

在这里插入图片描述

SpringAMQP提供了声明交换机、队列、绑定关系的API:

在这里插入图片描述

接下来写演示代码:

  • 步骤1:在消费consumer服务声明Exchange、Queue,并Binding绑定
//在consumer服务配置目录下创建一个类,添加@Configuration注解
@Configuration
public class FanoutConfig {
    // 声明FanoutExchange交换机
    @Bean    
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("exchange.fanout");
    }    
    // 声明第1个队列
    @Bean    
    public Queue fanoutQueue1(){        
    	return new Queue("fanout.queue1");    
	}   

	//绑定队列1和交换机,方法形参就是队列和交换机类型的参数  
	@Bean    
	public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){        
		return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);    
	}    

	//以相同方式声明第2个队列,并完成绑定到上面的交换机
	@Bean    
    public Queue fanoutQueue2(){        
    	return new Queue("fanout.queue2");    
	}   

	//绑定队列2和交换机  
	@Bean    
	public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){        
		return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);    
	}    
}

//将来Spring读到这些Bean,就会向RabbitMQ去声明这些队列和交换机,并绑定

启动消费服务,可以看到队列创建并且绑定到了交换机,以后交换机有消息,和交换机绑定的队列都能收到一份!

在这里插入图片描述

  • 步骤2:在consumer服务声明两个消费者,即添加两个方法,分别监听fanout.queue1和fanout.queue2
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
   System.out.println("消费者1接收到fanout.queue1的消息:【" + msg + "】");
}

@RabbitListener(queues = "fanout.queue2") 
public void listenFanoutQueue2(String msg) {
    System.out.println("消费者2接收到fanout.queue2的消息:【" + msg + "】");
}

//这里在一个服务里用两个消费者方法模拟被多个服务消费
//到此,两个队列就分别和这两个消费者方法勾搭上了
//重启消费者服务。
  • 步骤3:在publisher服务发送消息到FanoutExchange
//这次不再是send到队列,而是发到交换机
@Test
public void testFanoutExchange() {    // 队列名称    String exchangeName = "itcast.fanout";    // 消息    String message = "hello, everyone!";    // 发送消息,参数分别是:交互机名称、RoutingKey(暂时为空)、消息     rabbitTemplate.convertAndSend(exchangeName, "", message);}

//执行这段测试代码,往交换机发消息

查看消费者服务控制台,可以看到一次发送,被多个消费者收到

在这里插入图片描述

小总结:

交换机的作用是什么?

➢ 接收publisher发送的消息
➢ 将消息按照规则路由到与之绑定的队列
➢ 不能缓存消息,路由失败,消息丢失
➢FanoutExchange的会将消息路由到每个绑定的队列

6、发布订阅-DirectExchange

Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此称为路由模式(routes)。

  • 每一个Queue都与Exchange设置一个BindingKey
  • 发布者发送消息时,指定消息的RoutingKey
  • Exchange将消息路由到BindingKey与消息RoutingKey一致的队列

在这里插入图片描述

当然两个队列可以设置同一个key,此时的效果就和上面的Fanout Exchange一样了。

在这里插入图片描述

步骤1:在consumer服务声明Exchange、Queue。这里不再用Bean,而是利用@RabbitListener声明Exchange、Queue、RoutingKey

@RabbitListener(bindings = @QueueBinding(        
		value = @Queue(name = "direct.queue1"),        
		exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),       
		key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
    System.out.println("消费者1接收到Direct消息:【"+msg+"】");
}


@RabbitListener(bindings = @QueueBinding(        
	value = @Queue(name = "direct.queue2"),        
	exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),        
	key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
    System.out.println("消费者2接收到Direct消息:【"+msg+"】 ");
}

看IDEA的提示类型写就行:exchange的type属性默认就是direct类型,这里不加也行。

在这里插入图片描述
启动消费者服务,到此,队列和交换机都被创建和绑定好了,查看RabbitMQ控制台:

在这里插入图片描述

步骤2:在publisher服务发送消息到DirectExchange,此时convertAndSend方法的第二个参数就要写Routing key了

//在publisher服务的SpringAmqpTest类中添加测试方法
@Test
public void testDirectExchange() {    
	// 队列名称    
	String exchangeName = "itcast.direct";    
	// 消息     
	String message = "hello,blue!";    
	// 发送消息,参数依次为:交换机名称,RoutingKey,消息
	 rabbitTemplate.convertAndSend(exchangeName, "blue", message);
}

运行这个测试方法,查看消费服务:

在这里插入图片描述

小总结:

Direct交换机与Fanout交换机的差异?
➢ Fanout交换机将消息路由给每一个与之绑定的队列
➢ Direct交换机根据RoutingKey判断路由给哪个队列
➢ 如果多个队列具有相同的RoutingKey,则与Fanout功能类似

基于@RabbitListener注解声明队列和交换机的常用注解有:
@Queue
@Exchange

7、发布订阅-TopicExchange

TopicExchange与DirectExchange类似,区别在于它的routingKey必须是多个单词的列表,并且以 . 分割

在这里插入图片描述

#:代指0个或多个单词
*:代指一个单词

举例:

china.news 代表有中国的新闻消息;
china.weather 代表中国的天气消息;
japan.news 则代表日本新闻
japan.weather 代表日本的天气消息

总之就是用通配符简化了队列和exchange的绑定。用代码演示一下这个TopicExchange类型的使用,测试结构:

在这里插入图片描述
步骤1: 在consumer服务利用@RabbitListener声明Exchange、Queue、RoutingKey

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "topic.queue1"),
        exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
        key = "china.#"
))
public void listenTopicQueue1(String msg){
    System.out.println("消费者1接收到Topic消息:【"+msg+"】");
}

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "topic.queue2"),
        exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
        key = "#.news"
))
public void listenTopicQueue2(String msg){
    System.out.println("消费者2接收到Topic消息:【"+msg+"】");
}

//注意key和exchange中的type

步骤2: 在publisher服务发送消息到TopicExchange

//在publisher服务的SpringAmqpTest类中添加测试方法
@Test
public void testTopicExchange() {
    // 队列名称    
    String exchangeName = "itcast.topic";    
    // 消息     
    String message = "喜报!孙悟空大战哥斯拉,胜!";
    // 发送消息
    rabbitTemplate.convertAndSend(exchangeName, "china.news", message);

}

8、消息转换器

SpringAMQP的发送方法convertAndSend()中,接收消息的类型是Object,也就是说我们可以发送任意对象类型的消息,SpringAMQP会帮我们序列化为字节后发送

//这次不再是用@RabbitListener,因为它用在消费方法上,我们要看消息,不希望它被消费
//因此使用Bean声明队列
@Bean
public Queue objectMessageQueue(){
    return new Queue("object.queue");
}

在publisher中发送消息以测试:

@Test
public void testSendMap() throws InterruptedException {    
	// 准备消息    
	Map<String,Object> msg = new HashMap<>();
	msg.put("name", "Jack");     
	msg.put("age", 21);   
	// 发送消息
	rabbitTemplate.convertAndSend("object.queue", msg);
}

重启服务,查看RabbitMQ中的消息:

在这里插入图片描述
数据展示有问题,Spring的对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理的。而默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化。接下来修改序列化的方式。

修改序列化方式只需要定义一个MessageConverter 类型的Bean即可

  • 在父工程(或者消息生产者服务)中引入依赖
<dependency>
   <groupId>com.fasterxml.jackson.core</groupId> 
   <artifactId>jackson-databind</artifactId>
</dependency>

  • 在配置类中声明MessageConverter的Bean(直接写启动类中也行)
//我们一旦声明MessageConverter,就会覆盖Spring默认的MessageConverter(SpringBoot自动装配的原理)

@Bean
public MessageConverter jsonMessageConverter(){
    return new Jackson2JsonMessageConverter(); 
}

此时再重发消息,RabbitMQ中消息展示正常。接下来看消费者服务:

  • 引入Jackson依赖(如果上一步引入依赖放在了父工程,那这里就不用重复引入了)
<dependency>
   <groupId>com.fasterxml.jackson.core</groupId> 
   <artifactId>jackson-databind</artifactId>
</dependency>

  • 在consumer服务定义MessageConverter:
@Bean
public MessageConverter jsonMessageConverter(){
    return new Jackson2JsonMessageConverter(); 
}
  • 定义一个消费者,监听object.queue队列并消费消息
@RabbitListener(queues = "object.queue")
public void listenObjectQueue(Map<String, Object> msg) {
    System.out.println("收到消息:【" + msg + "】"); 
}

重启消费者服务,序列化方式修改成功!

SpringAMQP中消息的序列化和反序列化是怎么实现的?

利用MessageConverter实现的,默认是JDK的序列化,可重新定义MessageConverter类型的Bean来修改。

注意发送方与接收方必须使用相同的MessageConverter

System.out.println("End!");

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/604382.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

深入理解设计原则之单一职责原则(SRP)

系列文章目录 C高性能优化编程系列 深入理解设计原则系列 深入理解设计模式系列 高级C并发线程编程 SRP&#xff1a;单一职责原则 系列文章目录1、单一职责原则的定义和解读2、单一职责原则案例解读2.1、违背单一职责原则反面案例2.2、违背单一职责原则反面案例 - 解决方案 3…

《嵌入式存储器架构、电路与应用》----学习记录(三)

第4章 嵌入式内存 4.1 Flash的发展背景 Flash是非易失存储器&#xff0c;具有存储密度高、容错能力强和读写速度相对较慢等特点&#xff0c;传统Flash广泛应用于外部大数据存储。为了满足微控制器芯片(MCU)高速运算的需求&#xff0c;嵌入式Flash(eFlash)往往作为MCU的内部数…

【花雕学AI】ChatGPT的四大语言处理神器:文本生成、问答、创意生成和内容优化的技巧和实例

引言&#xff1a;ChatGPT是一个人工智能聊天机器人&#xff0c;它可以理解和交流多种语言&#xff0c;例如中文、英文、日文、西班牙语、法语、德语等。它是由OpenAI开发的&#xff0c;基于GPT-3.5和GPT-4这两个大型语言模型。它不仅可以与用户进行对话&#xff0c;还可以根据用…

Centos7切换到Alibaba Cloud Linux3

通过控制台自动导入迁移源目前仅支持迁移源的类型为物理机/虚拟机/云服务器和阿里云ECS云服务器&#xff0c;若需要迁移其他类型的迁移源&#xff0c;则可以选择手动导入迁移源。 第一步&#xff0c;登录SMC客户端。 登录网址&#xff1a;阿里云登录 - 欢迎登录阿里云&#x…

MYSQL 8 Too many connections error 还在继续,这对DBA 是不公平的

开头还是介绍一下群&#xff0c;如果感兴趣polardb ,mongodb ,mysql ,postgresql ,redis 等有问题&#xff0c;有需求都可以加群群内有各大数据库行业大咖&#xff0c;CTO&#xff0c;可以解决你的问题。加群请联系 liuaustin3 &#xff0c;在新加的朋友会分到2群&#xff08;共…

c++11 标准模板(STL)(std::bitset)(二)

定义于头文件 <bitset> template< std::size_t N > class bitset; 类模板 bitset 表示一个 N 位的固定大小序列。可以用标准逻辑运算符操作位集&#xff0c;并将它与字符串和整数相互转换。 bitset 满足可复制构造 (CopyConstructible) 及可复制赋值 (CopyAssig…

qgis二次开发环境搭建(qgis-3.28.6+ubuntu22.04+qt5.15)

背景 一个Ros2项目中用到了qgis&#xff0c;特此整理一下用到的qgis二次开发代码。 linux搭建Debug环境(省事简单apt一把梭) 下载 ubuntu22.04qgis-3.28.6Qt version 5.15.3 编译 参考qgis编译文档 ubuntu22.04 使用上图jammy的安装命令上图中的apt-get安装命令安装的…

PyQt5桌面应用开发(19):事件过滤器

本文目录 PyQt5桌面应用系列再来点事件事件过滤器例子这是什么恶毒巫术?需求分析代码额外的细节知 总结 PyQt5桌面应用系列 PyQt5桌面应用开发&#xff08;1&#xff09;&#xff1a;需求分析 PyQt5桌面应用开发&#xff08;2&#xff09;&#xff1a;事件循环 PyQt5桌面应用开…

从零到无搭建Vue项目及代码风格规范

注&#xff1a;已经有vue项目的可以跳过项目初始化 Vue项目搭建 环境搭建 安装nvm 方便后续切换不通的node版本 nvm官网 傻瓜安装就行 或者搜下自己&#xff08;非本文重点&#xff09;nvm 安装好后 安装一个Node版本 本文使用的 有了环境开始创建Vue项目 打开命令行 cmd n…

Redis底层学习(五)—存储类型-Set篇

文章目录 特点具体服务器操作命令底层结构应用场景 特点 适⽤场景&#xff1a;存储有去重需求的数据&#xff0c;⽐如&#xff1a;针对⼀篇⽂章⽤户进⾏点赞操作。 它的特点是内部元素⽆序且不重复。它的内部实现相当于⼀个特殊的字典&#xff0c;字典中所有的 value 的值都为…

这么好看的头像,岂不拿下!

❝ 如此好看的头像&#xff0c;怎么能不喜欢&#xff1f;&#xff1f;&#xff1f; ❞ 代码放在了最后 后续还会出一个工具&#xff0c;以便于随时打开下载。 看上述的头像是不是还是很不错的。看着网站还是✨✨每天都会有更新的✨✨。 所以&#xff0c;我动手了&#xff0c;下…

5.2 案例引入

博主简介&#xff1a;一个爱打游戏的计算机专业学生博主主页&#xff1a; 夏驰和徐策所属专栏&#xff1a;算法设计与分析 1.什么是大数据时代的到来&#xff1f; 大数据时代指的是在现代社会中&#xff0c;产生和积累的数据规模庞大、速度快、种类多样的时代。随着计算机技术…

《商用密码应用与安全性评估》第四章密码应用安全性评估实施要点4.4密码应用安全性评估测评过程指南

目录 概述 1.基本原则 2.风险测评控制 3.测评过程 密码应用评估方案 1.主要内容 1&#xff09;密码应用解决方案评估要点 2&#xff09;实施方案评估要点 3&#xff09;应急处置方案评估要点 2.主要任务 3.密码应用方案评估的输出文档 测评准备活动 1.测评准…

不愧是阿里,扣的真细。

铜三铁四已经过去了&#xff0c;今天的行情虽然没有以前好&#xff0c;但是相比去年来说也算是好了一些了。有一些人已经在这个招聘季拿到了不错的Offer了。 今天给大家分享一份面经&#xff0c;今天这位朋友的背景是Java五年本&#xff0c;2023年前被毕业后投入了面试大军怀抱…

Baseline Profile 安装时优化在西瓜视频的实践

‍ 动手点关注 干货不迷路 背景 在Android上&#xff0c;Java/Kotlin代码会编译为DEX字节码&#xff0c;在运行期由虚拟机解释执行。但是&#xff0c;字节码解释执行的速度比较慢。所以&#xff0c;通常虚拟机会在解释模式基础上做一些必要的优化。 在Android 5&#xff0c;Goo…

chatgpt赋能python:Python列表:完整介绍与使用指南

Python列表&#xff1a;完整介绍与使用指南 Python是一种非常受欢迎的编程语言&#xff0c;而Python列表是Python编程中最基本的数据结构之一。列表在Python中的使用频率极高&#xff0c;因为列表可以存储许多不同类型的数据&#xff0c;并且可以很方便地进行操作和修改。在本…

Unity Addressables学习笔记(2)---创建远程服务器对象

1.先创建对象 我的做法是&#xff1a; 先拖动一张图片到Resources/img下就是我选中的这张文件夹 2.把图片拖动到Hierarchy里变成一个对象&#xff0c;再把对象拖动到Resources/prefabs里&#xff0c;图片里的单词敲错了哈哈哈哈。 这样这个图片就变成了预制体&#xff0c;然…

AMD在数据中心领域举步维艰,竞争越来越难

来源&#xff1a;猛兽财经 作者&#xff1a;猛兽财经 数据中心业务举步维艰 2023年第一季度&#xff0c;AMD的数据中心业务收入为13亿美元&#xff0c;几乎没有同比增长&#xff0c;反而环比下降了22%。与此同时&#xff0c;它的主要竞争对手英伟达(NVDA)却获得了越来越多的订…

行为型设计模式04-状态模式

✨作者&#xff1a;猫十二懿 ❤️‍&#x1f525;账号&#xff1a;CSDN 、掘金 、个人博客 、Github &#x1f389;公众号&#xff1a;猫十二懿 状态模式 1、状态模式介绍 状态模式&#xff08;State&#xff09;是一种行为型设计模式&#xff0c;当一个对象的内在状态改变时…

python---条件语句(1)

顺序语句 按照写的顺序执行 条件语句 条件语句的一些注意事项: 1.情况1 2.情况2 bbb已经不属于条件语句中的内容了 3.情况3 通常使用4个空格或一个制表符tab来表示! if语句的嵌套 当有多级条件嵌套时,当前的语句属于哪个代码块,完全取决于缩进的级别.