微服务08-认识和使用SpringAMQP

news2025/2/25 9:09:31

1.AMQP的认识

1.1 介绍

AMQP是什么?看完你就知道了_hello_读书就是赚钱的博客-CSDN博客_amqp

在这里插入图片描述
好处:
什么connection:消息队列的连接、channel:服务发送接收消息的通道、Queue:消息队列——>这些你都不需要自己编写

工作过程:
发布者(Publisher)发布消息(Message),经由交换机(Exchange)。

交换机根据路由规则将收到的消息分发给与该交换机绑定的队列(Queue)。

最后 AMQP 代理会将消息投递给订阅了此队列的消费者,或者消费者按照需求自行获取。

深入理解:
1、发布者、交换机、队列、消费者都可以有多个。同时因为 AMQP 是一个网络协议,所以这个过程中的发布者,消费者,消息代理 可以分别存在于不同的设备上。

2、发布者发布消息时可以给消息指定各种消息属性(Message Meta-data)。有些属性有可能会被消息代理(Brokers)使用,然而其他的属性则是完全不透明的,它们只能被接收消息的应用所使用。

3、从安全角度考虑,网络是不可靠的,又或是消费者在处理消息的过程中意外挂掉,这样没有处理成功的消息就会丢失。基于此原因,AMQP 模块包含了一个消息确认(Message Acknowledgements)机制:当一个消息从队列中投递给消费者后,不会立即从队列中删除,直到它收到来自消费者的确认回执(Acknowledgement)后,才完全从队列中删除

4、在某些情况下,例如当一个消息无法被成功路由时(无法从交换机分发到队列),消息或许会被返回给发布者并被丢弃。或者,如果消息代理执行了延期操作,消息会被放入一个所谓的死信队列中。此时,消息发布者可以选择某些参数来处理这些特殊情况。

1.2 实战:消息发送与消息接收

消息发送
在这里插入图片描述

@RunWith(SpringRunner.class)
@EnableRabbit
@SpringBootTest
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
 
    @Test
    public void testSendMessageSimpleQueue(){
        String queueName="simple.queue";
        String messgae="Hello,SpringAMQP";
        rabbitTemplate.convertAndSend(queueName,messgae);
    }
}

可以发现队列simple.queue中 多了一条消息
在这里插入图片描述

AMOP如何发送消息?

引入amqp的starter依赖,然后我们配置RabbitMQ的地址,最后利用RabbitTemplate中的convertAndSend方法发送消息

在这里插入图片描述

消息接收:
在这里插入图片描述

1、首先进行yaml配置,RabbitMQ连接信息

2、新建一个组件,里面编写消费逻辑(利用@RabbitListener监听队列,只要队列一有消息就进行接收)

 
@Component
public class SpringRabbitListener {
 
    /**
     * @RabbitListener监听队列
     * @param msg
     */
    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueue(String msg){
        System.out.println("消费者接收到的simple.queue中消息为:"+msg);
    }
}

在这里插入图片描述
在这里插入图片描述

注意:消息一旦被消费就会从队列删除,RabbitMQ没有消息回溯功能

2.WorkQueue

场景:
采用多个工作队列处理消息,避免消息堆积;

在这里插入图片描述

2.1 实战:多个消费者绑定一个队列

在这里插入图片描述

1.首先配置类中把队列支棱起来,Publisher发布消息给消息队列simple.queue

@RunWith(SpringRunner.class)
@EnableRabbit
@SpringBootTest
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
 
    @Test
    public void testSendMessageWorkQueue() throws InterruptedException {
        String queueName="simple.queue";
        String messgae="华为手机,遥遥领先--";
        for (int i = 0; i < 50; i++) {
            rabbitTemplate.convertAndSend(queueName,messgae+i);
            Thread.sleep(20);
        }
    }

2.consumer服务中定义两个消费者,利用@RabbitListener监听队列

@Component
public class SpringRabbitListener {
    /**
     * 我们的思想是:两个消费者,一个1s能够消费50条,一个消费1s5条,一共50条信息,能者多劳
     * 现实:默认是平均分配->造成处理消息时间过长(因为第二个消费者处理消息很慢)
     * @param msg
     * @throws InterruptedException
     */
    @RabbitListener(queues = "simple.queue")
    public void listenWorkQueue(String msg) throws InterruptedException {
        System.out.println("消费者1......接收到消息为:"+msg+"当前时间:"+ LocalTime.now());
        //每s50个消息
        Thread.sleep(20);
    }
 
    @RabbitListener(queues = "simple.queue")
    public void listenWorkQueue2(String msg) throws InterruptedException {
        System.err.println("消费者2.......接收到消息为:"+msg+"当前时间:"+ LocalTime.now());
        Thread.sleep(200);
    }

在这里插入图片描述
我们可以采用prefetch设置消费者预取的消息数量,这样就不会出现平均分配消息的现象了——>而是根据每个消费者的能力来处理消息

在这里插入图片描述

3.Fanout Exchange

前言:

发布(Publish)和订阅(Subscribe)

因为一个消息只可能被一个消费者消费,消费完就会删除该消费,所以我们需要利用发布和订阅来进行处理,这样就可以将同一个消息发送给多个消费者——>
利用exchange交换机将消息发送给多个队列,然后队列是可以将消息进行储存的,发给多个消费者

在这里插入图片描述
常见的exchange类型:Fanout(广播)、Direct(路由)、Topic(话题)

注意:exchange负责消息路由,而不是储存,路由失败则会造成消息丢失(也就是说路由到哪个消息队列)。

3.1 介绍

exchange: 只能作为消息的转发,记住不能作为消息的缓存,如果路由失败消息就丢了

交换机exchange:通过FanoutExchange返回交换机

消息队列Queue:通过Queue返回消息队列

将消息队列与交换机进行绑定Bingding:可以通过Binding中的bind方法进行绑定

3.2 实战

在这里插入图片描述

1:
在这里插入图片描述
consumer服务中声明交换机和队列,将队列与交换机进行绑定(利用Binding中bind方法)

@Configuration
public class FanoutConfig {
    //itcast.fanout交换机
    @Bean
    public FanoutExchange fanoutExchange(){
       return new FanoutExchange("itcast.fanout");
    }
 
    //fanout.queue1任务队列
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout.queue1");
    }
 
    //消息队列2
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout.queue2");
    }
 
    //将消息队列与交换机进行绑定:类型和名称要保持一致,不然无法注入
    @Bean
    public Binding fanoutBinding1(Queue fanoutQueue1,FanoutExchange fanoutExchange){
        return BindingBuilder
                .bind(fanoutQueue1)
                .to(fanoutExchange);
    }
 
    //第二个消息队列与交换机进行绑定
    @Bean
    public Binding fanouting2(Queue fanoutQueue2,FanoutExchange fanoutExchange){
        return BindingBuilder
                .bind(fanoutQueue2)
                .to(fanoutExchange);
    }
}

2.在这里插入图片描述

@Component
public class SpringRabbitListener {
    /**
     * 以下是利用交换机通知信息给处理消息的消费者
     */
    @RabbitListener(queues = "fanout.queue1")
    public void listenFanoutQueue1(String msg){
        System.out.println("消费者接收到的fanout.queue1的消息为:"+msg);
    }
 
    @RabbitListener(queues = "fanout.queue2")
    public void listenFanoutQueue2(String msg){
        System.out.println("消费者收到fanout.queue2的消息为:"+msg);
    }
}

3.在这里插入图片描述

@RunWith(SpringRunner.class)
@EnableRabbit
@SpringBootTest
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
 
    /**
     * 发送信息给交换机exchange
     */
    @Test
    public void testSendFanoutExchange(){
        //1.交换机名称
        String exchangeName="itcast.fanout";
        //2.消息
        String message="hello,every one!";
        //3.发送消息
        rabbitTemplate.convertAndSend(exchangeName,"",message);
    }
}

在这里插入图片描述

3.3 总结

在这里插入图片描述

4.Direct Exchang

DirectExchang:可以根据规则路由到指定的消息队列

4.1实战

在这里插入图片描述
1.在consumer服务中声明Exchange交换机与不同key之消息队列的绑定,利用@RabbitListener->还监听了一波消息队列

对比与之前的Fanout,它们的队列以及交换机是在一个配置类中定义并绑定的,利用了Binding,并注入容器

消费者代码
在这里插入图片描述

提供者代码

@Test
    public void testSendDirectExchange() {
        //交换机名称
        String exchangeName = "denghao.direct";
        //消息
        String message = "hello, blue!";
        //发送消息
        rabbitTemplate.convertAndSend(exchangeName,"blue",message);

    }

个人编写与老师资料略有不同 敬请谅解
在这里插入图片描述

Direct交换机与Fanout交换机之间的差异+@RabbitListener注解中声明队列与交换机的常见注解
在这里插入图片描述

5.TopicExchange在这里插入图片描述

为什么要用Topic交换机,因为以后有的时候,比如地区天气新闻,有“长沙的,成都的,四川的,重庆的。。。。。。 如果使用DirectExchange的话,那么会要定义很多很多的key 而用TopicExchange的话 可以直接用 #.news 表示接收所有地方的天气新闻”

5.1 实战

介绍:

两个队列:一个是中国的新闻,一个所有的新闻消息;

两个消费者分别监听这两个队列

Publisher将消息发送给交换机Exchange,然后交换机根据key(+通配符)->决定将消息路由到哪个队列中

区别:

与DirectExchange区别就是,DirectExchange没有通配符,TopicExchange有通配符:xxx.xxx

最明显的地方就是:队列与交换机进行绑定时,key不一样

在这里插入图片描述

消费者的监听器

  @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue1"),
            exchange = @Exchange(name = "denghao.topic",type = ExchangeTypes.TOPIC),
            key = "china.#"
    ))
    public void listenTopicQueue1(String msg){
        System.out.println("消费者接收到topic.queue1的消息:【"+ msg +"】");
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue2"),
            exchange = @Exchange(name = "denghao.topic",type = ExchangeTypes.TOPIC),
            key = "#.news"
    ))
    public void listenTopicQueue2(String msg){
        System.out.println("消费者接收到topic.queue2的消息:【"+ msg +"】");
    }

服务者测试方,发布消息

@Test
    public void testSendTopicExchange() {
        //交换机名称
        String exchangeName = "denghao.topic";
        //消息
        String message1 = "红色天气预报警告,马上长沙迎来大升温,明天提升六度,请注意,不要感冒,不要鼻炎!!!";
        String message2 = "重磅新闻来袭!!!";
        //发送消息
        rabbitTemplate.convertAndSend(exchangeName,"china.news",message1);
        rabbitTemplate.convertAndSend(exchangeName,"jp.news",message2);

    }

结果
在这里插入图片描述
在这里插入图片描述

6. 消息转换器

场景: 因为我们Publisher服务者发送消息到消息队列(将消息序列化为二进制),消息是乱码的——>因为消息类型content-type是java序列化变来的类型:缺点:消息体大、传输速度慢、安全问题、占内存;

在这里插入图片描述
处理:

1.我们定义一个bean,利用boot的自动配置思想,将默认的替换

@Bean
    public Jackson2JsonMessageConverter jsonMessageConverter() {
      return new Jackson2JsonMessageConverter();
    }

2.然后导入依赖,Jacksonxxxx

  <!--序列化-->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>

3.Publisher发送消息给到消息队列

 //Publisher发送消息给到队列object.queue
    @Test
    public void testSendObjectQueue(){
        HashMap<Object, Object> msg = new HashMap<>();
         msg.put("姓名","柳岩");
         msg.put("年龄",38);
        //发送消息到队列
        rabbitTemplate.convertAndSend("object.queue",msg);
    }

在这里插入图片描述

4.消费者接收消息

@RabbitListener(queues = "object.queue")
    public void listenObjectQueue(Map<String,Object> msg){
        System.out.println("消费者接收到object.queue的消息:【"+ msg +"】");
    }

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1027651.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Keil 5 或者Keil 4自定义主题颜色100%成功

文章目录 步骤一:代码内容解析&#xff1a;完整文件代码一效果图&#xff1a;黑主题Keil 原主题 步骤一: 找到keil 5或者Keil 4软件安装目录下的UV4文件夹下的global.prop文件&#xff0c;然后用记事本打开该文件&#xff0c;复制下面配置替换到global.prop文件里的所有内容保…

IO口电路种类

文章目录 参考1.高速振荡电路&#xff08;时钟IO引脚&#xff09;2.与 GPIO 功能共享的低速振荡电路&#xff08;子时钟IO&#xff09;3.CMOS 滞后输入引脚4.电源输入保护电路5.A/D 转换器 ref (AVRH)带保护电路的电源输入端6.CMOS 电平输出7.CMOS 电平输出&#xff0c;带有模…

键盘失灵按什么键恢复?详细方法分享!

“我的电脑键盘莫名其妙失灵了&#xff0c;试了好多方法都无法恢复。请问遇到键盘失灵的情况&#xff0c;应该按什么键才能恢复呢&#xff1f;” 键盘是计算机的重要输入设备之一&#xff0c;但有时候它可能会出现失灵的情况&#xff0c;让用户感到困惑和不知所措。但其实&…

软件设计师笔记系列(一)

&#x1f600;前言 在日常生活和工作中&#xff0c;我们依赖于各种各样的计算机系统来完成一系列复杂的任务。计算机系统不仅仅是硬件设备的集合&#xff0c;它还包括一系列用于协调硬件工作的软件和协议。了解计算机系统的基础知识&#xff0c;包括其构造和功能&#xff0c;是…

【已解决】模糊匹配导致一门课存在多个同名教师

[已解决] 模糊匹配导致一门课存在多个同名教师 问题 LEFT JOIN jsxxb ON XSKB.RKJSXM LIKE jsxxb.JZGXM || ‘%’ 思路 利用正则表达式解决 jsxxb.JZGXM 的字段示例如 李志勇,许蕤 需以&#xff0c;作为分割点&#xff0c;只匹配逗号前面的名字&#xff0c;或者是没有逗号&a…

CSS 学习笔记(基础)

用来控制网页表现的语言&#xff0c;CSS&#xff08;Cascading Style Sheet&#xff09;&#xff1a;层叠样式表。然后我们继续看看 W3C 标准&#xff1a; 结构&#xff1a;HTML表现&#xff1a;CSS行为&#xff1a;JavaScript CSS导入方式、选择器&属性 由于网页的框架…

【C++进阶】:哈希

哈希 一.unordered_map二.底层结构1.哈希概念2.解决哈希冲突1.闭散列2.开散列 在C98中&#xff0c;STL提供了底层为红黑树结构的一系列关联式容器&#xff0c;在查询时效率可达到 l o g 2 N log_2N log2​N&#xff0c;即最差情况下需要比较红黑树的高度次&#xff0c;当树中的…

分布式/微服务---第四篇

系列文章目录 文章目录 系列文章目录一、分布式事务解决方案二、如何实现接口的幂等性一、分布式事务解决方案 XA规范:分布式事务规范,定义了分布式事务模型 四个角色:事务管理器(协调者TM)、资源管理器(参与者RM),应用程序AP,通信资源管理器CRM 全局事务:一个横跨多个数…

【数字通信原理】第三章—信源编码理论

文章目录 第三章 信源编码理论1.模拟信号的数字化概论2. 信源编码的基本原理2.1 抽样定理2.1.1 低通抽样定理2.1.2 带通抽样定理 2.2 脉冲振幅调制PAM2.2.1 自然抽样2.2.2 平顶抽样 第三章 信源编码理论 1.模拟信号的数字化概论 2. 信源编码的基本原理 2.1 抽样定理 2.1.1 低…

Win10 家庭版 - 解决应用程序无法启动,因为应用程序的并行配置不正确的问题(System Default Context”的激活上下文生成失败)

Win10 家庭版 - 解决应用程序无法启动&#xff0c;因为应用程序的并行配置不正确的问题&#xff08;System Default Context”的激活上下文生成失败&#xff09; 系统环境遇到问题试过过程解决办法 前天的时候&#xff0c;女盆友公司电脑遇到个问题&#xff1a;几乎所有的 exe …

Nginx 的优化思路有哪些?网站的防盗链如何做?附图文说明和完整代码步骤

Nginx 的优化思路有哪些?网站的防盗链如何做?实际工作中有哪些类似的安全经验?通过代码实践一步一步实现,附图文说明和完整代码步骤 实验拓扑图: 实验步骤 1、在Centos01上安装Nginx,设置网站根目录/www使用域名www.huhu.com访问 2、在Centos02上安装DNS使用域名访问Ce…

python读取.xls文件,绘制钻头外径磨损图

通过xlrd模块读取.xls文件&#xff0c;数据如下&#xff0c;总计162行16列&#xff1a; 读取与作图如下&#xff1a; from xlrd import open_workbook import matplotlib import matplotlib.pyplot as plt # 设置字体为微软雅黑&#xff0c;解决中文显示问题matplotlib.rc(&qu…

vue3项目学习三:配置登陆解决方案

配置登陆解决方案 配置环境变量封装axios封装接口请求模块封装登录请求触发登录动作本地缓存处理方案LocalStorage 登录鉴权退出登录方案主动退出被动退出 配置环境变量 在根目录创建开发模式和生产模式的两种baseURL 输入&#xff1a; ENVdevelopment# base api VUE_APP_BA…

国内首款研发领域 AI 项目管理工具发布:PingCode AI

PingCode的使命&#xff0c;始终是用技术驱动研发生产力。 过去几年&#xff0c;PingCode在研发管理领域持续引领创新&#xff0c;基于“自动化、数据化、智能化”的战略三部曲&#xff0c;先后发布了研发【自动化】引擎、【效能度量】引擎&#xff0c;而对于最后一步“智能化”…

时序数据库 IoTDB 发布端边云原生解决方案,有效优化工业互联网数据上传时效与资源消耗...

2023 年 9 月 8 日&#xff0c;由中国通信学会、福建省工业和信息化厅主办的 2023 中国国际工业互联网创新发展大会在厦门举办。大会主论坛中&#xff0c;时序数据库 IoTDB 发表其自研建立的端边云原生解决方案&#xff0c;该方案可实现端侧设备、边缘服务器、数据中心数据的协…

400电话怎么办理(申请开通)

申请开通400电话是一项相对简单的过程&#xff0c;只需按照以下步骤进行操作即可。 第一步&#xff0c;选择400电话服务提供商。在市场上有很多公司提供400电话服务&#xff0c;您可以根据自己的需求和预算选择适合的服务商。可以通过搜索引擎、咨询朋友或者查看相关论坛等方式…

开学季ipad电容笔哪款好?便宜的电容笔推荐

随着数码产品不断地更新和添加新的特性功能&#xff0c;iPad的平板已经可以和笔记本电脑相媲美了。而时至今日&#xff0c;随着技术的进步&#xff0c;ipad已经不再是一款单纯的娱乐设备&#xff0c;而是一款集学习、绘画、办公于一体的功能。为提高生产力&#xff0c;搭配上一…

公共4G广播音柱有哪些用处

公共广播音柱有哪些用处 公共广播音柱是一种用于广播音频信号的设备&#xff0c;一般安装在公共场所或街道上。它具有以下几个主要用处&#xff1a; 1. 喊话广播&#xff1a;公共广播音柱可以用于喊话广播&#xff0c;用来传达重要信息、紧急通知、警报等&#xff0c;如公共安…

基于微信小程序的实验室预约管理系统设计与实现

前言 &#x1f497;博主介绍&#xff1a;✌全网粉丝10W,CSDN特邀作者、博客专家、CSDN新星计划导师、全栈领域优质创作者&#xff0c;博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java、小程序技术领域和毕业项目实战✌&#x1f497; &#x1f447;&#x1f3fb;…

没有白走的路,这个结果你配得上。

作者 | 磊哥 来源 |公众号&#xff1a;Java中文社群 转载请联系授权&#xff08;微信ID&#xff1a;GG_Stone&#xff09; 几年前&#xff0c;我在看新三国时&#xff0c;有一段记忆深刻的话&#xff0c;司马懿获胜之后说&#xff1a;“我挥剑只有一次&#xff0c;却磨了十几年…