【SpringCloud】RabbitMQ——五种方式实现发送和接收消息

news2025/1/11 16:49:56

SpringAMQP

SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配。

SpringAmqp的官方地址:https://spring.io/projects/spring-amqp

SpringAMQP提供了三个功能:

  • 自动声明队列、交换机及其绑定关系
  • 基于注解的监听模式,异步接收消息
  • 封装了RabbitTemplate工具,用于发送消息

1.Basic Queue 简单队列模型

1.1引入依赖

<!--AMQP依赖,包含RabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

1.2消息发送

在publisher服务的application.yml中添加配置

spring:
  rabbitmq:
    host: 192.168.150.101 # 主机名
    port: 5672 # 端口
    virtual-host: / # 虚拟主机
    username: haojiale # 用户名
    password: 123321 # 密码

在publisher服务中编写测试类,并利用RabbitTemplate实现消息发送

package com.example.mq.spring;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSimpleQueue() {
        // 队列名称
        String queueName = "simple.queue";
        // 消息
        String message = "hello, spring amqp!";
        // 发送消息
        rabbitTemplate.convertAndSend(queueName, message);
    }
}

1.3消息接收

在consumer服务的application.yml中添加的配置内容同消息发送一样

在consumer服务中编写监听器,消费消息

package com.example.mq.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class SpringRabbitListener {

    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueueMessage(String msg) throws InterruptedException {
        System.out.println("spring 消费者接收到消息:【" + msg + "】");
    }
}

2.WorkQueue

WorkQueue也被称为任务模型。就是让多个消费者绑定到一个队列,共同消费队列中的消息
在这里插入图片描述

当消息处理比较耗时的时候,可能生产消息的速度远远大于消息的消费速度,消息就会堆积越来越多,无法及时处理,使用work模型,多个消费者共同处理消息提升消费速度

2.1消息发送

循环发送消息,模拟大量消息堆积现象

在publisher服务中添加一个测试方法:

/**
     * workQueue
     * 向队列中不停发送消息,模拟消息堆积。
     */
@Test
public void testWorkQueue() throws InterruptedException {
    // 队列名称
    String queueName = "simple.queue";
    // 消息
    String message = "hello, message_";
    for (int i = 0; i < 50; i++) {
        // 发送消息
        rabbitTemplate.convertAndSend(queueName, message + i);
        Thread.sleep(20);
    }
}

2.2消息接收

模拟多个消费者绑定同一个队列,在consumer服务中添加2个新的方法:

@RabbitListener(queues = "simple.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {
    System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());
    Thread.sleep(20);
}

@RabbitListener(queues = "simple.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
    System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());
    Thread.sleep(200);
}

执行测试方法结果是消费者1很快处理完了自己的25条消息,消费者2却在缓慢处理自己的25条消息,也就是说消息是平均分配给每个消费者,并没有考虑到消费者的处理能力是存在问题的。

解决

修改consumer服务的application.yml文件配置,通过设置prefetch来控制消费者预取的消息数量

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

3.发布/订阅

发布订阅的模型图如图:
在这里插入图片描述

Exchange只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

3.1 Fanout

在这里插入图片描述

3.1.1声明队列和交换机

在这里插入图片描述

在consumer服务中声明队列和交换机

package com.example.mq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutConfig {
    /**
     * 声明交换机
     * @return Fanout类型交换机
     */
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("example.fanout");
    }

    /**
     * 第1个队列
     */
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout.queue1");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }

    /**
     * 第2个队列
     */
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout.queue2");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}

3.1.2消息发送

在publisher服务中编写测试方法发送消息

@Test
public void testFanoutExchange() {
    // 队列名称
    String exchangeName = "example.fanout";
    // 消息
    String message = "hello, everyone!";
    rabbitTemplate.convertAndSend(exchangeName, "", message);
}

3.1.3消息接收

在consumer服务中添加两个方法,作为消费者:

@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
    System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}

@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
    System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}

3.1.4总结

交换机的作用:

  • 接收publisher发送的消息
  • 将消息按照规则路由到与之绑定的队列
  • 不能缓存消息,路由失败情况下消息丢失
  • FanoutExchange会将消息路由到每个绑定的队列

声明队列、交换机、绑定关系的Bean是什么?

  • Queue
  • FanoutExchange
  • Binding

3.2 Direct

在这里插入图片描述

在Direct模型下:

  • 队列与交换机的绑定不能是任意绑定,而是要指定一个RoutingKey(路由key)
  • 消息的发送方在向Exchange发送消息时,也必须指定消息的RoutingKey
  • Exchange不会再把消息交给每一个绑定的队列,而是根据消息的RoutingKey进行判断,只有队列的RountingKey与消息的RoutingKey完全一致,才会收到消息

3.2.1消息发送

在这里插入图片描述

在publisher服务中添加测试方法:

@Test
public void testSendDirectExchange() {
    // 交换机名称
    String exchangeName = "itcast.direct";
    // 消息
    String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";
    // 发送消息
    rabbitTemplate.convertAndSend(exchangeName, "red", message);
}

3.2.2消息接收——基于注解声明交换机和队列

基于@Bean的方式声明队列和交换机比较麻烦,Spring还提供了基于注解方式来声明。

在consumer的SpringRabbitListener中添加两个消费者,同时基于注解来声明队列和交换机:

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue1"),
    exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
    key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
    System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");
}

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue2"),
    exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
    key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
    System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");
}

3.2.3总结

Direct交换机与Fanout交换机的区别?

  • Fanout交换机将消息路由给每一个与之绑定的队列
  • Direct交换机根据RoutingKey判断路由给哪个队列
  • 如果多个队列具有相同的RoutingKey,则与Fanout功能类似

3.3 Topic

Topic类型的Exchang可以让队列在绑定Routing key的时候使用通配符

Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

通配符规则:

#:匹配一个或多个词

*:匹配不多不少恰好1个词
在这里插入图片描述

举例:

item.#:能够匹配item.spu.insert 或者 item.spu

item.*:只能匹配item.spu

图示:

3.3.1消息发送

在这里插入图片描述

在publisher服务的SpringAmqpTest类中添加测试方法:

@Test
public void testSendTopicExchange() {
    // 交换机名称
    String exchangeName = "itcast.topic";
    // 消息
    String message = "喜报!孙悟空大战哥斯拉,胜!";
    // 发送消息
    rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}

3.3.2消息接收

在consumer服务中添加方法:

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue1"),
    exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
    key = "china.#"
))
public void listenTopicQueue1(String msg){
    System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");
}

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue2"),
    exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
    key = "#.news"
))
public void listenTopicQueue2(String msg){
    System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】");
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2040367.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Docker基本语法

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、更新yum镜像仓库&#xff08;一&#xff09;查看本地yum镜像源地址&#xff08;二&#xff09;设置docker的镜像仓库&#xff08;1&#xff09;安装必要工具…

安卓相关环境配置

安卓相关环境配置 偶尔更新。。。 JEB&#xff08;动态调试好用&#xff09; JEB动态调试Smali-真机/模拟器&#xff08;详细&#xff0c;新手必看&#xff09; 夜步城 JADX官网&#xff08;静态分析&#xff09; https://github.com/skylot/jadx/releases/tag/v1.5.0 雷…

Upload-Lab第3关:如何巧妙应对黑名单文件后缀检测?

关卡介绍 在Pass03中,我们面临的挑战是绕过文件上传功能的黑名单检测机制。黑名单检测是一种常见的安全措施,它通过检查上传文件的后缀来阻止特定类型的文件(如 .php, .exe)被上传。在这一关,我们需要找到一种方法,上传一个可以执行的恶意文件,同时绕过黑名单检测。 …

Vue3学习 Day01

创建第一个vue项目 1.安装node.js cmd输入node查看是否安装成功 2.vscode开启一个终端&#xff0c;配置淘宝镜像 # 修改为淘宝镜像源 npm config set registry https://registry.npmmirror.com 3.下载依赖&#xff0c;启动项目 访问5173端口 第一个Vue项目的目录结构 我们先打…

C++ | Leetcode C++题解之第336题回文对

题目&#xff1a; 题解&#xff1a; //字典树节点 class TrieNode { private:bool isEnd;//单词结束标记int index;//单词序号vector<TrieNode*> children;//子节点 public://构造TrieNode():index(-1),isEnd(false),children(26,nullptr){}//析构~TrieNode(){for(int i…

php连接sphinx的长连接事宜以及sphinx的排除查询以及关于sphinx里使用SetSelect进行复杂的条件过滤或复杂查询

一、php连接sphinx的长连接事宜以及sphinx的排除查询 在使用php连接sphinx时&#xff0c;默认的sphinx连接非长连接&#xff0c;于是在想php连接sphinx能否进行一些优化 publish:January 9, 2018 -Tuesday: 方法&#xff1a;public bool SphinxClient::open ( void ) — 建立到…

24/8/15算法笔记 复习_决策回归树

from sklearn.tree import DecisionTreeRegressor from sklearn import tree import numpy as np import matplotlib.pyplot as plt#创建数据 X_train np.linspace(0,2*np.pi,40).reshape(-1,1)#训练数据就是符合要求的二维数据 #二维&#xff1a;[[样本一].[样本二]&#xff…

Elasticsearch、Easy-es 快速入门 SearchAfterPage分页 若依前后端分离 Ruoyi-Vue SpringBoot

一、环境安装 Elasticsearch ik分词器 1.1 下载解压Elasticsearch-7.x版本&#xff0c;越高越好&#xff0c;低版本有Log4j漏洞&#xff0c;Easy-es目前支持7.x 1.2 IK中文分词器 将对应Elasticsearch版本IK放进文件夹&#xff0c;Elasticsearch-7.6.1&#xff0c;ik对应版…

GPT-SoVITS

文章目录 model archS1 ModelS2 model model arch S1 model: AR model–ssl tokensS2 model: VITS&#xff0c;ssl 已经是mel 长度线性相关&#xff0c;MRTE(ssl_codes_embs, text, global_mel_emb)模块&#xff0c;将文本加强相关&#xff0c;学到一个参考结果 S1 Model cla…

Lora 全文翻译

作者&#xff1a; 地点&#xff1a;hby 来源&#xff1a;https://arxiv.org/pdf/2106.09685 工具&#xff1a;文心 LORA: LOW-RANK ADAPTATION OF LARGE LANGUAGE MODELS 摘要 自然语言处理的一个重要范式包括在通用领域数据上进行大规模预训练&#xff0c;并适应特定任务或…

Qt自定义控件:关于大佬“飞扬青云“的自定义UI控件的使用教程(MinGw,MSVC)

前言 最近在搞自定义控件&#xff0c;无意间发现大佬飞扬青云的开源项目&#xff0c;Qt/C编写超精美自定义控件 这里先贴出大佬项目地址和博客 码云&#xff1a;wwlzq5/qucsdk (gitee.com)&#xff08;旧版下载地址Qt4.7到Qt5.14&#xff09; github&#xff1a;https://git…

攻克面试:高频面试题与常见算法深度剖析

干货分享&#xff0c;感谢您的阅读&#xff01; &#xff08;暂存篇---后续会删除&#xff0c;完整版和持续更新见高频面试题基本总结回顾&#xff08;含笔试高频算法整理&#xff09;&#xff09; 备注&#xff1a;引用请标注出处&#xff0c;同时存在的问题请在相关博客留言…

第1章 大模型的概念、发展历程和应用领域

大模型&#xff1a;塑造未来的智能力量 目录 引言&#xff1a;大模型的定义与影响大模型的发展历程 早期探索&#xff1a;深度学习的起步中期发展&#xff1a;算法的革新与计算能力的提升当代突破&#xff1a;大模型的崛起 大模型的影响与未来展望 引言&#xff1a;大模型的定…

【设计模式】六大原则之依赖倒置原则(Dependency Inversion Principle,‌DIP)

设计模式是对相关问题提出的解决方案。 一般而言&#xff0c;一个模式有四个基本要素&#xff1a; 模式名称 &#xff08;pattern name&#xff09; 一个助记名&#xff0c;它用一两个词语来描述模式问题、解决方案和效果。问题&#xff08;problem&#xff09;描述了应该在何…

Unity中对Spine动画播放、暂停、事件处理管理类

Unity中对Spine动画播放、暂停、事件处理管理类 介绍Spine的事件处理动画师制作沟通Unity前端使用事件 Unity中动画播放Unity中动画暂定和继续Unity中停止动画Unity中动画转向Unity中获取骨骼和设置插槽附件完整管理类分享总结 介绍 最近在做设计spine动画的抖音小程序&#x…

RecyclerView的缓存机制(面试常客)

在构建滚动列表时&#xff0c;我们常首选RecyclerView&#xff0c;出于它优秀的缓存复用机制。 核心机制 RecyclerView的缓存机制又称回收复用机制&#xff0c;RecyclerView构建列表视图分为以下三步&#xff1a; 第一步的创建ViewHolder是RecyclerView构建视图时最耗时的操作…

鸿蒙(API 12 Beta3版)【使用通话设备切换组件】使用投播组件

基本概念 系统不再提供音频输出设备切换的API&#xff0c;如果需要应用内切换音频输出设备&#xff0c;请实现AVCastPicker组件&#xff0c;相关参数可参考[ohos.multimedia.avCastPicker]和 [ohos.multimedia.avCastPickerParam]。 本文将主要介绍AVCastPicker组件接入&…

【论文笔记】:PVswin-YOLOv8s:基于无人机的行人和车辆检测,使用改进的YOLOv8在智能城市中进行交通管理

摘要 在智慧城市中&#xff0c;有效的交通拥堵管理取决于熟练的行人和车辆检测。无人机 &#xff08;UAV&#xff09; 提供了一种具有移动性、成本效益和宽视野的解决方案&#xff0c;然而&#xff0c;优化识别模型对于克服小型和遮挡物体带来的挑战至关重要。为了解决这些问题…

推出 SAM 2:适用于视频和图像的下一代 Meta Segment Anything 模型

继图像元分割模型(SAM) 取得成功之后&#xff0c;我们发布了SAM 2&#xff0c;这是一个用于在图像和视频中实时提示对象分割的统一模型&#xff0c;可实现最先进的性能。 为了秉承我们的开放科学方针&#xff0c;我们通过宽松的 Apache 2.0 许可证共享代码和模型权重。 我们还…

嵌入式linux系统镜像制作day2

点击上方"蓝字"关注我们 01、前言 嵌入式linux系统镜像制作day1这一节先了解,后面实操 02、Yocto项目快速启动 Yocto项目通过OpenEmbedded构建系统为各种平台(包括x86-64和仿真平台)提供了一个针对ARM、MIPS、PowerPC和x86架构的开源开发环境。您可以使用Yocto项…