一篇文章快速入门Spring AMQP

news2024/9/30 13:19:49

文章目录

  • 一、AMQP
  • 二、Spring AMQP
    • 2.1 介绍
    • 2.2 SpringAMQP发送消息
    • 2.3 SpringAMQP接收消息
    • 2.4 WorkQueue模型
      • 2.4.1 概念
      • 2.4.2 示例
    • 2.5 发布订阅模型
      • 2.5.1 介绍
      • 2.5.2 Fanout Exchange
      • 2.5.3 Direct Exchange
      • 2.5.4 Topic Exchange
    • 2.6 消息转换器
      • 2.6.1 介绍
      • 2.6.2 切换消息转换器

一、AMQP

  AMQP(高级消息队列协议)是一个标准的消息传递协议,用于在应用程序之间进行消息传递,特别是在分布式系统中。
在这里插入图片描述

  AMQP提供了一个中间件消息传递机制,使不同的应用程序能够可靠地、安全地和高效地进行通信。它允许应用程序通过交换消息来进行通信,而不必直接进行网络通信。这种机制使得应用程序之间的通信更加灵活,因为它们可以独立地进行通信,而不必考虑其他应用程序的状态和可用性。

  AMQP支持消息传递的各种场景,包括点对点通信、发布/订阅模式和请求/响应模式等。它具有诸如持久化消息、事务、消息确认和优先级等高级特性,使得它非常适合处理复杂的分布式应用程序。

二、Spring AMQP

2.1 介绍

  Spring AMQP是基于Spring Framework的AMQP(高级消息队列协议)客户端库,用于在Java应用程序中使用AMQP进行消息传递。
在这里插入图片描述

  Spring AMQP提供了一个高级的抽象层,使得开发人员可以很方便地使用AMQP进行消息传递,而不必直接处理AMQP的复杂性。它支持各种AMQP实现,例如RabbitMQ和Apache ActiveMQ等。

Spring AMQP提供了许多有用的特性,例如:

1.声明式配置:使用注释和Java配置声明式地配置消息交换和队列。

2.简化的消息发布和订阅:使用Spring AMQP,开发人员可以很方便地将消息发送到队列或从队列中接收消息。

3.异步处理:Spring AMQP提供了异步消息处理机制,可以轻松地处理大量消息。

4.异常处理:Spring AMQP提供了对异常处理的支持,以便在发生错误时进行适当的处理。

2.2 SpringAMQP发送消息

  1. 引入依赖:
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  1. 在application.yml文件配置RabbitMQ的连接信息:
spring:
  rabbitmq:
  	#主机名
    host: 192.168.XX.XX
    #端口
    port: 5672
    #用户名
    username: caterpillar
    #密码
    password: 123456
    #虚拟主机
    virtual-host: /
  1. 调用RabbitTemplate的convertAndSend方法:
//自动注入RabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void SendMessage(){
    //队列名称
    String queueName = "simple.queue";
    //所发送的消息
    String message = "hello,World!";
    //调用convertAndSend方法发送信息
    rabbitTemplate.convertAndSend(queueName,message);
}

2.3 SpringAMQP接收消息

  1. 在application.yml文件配置RabbitMQ的连接信息:
spring:
  rabbitmq:
  	#主机名
    host: 192.168.XX.XX
    #端口
    port: 5672
    #用户名
    username: caterpillar
    #密码
    password: 123456
    #虚拟主机
    virtual-host: /
  1. 在消费者的服务中新建一个类接收消息并处理
//声明为Bean
@Component
public class SpringRabbitListener {
    //指定消息队列的名称
    @RabbitListener(queues = "simple.queue")
    public void listenQueue(String msg){
        //处理指定消息队列所收到的信息
        System.out.println("收到的消息:" + msg);
    }
}

2.4 WorkQueue模型

2.4.1 概念

  Work Queue(工作队列)模型是一种经典的消息队列模型,也被称为任务队列模型。它用于将耗时的任务分配给多个工作进程以便并行执行。
在这里插入图片描述

  在Work Queue模型中,一个生产者将消息发送到一个队列中,多个消费者从队列中接收消息并处理它们。一个消息只能被一个消费者处理,即消息的消费是互斥的。

Work Queue模型的主要特点包括:

  1. 生产者将消息发送到队列中,而不是发送到特定的消费者。
  2. 消费者从队列中获取消息,并将其处理。
  3. 消息的处理是互斥的,即一个消息只能被一个消费者处理。
  4. 消费者可以平均分配任务,以便并行处理。

2.4.2 示例

  1. 生产者测试代码
@Test
public void testSendMessage2WorkQueue() throws InterruptedException {
    String queueName = "simple.queue";
    String message = "message -- ";
    //发送50次消息
    for (int i = 1; i <= 50; i++) {
        rabbitTemplate.convertAndSend(queueName, message + i);
        //每次循环延时20毫秒
        Thread.sleep(20);
    }
}
  1. 消费者代码
@Component
public class SpringRabbitListener {
	//消息队列1
    @RabbitListener(queues = "simple.queue")
    public void listenWorkQueue1(String msg) throws InterruptedException {
        System.out.println("消息队列1收到的消息:" + msg + " -- " + LocalTime.now());
        Thread.sleep(20);
    }
	//消息队列2
    @RabbitListener(queues = "simple.queue")
    public void listenWorkQueue2(String msg) throws InterruptedException {
        //使用err是为了输出时颜色不同便于区分
        System.err.println("消息队列2收到的消息:" + msg + " -- " + LocalTime.now());
        Thread.sleep(200);
    }
}
  1. 消费者配置添加
spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 #每次只预取一条消息,使消费能力的队列更强的做更多的接收

2.5 发布订阅模型

2.5.1 介绍

  发布-订阅模型(Publish-Subscribe)是一种消息队列模型,主要用于将消息同时发给多个消费者。在发布-订阅模型中,消息的生产者将消息发送到一个交换机中,交换机会将消息同时发给多个与之绑定的队列,多个消费者可以分别从这些队列中获取消息并进行处理。
在这里插入图片描述

  与点对点模型(P2P)不同,发布-订阅模型中的消息生产者并不需要知道消息的接收者,而是只需要将消息发送到交换机中即可。交换机会将消息广播给与之绑定的所有队列,多个消费者可以从不同的队列中获取并处理消息。消息的处理是并行的,多个消费者可以同时处理不同的消息,提高了系统的处理效率和吞吐量。

常见的exchange类型包括:

  • Fanout:广播
  • Direct:路由
  • Topic:话题

2.5.2 Fanout Exchange

  Fanout Exchange是一种简单的Exchange类型,它会将所有收到的消息广播给与之绑定的所有队列,所有的队列都会收到相同的消息。在Fanout Exchange中,不需要指定Routing Key,消息会直接广播给所有的队列。Fanout Exchange通常用于需要将消息广播给多个消费者的场景,例如在线聊天室、实时数据分析等。
在这里插入图片描述

示例:

  1. 生产者测试代码
@Test
public void testSendFanoutExchange(){
    //交换机名称
    String exchangName = "caterpillar.fanout";
    //消息
    String message = "hello";
    //发送消息
    rabbitTemplate.convertAndSend(exchangName,"",message);
}	
  1. 添加一个类作为消费者的配置类
//声明为一个配置
@Configuration
public class FanoutConfig {
    //声明交换机
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("caterpillar.fanout");
    }
    //声明队列1
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout.queue1");
    }
    //将队列1绑定到交换机
    @Bean
    public Binding fanoutBinding1(Queue fanoutQueue1,FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }
    //声明队列2
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout.queue2");
    }
    //将队列2绑定到交换机
    @Bean
    public Binding fanoutBinding2(Queue fanoutQueue2,FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}
  1. 消费者代码
@Component
public class SpringRabbitListener {
    @RabbitListener(queues = "fanout.queue1")
    public void listenFanoutQueue1(String msg){
        System.out.println("fanout1收到的消息:" + msg);
    }
    @RabbitListener(queues = "fanout.queue2")
    public void listenfanoutQueue(String msg){
        System.out.println("fanout2收到的消息:" + msg);
    }
}

2.5.3 Direct Exchange

  Direct Exchange是一种常见的Exchange类型,它会将消息发送给与之匹配的队列。在Direct Exchange中,生产者将消息发送到指定的交换机中,并指定一个Routing Key,Exchange会将消息发送到与之绑定的Routing Key相同的队列中。Direct Exchange通常用于一些点对点的场景,例如订单处理、日志记录等。
在这里插入图片描述

示例:

  1. 生产者测试代码
@Test
public void testSendDirectExchange(){
    //交换机名称
    String exchangName = "caterpillar.direct";
    //消息
    String message = "hello,blue";
    //发送消息,只会发送给相同key的消费者,第二个参数是Routing Key
    rabbitTemplate.convertAndSend(exchangName,"red",message);
}
  1. 消费者代码
@RabbitListener(bindings = @QueueBinding(
	//声明队列
	value = @Queue(name = "direct.queue1"),
	//声明交换机
	exchange = @Exchange(name = "caterpillar.direct",type = ExchangeTypes.DIRECT),
	//声明绑定key
	key = {"red","blue"}
))
public void listenDirectQueue1(String msg){
	System.out.println("direct1收到的消息:" + msg);
}
@RabbitListener(bindings = @QueueBinding(
	//声明队列
	value = @Queue(name = "direct.queue2"),
	//声明交换机
	exchange = @Exchange(name = "caterpillar.direct",type = ExchangeTypes.DIRECT),
	//声明绑定key
	key = {"red","yellow"}
))
public void listenDirectQueue2(String msg){
	System.out.println("direct2收到的消息:" + msg);
}

  在此示例中两个消费者都会收到消息,如果生产者指定的key为blue则只有direct1收到消息,如果生产者指定的key为yellow则只有direct2收到消息。

2.5.4 Topic Exchange

  Topic Exchange是一种强大的Exchange类型,它会将消息发送到与之匹配的队列中。在Topic Exchange中,生产者可以指定一个带有通配符的Routing Key(*),Exchange会将消息发送到所有与之匹配的队列中。Topic Exchange支持通配符的Routing Key匹配,可以根据消息的主题进行路由,非常适合一些消息订阅和过滤的场景,例如新闻订阅、事件通知等。
在这里插入图片描述

Topic Exchange支持两种通配符:* 和 #。

  1. *:表示匹配一个单词,可以用于匹配某个特定单词,例如"*.apple"可以匹配"green.apple"、“red.apple"等,但不能匹配"green.big.apple”。
  2. #:表示匹配零个或多个单词,可以用于匹配某个前缀或者所有单词,例如"fruit.#“可以匹配"fruit.apple”、“fruit.orange”、"fruit.apple.red"等。

示例:

  1. 生产者测试代码
@Test
public void testSendTopicExchange(){
    //交换机名称
    String exchangName = "caterpillar.topic";
    //消息
    String message = "毛毛虫被网易招聘了!!!";
    //发送消息
    rabbitTemplate.convertAndSend(exchangName,"china.news",message);
}
  1. 消费者代码
@RabbitListener(bindings = @QueueBinding(
        //声明队列
        value = @Queue(name = "topic.queue1"),
        //声明交换机
        exchange = @Exchange(name = "caterpillar.topic",type = ExchangeTypes.TOPIC),
        //声明绑定key
        key = "china.#"
))
public void listenTopicQueue1(String msg){
    System.out.println("topic1收到的消息:" + msg);
}
@RabbitListener(bindings = @QueueBinding(
        //声明队列
        value = @Queue(name = "topic.queue2"),
        //声明交换机
        exchange = @Exchange(name = "caterpillar.topic",type = ExchangeTypes.TOPIC),
        //声明绑定key
        key = "#.news"
))
public void listenTopicQueue2(String msg){
    System.out.println("topic2收到的消息:" + msg);
}

  在此示例中两个消费者都会收到消息,如果生产者指定的key为china.weather则只有topic1收到消息,如果生产者指定的key为Canada则只有topic2收到消息。

2.6 消息转换器

2.6.1 介绍

  Spring AMQP提供了消息转换器(MessageConverter)的支持,用于将消息对象转换为字节数组或者将字节数组转换为消息对象,使得消息生产者和消费者之间可以方便地传输POJO对象。

Spring AMQP提供了以下几种消息转换器:

  1. SimpleMessageConverter:默认的消息转换器,可以将Java对象转换为字节数组并发送到队列中,也可以从队列中接收字节数组并将其转换为Java对象。支持文本消息和字节消息的转换。
  2. Jackson2JsonMessageConverter:基于Jackson库的消息转换器,可以将Java对象转换为JSON字符串并发送到队列中,也可以从队列中接收JSON字符串并将其转换为Java对象。支持文本消息和字节消息的转换。
  3. Jackson2XmlMessageConverter:基于Jackson库的消息转换器,可以将Java对象转换为XML字符串并发送到队列中,也可以从队列中接收XML字符串并将其转换为Java对象。支持文本消息和字节消息的转换。
  4. MarshallingMessageConverter:使用Spring的Marshaller和Unmarshaller进行消息转换的消息转换器。支持XML和Java Serialization格式。
  5. ByteArrayMessageConverter:将Java对象转换为字节数组并发送到队列中,也可以从队列中接收字节数组并将其转换为Java对象。只支持字节消息的转换。

2.6.2 切换消息转换器

以切换为Jackson2JsonMessageConverter为例:

  1. 在主类pom文件引入依赖
<!--数据绑定模块-->
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
</dependency>
  1. 在生产者和消费者的配置类中声明消息转换器的Bean
//消息转换器
@Bean
public MessageConverter messageConverter(){
    //Jackson消息转换工具
    return new Jackson2JsonMessageConverter();
}
  1. 消费者监听队列消息时接收的消息要和生产者中的类型一致
@RabbitListener(/*queues = 队列名称*/)
public void listenObjectQueue(/*消息类型 消息实例*/){
    //消息操作
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/437975.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

重写equlas时为什么一定要重写hashcode方法?

equals方法和hashCode方法都是Object类中的两个基本方法&#xff0c;它们共同来判断两个对象是否相等。为什么要两个方法结合起来使用呢&#xff1f;原因是在 ‘性能’ 上面。 使用过 hashMap 我们知道&#xff0c;通过 hash 计算 &#xff0c;可以快速的在常量时间内找到某个…

webpack基本认知,它是什么,做什么的

一、基本概述 webpack本质是, 一个第三方模块包, 用于分析, 并打包代码 支持所有类型文件的打包支持less/sass > css支持ES6/7/8 > ES5压缩代码, 提高加载速度 二、安装 创建一个文件并运行以下命令&#xff1a; npm init -y npm i webpack webpack-cli -S 运行命令…

DNS域名解析服务

目录 一、DNS的简介 1&#xff09;DNS 数据结构分布 2&#xff09;服务器的类型 3&#xff09;DNS 域名解析方式 4&#xff09;DNS的查询方式 递归查询 迭代查询 二、DNS配置 1&#xff09;两台主从服务器进行配置操作 ​编辑 2&#xff09;DNS主域名服务器配置&am…

ITSS服务经理 、服务工程师线上开班在即

为了促进企业信息技术服务-运行维护服务能力&#xff0c;全面系统的提升员工的IT服务知识和技能水平&#xff0c;且更好的满足参训企业的时间需求&#xff0c;我司将于5月份开展ITSS服务经理、服务工程师线上班。 日期和形式 五月份&#xff1a;ITSS服务项目经理&#xff1a;…

Qlik Sense 集合表达式详解

文章目录 1 概述2 集合表达式 expression2.1 标识符 identifiers2.2 修饰符 modifiers2.2.1 多值用 &#xff0c;隔开2.2.2 引号区分大小写2.2.3 搜索 2.3 运算符 operators 3 应用 1 概述 #mermaid-svg-bQWKUrD934SlJaj9 {font-family:"trebuchet ms",verdana,arial…

电子招标采购系统源码:营造全面规范安全的电子招投标环境,促进招投标市场健康可持续发展

营造全面规范安全的电子招投标环境&#xff0c;促进招投标市场健康可持续发展 传统采购模式面临的挑战 一、立项管理 1、招标立项申请 功能点&#xff1a;招标类项目立项申请入口&#xff0c;用户可以保存为草稿&#xff0c;提交。 2、非招标立项申请 功能点&#xff1a;非招标…

CompletableFuture的基本使用和原理

CompletableFuture CompletableFuture是对Future的扩展和增强。CompletableFuture实现了Future接口&#xff0c;并在此基础上进行了丰富的扩展&#xff0c;完美弥补了Future的局限性&#xff0c;同时CompletableFuture实现了对任务编排的能力。借助这项能力&#xff0c;可以轻…

如何将matlab的m文件转换成python文件

因为matlab的内存实在太大了&#xff0c;所以我只在实验室电脑安装了matlab&#xff0c;自己电脑没有安装&#xff0c;现在跑实验需要把matlab文件转成python文件。在网上找到可以使用smop小工具。 我是在本地的anaconda转换的。先创建一个新环境到指定路径 conda create --pr…

HttpWebRequest类

HttpWebRequest类与HttpRequest类的区别。 HttpRequest类的对象用于服务器端&#xff0c;获取客户端传来的请求的信息&#xff0c;包括HTTP报文传送过来的所有信息。而HttpWebRequest用于客户端&#xff0c;拼接请求的HTTP报文并发送等。 HttpWebRequest这个类非常强大&#…

Spring MVC 接收 json 和返回 json (14)

目录 总入口 测试case 源码分析 1. 针对RequestBody的参数解析 2. 针对 ResponseBody 的返回值处理 总入口 通过上一篇Spring MVC 参数解析&#xff08;13&#xff09;_chen_yao_kerr的博客-CSDN博客的说明&#xff0c;相信大家对Sping MVC的参数解析有了一定的了解&…

2.微服务项目实战---环境搭建,实现电商中商品、订单、用户

使用的电商项目中的商品、订单、用户为案例进行讲解。 2.1 案例准备 2.1.1 技术选型 maven &#xff1a; 3.3.9 数据库&#xff1a; MySQL 5.7 持久层 : SpingData Jpa 其他 : SpringCloud Alibaba 技术栈 2.1.2 模块设计 springcloud-alibaba 父工程 shop-common …

【观察】构建“零信任”架构,筑起制造业安全“护城河”

中国是全球制造业大国&#xff0c;过去40年&#xff0c;中国制造业规模增长了18倍&#xff0c;其附加值达到2.2万亿美元&#xff0c;制造业在中国GDP比重高达40%&#xff0c;其之于中国经济的重要性可见一斑。 与此同时&#xff0c;中国制造业在高速发展的同时&#xff0c;也普…

使用全球融合CDN的10大优势

根据预估&#xff0c;今年的全球内容交付网络&#xff08;CDN&#xff09;市场预计将达到424亿美元。而由于移动应用程序的激增和人工智能尤其是ChatGPT等相关领域的快速发展将进一步带来CDN市场的快速增长&#xff0c;可以说全球CDN的黄金时代才刚开始。 融合CDN和多CDN战略是…

32道子网划分练习题详细解析含答案

目录 1 子网划分概念&#xff1a; 2 划分方法&#xff1a; 子网划分方法&#xff1a;段&#xff0c;块&#xff0c;数的计算三步。 段就是确定ip地址段中既有网络地址&#xff0c;又有主机地址的那一段是四段中的那一段&#xff1f; 块就确定上一步中确定的那一段中的主机…

企业云成本优化:减少企业云支出的终极指南

向云的转移使企业的技术领导者能够实现基础设施的现代化&#xff0c;并提高应用程序的可用性、可扩展性和性能。然而优化云成本对很多以互联网业务为主体的公司都是一项挑战&#xff0c;因为需要执行可持续的云成本管理战略。随着世界经济近年来走向低迷&#xff0c;尤其是互联…

【Linux网络服务】DNS域名解析服务服务

一、BIND域名服务基础 服务背景 1在日常生活中人们习惯使用域名访问服务器&#xff0c;但机器向互相只认IP地址&#xff0c;域名与IP地址之间是多对一的关系&#xff0c;一个IP址不一定只对应一个域名&#xff0c;且一个完成域名只可以对应一个IP地址&#xff0c;它们之间转换…

[ARM+Linux] 基于wiringPi库的串口通信

wiringOP-master/examples/serialTest.c中&#xff0c;wiringPi库中自带的串口程序&#xff1a; #include <stdio.h> #include <string.h> #include <errno.h>#include <wiringPi.h> #include <wiringSerial.h>int main () {int fd ;int count …

JavaSE-part1

文章目录 Day01 面向对象特性1.java继承注意点2.多态2.1多态概述2.2多态中成员的特点:star::star:2.3多态的转型:star::star: 3.Super4.方法重写:star::star:5.Object类:star::star: Day02 面向对象特性1.代码块:star:(主要是初始化变量&#xff0c;先于构造器)2.单例设计模式:…

服务器初始化

Linux基础系类 提示&#xff1a;个人学习总结&#xff0c;仅供参考。 一、Linux系统部署 二、服务器初始化 提示&#xff1a;文档陆续更新整理 服务器初始化 Linux基础系类简介一、配置IP地址二、配置YUM源&#xff08;yum本地源和yum网络源&#xff09;1.简介2.准备工作3.配置…

数据结构与算法——深度寻路算法

&#x1f4d6;作者介绍&#xff1a;22级树莓人&#xff08;计算机专业&#xff09;&#xff0c;热爱编程&#xff1c;目前在c&#xff0b;&#xff0b;阶段&#xff0c;因为最近参加新星计划算法赛道(白佬)&#xff0c;所以加快了脚步&#xff0c;果然急迫感会增加动力>——…