RabbitMQ基本使用

news2025/4/7 15:15:45

先会用,知道mq是干嘛的,怎么用RabbitMQ  再去考虑一系列深入东西

 一:什么是MQ

MQ消息队列详解、四大MQ的优缺点分析_从百草园杀到三味书屋&的博客-CSDN博客

理解什么是MQ  MQ在企业/程序中的作用是什么?

 MQ,中文是消息队列(MessageQueue),字面来看就是存放消息的队列。

消息队列中间件(MQ)是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题实现高性能,高可用,可伸缩和最终一致性[架构] 

好处:

        1.吞吐量提升,无需等待订阅者处理完成,给客户端响应更快

        2.耦合度降低,每个服务都可以灵活拔插,可替换

        3.流量晓峰,不管发布事件的流量波动多大,都由Broker接收,订阅者可以按照自己的速度去处理事件

        ...

缺点:

        1.架构复杂了,业务线没有明显的流程线,不好管理

        2.需要依赖于Broker的可靠、安全、性能

比较常见的MQ(ActiveMQ  RabbitMQ  RocketMQ  Kafka )

几种常见MQ的对比:

RabbitMQActiveMQ(一般不用了)RocketMQKafka
公司/社区RabbitApache阿里Apache
开发语言ErlangJavaJavaScala&Java
协议支持AMQP,XMPP,SMTP,STOMPOpenWire,STOMP,REST,XMPP,AMQP自定义协议自定义协议
可用性一般
单机吞吐量一般非常高
消息延迟微秒级毫秒级毫秒级毫秒以内
消息可靠性一般一般

二:RabbitMQ

2.1安装RabbitMQ

linux(docker)安装:docker安装RabbitMQ_落后挨打的博客-CSDN博客_docker 安装rabbitmq 

 docker search rabbitmq   搜寻镜像版本

docker pull rabbitmq        拉取镜像到宿主机

将镜像生成容器
docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq

      
docker ps  查看运行的容器 
docker exec -it 镜像ID /bin/bash     进入容器内部
rabbitmq-plugins enable rabbitmq_management  运行
访问地址: http://linuxip地址:15672   (用户guest,密码guest)

 注意:  

        RabbitMQ启动成功后暴露两个端口

        java应用端连接:192.168.136.160:5672

        管理后台地址 :地址: 15672

        管理员账号/密码 guest/guest

三:RabbitMQ消息模型(队列消息 发布/订阅消息)

 四:Springboot演示RabbitMQ

1.导依赖  amqp

<dependencies>
    <!--spring amqp-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

   <!--springboot单元测试-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>

  <!--json格式转换-->
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
    </dependency>
</dependencies>

2.添加配置文件:

#配置RabbitMQ的基本信息
spring:
  rabbitmq:
    host: 192.168.136.160
    port: 5672
    virtual-host: tjtc
    username: hua
    password: hua

 首先介绍两种:SimpleQueue        WorkQueue  (没有交换机)

 SimpleQueue:一个生产者,一个消费者

 

在上图的模型中,有以下概念:

  • P:生产者,也就是要发送消息的程序

  • C:消费者:消息的接受者,会一直等待消息到来。

  • queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。

 1.定义发送消息(生产者)

@SpringBootTest
@RunWith(SpringRunner.class)
public class RabbitTemplateTest {

    /**
     * 使用SpringAMQP发送消息:
     *    1、依赖
     *    2、在配置文件中加入rabbitmq的相关配置
     *    3、注入对象RabbitTemplate,调用convertAndSend方法发送消息
     */
    @Autowired
    private RabbitTemplate template;
    
    /**
     * 发送基本(简单)队列消息
     * springamqp发送队列消息,需要在后台系统中手动创建一个队列
     */
    @Test
    public void testSendBaseMessage() {
        String ququeName="simple.queue";
        String message = "hahhah";
        template.convertAndSend(ququeName,message); //队列名称,消息内容
    }
}

 消费者:

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 自定义的消息监听器:
 *    1、此类交给容器管理
 *    2、编写方法:监听消息队列(队列中一旦有最新的消息,自动执行方法)
 */
@Component
public class MessageListener {

    /**
     * 消费者的监听器方法:获取消息并消费(完成业务逻辑)
     * 1、方法没有返回值
     * 2、方法有参数(参数=发送的消息内容)
     *      参数类型和发送的消息类型一致
     * 3、在方法上通过@RabbitListener绑定队列
     */
     @RabbitListener(queues = "simple.queue")   //有队列就是用 没队列就报错
    //@RabbitListener(queuesToDeclare ={@Queue("abc")} ) //有对象就使用 没队列就创建一个队列
    public void recBaseMessage(String message) {
        System.out.println("消费者1获取消息:"+message);
    }
}

 注意:   

    @RabbitListener(queues = "simple.queue")   //有队列就是用 没队列就报错
    @RabbitListener(queuesToDeclare ={@Queue("abc")} ) //有队列就使用 没队列就创建一个队列

WorkQueue:让多个消费者绑定到一个队列,共同消费队列中的消息

 

 当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。

此时就可以使用work 模型,多个消费者共同处理消息处理,速度就能大大提高了。

 消息发送:

  /**
     * 发送工作队列消息
     *   发送多次
     */
    @Test
    public void testSendWorkMessage() throws InterruptedException {
        for (int i = 0; i < 50; i++) {
            String message = "凡凡有难,八方点赞,点赞数:"+i;
            String queueName = "simple.queue";
            rabbitTemplate.convertAndSend(queueName,message); //队列,消息
            Thread.sleep(10);
        }
    }

 消费者:

    /**
     * 消费者1
     */
    @RabbitListener(queues="simple.queue")
    public void recBaseMessage(String message) throws InterruptedException {
        //性能较高
        System.out.println("1号消费者获取消息:"+message);
        Thread.sleep(50);
    }

    /**
     * 消费者2
     */
    @RabbitListener(queues="simple.queue")
    public void recBaseMessage(String message) throws InterruptedException {
        //性能较低
        System.out.println("2号消费者获取消息:"+message);
        Thread.sleep(1000);
    }

 注意: RabbitMQ默认的是,平均分配给每个消费者,并没有考虑到消费者的处理能力

那么可以通过配置解决:

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

上边这两种消息队列  

发送者:

        rabbitTemplate.convertAndSend(queueName,message); //队列名称,消息

消费者: (定义一个方法,放一个参数)

         @RabbitListener(queues="simple.queue")

Fanout

可以看到,在订阅模型中,多了一个exchange角色,而且过程略有变化:

  • P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)

  • Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有以下3种类型:

    • Fanout:广播,将消息交给所有绑定到交换机的队列

    • Direct:定向,把消息交给符合指定routing key 的队列

    • Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

  • C:消费者,与以前一样,订阅队列,没有变化

  • Queue:消息队列也与以前一样,接收消息、缓存消息。

 消息发送:

  /**
     * 测试发送Fanout模式消息(广播模式)
     */
    @Test
    public void testSendFanoutMessage() {
        String exchangeName = "heima165fanout";
        String message = "凡凡有难,八方点赞";
        template.convertAndSend(exchangeName,"",message); //交换机名称,空,消息
    }

 消息消费者:

    /**
     * 监听Fanout模式消息
     *      @RabbitListener:
     *          bindings :绑定队列和交换机
     *          value:
     *              @Queue : 指定队列
     *                  value:队列名称
     *          exchange:指定交换机
     *              @Exchange
     *                  value:交换机名称
     *                  type:类型
     */
    /**
     * 获取Fanout模式消息(广播)
     */
package cn.itcast.mq.fanoutqueue;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class ConsumerFanOut {

    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(
                            value = "fanout.queue1"
                    ),
                    exchange = @Exchange(
                            value = "fanout",
                            type = ExchangeTypes.FANOUT
                    )
            )
    )
    public void recBaseMessage(String message) {
//        Thread.sleep(1000);
        System.out.println("消费者1获取消息:"+message);
    }
}

 2

package cn.itcast.mq.fanoutqueue;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class ConsumerFanOut2 {

    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(
                            value = "fanout.queue2"
                    ),
                    exchange = @Exchange(
                            value = "fanout",
                            type = ExchangeTypes.FANOUT
                    )
            )
    )
    public void recBaseMessage(String message) throws InterruptedException {
        Thread.sleep(1000);
        System.out.println("消费者1获取消息:"+message);
    }
}

注意:Exchange(交换机)只负责转发消息,不具备存储消息的能力

在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。

Direct

 

在Direct模型下:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)

  • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey

  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

发送者:

   @Test
    public void testSendDirectMessage() {
        for (int i = 0; i < 10; i++) {
            String exchangeName = "Direct";
            String routingKey = "red";
            String message = "今天太冷了" + i;
            rabbitTemplate.convertAndSend(exchangeName,routingKey,message); //交换机,routingkey(类型),消息
        }
    }

消费者:


    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(
                            value = "direct.queue"
                    ),
                    exchange = @Exchange(
                            value = "Direct",
                            type = ExchangeTypes.DIRECT
                    ),
                    key = {"blue"}  // key = {"blue","red","yellow"}
            )
    )
    public void recBaseMessage(String message) throws InterruptedException {
        Thread.sleep(1000);
        System.out.println("消费者1获取消息:"+message);
    }
 @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(
                            value = "direct.queue2"
                    ),
                    exchange = @Exchange(
                            value = "Direct",
                            type = ExchangeTypes.DIRECT
                    ),
//                    key = { "red"}
                      key = {"blue","red","yellow"}
            )
    )
    public void recBaseMessage(String message) throws InterruptedException {
        Thread.sleep(1000);
        System.out.println("消费者1获取消息:"+message);
    }

 Topic  模式

Topic类型的ExchangeDirect相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!

Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

通配符规则:

#:匹配一个或多个词

*:匹配不多不少恰好1个词

 发送者:

    /**
     * 发送topic消息(主题模式)
     */
    @Test
    public void testSendTopicMessage() {
        String exchangeName = "Topic";
        String routingKey = "china.weather";
        String message = "恭喜EDG";
        template.convertAndSend(exchangeName,routingKey,message); //交换机,routingkey(类型),消息
    }

消费者:

    /**
     * 获取主题模式消息
     *   工程,可以支持所有以china开头的消息
     */
    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(
                            value = "topic.queue1"
                    ),
                    exchange = @Exchange(
                            value = "Topic",
                            type = ExchangeTypes.TOPIC
                    ),
                    key = "china.*"
            )
    )
    public void recTopicMessage(String message) {
        System.out.println("消费者1获取topic消息:"+message);
    }
  /**
     * 获取主题模式消息
     *   工程,可以支持所有以news结尾的内容
     */
    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(
                            value = "topic.queue2"
                    ),
                    exchange = @Exchange(
                            value = "Topic",
                            type = ExchangeTypes.TOPIC
                    ),
                    key = "*.news"
            )
    )
    public void recTopicMessage(String message) {
        System.out.println("消费者2获取topic消息:"+message);
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/71465.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Fluent的porous jump边界条件

1 porous jump简介 多孔介质指内部具有连通的空洞&#xff0c;可使流体穿过的固体。多孔介质具有以下特点&#xff1a; 流道极复杂&#xff0c;活性炭过滤器等随机排布的多孔介质甚至不存在可通过曲面和实体表征的确定流道多孔介质只是流场的一部分&#xff0c;通常不关注多孔…

DataSphere Studio数据应用开发管理集成框架【DSS基础】

https://github.com/WeBankFinTech/DataSphereStudio/https://gitee.com/WeBank/DataSphereStudio 基于插拔式的集成框架设计&#xff0c;及计算中间件 Linkis &#xff0c;可轻松接入上层各种数据应用系统&#xff0c;让数据开发变得简洁又易用。在统一的 UI 下&#xff0c;Da…

[附源码]Python计算机毕业设计Django中小学课后延时服务管理系统

项目运行 环境配置&#xff1a; Pychram社区版 python3.7.7 Mysql5.7 HBuilderXlist pipNavicat11Djangonodejs。 项目技术&#xff1a; django python Vue 等等组成&#xff0c;B/S模式 pychram管理等等。 环境需要 1.运行环境&#xff1a;最好是python3.7.7&#xff0c;…

前端_Vue_2.创建一个Vue应用、模板语法

文章目录一、创建一个Vue应用1.1. 应用实例1.2. 根组件1.3. 挂载应用1.3.1. DOM中的根组件模板1.4. 应用配置1.5. 多个应用实例二、模板语法2.1. 文本插值2.2. 原始HTML2.3. Attribute绑定2.3.1. 简写2.3.2. 布尔型 Attribute2.3.3. 动态绑定多个值2.4. 使用JavaScript表达式2.…

记录一次Sql性能优化

场景&#xff1a; 主业务表 contract&#xff08;合同表&#xff09;&#xff0c;对于不同主体&#xff08;人员&#xff09;&#xff0c;能查看的合同是不一样的。系统企业业务用到了&#xff0c;系统资源表 PERMISSION_RESOURCE 、员工对于资源关系表&#xff1a;ENTRY_JOIN…

物联卡批发为什么那么火爆?

2022年物联网行业开始爆发&#xff0c;针对于企业设备联网的物联卡就显得格外重要了&#xff0c;而共享单车&#xff0c;移动支付&#xff0c;智慧城市&#xff0c;自动售卖机等企业采购物联卡会面临着各种问题&#xff0c;低价陷阱&#xff0c;流量虚假&#xff0c;管理混乱&a…

[Python图像处理] 合成微缩效果

合成微缩效果前言图像微缩效果原理实现图像微缩效果相关链接前言 图像中的模糊效果可以强烈影响被拍摄场景的感知&#xff0c;模糊在传达所需的尺寸和距离感方面起着重要作用。合成微缩 (miniature faking) 是一个使真实大小物体照片看起来像微缩模型照片的过程&#xff0c;也…

java必背综合知识点总结

一、JDK常用的包 java.lang&#xff1a; 这个是系统的基础类&#xff0c;比如String、Math、Integer、System和Thread&#xff0c;提供常用功能。 java.io: 这里面是所有输入输出有关的类&#xff0c;比如文件操作等 java.net: 这里面是与网络有关的类&#xff0c;比如URL,U…

寻找适配网红很迷茫?最全秘笈来了

根据《2022年全球数字概览》报告显示&#xff0c;全球社交媒体用户超过46.2亿&#xff0c;相当于全球总人口的58.4%。全球用户每天在社交媒体上平均花费近2.5个小时&#xff0c;并且每天以2分钟的速度增长。 社交媒体成为全球网民生活中不可或缺的一部分&#xff0c;而对于跨境…

【深度学习】Yolov5训练意外中断后如何接续训练详解;yolov5中断后继续训练

0. 前言 目标检测是计算机视觉上的一个重要任务,下面这篇文章主要给大家介绍了关于Yolov5训练意外中断后如何接续训练的相关资料,文中通过实例代码介绍的非常详细,需要的朋友可以参考下 1. 配置环境 操作系统&#xff1a;Ubuntu20.04 CUDA版本&#xff1a;11.4 Pytorch版本…

Excel - 数据分析师所需的最常用公式。

“先打好基础,再细化细节——克里斯安德森” 这将是我正在撰写的关于必须具备数据分析技能的第 4 个也是最后一个“像你 5 岁一样解释”系列。(请观看其他的——Power BI、Python 和 SQL)。现在,我们将具备所需的所有基本技能,然后可以进入数据分析领域的下一阶段 使用 E…

Java实现大乐透不重复数字随机号码生成方案

大乐透攻略Java实现&#xff08;仅供参考学习&#xff09; 购票方式 每期最低购票数&#xff1a;7 张 最低消费&#xff1a;14 元 方案介绍&#xff1a;后区12个数中随机分成6组&#xff0c;且数字不重复。前区35个数随机分成7组&#xff0c;且数字不重复。前区需要7组才能够…

【目标检测】IoU、GIoU、DIoU、CIoU、EIoU 5大评价指标

目录 一、简介 二、IoU&#xff08;Intersection over Union&#xff09; 三、GIoU&#xff08;Generalized IoU&#xff09; 四、DIoU&#xff08;Distance-IoU&#xff09; 五、CIoU(Complete-IoU) 六、EIoU(Efficient-IoU) 七、pytorch代码实现 七、总结 一、简介 在目标检测…

即时通讯开发之如何测试实时语音通话质量

实时语音聊天开发&#xff0c;对于一般的开发者来说比较神秘&#xff0c;很多朋友不太清楚如何全面的评估一个音频引擎。很多朋友还停留在这样的初级阶段&#xff1a;把demo调通&#xff0c;找几个人喂喂喂......凭自己优异的听觉感受一下&#xff0c;整个测试过程就完成了。 但…

【嵌入式硬件芯片开发笔记】EEPROM芯片M24C32配置流程

【嵌入式硬件芯片开发笔记】EEPROM芯片M24C32配置流程 32-Kbit serial IC bus EEPROM - 105C operation 适用于M24C32/M24C32-DRE 读取存储的从机地址为&#xff1a;0x50 读取标识页面的从机地址为&#xff1a;0x58 WC引脚接地&#xff0c;存储可以进行写操作 地址长度为16位 存…

117. 填充每个节点的下一个右侧节点指针 II

文章目录1. 背2. 题目3. 答案1. 背 这道题本来可以很简答&#xff0c;一个队列&#xff0c;存储指针和它的行数就OK了&#xff0c;但是这道题的难点在于不用额外空间复杂度。 横向看一下&#xff0c;这一行是不是就是一个链表呢&#xff1f; 多加一个变量&#xff0c;用来存储第…

C++入门教程||C++ 判断||C++ 日期 时间

判断结构要求程序员指定一个或多个要评估或测试的条件&#xff0c;以及条件为真时要执行的语句&#xff08;必需的&#xff09;和条件为假时要执行的语句&#xff08;可选的&#xff09;。 下面是大多数编程语言中典型的判断结构的一般形式&#xff1a; 判断语句 C 编程语言提…

three.js实战 -自定义剪切器

1. 前言 这是我在github上看到大佬的一个作品&#xff0c;当时感觉很有意思&#xff0c;决定分享出来&#xff0c;不知道取这个名字是否正确&#xff0c;废话不多说看下面效果。 2.demo效果 3.需要掌握的知识 矩阵的基本运算&#xff0c;能够认是到一些基本变换用到的矩阵(…

晶圆级倒装装备及控制系统

晶圆级倒装装备主要由晶圆盘进料模块、晶圆盘工作台模块、覆晶模块、焊头模块、基板工作台模块、点胶模块、视觉模块和基板进出料模块组成&#xff0c;如图 2-2 所示。 晶圆级倒装装备控制系统结构晶圆级倒装装备的运控系统主要由工控机、运动控制卡、驱动器、反馈装置和直线电…

QA:observable and Subject

概念区别和常见的错误理解辩证&#xff1a; 通俗理解一下 1. Observable 是一条 "水管蓝图" ,每次打开水龙头&#xff0c;水流会按照设计好的路线流向终点。起点和终点一一对应。每次打开水流&#xff0c;都是新的流&#xff0c;水流之间互不影响。一次一管。 2. …