Spring AMQP

news2025/1/15 17:12:33

大家好我是苏麟 今天说一说spring aqmp。

SpringAMQP

SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,使用起 来非常方便。

官方 : Spring AMQP

 依赖

        <!--AMQP 包含RabbitMQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>${amqp.version}</version>
        </dependency>
        <dependency>
            <groupId>com.github.luues</groupId>
            <artifactId>spring-boot-starter-rabbitmq</artifactId>
            <version>1.3.0.5.RELEASE</version>
        </dependency>

SpringAMQP提供了三个功能:

  • 自动声明队列、交换机及其绑定关系
  • 基于注解的监听器模式,异步接收消息
  • 封装了RabbitTemplate工具,用于发送消息

Basic Queue 简单队列模型

在父工程mq-demo中引入依赖

        <!--AMQP 包含RabbitMQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

消息发送

首先配置MQ地址,在publisher服务的application.yml中添加配置:

spring:
  rabbitmq:
    port: 5672
    host: 127.0.0.1 # 主机名
    username: guest # 用户名
    password: guest # 密码
    virtual-host: / # 虚拟主机

然后在publisher服务中编写测试类SpringAmqpTest,并利用RabbitTemplate实现消息发送:

package com.sl;

import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

/**
 * springAMQP
 */
@RunWith(SpringRunner.class)
@SpringBootTest
class PublisherApplicationTests {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * Simple 简单队列
     */
    @Test
    void testSimple() {
        //队列名
        String queues = "ty";
        //消息
        String message = "hello ty";
        System.out.println("666");
        //发送信息
        rabbitTemplate.convertAndSend(queues, message);
    }

}

消息接收

首先配置MQ地址,在consumer服务的application.yml中添加配置:

spring:
  rabbitmq:
    port: 5672
    host: 127.0.0.1
    username: guest
    password: guest
    virtual-host: /

然后在consumer服务新建一个类SpringRabbitListener,代码如下:

package com.sl.spring;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;



@Component
public class SpringRabbitMQListener {

    @RabbitListener(queues = "ty")
    public void listenerSimpleQueue(String msg) {
        System.out.println("消费者接收到的消息 : " + msg);
    }
}

测试

启动consumer服务,然后在publisher服务中运行测试代码,发送MQ消息 .

 我们发送消息之前需要自己创建一个队列

 接受消息

WorkQueue 工作队列

Work queues,也被称为(Task queues),任务模型。简单来说就是让多个消费者绑定到一个队列, 共同消费队列中的消息。

当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。

此时就可以使用work 模型,多个消费者共同处理消息处理,速度就能大大提高了。

消息发送

这次我们循环发送,模拟大量消息堆积现象。

在publisher服务中的SpringAmqpTest类中添加一个测试方法:

package com.sl;

import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

/**
 * springAMQP
 */
@RunWith(SpringRunner.class)
@SpringBootTest
class PublisherApplicationTests {

    @Autowired
    private RabbitTemplate rabbitTemplate;


    /**
     * Work 工作队列
     */
    @Test
    void testWork() {
        String queues = "ty";

        for (int i = 0; i <= 50; i++) {
            String message = "hello rabbitmq__";
            rabbitTemplate.convertAndSend(queues, message + i);
        }
    }

}

消息接收

要模拟多个消费者绑定同一个队列,我们在consumer服务的SpringRabbitListener中添加2个新的方 法:

package com.sl.spring;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;

/**
 * @className: SpringRabbitMQListence
 * @author: TianYuan ShowTime
 * @date: 2023/7/29-9:30
 **/

@Component
public class SpringRabbitMQListener {

    /**
     * @RabbitListener 声明队列名称
     */

    @RabbitListener(queues = "ty")
    public void listenerWorkQueue1(String msg) throws InterruptedException {
        System.out.println("消费者1......接收到的消息: " + msg + LocalDateTime.now());
        Thread.sleep(20);
    }

    @RabbitListener(queues = "ty")
    public void listenerWorkQueue2(String msg) throws InterruptedException {
        System.out.println("消费者2......接收到的消息" + msg + LocalDateTime.now());
        Thread.sleep(200);
    }

}

测试 :

启动ConsumerApplication后,在执行publisher服务中刚刚编写的发送测试方法testWorkQueue。

这会耗时

能者多劳

在spring中有一个简单的配置,可以解决这个问题。我们修改consumer服务的application.yml文件, 添加配置:

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

节省了好几秒的时间

总结 : 

Work模型的使用:

  • 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理
  • 通过设置prefetch来控制消费者预取的消息数量

发布/订阅

发布订阅的模型如图:

可以看到,在订阅模型中,多了一个exchange角色,而且过程略有变化:

  • Publisher:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
  • Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息, 例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange 的类型。Exchange有以下3种类型:
  1. Fanout:广播,将消息交给所有绑定到交换机的队列
  2. Direct:定向,把消息交给符合指定routing key 的队列
  3. Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
  • Consumer:消费者,与以前一样,订阅队列,没有变化 Queue:消息队列也与以前一样,接收消息、缓存消息。

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

Fanout 

Fanout,英文翻译是扇出,我觉得在MQ中叫广播更合适。

在广播模式下,消息发送流程是这样的:

  • 1) 可以有多个队列
  • 2) 每个队列都要绑定到Exchange(交换机)
  • 3) 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定
  • 4) 交换机把消息发送给绑定过的所有队列
  • 5) 订阅队列的消费者都能拿到消息

我们的计划是这样的:

  • 创建一个交换机 itcast.fanout,类型是Fanout
  • 创建两个队列fanout.queue1和fanout.queue2,绑定到交换机itcast.fanout

声明队列和交换机

Spring提供了一个接口Exchange,来表示所有不同类型的交换机:

在consumer中创建一个类,声明队列和交换机:

package com.sl.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class AMQPConfig {

    /**
     * 创建交换机
     *
     * @return
     */
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("sl.exchange");
    }

    /**
     * 创建队列
     *
     * @return
     */
    @Bean
    public Queue fanoutQueue1() {
        return new Queue("fanoutQueue1");
    }

    @Bean
    public Queue fanoutQueue2() {
        return new Queue("fanoutQueue2");
    }

    /**
     * 创建连接
     *
     * @return
     */
    @Bean
    public Binding fanoutBinding1(FanoutExchange fanoutExchange, Queue fanoutQueue1) {
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }

    @Bean
    public Binding fanoutBinding2(FanoutExchange fanoutExchange, Queue fanoutQueue2) {
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }

}

消息发送

在publisher服务的SpringAmqpTest类中添加测试方法:

package com.sl;

import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

/**
 * springAMQP
 */
@RunWith(SpringRunner.class)
@SpringBootTest
class PublisherApplicationTests {

    @Autowired
    private RabbitTemplate rabbitTemplate;


    /**
     * 发布订阅模型
     * <p>
     * Fanout 广播
     */
    @Test
    void testFanout() {
        String exchangeName = "sl.exchange";

        /**
         *  ctrl + p  查看参数
         */
        rabbitTemplate.convertAndSend(exchangeName, "", "中南海");


    }

}

消息接收

在consumer服务的SpringRabbitListener中添加两个方法,作为消费者:

package com.sl.spring;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;



@Component
public class SpringRabbitMQListener {


    /**
     * Fanout 广播
     * @RabbitListener 声明队列名称
     * @param msg
     */
    @RabbitListener(queues = "fanoutQueue1")
    public void listenerFanoutQueue1(String msg) {
        System.out.println("消费者1111接收到的消息" + msg);
    }

    @RabbitListener(queues = "fanoutQueue2")
    public void listenerFanoutQueue2(String msg) {
        System.out.println("消费者2222接收到的消息" + msg);
    }
}

总结

交换机的作用是什么?

  • 接收publisher发送的消息
  • 将消息按照规则路由到与之绑定的队列
  • 不能缓存消息,路由失败,消息丢失
  • FanoutExchange的会将消息路由到每个绑定的队列

声明队列、交换机、绑定关系的Bean是什么?

  • Queue
  • FanoutExchange
  • Binding

Direct

在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息 被不同的队列消费。这时就要用到Direct类型的Exchange。

在Direct模型下:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey (路由key)
  • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey 。
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列 的 Routingkey 与消息的 Routing key 完全一致,才会接收到消息

案例需求如下:

1. 利用@RabbitListener声明Exchange、Queue、RoutingKey

2. 在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2

3. 在publisher中编写测试方法,向itcast. direct发送消息

消息接收

基于注解声明队列和交换机

基于@Bean的方式声明队列和交换机比较麻烦,Spring还提供了基于注解方式来声明。 在consumer的SpringRabbitListener中添加两个消费者,同时基于注解来声明队列和交换机:

package com.sl.spring;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Component
public class SpringRabbitMQListener {


    /**
     * Direct 路由
     *
     * @param msg
     */
    @RabbitListener(bindings = @QueueBinding(value = @Queue("jwd1"),
            exchange = @Exchange("yk.sl"),
            key = {"blue", "red"})
    )
    public void listenerDirectQueue1(String msg) {
        System.out.println("苏麟打死杨科小弟 1号 :" + msg);
    }

    @RabbitListener(bindings = @QueueBinding(value = @Queue("jwd2"),
            exchange = @Exchange("yk.sl"),
            key = {"yellow", "red"})
    )
    public void listenerDirectQueue2(String msg) {
        System.out.println("苏麟打死杨科小弟 2号 :" + msg);
    }
}

消息发送

在publisher服务的SpringAmqpTest类中添加测试方法:

package com.sl;

import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

/**
 * springAMQP
 */
@RunWith(SpringRunner.class)
@SpringBootTest
class PublisherApplicationTests {

    @Autowired
    private RabbitTemplate rabbitTemplate;



    /**
     * 发布订阅模型
     * <p>
     * Direct 路由
     */
    @Test
    void testDirect() {
        String exchangeName = "yk.sl";

        /**
         *  ctrl + p  查看参数
         */
        rabbitTemplate.convertAndSend(exchangeName, "yellow", "奥特曼");

    }
}

 测试 : 

 修改BindingKey

  修改BindingKey和消息

总结

描述下Direct交换机与Fanout交换机的差异?

  • Fanout交换机将消息路由给每一个与之绑定的队列
  • Direct交换机根据RoutingKey判断路由给哪个队列
  • 如果多个队列具有相同的RoutingKey,则与Fanout功能类似

基于@RabbitListener注解声明队列和交换机有哪些常见注解?

  • @Queue
  • @Exchange

这期就到这里下期见!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1108290.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

三款经典的轮式/轮足机器人讲解,以及学习EG2133产生A/B/C驱动电机。个人机器人学习和开发路线(推荐)

1&#xff0c;灯哥开源&#xff08;有使用指南&#xff0c;适合刚入门新手&#xff09; 机械部分&#xff1a;2个foc无刷电机 硬件和软件部分&#xff1a;没有驱动板子。只有驱动器&#xff0c;主控板esp32和驱动器通过pwm直接通讯。驱动器板子上有蓝色电机接口&#xff0c;直…

常见的 NoSQL 数据库有哪些?

前言 今天我们来介绍一下工作开发中常见的一些NoSQL数据库及其基本特点。欢迎在评论区留下文章中没有介绍且好用的NOSQL数据库&#x1f91e;。 什么是&#xff08;NOSQL&#xff09;非关系型数据库 非关系型数据库又被称为 NoSQL&#xff08;Not Only SQL )&#xff0c;意为不…

第八章动态规划+第九章同余【算法zxd】

算法设计过程&#xff1a; ①问题分析 ②算法策略 / 建立计算模型 ③算法设计与描述 ④算法分析 [ 算法选择 ] ⑤算法实现 ⑥测试与结果分析 ⑦文档编制 常用结论&#xff1a; 对数低于多项式&#xff1b;多项式低于指数 常用公式&#xff1a; 定理2.5 第八章&#xff1…

软件工程与计算总结(十九)软件测试

目录 ​编辑 一.引言 1.验证与确认 2.目标 3.测试用例 4.桩与驱动 5.缺陷、错误与失败 二.测试层次 1.测试层次的划分 2.单元测试 3.集成测试 4.系统测试 三.测试技术 1.测试用例的选择 2.随机测试 3.基于规格的技术&#xff08;黑盒测试&#xff09; 4.基于代…

你的DOT即将解锁,请注意以下事项

作者&#xff1a; David 还记得两年前Polkadot平行链卡槽拍卖质押吗&#xff1f; 参与平行链众贷&#xff0c;质押DOT两年&#xff0c;选择投票的项目方&#xff0c;获得相应token奖励。当年质押的DOT即将解锁&#xff0c;就在十月底&#xff0c;10月24日请注意。 第一批解锁…

【C语言刷题】模拟实现offsetof宏

本篇文章目录 1. 宏offsetof的作用2. 分析该如何模拟实现3.模拟实现 1. 宏offsetof的作用 在www.cplusplus.com中对offsetof宏的功能描述&#xff1a; 这个宏的作用就是传入一个结构体类型和一个成员名&#xff0c;返回这个成员相对比这个结构体起始位置的偏移量&#xff08…

深度学习零基础教程

代码运行软件安装&#xff1a; anaconda:一个管理环境的软件–>https://blog.csdn.net/scorn_/article/details/106591160&#xff08;可选装&#xff09; pycharm&#xff1a;一个深度学习运行环境–>https://blog.csdn.net/scorn_/article/details/106591160&#xf…

c语言内功修炼--深度剖析数据的存储

前言&#xff1a; 我们知道在c语言中的几种基本内置数据类型&#xff0c;分别是&#xff1a; char //字符数据类型 short //短整型 int //整形 long //长整型 long long //更长的整形 float //单精度浮点数 double //双精度浮点数 在…

【】02-02序列求和----二分检索

数列求和公式&#xff1a; 例子&#xff1a; 二分检索算法:&#xff08;有序数列&#xff09; 输入的分析结果&#xff1a; 比较t次的输入个数&#xff1a; 比较K次&#xff0c;是还需要加上间隙处的情况。 如k2;下标 1 2 3 比较k次时 low1,high3 while 第1次循环 &…

卷积神经网络手写字符识别 - 深度学习 计算机竞赛

文章目录 0 前言1 简介2 LeNet-5 模型的介绍2.1 结构解析2.2 C1层2.3 S2层S2层和C3层连接 2.4 F6与C5层 3 写数字识别算法模型的构建3.1 输入层设计3.2 激活函数的选取3.3 卷积层设计3.4 降采样层3.5 输出层设计 4 网络模型的总体结构5 部分实现代码6 在线手写识别7 最后 0 前言…

PLC 学习day02 硬件输入/输入的知识

1.资料来源 1.链接&#xff1a;三菱PLC视频教程全集之FX3U基本单元输入接线_哔哩哔哩_bilibili 2. 链接&#xff1a; 三菱plc视频教程全集之FX3U基本单元输出接线_哔哩哔哩_bilibili 2. PLC 的输入部分器件连接。 2.1 PLC输入部分的硬件知识 1. 一般输入部分是PLC获取信息的地…

界面组件DevExpress WPF v23.1 - 全面升级文档处理功能

DevExpress WPF拥有120个控件和库&#xff0c;将帮助您交付满足甚至超出企业需求的高性能业务应用程序。通过DevExpress WPF能创建有着强大互动功能的XAML基础应用程序&#xff0c;这些应用程序专注于当代客户的需求和构建未来新一代支持触摸的解决方案。 无论是Office办公软件…

分享一个简单容易上手的CSS框架:Pure.Css

雅虎&#xff08;Yahoo!&#xff09;创建了一个简单的CSS框架&#xff0c;被称为Pure.css&#xff08;https://purecss.io/&#xff09;&#xff0c;以提供一套基础样式集&#xff0c;可作为网页开发的起点。Pure.css旨在轻量、模块化和响应式&#xff0c;使构建快速加载、适用…

Python 创建或读取 Excel 文件

Excel是一种常用的电子表格软件&#xff0c;广泛应用于金融、商业和教育等领域。它提供了强大的数据处理和分析功能&#xff0c;可进行各种计算和公式运算&#xff0c;并能创建各种类型的图表和可视化数据。Excel的灵活性使其成为处理和管理数据的重要工具。本文将介绍如何使用…

【整理】旅行商问题(traveling salesman problem,TSP)

旅行商 一个旅行商由某市出发&#xff0c;经过所有给定的n个城市后&#xff0c;再 回到出发的城市。除了出发的城市外&#xff0c;其它城市只经过一 回。这样的回路可能有多个&#xff0c;求其中路径成本最小的回路。 蛮力【穷举】 【例4-4】旅行商问题——排列树 计算模型…

CSS3选择器详解 前端开发入门笔记(六)

CSS3选择器是一种用于定位HTML元素的方式&#xff0c;它们可以使样式表更加精确地应用到特定的元素。下面是一些常用的CSS3选择器&#xff1a; 元素选择器&#xff08;Element Selector&#xff09;&#xff1a;使用元素名称作为选择器&#xff0c;匹配对应名称的所有元素。例如…

esp32 arduino使用多个串口时如何查看serial1,serial2所对应的引脚定义

如上图所示可以通HardwareSerial.cpp找到起对应的引脚去连接设备即可

5.12.webrtc接口调用过程

嗨&#xff0c;大家好&#xff0c;我是李超&#xff0c;在上节课中呢&#xff0c;我向你介绍了外接口的设计以及我们红接口展开之后的样子&#xff0c;对吧&#xff1f;那今天呢&#xff1f;我们再来看看整个接口调用过程。那整个这个调用过程啊&#xff0c;非常的复杂&#xf…

域控操作二:设置域用户使用简单密码

过程太多简单 直接写出路径更改即可 组策略—计算机配置----策略—Windows设置–安全设置----账户策略–密码策略 按自己想法改就行了 注意一点&#xff01;&#xff01;&#xff01;&#xff01;&#xff01; 要么自己设置策略&#xff0c;要么从默认策略改&#xff01;&am…

数学分析:傅里叶级数

卓里奇书好的一点就是&#xff0c;不是直接引出公式&#xff0c;而是告诉你理由。先引出正交的概念&#xff0c;然后在函数空间中&#xff0c;也有正交&#xff0c;只不过是无限维的空间。 这里要注意&#xff0c;明确说明了是有限个。 在函数空间里面&#xff0c;内积是指进行…