RabbitMQ-SpringAMQP使用介绍

news2025/1/10 18:20:38

RabbitMQ

    • 1. Spring AMQP
      • 1.1 引入依赖
      • 1.2 消息发送
      • 1.3 消息接收
      • 1.4 WorkQueue模型
        • 1.4.1 实例代码
        • 1.4.2 能者多劳
        • 1.4.3 总结
      • 1.5交换机
      • 1.6 Fanout交换机(广播)
      • 1.7 Direct交换机(订阅)
      • 1.8 Topic交换机(通配符订阅)
      • 1.9 声明队列and交换机
      • 1.10 基于注解声明
      • 1.11 消息转换器
        • 1.11.1 配置JSON转换器
        • 1.11.2 示例

RabbitMQ采用了AMQP协议,因此它具备跨语言的特性。任何语言只要遵循AMQP协议收发消息,都可以与RabbitMQ交互。并且RabbitMQ官方也提供了各种不同语言的客户端。

1. Spring AMQP

RabbitMQ官方提供的Java客户端编码相对复杂,一般生产环境下我们更多会结合Spring来使用。而Spring的官方刚好基于RabbitMQ提供了这样一套消息收发的模板工具:SpringAMQP。并且还基于SpringBoot对其实现了自动装配,使用起来非常方便。

官网:https://spring.io/projects/spring-amqp/

SpringAMQP提供了三个功能:

  • 自动声明队列、交换机及其绑定关系
  • 基于注解的监听器模式,异步接收消息
  • 封装了RabbitTemplate工具,用于发送消息

1.1 引入依赖

<!--AMQP依赖,包含RabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

1.2 消息发送

配置MQ地址,在publisher服务的application.yml中添加配置:

spring:
  rabbitmq:
    host: 192.168.1.200 # 虚拟机IP
    port: 5672 # 端口
    virtual-host: /hmall # 虚拟主机
    username: feng # 用户名
    password: 123 # 密码

然后在publisher服务中编写测试类SpringAmqpTest,并利用RabbitTemplate实现消息发送:

@SpringBootTest
public class SpringAmopTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void SimpleQueue(){
        //队列
        String queueName = "simpleQueue";
        //消息
        String message = "Hello World";
        //发送消息
        rabbitTemplate.convertAndSend(queueName,message);
    }
}

1.3 消息接收

在consumer端的application.xml配置MQ信息:

spring:
  rabbitmq:
    host: 192.168.1.200 # 你的虚拟机IP
    port: 5672 # 端口
    virtual-host: /hmall # 虚拟主机
    username: feng # 用户名
    password: 123321 # 密码

编写监听器

@Component
@Slf4j
public class SpringRabbitListener {

    @RabbitListener(queues = "simpleQueue")
    public void listenSimpleQueue(String message){

        log.info("Simple Queue消息: "+message);

    }
}

重启服务,即可实现监听!

1.4 WorkQueue模型

Work queues,任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息。

在这里插入图片描述

1.4.1 实例代码

publisher段代码:

@Test
public void WorkQueue(){
    //队列
    String queueName = "work.queue";
    //消息
    String message = "Hello World_";
    //发送消息50次
    for (int i = 0; i < 50; i++) {
        rabbitTemplate.convertAndSend(queueName,message+i);
    }

}

consumer段消费代码:

//消费者1
@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String message){

    System.out.println("消费者1处理任务{:"+message+"}"+ LocalTime.now());
}
//消费者2
@RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String message){

    System.err.println("消费者2处理任务{:"+message+"}"+ LocalTime.now());
}

work queue模式,消息分配模式默认是平均分配,每个消费者处理的任务数量是平均的

也就是上面案例中,消息队列一共五十条消息,消费者1和消费者2,每人会处理25条。

1.4.2 能者多劳

如果想修改work模式的分配模式,比如想按性能分配,性能好的,多处理任务,那些就需要添加配置:

application.xml:

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

这样,50条消息,由消费者1和消费者2 按性能快慢决定执行数量的多少,谁先执行完成当前任务,就可以去执行消息队列中的下一个任务,没有预分配机制;

1.4.3 总结

work queue模型的使用:

  • 多个消费者绑定到一个消息队列,可以加快消息处理速度;
  • 同一条消息只会被一个消费者处理;
  • 通过设置prefetch来控制消费者预取的消息数量,处理完一条再处理下一条,实现能者多劳;

1.5交换机

交换机exchange,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。

在这里插入图片描述

  • Publisher:生产者,不再发送消息到队列中,而是发给交换机
  • Exchange:交换机,一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。
  • Queue:消息队列也与以前一样,接收消息、缓存消息。不过队列一定要与交换机绑定。
  • Consumer:消费者,与以前一样,订阅队列,没有变化

交换机的作用:

  • 接收publisher发送的消息
  • 将消息按照规则路由到与之绑定的队列
  • 不能缓存消息,路由失败,消息丢失

交换机的类型有四种:

  • Fanout:广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机
  • Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列
  • Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符
  • Headers:头匹配,基于MQ的消息头匹配,用的较少。

1.6 Fanout交换机(广播)

FanoutExchange会将消息路由到每个绑定的队列。

在这里插入图片描述

  • 可以有多个队列
  • 每个队列都要绑定到Exchange(交换机)
  • 生产者发送的消息,只能发送到交换机
  • 交换机把消息发送给绑定过的所有队列
  • 订阅队列的消费者都能拿到消息

1.7 Direct交换机(订阅)

基于RoutingKey(路由key)发送给订阅了消息的队列。

在这里插入图片描述

在Direct模型下:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
  • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

Direct交换机与Fanout交换机差异:

  • Fanout交换机将消息路由给每一个与之绑定的队列
  • Direct交换机根据RoutingKey判断路由给哪个队列
  • 如果多个队列具有相同的RoutingKey,则与Fanout功能类似

1.8 Topic交换机(通配符订阅)

Topic与direct相似,都是通过RoutingKey绑定,按照RoutingKey进行路由,只不过Topic类型的Exchange在绑定bandingKey时,可以使用通配符配置:

通配符规则:

  • #:匹配一个或多个词
  • *:匹配不多不少恰好1个词

举例:

  • item.#:能够匹配item.spu.insert 或者 item.spu
  • item.*:只能匹配item.spu

Direct交换机与Topic交换机的差异:

  • Topic交换机接收的消息RoutingKey必须是多个单词,以 . 分割
  • Topic交换机与队列绑定时的bindingKey可以指定通配符
  • #:代表0个或多个词
  • *:代表1个词

1.9 声明队列and交换机

fanout交换机

package com.itheima.consumer.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutConfig {

    /**
     * 声明Fanout交换机
     * @return Fanout交换机类型
     */
    public FanoutExchange fanoutExchange() {
         return new FanoutExchange("fanoutExchange", true, false);
    }

    /**
     * 声明队列
     * @return Queue类型
     */
    public Queue queue() {
        return new Queue("queue", true, false, false);
    }

    /**
     * 队列绑定到交换机
     * @param queue
     * @param fanoutExchange
     * @return
     */
    public Binding binding(Queue queue ,FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }
}

Direct交换机

package com.itheima.consumer.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DirectConfig {

    /**
     * 声明交换机
     * @return Direct类型交换机
     */
    @Bean
    public DirectExchange directExchange(){
        return ExchangeBuilder.directExchange("hmall.direct").build();
    }

    /**
     * 第1个队列
     */
    @Bean
    public Queue directQueue1(){
        return new Queue("direct.queue1");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange){
        return BindingBuilder.bind(directQueue1).to(directExchange).with("red");
    }
    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue1WithBlue(Queue directQueue1, DirectExchange directExchange){
        return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");
    }

    /**
     * 第2个队列
     */
    @Bean
    public Queue directQueue2(){
        return new Queue("direct.queue2");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue2WithRed(Queue directQueue2, DirectExchange directExchange){
        return BindingBuilder.bind(directQueue2).to(directExchange).with("red");
    }
    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue2WithYellow(Queue directQueue2, DirectExchange directExchange){
        return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");
    }
}

1.10 基于注解声明

Direct模式

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue1"),
    exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
    key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
    System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue2"),
    exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
    key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
    System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}

Topic模式:

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue1"),
    exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC),
    key = "china.#"
))
public void listenTopicQueue1(String msg){
    System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");
}

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue2"),
    exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC),
    key = "#.news"
))
public void listenTopicQueue2(String msg){
    System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
}

1.11 消息转换器

Spring的消息发送代码接收的消息体是一个Object,而在数据传输时,它会把发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。

只不过,默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:

  • 数据体积过大
  • 有安全漏洞
  • 可读性差
1.11.1 配置JSON转换器

publisherconsumer两个服务中都引入依赖:

<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-xml</artifactId>
    <version>2.9.10</version>
</dependency>

注意,如果项目中引入了spring-boot-starter-web依赖,则无需再次引入Jackson依赖。

配置消息转换器,在publisherconsumer两个服务的启动类中添加一个Bean即可:

@Bean
public MessageConverter messageConverter(){
    // 1.定义消息转换器
    Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
    // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
    jackson2JsonMessageConverter.setCreateMessageIds(true);
    return jackson2JsonMessageConverter;
}

消息转换器中添加的messageId可以便于我们将来做幂等性判断。

1.11.2 示例

publisher

    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void SimpleQueue(){
        //队列
        String queueName = "simpleQueue";
        //消息
        Map<String,Object> map = new HashMap<>(2);
        map.put("name", "xiaoming");
        map.put("age", 18);
//        String message = "测试消息!!!";
        //发送消息
        rabbitTemplate.convertAndSend(queueName,map);
    }

consumer

@RabbitListener(queues = "simpleQueue")
public void listenSimpleQueue(Map<String, Object> map) throws InterruptedException {

    log.info("Simple Queue消息: "+map);
    System.out.println("sout=============:"+map.toString());
}
w HashMap<>(2);
        map.put("name", "xiaoming");
        map.put("age", 18);
//        String message = "测试消息!!!";
        //发送消息
        rabbitTemplate.convertAndSend(queueName,map);
    }

consumer

@RabbitListener(queues = "simpleQueue")
public void listenSimpleQueue(Map<String, Object> map) throws InterruptedException {

    log.info("Simple Queue消息: "+map);
    System.out.println("sout=============:"+map.toString());
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2274484.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Can I Use 实战指南:优化你的前端开发流程

引言 浏览器兼容性问题一直是前端开发中最令人头疼的难题。无论是新技术的支持差异&#xff0c;还是老旧浏览器的兼容需求&#xff0c;开发者常常需要花费大量时间解决这些问题。而 Can I Use 正是为此而生的前端开发利器。它提供详尽的浏览器兼容性数据&#xff0c;帮助开发者…

会员制营销与门店业绩提升:以开源AI智能名片S2B2C商城小程序为例的深度剖析

摘要&#xff1a;在数字化时代&#xff0c;会员制营销已成为企业提升门店业绩、增强客户黏性的重要策略。然而&#xff0c;仅仅将会员制营销视为提升业绩的手段&#xff0c;显然过于笼统&#xff0c;缺乏精准性。本文基于“业绩客量客单回头次数”的公式&#xff0c;深入探讨了…

/src/utils/request.ts:axios 请求封装,适用于需要统一处理请求和响应的场景

文章目录 数据结构解释1. 核心功能2. 代码结构分析请求拦截器响应拦截器 3. 改进建议4. 总结 console.log(Intercepted Response:, JSON.stringify(response));{"data": {"code": 0,"msg": "成功","data": {"id":…

Ubuntu24.04安装AppImage报错AppImages require FUSE to run.

报错如下&#xff1a; 解决&#xff1a; sudo apt install libfuse2t64如果不行&#xff1a; sudo add-apt-repository universe sudo apt install libfuse2t64安装时又报错&#xff1a; [10354:0109/100149.571068:FATAL:setuid_sandbox_host.cc(158)] The SUID sandbox hel…

2025新春烟花代码(二)HTML5实现孔明灯和烟花效果

效果展示 源代码 <!DOCTYPE html> <html lang"en"> <script>var _hmt _hmt || [];(function () {var hm document.createElement("script");hm.src "https://hm.baidu.com/hm.js?45f95f1bfde85c7777c3d1157e8c2d34";var …

同步整流 MT6705 应用注意事项

MT6705 是用于反激式变换器的高性能45V 同步整流器。它兼容各种反激转换器类型。支持 DCM、CCM 和准谐振模式。MT6705集成了一个40V功率MOSFET&#xff0c;可以取代肖特基二极管&#xff0c;提高效率。MT6705 PCB 布局应遵循以下规则: 1、VCC 电容器: VCC 引脚必须放置电容,电容…

LLaMA-Factory web微调大模型并导出大模型

LLaMA-Factory 开源大模型如LLaMA&#xff0c;Qwen&#xff0c;Baichuan等主要都是使用通用数据进行训练而来&#xff0c;其对于不同下游的使用场景和垂直领域的效果有待进一步提升&#xff0c;衍生出了微调训练相关的需求&#xff0c;包含预训练&#xff08;pt&#xff09;&am…

Jenkins内修改allure报告名称

背景&#xff1a; 最近使用Jenkins搭建自动化测试环境时&#xff0c;使用Jenkins的allure插件生成的报告&#xff0c;一直显示默认ALLURE REPORT&#xff0c;想自定义成与项目关联的名称&#xff0c;如图所示&#xff0c;很明显自定义名称显得高大上些&#xff0c;之前…

RK3588平台开发系列讲解(系统篇)Linux Kconfig的语法

文章目录 一、什么是Kconfig二、config模块三、menuconfig四、menu 和 endmenu五、choice 和 endchoice六、source七、depends on八、default九、help十、逻辑表达式沉淀、分享、成长,让自己和他人都能有所收获!😄 一、什么是Kconfig Kconfig的语法及代码结构非常简单。本博…

Android原生开发同一局域网内利用socket通信进行数据传输

1、数据接收端代码如下&#xff0c;注意&#xff1a;socket 接收信息需要异步运行&#xff1a; // port 端口号自定义一个值&#xff0c;比如 8888&#xff0c;但需和发送端使用的端口号保持一致 ServerSocket serverSocket new ServerSocket(port); while (true) {//这里为了…

Elasticsearch学习(1) : 简介、索引库操作、文档操作、RestAPI、RestClient操作

目录 1.elasticsearch简介1.1.了解es1.2.倒排索引正向索引和倒排索引 1.3.es的一些概念:文档和字段&#xff1b;索引和映射&#xff1b;Mysql与ES1.4.安装es、kibana部署单点es部署kibanaIK分词器安装IK分词器与测试扩展与停用词词典总结 部署es集群 2.索引库操作2.1.mapping映…

Clickhouse基础(一)

操作命令&#xff1a; sudo clickhouse start sudo clickhouse restart sudo clickhouse status进入clickhouse clickhouse-client -mCREATE TABLE db_13.t_assist (modelId UInt64,taskId UInt64,testNo String,tdId UInt64,eventDay String,eventDaytime UInt64,eventBatch …

记录将springboot的jar包和lib分离,使用docker-compose部署

本文讲诉如何把jar里的lib依赖包独立出来&#xff0c;方便更新服务时&#xff0c;缩小jar的体积&#xff0c;下面以若依的system服务为例&#xff0c;配置中的路径请酌情修改&#xff0c;主要提供大致配置逻辑 第一步&#xff1a;修改项目的pom.xml&#xff0c;调整build的配…

【对象存储】-- s3:\\、s3n:\\、s3a:\\ 简介

目录 1. s3:\ 2. s3n:\ 3. s3a:\ 区别对比 总结 在 Hadoop 和大数据处理领域&#xff0c;s3:\\、s3n:\\ 和 s3a:\\ 是访问 Amazon S3 的不同文件系统实现方式。以下是它们的简要介绍、区别及应用场景&#xff1a; 1. s3:\ 全称&#xff1a;Hadoop S3 Native FileSystem。…

Springboot3.x工程创建及必要引用(基础篇)

下面从环境的安装和配置开始&#xff0c;到Springboot3.x工程创建&#xff0c;记录一下让完全没有基础的小白用户也能够开始自己的第一个项目。 准备 安装JDK环境&#xff08;这里最好安装JDK17及以上版本&#xff09;安装IntelliJ IDEA Ultimate工具&#xff08;可以从官网下…

腾讯云AI代码助手-公司职位分析AI助手

作品简介 腾讯云AI代码助手是一款智能工具&#xff0c;专注于为公司提供职位分析服务。通过自然语言处理和机器学习技术&#xff0c;它能快速解析职位描述&#xff0c;提取关键信息&#xff0c;并提供数据驱动的洞察&#xff0c;帮助公司优化招聘流程和职位设计。 技术架构 …

QML学习(八) Quick中的基础组件:Item,Rectangle,MouseArea说明及使用场景和使用方法

上一篇中我们从设计器里可以看到Qt Quick-Base中有几大基础组件&#xff0c;如下图&#xff0c;这篇文章先介绍下Item&#xff0c;Rectangle&#xff0c;MouseArea这三个的说明及使用场景和使用方法 Item Item 是 QML 中所有可视元素的基类&#xff0c;是一个非常基础和通用的…

宇航用VIRTEX5系列FPGA的动态刷新方法及实现

SRAM型FPGA在宇航领域有广泛的应用&#xff0c;为解决FPGA在空间环境中的单粒子翻转问题&#xff0c;增强设计的可靠性&#xff0c;本文介绍一种低成本的抗辐照解决方案。该方案从外置高可靠存储器中读取配置数据&#xff0c;通过定时刷新结合三模冗余的方式消除单粒子影响&…

03.MPLS静态LSP配置实验

MPLS静态LSP配置实验 1、实验环境2、基础配置开启全局mpls接口下开启mpls配置静态LSP配置FEC从1.1.1.1到3.3.3.3配置FEC从3.3.3.3到1.1.1.13、信息查看查看LFIB表(标签转发信息表)查看FIB表(转发信息表)查看详细FFIB表tracert lsp iptracert -vping lsp ip4、抓包验证1、实…

el-table表格合并某一列

需求&#xff1a;按照下图完成单元格合并&#xff0c;数据展示 可以看到科室列是需要合并的 并加背景色展示&#xff1b;具体代码如下&#xff1a; <el-tableref"tableA":data"tableDataList":header-cell-style"{ backgroundColor: #f2dcdb, col…