消息队列——spring和springboot整合rabbitmq

news2025/1/26 15:43:56

目录

spring整合rabbitmq——生产者

rabbitmq配置文件信息

倒入生产者工程的相关代码

简单工作模式

spring整合rabbitmq——消费者

spring整合rabbitmq——配置详解

SpringBoot整合RabbitMQ——生产者

 SpringBoot整合RabbitMQ——消费者


 

spring整合rabbitmq——生产者

使用原生amqp来写应该已经没有这样的公司了

创建两个工程,一个生产者一个消费者,分别倒入如下依赖

    <dependencies>
        <!--上下文-->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>5.1.7.RELEASE</version>
        </dependency>
        <!--spring整合amqp-->
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>2.1.8.RELEASE</version>
        </dependency>
        <!--单元测试-->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
            <version>5.1.7.RELEASE</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!--编译插件-->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

rabbitmq配置文件信息

rabbitmq.properties文件如下

rabbitmq.host=172.16.98.133
rabbitmq.port=5672
rabbitmq.username=heima
rabbitmq.password=heima
rabbitmq.virtual-host=/itcast

倒入生产者工程的相关代码

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context
       https://www.springframework.org/schema/context/spring-context.xsd
       http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
    <!--加载配置文件-->
    <context:property-placeholder location="classpath:/rabbitmq.properties"/>

    <!-- 定义rabbitmq connectionFactory -->
    <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
                               port="${rabbitmq.port}"
                               username="${rabbitmq.username}"
                               password="${rabbitmq.password}"
                               virtual-host="${rabbitmq.virtual-host}"/>
    <!--定义管理交换机、队列-->
    <rabbit:admin connection-factory="connectionFactory"/>

    <!--定义持久化队列,不存在则自动创建;不绑定到交换机则绑定到默认交换机
    默认交换机类型为direct,名字为:"",路由键为队列的名称
    -->
    <rabbit:queue id="spring_queue" name="spring_queue" auto-declare="true"/>

    <!-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~广播;所有队列都能收到消息~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -->
    <!--定义广播交换机中的持久化队列,不存在则自动创建-->
    <rabbit:queue id="spring_fanout_queue_1" name="spring_fanout_queue_1" auto-declare="true"/>

    <!--定义广播交换机中的持久化队列,不存在则自动创建-->
    <rabbit:queue id="spring_fanout_queue_2" name="spring_fanout_queue_2" auto-declare="true"/>

    <!--定义广播类型交换机;并绑定上述两个队列-->
    <rabbit:fanout-exchange id="spring_fanout_exchange" name="spring_fanout_exchange" auto-declare="true">
        <rabbit:bindings>
            <rabbit:binding queue="spring_fanout_queue_1"/>
            <rabbit:binding queue="spring_fanout_queue_2"/>
        </rabbit:bindings>
    </rabbit:fanout-exchange>

    <!-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~通配符;*匹配一个单词,#匹配多个单词 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -->
    <!--定义广播交换机中的持久化队列,不存在则自动创建-->
    <rabbit:queue id="spring_topic_queue_star" name="spring_topic_queue_star" auto-declare="true"/>
    <!--定义广播交换机中的持久化队列,不存在则自动创建-->
    <rabbit:queue id="spring_topic_queue_well" name="spring_topic_queue_well" auto-declare="true"/>
    <!--定义广播交换机中的持久化队列,不存在则自动创建-->
    <rabbit:queue id="spring_topic_queue_well2" name="spring_topic_queue_well2" auto-declare="true"/>

    <rabbit:topic-exchange id="spring_topic_exchange" name="spring_topic_exchange" auto-declare="true">
        <rabbit:bindings>
            <rabbit:binding pattern="heima.*" queue="spring_topic_queue_star"/>
            <rabbit:binding pattern="heima.#" queue="spring_topic_queue_well"/>
            <rabbit:binding pattern="itcast.#" queue="spring_topic_queue_well2"/>
        </rabbit:bindings>
    </rabbit:topic-exchange>

    <!--定义rabbitTemplate对象操作可以在代码中方便发送消息-->
    <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
</beans>

 上面这个配置文件准备了三种工作模式需要的队列和交换机。

简单工作模式

在测试类中加载配置文件并发送消息


@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
public class ProducerTest {
    //1.注入RabbitTemplate
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testHelloWorld(){
        //2.发送消息
        rabbitTemplate.convertAndSend("spring_queue","hello-yhy");
    }

    /**
     * 发送fanout
     */
    @Test
    public void testFaonut(){
        //2.发送消息
        rabbitTemplate.convertAndSend("spring_fanout_exchange","","spring fanout....");
    }

    /**
     * 发送topic消息
     */
    @Test
    public void testTopic(){
        //2.发送消息
        rabbitTemplate.convertAndSend("spring_topic_exchange","heima.hehe.haha","spring topic....");
    }


}

运行上三个测试方法过后管理端如下,出现了新的队列和交换机和信息

   

spring整合rabbitmq——消费者

导入消费者的XML配置文件

消费者中还要创建对应的监听器的类,不然配置文件爆红

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context
       https://www.springframework.org/schema/context/spring-context.xsd
       http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
    <!--加载配置文件-->
    <context:property-placeholder location="classpath:rabbitmq.properties"/>

    <!-- 定义rabbitmq connectionFactory -->
    <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
                               port="${rabbitmq.port}"
                               username="${rabbitmq.username}"
                               password="${rabbitmq.password}"
                               virtual-host="${rabbitmq.virtual-host}"/>

    <bean id="springQueueListener" class="com.yhy.rabbitmq.listener.SpringQueueListener"/>
<!--    <bean id="fanoutListener1" class="com.yhy.rabbitmq.listener.FanoutListener1"/>-->
<!--    <bean id="fanoutListener2" class="com.yhy.rabbitmq.listener.FanoutListener2"/>-->
<!--    <bean id="topicListenerStar" class="com.yhy.rabbitmq.listener.TopicListenerStar"/>-->
<!--    <bean id="topicListenerWell" class="com.yhy.rabbitmq.listener.TopicListenerWell"/>-->
<!--    <bean id="topicListenerWell2" class="com.yhy.rabbitmq.listener.TopicListenerWell2"/>-->

    <rabbit:listener-container connection-factory="connectionFactory" auto-declare="true">
        <rabbit:listener ref="springQueueListener" queue-names="spring_queue"/>
<!--        <rabbit:listener ref="fanoutListener1" queue-names="spring_fanout_queue_1"/>-->
<!--        <rabbit:listener ref="fanoutListener2" queue-names="spring_fanout_queue_2"/>-->
<!--        <rabbit:listener ref="topicListenerStar" queue-names="spring_topic_queue_star"/>-->
<!--        <rabbit:listener ref="topicListenerWell" queue-names="spring_topic_queue_well"/>-->
<!--        <rabbit:listener ref="topicListenerWell2" queue-names="spring_topic_queue_well2"/>-->
    </rabbit:listener-container>
</beans>

然后创建一个简单工作模式需要的对应类

public class SpringQueueListener implements MessageListener {
    @Override
    public void onMessage(Message message) {
        /**
         * 打印消息
         */
        System.out.println(new String(message.getBody()));
    }
}

在测试类中弄个方法用来加载配置文件,配置文件一加载,上面的监听器就会自动执行的。

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-consumer.xml")
public class ConsumerTest {
    @Test
    public void test1(){
        while(true){
            
        }
    }
}

其余的都是一模一样的写法。

spring整合rabbitmq——配置详解

队列声明的参数 

广播类型的交换机和队列绑定时不需要指定路由key,direct和topic都要指定路由key.

SpringBoot整合RabbitMQ——生产者

引入如下依赖

 <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>

 再在resources目录下写一个配置文件类

# 配置RabbitMQ的基本信息 ip 端口 username password ...
spring:
  rabbitmq:
    host: 
    post: 5672
    username: guest
    password: guest
    virtual-host: /

创建启动类

package com.yhy;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class ProducerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ProducerApplication.class);
    }
}

准备一个配置类

package com.yhy.rabbit.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class RabbitConfig {
    public static final String EXCHANGE_NAME="boot_topic_exchange";
    public static final String QUEUE_NAME="boot_queue";

    //1.交换机
    @Bean("bootExchange")
    public Exchange bootExchange(){
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
    }
    //2.Queue队列
    @Bean("bootQueue")
    public Queue bootQueue(){
        return QueueBuilder.durable(QUEUE_NAME).build();
    }
    //3.队列和交换机绑定关系,Binding
    /**
     * 1.知道哪个队列
     * 2.知道哪个交换机
     * 3.routing key
     */
    @Bean
    public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue,@Qualifier("bootExchange") Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
    }
}

在测试类中准备如下测试方法

@SpringBootTest
@RunWith(SpringRunner.class)
public class ProducerTest {
    //1.注入RabbitTemplate
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSend(){
        rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME,"boot.haha","boot mq hello");
    }
}

运行后可以看见出现有新队列和消息

 SpringBoot整合RabbitMQ——消费者

 在性工程创建一个监听类如下,加上@Component注解之后就可以自动执行一次了

@Component
public class RabbitMQListener {
    @RabbitListener(queues="boot_queue")
    public void ListenerQueue(Message message){
        System.out.println(message);
    }
}

输出如下,成功获取到上面生产者发出的消息

   

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/763437.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

分布式应用之存储(Ceph)

分布式应用之存储&#xff08;Ceph) 一、数据存储类型 存储类型说明典型代表块存储一对一&#xff0c;只能被一个主机挂载使用数据以块为单位进行存储硬盘文件存储一对多&#xff0c;能同时被多个主机挂载/传输使用&#xff0c;数据以文件的形式存储&#xff08;元数据和实际…

Appium+python自动化(十)- 元素定位秘籍助你打通任督二脉 - 上卷(超详解)

1、 常用定位方法讲解 对象定位是自动化测试中很关键的一步&#xff0c;也可以说是最关键的一步&#xff0c;毕竟你对象都没定位那么你想操作也不行。所以本章节的知识宏哥希望小伙伴们多动手去操作和实战&#xff0c;不要仅仅只是书本上的知识&#xff0c;毕竟这个我只能够举例…

AtcoderABC301场

A - Order Something Else A - Order Something Else 题目大意 计算 Takahashi 最少需要支付多少钱才能获得 AtCoder Drink。AtCoder Drink 可以按照原价 P 日元购买&#xff0c;也可以使用折扣券以 Q 日元的价格购买&#xff0c;但必须再额外购买 N 道菜品中的一道才能使用折…

Navicat代码片段存储位置

1、在Navicat的主界面中&#xff0c;选择“工具”——》“选项”——》文件位置&#xff0c;如下图 配置文件就是存放自动保存、代码片段等文件的位置&#xff0c;其中snippets&#xff08;片段&#xff09;就是自定义片段的存储位置了

【Android】在某个model中找不到自己的R资源的原因

背景 在某个新建的model为lib包的时候&#xff0c;我想在这个model内的activity引用R.string 等等资源&#xff0c;但是Android studio找不到。 解决 原来我之前误删了这个manifest中的 补齐包名即可。

Triton_server部署学习笔记

下载镜像 docker pill http://nvcr.io/nvidia/tritonserver:22.07-py3 docker run --gpus all -itd -p8000:8000 -p8001:8001 -p8002:8002 -v /home/ai-developer/server/docs/examples/model_repository/:/models nvcr.io/nvidia/tritonserver:22.07-py3 docker exec -it a5…

使用shell监控应用运行状态通过企业微信接收监控通知

目的&#xff1a;编写shell脚本来监控应用服务运行状态&#xff0c;若是应用异常则自动重启应用通过企业微信接收监控告警通知 知识要点&#xff1a; 使用shell脚本监控应用服务使用shell脚本自动恢复异常服务通过企业微信通知接收监控结果shell脚本使用数组知识&#xff0c;…

[黑苹果EFI]Lenovo ThinkPad T490电脑 Hackintosh 黑苹果引导文件

原文来源于黑果魏叔官网&#xff0c;转载需注明出处。&#xff08;下载请直接百度黑果魏叔&#xff09; 硬件型号驱动情况 主板Lenovo ThinkPad T490 处理器Intel Intel Core i5 8265U (Quad Core)已驱动 内存16 GB:8 GB Samsung DDR 4 2666 Mhz *2已驱动 硬盘PC SN520 NVM…

maven项目使用java命令行运行类的main方式示例

因为需要测试一个东西,本地测试无问题,测试环境一直有问题,就想在测试环境测试下 直接写了个测试类,main方法直接运行测试逻辑 测试类写好,发现自己不会使用命令行运行 运行测试类一直报"错误: 找不到或无法加载主类" 折腾好久,终于找到两个帖子 记录下来,避免自己下…

Docker容器常用命令大全:熟练掌握使容器优化更加高效

&#x1f337;&#x1f341; 博主 libin9iOak带您 Go to New World.✨&#x1f341; &#x1f984; 个人主页——libin9iOak的博客&#x1f390; &#x1f433; 《面试题大全》 文章图文并茂&#x1f995;生动形象&#x1f996;简单易学&#xff01;欢迎大家来踩踩~&#x1f33…

mysql查询当天/昨天/近7天/近30天/本月/上个月/本季度/上季度/本年/上一年 数据

查询当天数据 select * from tab where FROM_UNIXTIME(fabutime, %Y%m%d) 20230717; mysql TO_DAYS(date) 函数 TO_DAYS(date) 给定一个日期date, 返回一个天数 (从年份0开始的天数 )。 mysql> SELECT TO_DAYS(950501); -> 728779 mysql查询今天、昨天、7天、近30天…

【Linux后端服务器开发】TCP协议

目录 一、TCP报头结构 二、确认应答机制 三、超时重传机制 四、连接管理机制 五、滑动窗口 六、拥塞控制 七、应答策略 一、TCP报头结构 TCP全称为传输控制协议&#xff08;Transmission Control Protocol&#xff09;&#xff0c;数据在传输过程需要严格的控制 TCP协议…

CONNECT BY 介绍以及用法

CONNECT BY 介绍以及用法 CONNECT BY作用是&#xff0c;NNECT BY用来查询树形数据&#xff0c; CONNECT BY 语句的用法 语句格式&#xff1a; start with 条件A connect by prior orgid parentorgid 用法 情况1&#xff1a; start with 条件A connect by…

【树链】CF1702 G

Problem - G2 - Codeforces 题意&#xff1a; 思路&#xff1a; 首先&#xff0c;一条树链可以被分为两部分&#xff1a;左半部分和右半部分 我们可以把所有可能是链上的点排序&#xff0c;把深度最大的点默认成起点st&#xff0c;接下来去找终点ed ed在和st不同的链上 且 …

MSA【1】:Segment Anything Model for Medical Image Analysis: an Experimental Study

文章目录 前言1. Abstraction & Introduction1.1. Abstraction1.2. Introduction1.2.1. What is SAM?1.2.2. How to segment medical images with SAM? 2. Methodology2.1. SAM is used in the process of segmentation of medical images2.1.1. Semi-automated annotati…

【压力传感器】LPS22DFTR、LPS33KTR 绝对 压力,ADP5131 排气式压力计 50kPa 6-DIP

LPS22DFTR MEMS纳米压力传感器是一款超紧凑型压阻式绝对压力传感器&#xff0c;可用作数字输出气压计。LPS22DF的功耗更低&#xff0c;与上一代产品相比压力噪声更低。该器件包括一个传感元件和一个IC接口&#xff0c;通过I2C、MIPI I3CSM或SPI接口从传感元件向应用程序进行通信…

蒙德里安的梦想

题目 求把 NM 的棋盘分割成若干个 12 的长方形&#xff0c;有多少种方案。 例如当 N2&#xff0c;M4 时&#xff0c;共有 5 种方案。当 N2&#xff0c;M3 时&#xff0c;共有 3 种方案。 如下图所示&#xff1a; 输入格式 输入包含多组测试用例。 每组测试用例占一行&…

GO语言基础-04-数据类型-04-map(map的排序)

文章目录 1. 按value排序1.1 思路1.2 语法1.3 完整示例 2. 按key排序2.1 思路2.2 语法示例2.3 完整示例2.4 完整示例 1. 按value排序 1.1 思路 map本身的顺序不可控&#xff0c;我们考虑如下方法实现排队&#xff1a; 思路一&#xff1a;通过一个切片来接收拍好队的map成员思…

多线程应用场景

文章目录 前言一、CountDownLatch倒计时锁二、如何控制线程并发数&#xff1f;三、浅聊ThreadLocal1.ThreadLocal定义2.ThreadLocal源码解读3.关于ThreadLocal的一个案例 总结 前言 本篇介绍多线程中的应用场景&#xff0c;比如倒计时锁CountDownLatch、信号量Semaphore、以及…

【雕爷学编程】Arduino动手做(163)---大尺寸8x8LED方格屏模块5

37款传感器与模块的提法&#xff0c;在网络上广泛流传&#xff0c;其实Arduino能够兼容的传感器模块肯定是不止37种的。鉴于本人手头积累了一些传感器和执行器模块&#xff0c;依照实践出真知&#xff08;一定要动手做&#xff09;的理念&#xff0c;以学习和交流为目的&#x…