SpringAMQP使用

news2024/11/26 0:27:55

说明:SpringAMQP(官网:https://spring.io/projects/spring-amqp)是基于RabbitMQ封装的一套模板,并利用了SpringBoot对其实现了自动装配,使用起来非常方便。安装和原始使用参考:http://t.csdn.cn/51qyD

基础操作

创建两个模块,一个用于发送消息(sender),一个用于接收消息(receiver),两个模块拥有共同的父模块

第一步:添加依赖

在父模块的pom.xml文件中,添加依赖,如下:

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.9.RELEASE</version>
        <relativePath/>
    </parent>

    <dependencies>
        <!--lombok依赖,用于生成set、get、toString方法-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>

        <!--AMQP依赖,包含RabbitMQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <!--单元测试-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
    </dependencies>

第二步:创建配置文件

配置文件(application.yml)内容如下,两个模块的内容一样

spring:
  rabbitmq:
    # MQ ip地址
    host: XXX.XXX.XXX.XXX
    # MQ的端口号
    port: 5672
    # 虚拟主机 每个用户单独对应一个 不同用户之间无法访问彼此的虚拟主机
    virtual-host: /
    # 用户名
    username: root
    # 密码
    password: 123456

第三步:创建Listener类

在接收方,创建监听类,用来接收消息,如下:

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class RabbitListenerDemo {

    @RabbitListener(queues = "demo.queue")
    public void listenDemoQueueMessage(String msg){
        System.out.println("msg = " + msg);
    }
}

第四步:编写发送端代码

在发送方的测试类中,写测试代码,发消息给接收方,其中RunWith()注解用于构建程序运行的上下文环境;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class SenderTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSender() {
        rabbitTemplate.convertAndSend("demo.queue","hello rabbit mq!");
    }
}

第五步:启动

先启动接收方(这是因为,如果队列在RabbitMQ管理平台上不存在的话,先启动发送方会造成消息丢失,而先启动接收方,RabbitMQ会根据队列名先创建出队列),再启动发送方;

可以看到,测试完成,接收方可以接收到消息

在这里插入图片描述

工作队列

实际的业务情况是一个发送方,可能会有多个接收方来接收,而且接收方处理效率可能各不相同。这样,接收方的代码可以写成这样,使用线程休眠模拟接收方执行的效率,再设置变量用于统计各个接收方执行的次数:

(RabbitListenerDemo.java)

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class RabbitListenerDemo {
    
    private static int count1 = 0;
    private static int count2 = 0;
    private static int count3 = 0;

    @RabbitListener(queues = "demo.queue")
    public void listenDemoQueueMessage1(String msg) throws InterruptedException {
        System.out.println("msg1 = " + msg + "======= count1 =" + (++count1));
        Thread.sleep(10);
    }

    @RabbitListener(queues = "demo.queue")
    public void listenDemoQueueMessage2(String msg) throws InterruptedException {
        System.out.println("msg2 = " + msg + "======= count2 =" + (++count2));
        Thread.sleep(20);
    }

    @RabbitListener(queues = "demo.queue")
    public void listenDemoQueueMessage3(String msg) throws InterruptedException {
        System.out.println("msg3 = " + msg + "======= count3 =" + (++count3));
        Thread.sleep(50);
    }
}

(SenderTest:循环发送200次,休眠10毫秒)

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class SenderTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSender() throws InterruptedException {
        for (int i = 0; i < 200; i++) {
            rabbitTemplate.convertAndSend("demo.queue","hello rabbit mq!======>" + i);
            Thread.sleep(10);
        }
    }
}

启动,可以看到执行效率最低的3号,也和1号、2号接收到了等量的消息量,

在这里插入图片描述

这是因为RabbitMQ有默认的分配策略,使每个接收方都可以接收到等量的消息量,而不是处理越快的处理越多。可以在接收方的配置文件中,添加这个配置,表示每个接收方只能一个消息一个消息处理(可以推测默认是先按照接收方数量,把请求都平均分配好之后,再让它们各自处理的);

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1

重启测试,可以看到,达到了“能者多劳”的效果

在这里插入图片描述

发布/订阅

发布/订阅,是指在消息发给队列前,对消息所绑定的队列信息做判断,然后按照绑定的队列对消息进行分发;

在这里插入图片描述

根据分发的情况,可分为以下三种:

  • 广播(Fanout):消息分发给所有队列;

  • 路由(Direct):消息只分发给拥有关键字(RoutingKey)的队列;

  • 主题(Topic):消息只分发给符合条件的队列;

Fanout(广播)

创建一个广播配置类,用于绑定队列与广播交换机(FanoutExchange);

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


/**
 * 广播配置类
 */
@Configuration
public class FanoutConfig {

    /**
     * 声明交换机
     * @return
     */
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("essay.fanout");
    }

    /**
     * 生成第一个队列
     * @return
     */
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout.queue1");
    }

    /**
     * 绑定队列和交换机
     *
     * @return
     */
    @Bean
    public Binding bindingQueue1(){
        return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
    }

    /**
     * 生成第二个队列
     * @return
     */
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout.queue2");
    }

    /**
     * 绑定队列和交换机
     *
     * @return
     */
    @Bean
    public Binding bindingQueue2(){
        return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
    }
}

接收方代码

    @RabbitListener(queues = "fanout.queue1")
    public void listenFanoutQueue1(String msg){
        System.out.println("接收者1接收到了消息:" + msg);
    }

    @RabbitListener(queues = "fanout.queue2")
    public void listenFanoutQueue2(String msg){
        System.out.println("接收者2接收到了消息:" + msg);
    }

发送方代码:消息并不直接发送给队列,而是发送个交换机;

    @Test
    public void fanoutExchangeTest(){
    	// 第二个参数是routeKey(路由转发关键字)不能不加,可以为空字符串
        rabbitTemplate.convertAndSend("essay.fanout","", "hello everyone!");
    }

测试结果,每个队列都接收到了消息,并发给各自的接收方

在这里插入图片描述

Direct(路由)

在接收方的接收方法上,创建对应的队列、路由交换机,并设置routeKey(路由关键字),接收者1号(group1, group2),接收者2号(group1, group3)

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue1"),
            exchange = @Exchange(name = "essay.direct",type = ExchangeTypes.DIRECT),
            key = {"group1", "group2"}
    ))
    public void listenDirectQueue1(String msg){
        System.out.println("接收者1号接收到了消息:" + msg);
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue2"),
            exchange = @Exchange(name = "essay.direct",type = ExchangeTypes.DIRECT),
            key = {"group1", "group3"}
    ))
    public void listenDirectQueue2(String msg){
        System.out.println("接收者2号接收到了消息:" + msg);
    }

发送方发送消息,routeKey = group1

	rabbitTemplate.convertAndSend("essay.direct","group1", "hello group1!");

在这里插入图片描述


发送方发送消息,routeKey = group2

	rabbitTemplate.convertAndSend("essay.direct","group2", "hello group2!");

只有接收者1号拥有group2,故只有接收者1号接收到消息

在这里插入图片描述

Topic(主题)

与路由类似,不同的是ExchangeTypes的类型key的组成,key由通配符和关键字组成

  • #:表示一个或多个字符;

  • *:表示一个字符;

如下面的三个key分别表示:

  • group.#:表示以“group”开头的消息都发过来;

  • #.class:表示以“class”结尾的消息都发过来;

  • *.person:表示两个字符,并以“person”结尾的消息都发过来;

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue1"),
            exchange = @Exchange(name = "essay.topic",type = ExchangeTypes.TOPIC),
            key = "group.#"
    ))
    public void listenTopicQueue1(String msg){
        System.out.println("接收者1号接收到了消息:" + msg);
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue2"),
            exchange = @Exchange(name = "essay.topic",type = ExchangeTypes.TOPIC),
            key = "#.class"
    ))
    public void listenTopicQueue2(String msg){
        System.out.println("接收者2号接收到了消息:" + msg);
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue3"),
            exchange = @Exchange(name = "essay.topic",type = ExchangeTypes.TOPIC),
            key = "*.person"
    ))
    public void listenTopicQueue3(String msg){
        System.out.println("接收者3号接收到了消息:" + msg);
    }

发送方测试

	// 1号接收
	rabbitTemplate.convertAndSend("essay.topic","group.b.c.d", "hello NO.1!");
	
	// 2号接收
	rabbitTemplate.convertAndSend("essay.topic","b.c.d.class", "hello NO.2!");
	
	// 3号接收
	rabbitTemplate.convertAndSend("essay.topic","b.person", "hello NO.3!");

启动,测试结果如下,可以看到达到了预期结果

在这里插入图片描述

总结

RabbitMQ是一门异步通信的技术,SpringAMQP是基于RabbitMQ的模版,可以省去原始操作RabbitMQ的繁琐(建立连接、设置连接参数、创建通道、创建队列、发送消息/接收消息)。

另外,可以使用SpringAMQP建立工作队列、发布/订阅等模式,其中工作队列可设置spring.rabbitmq.listener.simple.prefetch=1,达到“能者多劳”的效果;

而发布/订阅模式又分为广播、路由和主题,广播模式需要手动建立队列和路由交换机的关联,路由与主题的区别在于路由交换机的类型和路由关键字的格式。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/771499.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Python(二十二)运算符——算术运算符

❤️ 专栏简介&#xff1a;本专栏记录了我个人从零开始学习Python编程的过程。在这个专栏中&#xff0c;我将分享我在学习Python的过程中的学习笔记、学习路线以及各个知识点。 ☀️ 专栏适用人群 &#xff1a;本专栏适用于希望学习Python编程的初学者和有一定编程基础的人。无…

centos7.6下安装mysql

1.下载yum源&#xff1a; wget https://dev.mysql.com/get/mysql80-community-release-el7-5.noarch.rpm2.执行安装&#xff1a; rpm -ivh mysql80-community-release-el7-5.noarch.rpm3.开始安装 yum install -y mysql-server4.启动mysql服务 systemctl start mysqld5.查看…

JavaWeb课程设计项目实战(03)——开发准备工作

版权声明 本文原创作者&#xff1a;谷哥的小弟作者博客地址&#xff1a;http://blog.csdn.net/lfdfhl 在正式进入项目开发之前请先完成以下准备工作。 数据库语句 请创建数据库和表并完成数据初始化工作。 初始化数据库 请在MySQL数据库中创建名为studentinformationmanag…

Vue-组件基础(下)

一、目标 能够知道如何对props进行验证能够知道如何使用计算属性能够知道如何为组件自定义事件能够知道如何在组件上使用v-model 二、目录 props验证计算属性自定义事件组件上的v-model任务列表案例 props验证 1.什么是props验证 指的是&#xff1a;在封装组件时对外界传递…

关于GPT、AI绘画、AI提词器等AI技术的探讨

目前的AI潮流非常火热&#xff0c;CHATGPT可谓是目前大模型人工智能的代表&#xff0c;刚开始听说chatGPT可以写代码&#xff0c;写作&#xff0c;写方案&#xff0c;无所不能。还有AI绘画也很&#xff2e;&#xff22;作为一个程序员&#xff0c;为了体验这些&#xff21;&…

医院检验科LIS系统源码 检验申请、标本编号、联机采集、中文报告单的生成与打印、质控图的绘制和数据的检索与备份

LIS通过将所有仪器自身提供的端口与科室LIS系统中的工作站点连接&#xff0c;通过LIS实现与医院HIS系统的联网。是一套符合医院检验科实际需要的管理系统&#xff0c;实现检验业务全流程的计算机管理。从检验申请、标本编号、联机采集、中文报告单的生成与打印、质控图的绘制和…

基于微信小程序的求职招聘系统设计与实现(Java+spring boot+MySQL+微信小程序)

获取源码或者论文请私信博主 演示视频&#xff1a; 基于微信小程序的求职招聘系统设计与实现&#xff08;Javaspring bootMySQL微信小程序&#xff09; 使用技术&#xff1a; 前端&#xff1a;html css javascript jQuery ajax thymeleaf 微信小程序 后端&#xff1a;Java s…

Shikra:新一代多模态大语言模型,理解指向,说出坐标

“ Shikra&#xff1a;解锁多模态语言模型参考对话的魔法” Shikra和用户的对话案例 在人类的日常交流中&#xff0c;经常会关注场景中的不同区域或物体&#xff0c;双方都可以通过说话并指向这些区域来进行高效的信息交换。我们将这种对话模式称为参考对话&#xff08;Referen…

关系型数据库设计规则

目录 1.1 表、记录、字段 1.2 表的关联关系 1.2.1 一对一关联&#xff08;one-to-one&#xff09; 1.2.2 一对多关系&#xff08;one-to-many&#xff09; 1.2.3 多对多&#xff08;many-to-many&#xff09; 1.2.4 自我引用&#xff08;Self reference&#xff09; 关系…

第13讲:剖析 Trace 在 SkyWalking 中的落地实现方案(下)

TraceSegmentRef TraceSegment 中除了 Span 之外&#xff0c;还有另一个需要介绍的重要依赖 —— TraceSegmentRef&#xff0c;TraceSegment 通过 refs 集合记录父 TraceSegment 的信息&#xff0c;它的核心字段大概可以分为 3 类&#xff1a; 父 Span 信息traceSegmentId&am…

嵌入式工程师常用的软件工具推荐

前言&#xff1a;常言道&#xff1a;工欲善其事&#xff0c;必先利其器。作为一名合格的嵌入式工程师&#xff0c;日常可能需要接触和处理各种奇奇怪怪的问题&#xff0c;这时候一款高适配性的工具将会令工作效率大大提升。作者根据个人的实际使用情况与粉丝的客观感受&#xf…

在第二代SpringCloud中配置网关组件

我们接着上次的微服务的项目继续搭建网关组件: 搭建微服务项目 前提准备: 1.打开nacos服务注册中心,在浏览器通过这地址访问 http://10.48.185.7:8848/nacos/index.html 2.启动page和product的微服务 1.新建一个网关的项目 2.导入pom依赖 <!-- Spring Boot父启动器…

mysql中的Innodb_buffer_pool_reads和Innodb_buffer_pool_read_requests

Innodb_buffer_pool_reads和Innodb_buffer_pool_read_requests是什么&#xff1f; mysql服务器维护了很多状态变量&#xff08;status variables),这些变量提供了其相关操作的信息。 我们可以通过SHOW [GLOBAL | SESSION] STATUS 查看这些变量以及变量值。这些变量有很多&…

window 命令笔记

1.查看端口 输入“netstat -ano”并回车可以获得所有网络连接活动的列表&#xff0c;在表中&#xff0c;本地地址IP地址后方冒号之后的即是端口号&#xff1a; 如果想要查找特定的端口可以输入命令“netstat -aon|findstr “端口号””&#xff0c;例如“netstat -aon|findstr…

基于IPC-CFX的点对点通信C#

IPC-CFX有两种主要的通信方式&#xff0c;可以通过RabbitMQ发布和订阅&#xff0c;也可以通过request和response进行点对点的通信&#xff0c;本文主要讲的是点对点的通信方式。 在vscode里建立新的dotnet项目&#xff0c;可以通过终端输入dotnet new console来建立&#xff0c…

Spring Cloud 2022 发布,这几个组件要移除了!

继SpringBoot 3.0和SpringFramework 6.0之后&#xff0c;Spring Cloud 终于也推出了新版本——2022.0.0&#xff0c;官网把这个版本命名为Kilburn。 目前在Maven仓库中已经可以下载使用了&#xff0c;通过POM文件即可依赖到项目中&#xff1a; <dependencyManagement>&l…

阿里云声音复刻

阿里云声音复刻 个性化人声定制 阿里云个性化人声定制是智能语音交互产品自学习平台下的一部分 使用方式&#xff1a;https://help.aliyun.com/document_detail/456006.html 方式一&#xff1a;控制台界面定制使用方式 方式二&#xff1a;通过OpenAPI定制&#xff1a;在该页…

微服务保护——Sentinel【实战篇】

一、限流规则&#x1f349; 1.簇点链路&#x1f95d; 簇点链路&#xff1a;就是项目内的调用链路&#xff0c;链路中被监控的每个接口就是一个资源。默认情况下sentinel会监控SpringMVC的每一个端点&#xff08;Endpoint&#xff09;&#xff0c;因此SpringMVC的每一个端点&a…

CS162 11-12 调度与死锁

调度 overview 1.FCFS 可以利用好cache缓存&#xff0c;减少上下文切换。 2.很直观&#xff0c;贪心&#xff0c;可以减少平均的响应时间 3 4. 5.等待调度的时间是平均的 6.优先级翻转&#xff0c;和优先级捐赠 解决 cfs中的调度 死锁 四个必要不充分条件 银行家算法&…

基于 ChatGPT 的 helm 入门

1. 写在最前面 公司最近在推业务上云&#xff08;底层为 k8s 管理&#xff09;&#xff0c;平台侧为了简化业务侧部署的复杂度&#xff0c;基于 helm 、chart 等提供了一个发布平台。 发布平台的使用使业务侧在不了解 helm 、chart 等工具的时候&#xff0c;「只要点点」就可…