SpringAMQP消息队列(SpringBoot集成RabbitMQ)

news2025/1/10 23:57:13

一、初始配置

1、导入maven坐标

<!--rabbitmq-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

2、yml配置

spring:
    rabbitmq:
        host: 你的rabbitmq的ip
        port: 5672
        username: guest
        password: guest

二、基本消息队列

1、创建队列

访问接口:http://localhost:15672,账号密码都为guest

进入后左下角有Add queue添加队列,我已添加队列为MqTest1

2、发布消息

@SpringBootTest
class RabbitMQDemoPublishApplicationTests {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    void contextLoads() {
        String queue="MqTest1";
        String message="message1";
        rabbitTemplate.convertAndSend(queue,message);
    }

}

此时可以看到队列有一个消息

3、接受消息

package com.rabbitmqdemoconsumer.rabbitmq;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class SpringRabbitLeistener {

    @RabbitListener(queues = "MqTest1")
    public void listenSimpleQueueMessage(String msg){
        System.out.println("接收到的消息:"+msg);
    }
}

此时控制台输出接收到的消息

三、工作消息队列(Work Queue)

可以提高消息处理速度,避免队列消息堆积

1、发布消息

@SpringBootTest
class RabbitMQDemoPublishApplicationTests {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    void contextLoads() {
        String queue="MqTest1";
        String message="message1";
        for (int i=0;i<10;i++){
            rabbitTemplate.convertAndSend(queue,message);
        }
    }

}

此时队列有10条消息

2、接受消息

package com.rabbitmqdemoconsumer.rabbitmq;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class SpringRabbitLeistener {

    @RabbitListener(queues = "MqTest1")
    public void listenSimpleQueueMessage1(String msg){
        System.out.println("consume1接收到的消息:"+msg);
    }
    @RabbitListener(queues = "MqTest1")
    public void listenSimpleQueueMessage2(String msg){
        System.out.println("consume2接收到的消息:"+msg);
    }
}

控制台输出结果

consume1接收到的消息:message1
consume2接收到的消息:message1
consume1接收到的消息:message1
consume2接收到的消息:message1
consume1接收到的消息:message1
consume2接收到的消息:message1
consume1接收到的消息:message1
consume2接收到的消息:message1
consume1接收到的消息:message1
consume2接收到的消息:message1

4、消息预取问题

但是此时有一个问题就是消息预取,比如队列有10条消息,两个消费者各自直接先预取5个消息,如果一个消费者接受消息的速度慢,一个快,就会导致一个消费者已经完成工作,另一个还在慢慢处理,会造成消息堆积消费者身上,要解决这个问题需要在yml文件配置相关配置

  rabbitmq:
    host: 43.140.244.236
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    listener:
      simple:
        prefetch: 1 #每次只能取一个,处理完才能取下一个消息

这样可以避免消息预取导致堆积

四、发布订阅模式

exchange是交换机,负责消息路由,但不存储消息,路由失败则消息丢失

五、发布订阅模式之广播模式(Fanout)

1、Fanout配置类(@Bean声明)

package com.rabbitmqdemoconsumer.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanountConfig {
    //交换机声明
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("FanountExchange");
    }
    //声明队列1
    @Bean
    public Queue Fanount_Qeueue1(){
        return new Queue("Fanount_Qeueue1");
    }
    //声明队列2
    @Bean
    public Queue Fanount_Qeueue2(){
        return new Queue("Fanount_Qeueue2");
    }
    //绑定交换机和队列
    @Bean
    public Binding bindingFanount_Qeueue1(Queue Fanount_Qeueue1,FanoutExchange fanoutExchange){
        return BindingBuilder.bind(Fanount_Qeueue1).to(fanoutExchange);
    }
    @Bean
    public Binding bindingFanount_Qeueue2(Queue Fanount_Qeueue2,FanoutExchange fanoutExchange){
        return BindingBuilder.bind(Fanount_Qeueue2).to(fanoutExchange);
    }
}

可以看到声明的队列

已经声明的交换机(第一个)

绑定关系

2、发送消息

首先发送10条消息,经过交换机转发到队列

@SpringBootTest
class RabbitMQDemoPublishApplicationTests {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    void contextLoads2() {
        String exchange="FanountExchange";
        String message="message";
        for (int i=0;i<10;i++){
            rabbitTemplate.convertAndSend(exchange,"",message);
        }
    }

}

此时可以看到两个队列各自有十条消息

3、接受消息

 //监听交换机Fanount_Qeueue1
    @RabbitListener(queues = "Fanount_Qeueue1")
    public void listenFanountQeueue1(String msg){
        System.out.println("Fanount_Qeueue1接收到的消息:"+msg);
    }
    //监听交换机Fanount_Qeueue2
    @RabbitListener(queues = "Fanount_Qeueue2")
    public void listenFanountQeueue2(String msg){
        System.out.println("Fanount_Qeueue2接收到的消息:"+msg);
    }

控制台结果如下(共发送20条,每个队列10条)

Fanount_Qeueue1接收到的消息:message
Fanount_Qeueue1接收到的消息:message
Fanount_Qeueue1接收到的消息:message
Fanount_Qeueue1接收到的消息:message
Fanount_Qeueue2接收到的消息:message
Fanount_Qeueue1接收到的消息:message
Fanount_Qeueue2接收到的消息:message
Fanount_Qeueue1接收到的消息:message
Fanount_Qeueue2接收到的消息:message
Fanount_Qeueue1接收到的消息:message
Fanount_Qeueue2接收到的消息:message
Fanount_Qeueue1接收到的消息:message
Fanount_Qeueue2接收到的消息:message
Fanount_Qeueue1接收到的消息:message
Fanount_Qeueue2接收到的消息:message
Fanount_Qeueue2接收到的消息:message
Fanount_Qeueue2接收到的消息:message
Fanount_Qeueue2接收到的消息:message

六、发布订阅模式之路由模式(Direct)

会将消息根据规则路由到指定的队列

1、声明(基于@RabbitListener声明)

package com.rabbitmqdemoconsumer.rabbitmq;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class SpringRabbitLeistener {

    /**
     * 绑定交换机和队列,并为key赋值
     * @param msg
     */
    @RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "DirectQueue1"),
        exchange = @Exchange(name = "DirectExchange",type = ExchangeTypes.DIRECT),
        key = {"red","blue"}
    ))
    public void listenDirectQueue1(String msg){
        System.out.println("listenDirectQueue1接收到的消息:"+msg);
    }


    @RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "DirectQueue2"),
        exchange = @Exchange(name = "DirectExchange",type = ExchangeTypes.DIRECT),
        key = {"red","yellow"}
    ))
    public void listenDirectQueue2(String msg){
        System.out.println("listenDirectQueue2接收到的消息:"+msg);
    }
}

此时可以看到声明的队列

声明的交换机(第一个)

绑定关系

2、发送给blue

发送消息


@SpringBootTest
class RabbitMQDemoPublishApplicationTests {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    void contextLoads2() {
        String exchange="DirectExchange";
        String message="HelloWorld";
        for (int i=0;i<10;i++){
            rabbitTemplate.convertAndSend(exchange,"blue",message);
        }
    }

}

接收消息

listenDirectQueue1(red,blue)接收到的消息:HelloWorld
listenDirectQueue1(red,blue)接收到的消息:HelloWorld
listenDirectQueue1(red,blue)接收到的消息:HelloWorld
listenDirectQueue1(red,blue)接收到的消息:HelloWorld
listenDirectQueue1(red,blue)接收到的消息:HelloWorld
listenDirectQueue1(red,blue)接收到的消息:HelloWorld
listenDirectQueue1(red,blue)接收到的消息:HelloWorld
listenDirectQueue1(red,blue)接收到的消息:HelloWorld
listenDirectQueue1(red,blue)接收到的消息:HelloWorld
listenDirectQueue1(red,blue)接收到的消息:HelloWorld

3、发送给red

发送消息

@SpringBootTest
class RabbitMQDemoPublishApplicationTests {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    void contextLoads2() {
        String exchange="DirectExchange";
        String message="HelloWorld";
        for (int i=0;i<10;i++){
            rabbitTemplate.convertAndSend(exchange,"blue",message);
        }
    }

}

接收消息

listenDirectQueue1(red,blue)接收到的消息:HelloWorld
listenDirectQueue2(red,yellow)接收到的消息:HelloWorld
listenDirectQueue2(red,yellow)接收到的消息:HelloWorld
listenDirectQueue1(red,blue)接收到的消息:HelloWorld
listenDirectQueue1(red,blue)接收到的消息:HelloWorld
listenDirectQueue2(red,yellow)接收到的消息:HelloWorld
listenDirectQueue1(red,blue)接收到的消息:HelloWorld
listenDirectQueue2(red,yellow)接收到的消息:HelloWorld
listenDirectQueue2(red,yellow)接收到的消息:HelloWorld
listenDirectQueue1(red,blue)接收到的消息:HelloWorld
listenDirectQueue2(red,yellow)接收到的消息:HelloWorld
listenDirectQueue1(red,blue)接收到的消息:HelloWorld
listenDirectQueue2(red,yellow)接收到的消息:HelloWorld
listenDirectQueue1(red,blue)接收到的消息:HelloWorld
listenDirectQueue1(red,blue)接收到的消息:HelloWorld
listenDirectQueue2(red,yellow)接收到的消息:HelloWorld
listenDirectQueue2(red,yellow)接收到的消息:HelloWorld
listenDirectQueue1(red,blue)接收到的消息:HelloWorld
listenDirectQueue2(red,yellow)接收到的消息:HelloWorld
listenDirectQueue1(red,blue)接收到的消息:HelloWorld

七、发布订阅模式之广播模式(Topic)

Queue与Exchange指定BindingKey可以使用通配符:

#:代指0个或多个单词

*:代指一个单词

比如:

bindingkey: china.# ->中国的所有消息

bindingkey: #.weather ->所以国家的天气

1、声明

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "TopicQueue1"),
        exchange = @Exchange(name = "TopicExchange",type = ExchangeTypes.TOPIC),
        key = {"china.#"}
    ))
public void listenTopicQueue1(String msg){
    System.out.println("listenTopicQueue1接收到的消息:"+msg);
}

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "TopicQueue2"),
    exchange = @Exchange(name = "TopicExchange",type = ExchangeTypes.TOPIC),
    key = {"#.news"}
))
public void listenTopicQueue2(String msg){
    System.out.println("listenTopicQueue2接收到的消息:"+msg);
}

队列

交换机(第四个)

绑定关系

2、发送消息(测试1)

package com.rabbitmqdemo;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class RabbitMQDemoPublishApplicationTests {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    void contextLoads2() {
        String exchange="TopicExchange";
        String message="HelloWorld";
        for (int i=0;i<10;i++){
            rabbitTemplate.convertAndSend(exchange,"china.news",message);
        }
    }

}

接收消息

TopicQueue2接收到的消息:HelloWorld
TopicQueue1接收到的消息:HelloWorld
TopicQueue2接收到的消息:HelloWorld
TopicQueue1接收到的消息:HelloWorld
TopicQueue2接收到的消息:HelloWorld
TopicQueue1接收到的消息:HelloWorld
TopicQueue2接收到的消息:HelloWorld
TopicQueue1接收到的消息:HelloWorld
TopicQueue2接收到的消息:HelloWorld
TopicQueue1接收到的消息:HelloWorld
TopicQueue2接收到的消息:HelloWorld
TopicQueue1接收到的消息:HelloWorld
TopicQueue2接收到的消息:HelloWorld
TopicQueue1接收到的消息:HelloWorld
TopicQueue2接收到的消息:HelloWorld
TopicQueue1接收到的消息:HelloWorld
TopicQueue2接收到的消息:HelloWorld
TopicQueue1接收到的消息:HelloWorld
TopicQueue1接收到的消息:HelloWorld
TopicQueue2接收到的消息:HelloWorld

3、发送消息(测试2)

发送消息

package com.rabbitmqdemo;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class RabbitMQDemoPublishApplicationTests {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    void contextLoads2() {
        String exchange="TopicExchange";
        String message="HelloWorld";
        for (int i=0;i<10;i++){
            rabbitTemplate.convertAndSend(exchange,"china.weather",message);
        }
    }

}

接收消息

TopicQueue1接收到的消息:HelloWorld
TopicQueue1接收到的消息:HelloWorld
TopicQueue1接收到的消息:HelloWorld
TopicQueue1接收到的消息:HelloWorld
TopicQueue1接收到的消息:HelloWorld
TopicQueue1接收到的消息:HelloWorld
TopicQueue1接收到的消息:HelloWorld
TopicQueue1接收到的消息:HelloWorld
TopicQueue1接收到的消息:HelloWorld
TopicQueue1接收到的消息:HelloWorld

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/355427.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

4G模块DTU网关远程抄表方案(三):水表188协议

4G模块DTU网关远程抄表方案&#xff08;三&#xff09;&#xff1a;水气电表188协议 1 CTJ 188协议简介 CJ/T188协议规定了户用计量仪表(以下简称仪表)&#xff0c;包括水表、燃气表、热量表等仪表数据传输的基本原则&#xff0c;接口形式及物理性能、数据链路、数据标识及数…

目标检测回归损失函数简介:SmoothL1/IoU/GIoU/DIoU/CIoU Loss

目标检测 回归损失函数1、Smooth L1 Loss2、 IoU Loss3、 GIoU Loss &#xff08;Generalized-IoU Loss&#xff09;4、 DIoU Loss &#xff08;Distance-IoU Loss&#xff09;5、 CIoU Loss &#xff08;Complete-IoU Loss&#xff09;总结&#xff1a;目标检测任务的损失函数…

【计算机网络】数据链路层(下)

文章目录媒体接入控制媒体接入控制-静态划分信道随机接入 CSMACD协议随机接入 CSMACA协议MAC地址MAC地址作用MAC地址格式MAC地址种类MAC地址的发送顺序单播MAC地址广播MAC地址多播MAC地址随机MAC地址IP地址区分网络编号IP地址与MAC地址的封装位置转发过程中IP地址与MAC地址的变…

1.1 硬件与micropython固件烧录及自编译固件

1.ESP32硬件和固件 淘宝搜ESP32模块,20-50元都有,自带usb口,即插即用. 固件下载地址:MicroPython - Python for microcontrollers 2.烧录方法 为简化入门难度,建议此处先使用带GUI的开发工具THonny,记得不是给你理发的tony老师. 烧录的入口是: 后期通过脚本一次型生成和烧…

[软件工程导论(第六版)]第3章 需求分析(课后习题详解)

文章目录1. 为什么要进行需求分析&#xff1f;通常对软件系统有哪些需求&#xff1f;2. 怎样与用户有效地沟通以获取用户的真实需求&#xff1f;3. 银行计算机储蓄系统的工作过程大致如下&#xff1a;储户填写的存款单或取款单由业务员输入系统&#xff0c;如果是存款则系统记录…

C语言经典编程题100例(81~100)

目录81、习题7-7 字符串替换82、习题8-10 输出学生成绩83、习题8-2 在数组中查找指定元素84、习题8-3 数组循环右移85、题8-9 分类统计各类字符个数86、习题9-2 计算两个复数之积87、习题9-6 按等级统计学生成绩88、习题11-1 输出月份英文名89、习题11-2 查找星期90、练习10-1 …

分享113个HTML娱乐休闲模板,总有一款适合您

分享113个HTML娱乐休闲模板&#xff0c;总有一款适合您 113个HTML娱乐休闲模板下载链接&#xff1a;https://pan.baidu.com/s/1aWYO2j2pSTjyqlQPHa0-Jw?pwdbium 提取码&#xff1a;bium Python采集代码下载链接&#xff1a;采集代码.zip - 蓝奏云 海上的沤鸟HTML网页模板…

(三十六)Vue解决Ajax跨域问题

文章目录环境准备vue的跨域问题vue跨域问题解决方案方式一方式二上一篇&#xff1a;&#xff08;三十五&#xff09;Vue之过渡与动画 环境准备 首先我们要借助axios发送Ajax&#xff0c;axios安装命令&#xff1a;npm i axios 其次准备两台服务器&#xff0c;这里使用node.j…

Linux | 网络通信 | 序列化和反序列化的讲解与实现

文章目录为什么要序列化&#xff1f;协议的实现服务端与客户端代码实现为什么要序列化&#xff1f; 由于默认对齐数的不同&#xff0c;不同的平台对相同数据进行内存对齐后&#xff0c;可能得到不同的数据。如果直接将这些数据进行网络传输&#xff0c;对方很可能无法正确的获…

【数据结构】单链表的接口实现(附图解和源码)

单链表的接口实现&#xff08;附图解和源码&#xff09; 文章目录单链表的接口实现&#xff08;附图解和源码&#xff09;前言一、定义结构体二、接口实现&#xff08;附图解源码&#xff09;1.开辟新空间2.头插数据3.头删数据4.打印整个单链表5.尾删数据6.查找单链表中的数据7…

Linux 磁盘挂载

目录 Linux硬盘分区 硬盘设备的文件名 /dev/sd[a-z] 硬盘分区 识别硬盘的文件名 Linux文件系统 文件系统类型 Linux如何保存文件 VFS虚拟文件系统 磁盘挂载命令 lsblk 查看系统的磁盘使用情况 fdisk 硬盘分区 mkfs 格式化文件系统 mount 挂载命令 df 显示磁盘空间…

Java中的链表实现介绍

Java中的链表实现介绍 学习数据结构的的链表和树时&#xff0c;会遇到节点&#xff08;node&#xff09;和链表&#xff08;linked list&#xff09;这两个术语&#xff0c;节点是处理数据结构的链表和树的基础。节点是一种数据元素&#xff0c;包括两个部分&#xff1a;一个是…

pytest总结

这里写目录标题一、pytest的命名规则二、界面化配置符合命名规则的方法前面会有运行标记三、pytest的用例结构三部分组成四、pytest的用例断言断言写法&#xff1a;五、pytest测试框架结构六、pytest参数化用例1、pytest参数化实现方式2、单参数&#xff1a;每一条测试数据都会…

第五十七章 树状数组(二)

第五十七章 树状数组&#xff08;二&#xff09;一、差分的缺陷二、树状数组与差分三、例题题目描述输入格式输出格式样例 #1样例输入 #1样例输出 #1提示样例 1 解释&#xff1a;数据规模与约定代码一、差分的缺陷 差分的作用是能够在O(1)的时间内给一段区间加上相同的数字&am…

【计算机网络】数据链路层(上)

文章目录数据链路层概述封装成帧透明传输差错检测奇偶校验循环冗余校验CRC可靠传输可靠传输基本概念实现机制 — 停止-等待协议实现机制 — 回退N帧协议实现机制 — 选择重传协议点对点协议PPP数据链路层概述 首先我蛮来看看数据链路层在网络体系结构中的地位。如图所示主机h1…

key的作用原理与列表的遍历、追加、搜索、排序

目录 一、key的作用原理 二、实现列表遍历并对在列表最前方进行追加元素 三、实现列表过滤搜索 1、用computed计算属性来实现 2、用watch监听输入值的变化来实现 四、按年龄排序输出列表 一、key的作用原理 1. 虚拟DOM中key的作用&#xff1a; key是虚拟DOM对象的标识&a…

博彩公司 BetMGM 发生数据泄露,“赌徒”面临网络风险

Bleeping Computer 网站披露&#xff0c;著名体育博彩公司 BetMGM 发生一起数据泄露事件&#xff0c;一名威胁攻击者成功窃取其大量用户个人信息。 据悉&#xff0c;BetMGM 数据泄漏事件中&#xff0c;攻击者盗取了包括用户姓名、联系信息&#xff08;如邮政地址、电子邮件地址…

Unity如何实现3D物体拆解组装

一.前言 最近有一个需求,是做一个发动机的拆卸和安装功能,其实是一个很简单的功能,但是其中有一个点我觉的非常有意思,就是拖拽组装时,物体如何精准拖到目标位置,思路有了,但是我一直找不到实现方式,早晨刷牙时无意间想到了叉乘,我才有了解决方案。就凭这一次的灵光乍…

AutoJs7、8版本快速接通vscode进行调试脚本

AutoJs7、8版本快速接通vscode进行调试脚本 作者:虚坏叔叔 博客:https://xuhss.com 早餐店不会开到晚上,想吃的人早就来了!😄 # AutoJs7、8快速接通vscode进行调试脚本一、下载AutoJs并安装 https://download.csdn.net/download/huangbangqing12/87449177 下载完成后,…

【图神经网络】图拉普拉斯滤波器如何实现全通、低通、高通滤波

【图神经网络】图拉普拉斯滤波器如何实现全通、低通、高通滤波 文章目录【图神经网络】图拉普拉斯滤波器如何实现全通、低通、高通滤波1. 前言2. 符号说明3. 三种滤波3.1 全通滤波3.2 低通滤波3.2.1 平滑信号分析3.2.2 广义拉普拉斯平滑滤波器3.3 高通滤波4. 总结1. 前言 GCN&…