十七、SpringAMQP

news2025/1/11 6:10:47

目录

一、SpringAMQP的介绍:

二、利用SpringAMQP实现HelloWorld中的基础消息队列功能

1、因为publisher和consumer服务都需要amqp依赖,因此这里把依赖直接放到父工程mq-demo中

2、编写yml文件

3、编写测试类,并进行测试

三、在consumer中编写消费逻辑,监听simple.queue

1、导入依赖,刚才在父工程中已经导入了,所以省略

2、编写yml文件

3、新建类,实现消费逻辑

4、运行并测试

四、模拟WorkQueue,实现一个队列绑定多个消费者

1、编写生产者(生产50个消息)

2、编写消费者(一个消费者更快,一个消费者更慢)

3、测试

4、消费预取的修改

5、重新测试

五、发布和订阅

(一)利用SpringAMQP演示FanoutExchange的使用

1、新建config类,声明交换机和队列

2、启动项目,查看配置

3、编写消费者代码

4、编写生产者代码

5、运行代码,观察输出

(二)交换机的作用

(三)声明队列、交换机、绑定关系的Bean是什么?

(四)DirectExchange

1、编写消费者代码

2、编写生产者代码

(五)Direct交换机与Fanout交换机的差异

(六)TopicExchange

1、编写消费者代码

2、编写生产者代码

3、运行测试

4、描述下Direct交换机与Topic交换机的差异

(七)测试发送Object类型信息

1、新增队列

2、发送对象

3、查看

4、优化(使用jackson进行序列化)

5、接收消息


一、SpringAMQP的介绍:

  1. AMQP是一种高级消息队列协议。

  2. SpringAMQP是基于Spring Framework的AMQP扩展,提供了一个抽象层,使得使用AMQP进行消息传递变得更加简单。

  3. SpringAMQP支持多种消息传递模式,包括点对点、发布/订阅和请求/响应等。

  4. SpringAMQP提供了许多高级功能,例如队列管理、消息确认、事务和消息过滤等。

  5. SpringAMQP提供了集成测试工具和基于Spring Boot的自动配置,使得集成AMQP变得更加容易。

  6. 总之,SpringAMQP是一个灵活、可扩展的AMQP实现,它使得使用消息队列时变得更加容易和高效。

二、利用SpringAMQP实现HelloWorld中的基础消息队列功能

1、因为publisher和consumer服务都需要amqp依赖,因此这里把依赖直接放到父工程mq-demo中

<!--        AMQP依赖,包含RabbitMQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

2、编写yml文件

logging:
  pattern:
    dateformat: MM-dd HH:mm:ss:SSS
spring:
  rabbitmq:
    host: 192.168.248.152
    port: 5672
    virtual-host: /
    username: itcast
    password: 123456

3、编写测试类,并进行测试

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSendMassage(){
        String queue = "simple.queue";
        String massage="aaaaaaa";
        rabbitTemplate.convertAndSend(queue,massage);
    }
}

三、在consumer中编写消费逻辑,监听simple.queue

1、导入依赖,刚才在父工程中已经导入了,所以省略

2、编写yml文件

logging:
  pattern:
    dateformat: MM-dd HH:mm:ss:SSS
spring:
  rabbitmq:
    host: 192.168.248.152
    port: 5672
    virtual-host: /
    username: itcast
    password: 123456

3、新建类,实现消费逻辑

package cn.itcast.mq.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class SpringRabbitListener {
    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueue(String msg){
        System.out.println("消费者接收到消息:"+msg);
    }
}

4、运行并测试

注意:

消息一旦消费就会从队列删除,RabbitMQ没有消息回溯功能

四、模拟WorkQueue,实现一个队列绑定多个消费者

1、编写生产者(生产50个消息)

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSendMassage(){
        String queue = "simple.queue";
        String massage="HelloWorld";

        for (int i = 0; i < 50; i++) {
            rabbitTemplate.convertAndSend(queue,massage);
        }
    }
}

2、编写消费者(一个消费者更快,一个消费者更慢)

@Component
public class SpringRabbitListener {
    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueue(String msg){
        System.out.println("消费者0接收到消息:"+msg+ LocalTime.now());
        try {
            Thread.sleep(20);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueue1(String msg){
        System.err.println("消费者1接收到消息__________-:"+msg+ LocalTime.now());
        try {
            Thread.sleep(200);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

3、测试

我们发现,虽然消费者0更快,但是它并没有承担更多的工作量;

这是因为消费预取机制会让消费者事先分配好要处理的消息,而不是按能力分配;

4、消费预取的修改

可以在yml文件中修改

    listener:
      simple:
        prefetch: 1 #表示预取上限为1

5、重新测试

五、发布和订阅

(一)利用SpringAMQP演示FanoutExchange的使用

1、新建config类,声明交换机和队列
@Configuration
public class FanoutConfig {
    ///1
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("itcast.fanout");
    }

    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout.queue1");
    }

    @Bean
    public Binding bindingQueue1(FanoutExchange exchange,Queue fanoutQueue1){
        return BindingBuilder.bind(fanoutQueue1).to(exchange);
    }

    ///2
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout.queue2");
    }

    @Bean
    public Binding bindingQueue2(FanoutExchange exchange,Queue fanoutQueue2){
        return BindingBuilder.bind(fanoutQueue2).to(exchange);
    }
}
2、启动项目,查看配置

绑定成功

3、编写消费者代码
    @RabbitListener(queues = "fanout.queue1")
    public void listenFanoutQueue1(String msg){
        System.err.println("消费者1接收到消息__________-:"+msg+ LocalTime.now());
    }

    @RabbitListener(queues = "fanout.queue2")
    public void listenFanoutQueue2(String msg){
        System.err.println("消费者2接收到消息__________-:"+msg+ LocalTime.now());
    }
4、编写生产者代码
    @Test
    public void sendFanoutMassage(){
        String exchangeName = "itcast.fanout";
        String message = "Hello Every One";
        rabbitTemplate.convertAndSend(exchangeName,"",message);
    }
5、运行代码,观察输出

发现两个消费者都接收到了消息

(二)交换机的作用

  • 接收publisher发送的消息
  • 将消息按照规则路由到与之绑定的队列
  • 不能缓存消息,路由失败,消息丢失
  • FanoutExchange的会将消息路由到每个绑定的队列

(三)声明队列、交换机、绑定关系的Bean是什么?

  • Queue
  • FanoutExchange
  • Binding
     

(四)DirectExchange

实现:

1、编写消费者代码
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue2"),
            exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT),
            key = {"red","blue"}
    ))
    public void listenDirectQueue1(String msg){
        System.err.println("消费者1接收到消息__________-:"+msg+ LocalTime.now());
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue2"),
            exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT),
            key = {"red","yellow"}
    ))
    public void listenDirectQueue2(String msg){
        System.out.println("消费者2接收到消息__________-:"+msg+ LocalTime.now());
    }
2、编写生产者代码
    @Test
    public void sendDirectMassage(){
        String exchangeName = "itcast.direct";
        String message = "Hello Every One1111";
        rabbitTemplate.convertAndSend(exchangeName,"blue",message);
    }
    @Test
    public void sendDirectMassage(){
        String exchangeName = "itcast.direct";
        String message = "Hello Every One1111";
        rabbitTemplate.convertAndSend(exchangeName,"red",message);
    }

(五)Direct交换机与Fanout交换机的差异

  • Fanout交换机将消息路由给每一个与之绑定的队列
  • Direct交换机根据RoutingKey判断路由给哪个队列
  • 如果多个队列具有相同的RoutingKey,则与Fanout功能类似
     

基于@RabbitListener注解声明队列和交换机有哪些常见注解

  • @Queue
  • @Exchange
     

(六)TopicExchange

利用SpringAMQP演示TopicExchange的使用

1、编写消费者代码
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue1"),
            exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC),
            key = "china.#"
    ))
    public void listenTopicQueue1(String msg){
        System.out.println("消费者1接收到消息aaaaaa__-:"+msg+ LocalTime.now());
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue2"),
            exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC),
            key = "#.news"
    ))
    public void listenTopicQueue2(String msg){
        System.err.println("消费者2接收到消息a__-:"+msg+ LocalTime.now());
    }
2、编写生产者代码
    @Test
    public void sendTopicMassage(){
        String exchangeName = "itcast.topic";
        String message = "Hello Every One12222";
        rabbitTemplate.convertAndSend(exchangeName,"china.news",message);
    }
3、运行测试

4、描述下Direct交换机与Topic交换机的差异

(七)测试发送Object类型信息

1、新增队列
    @Bean
    public Queue objectQueue(){
        return new Queue("object.queue");
    }
2、发送对象
    @Test
    public void sendObjectMassage(){
        Map<String ,Object> message = new HashMap<>();
        message.put("name","11");
        message.put("age","22");
        rabbitTemplate.convertAndSend("object.queue",message);
    }
3、查看

对象被序列化了,这种方式性能差,不安全(容易被注入)

4、优化(使用jackson进行序列化)

引入依赖 

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>

添加配置Bean

    @Bean
    public Jackson2JsonMessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }

5、接收消息

编写配置Bean

    @Bean
    public Jackson2JsonMessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }

编写消费者代码

@RabbitListener(queues = "object.queue")
public void listenObjectQueue1(Map<String,Object> msg){
    System.err.println("消费者接收到消息___da_______-:"+msg+ LocalTime.now());
}

验证

注意:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1236389.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

c++|内联函数

一、概念 以inline修饰的函数叫做内联函数&#xff0c;编译时c编译器会在调用函数的地方展开&#xff0c;而不会建立栈帧&#xff0c;提升了程序运行的效率 例子&#xff1a; #include <iostream> using namespace std;int Add(int left, int right) {return left - ri…

【qsort学习及改造冒泡排序能排序任何数】

qsort学习及改造冒泡排序能排序任何数 qsort的使用 qsort的使用 这个函数也不是很复杂&#xff01;&#xff01;&#xff01; qsort(void*base,size_t num,size_t width,int(int (__cdecl *compare )(const void *elem1, const void *elem2 )))  void * base,为数组的基地…

人工智能:科技之光,生活之美

在科技飞速发展的今天&#xff0c;人工智能已经深入到我们的生活中&#xff0c;它如同一束璀璨的科技之光&#xff0c;照亮我们生活的每一个角落&#xff0c;使我们的生活更加美好。下面我将从人工智能的领域、应用以及对人工智能的看法三个方面来谈谈它对我们生活的影响。 一、…

基于单片机设计的气压与海拔高度检测计(采用MPL3115A2芯片实现)

一、前言 随着科技的不断发展&#xff0c;在许多领域中&#xff0c;对气压与海拔高度的测量变得越来越重要。例如&#xff0c;对于航空和航天工业、气象预报、气候研究等领域&#xff0c;都需要高精度、可靠的气压与海拔高度检测装置。针对这一需求&#xff0c;基于单片机设计…

Java语言的特点||运算符

Java语言的特点||运算符 1&#xff1a;2&#xff1a;JDK, JRE&#xff0c;JVM知识&#xff1a;3&#xff1a;注释4&#xff1a;标识符5&#xff1a; Java编译过程&#xff1a;6&#xff1a;赋值7&#xff1a;switch8:布尔表达式9&#xff1a;判定素数10&#xff1a;打印 1 - 10…

前端学习--React(1)

一、React简介 React由Meta公司研发&#xff0c;是一个用于 构建Web和原生交互界面的库 优势&#xff1a;组件化开发、不错的性能、丰富生态&#xff08;所有框架中最好&#xff09;、跨平台&#xff08;web、ios、安卓&#xff09; 开发环境搭建 打开相应文件夹 新建终端并…

Java引用类型String源码解析

目录 String解析 final的作用 String是否有长度限制 StringBuffer解析 StringBuilder解析 关键字、操作类相关 引用数据类型非常多大致包括&#xff1a;类、 接口类型、 数组类型、 枚举类型、 注解类型、 字符串型。String类型就是引用类型。 String解析 JVM运行时会分…

Flutter:多线程Isolate的简单使用

在flutter中如果要使用线程&#xff0c;需要借助Isolate来实现。 简介 在Flutter中&#xff0c;Isolate是一种轻量级的线程解决方案&#xff0c;用于在应用程序中执行并发任务。Isolate可以被认为是独立于主线程的工作单元&#xff0c;它们可以在后台执行任务而不会阻塞应用程…

8.Gin 自定义控制器

8.Gin 自定义控制器 前言 在上一篇路由文件抽离的过程中&#xff0c;我们发现接口的业务逻辑还写在路由配置中&#xff0c;如下&#xff1a; 1696385129126 但是如果业务逻辑比较多&#xff0c;如果写在路由之中&#xff0c;肯定不合适。 我们可以将业务逻辑抽离&#xff0c;单…

【C++进阶之路】第八篇:智能指针

文章目录 一、为什么需要智能指针&#xff1f;二、内存泄漏1.什么是内存泄漏&#xff0c;内存泄漏的危害2.内存泄漏分类&#xff08;了解&#xff09;3.如何检测内存泄漏&#xff08;了解&#xff09;4.如何避免内存泄漏 三、智能指针的使用及原理1.RAII2.智能指针的原理3.std:…

【兔子王赠书第8期】AI短视频制作一本通: 文本生成视频+图片生成视频+视频生成视频

文章目录 写在前面推荐图书关键点内容简介作者简介推荐理由写在后面 写在前面 1本书精通AI短视频制作&#xff0c;文本生成视频图片生成视频视频生成视频AI短视频应用&#xff01;高效视频制作技巧&#xff0c;助你快速成长为行业大咖&#xff01; 推荐图书 《AI短视频制作一…

Stable Diffusion XL网络结构-超详细原创

强烈推荐先看本人的这篇 Stable Diffusion1.5网络结构-超详细原创-CSDN博客 1 Unet 1.1 详细整体结构 1.2 缩小版整体结构 以生成图像1024x1024为例&#xff0c;与SD1.5的3个CrossAttnDownBlock2D和CrossAttnUpBlock2D相比&#xff0c;SDXL只有2个&#xff0c;但SDXL的Cros…

springboot集成nacos作配置中心,动态配置不生效

总体概要 springboot3.0&#xff0c;nacos&#xff0c;jdk17使用nacos配置中心&#xff0c;热更新&#xff0c;使配置动态生效 本文主要介绍springboot怎么集成nacos作为配置中心&#xff0c;使其配置在不重启服务的情况下&#xff0c;怎么生效的。 所用组件及其版本 组件版本…

【ArcGIS Pro微课1000例】0034:矢量数据几何校正案例(Spatial Adjustment)

本案例讲解矢量数据几何校正&#xff0c;根据一个矢量数据去校正另外一个矢量数据。 文章目录 一、加载实验数据二、空间校正三、注意事项 一、加载实验数据 在ArcGIS Pro中加载数据效果如下&#xff1a; design&#xff1a;需要校正的数据图层planroadcenter&#xff1a;目标…

118.184.158.111德迅云安全浅谈如何避免网络钓鱼攻击

随着互联网的不断发展&#xff0c;网络钓鱼攻击也越来越猖獗&#xff0c;给个人和企业带来了巨大的经济损失和安全威胁。本文对如何防范网络钓鱼攻击提出的一些小建议 希望对大家有所帮助。 1.防止XSS&#xff08;跨站脚本攻击&#xff09;攻击 XSS攻击指的是攻击者在网站中注入…

每日一练:组合不重复的四位数字

问题&#xff1a;有四个数字“1、2、3、4”&#xff0c;能组成多少个互不相同且无重复数字的四位数&#xff1f;各是多少&#xff1f; 程序分析 可填在千位、百位、十位、个位的数字都是1、2、3、4。组成所有的排列后再去掉不满足条件的排列。 实现方法1 1&#xff09;使用四…

909-2014-T1

文章目录 1.原题2.算法思想3.关键代码4.完整代码5.运行结果 1.原题 为带表头的单链表类Chain编写一个成员函数Reverse&#xff0c;该函数对链表进行逆序操作&#xff08;将链表中的结点按与原序相反的顺序连接&#xff09;&#xff0c;要求逆序操作就地进行&#xff0c;不分配…

Linux学习第44天:Linux 多点电容触摸屏实验:难忘记第一次牵你手的温存

Linux版本号4.1.15 芯片I.MX6ULL 大叔学Linux 品人间百味 思文短情长 本章的思维导图内容如下&#xff1a; 二、硬件原理图分析 三、实验程序编写 1、修改设备树 1&#xff09;、添加FT5426所使用的IO 一个复位 IO、一个中断 IO、…

【C++进阶之路】第五篇:哈希

文章目录 一、unordered系列关联式容器1.unordered_map&#xff08;1&#xff09;unordered_map的介绍&#xff08;2&#xff09;unordered_map的接口说明 2. unordered_set3.性能对比 二、底层结构1.哈希概念2.哈希冲突3.哈希函数4.哈希冲突解决&#xff08;1&#xff09;闭散…

【Python3】【力扣题】338. 比特位计数

【力扣题】题目描述&#xff1a; 题解&#xff1a;从0到n的整数&#xff0c;逐一统计二进制中1的个数&#xff0c;记录在一个新列表中。 【Python3】代码&#xff1a; 1、解题思路&#xff1a;Python函数。 知识点&#xff1a;bin(...)&#xff1a;转为二进制字符串&#xff…