04MQ消息队列

news2025/1/10 3:11:41

一、同步异步通讯

1.同步通讯和异步通讯

同步通讯:当A服务调用B服务时,一直等待B服务执行完毕,A才继续往下走

优点:时效性强,可以立即得到结果。

缺点:

①耦合度高,加入新的需求就要修改原来代码

②性能下降,调用者需要等待服务提供者的响应,如果调用链过长则响应时间是每次调用时间之和。

③资源浪费, 每个调用者在等待服务响应的过程中,不能释放资源,高并发会浪费系统资源

④级联失败,如果提供者出现问题,所有的调用都会出现问题。

异步通讯: 当A服务调用B服务时,不需要等待B服务执行完毕,A就可以往下走(事件驱动模式)

优点:

①服务解耦

②性能提升,吞吐量提高。

③服务没有强依赖,不担心级联失败问题

④流量削峰

缺点:

①依赖Broker的可靠性、安全性、吞吐能力

②架构复杂,业务没有明显的流程线,不好追踪管理

2.什么是MQ(MessageQueue)

存放消息的队列,事件驱动架构的Broker

二、RabbitMQ快速入门

1.RabbitMQ概述

RabbitMQ是开源消息通信中间件

生产者-》交换机-》信道-》队列-》消费者

①Publisher:消息的发送者,把消息发送到exchange交换机

②exchange:交换机,负责路由,把消息投递到queue队列

③queue:队列,负责缓存消息

②consumer:消息的消费者,从队列获取消息,处理消息。

总结

RabbitMQ中的几个概念:

channel:操作MQ的工具

exchange:路由消息到队列中

queue:缓存消息

virtual host:虚拟主机,是对queue、exchange等资源的逻辑分组

2.运行RabbitMQ命令

docker run \
 -e RABBITMQ_DEFAULT_USER=itcast \
 -e RABBITMQ_DEFAULT_PASS=123321 \
 --name mq \
 --hostname mq1 \
 -p 15672:15672 \
 -p 5672:5672 \
 -d \
 rabbitmq:3-management

①端口15672是MQ管理平台的端口

②端口5672是消息通信的端口

3.常见消息模型

①基本消息队列(BasicQueue)

②工作消息队列(WorkQueue)

③发布订阅(Publish,Subscribe),根据交换机分为三种类型

Fanout Exchange:广播

Direct Exchange:路由

Topic Exchange:主题

4.HelloWorld案例

Publisher:消息发布者,将消息发送到队列queue

Queue:消息队列,负责接受并缓存消息

Consumer:订阅队列,处理队列中的消息

发送流程

  • 建立connection
  • 创建channel
  • 利用channel声明队列
  • 利用channel向队列发送消息

接收流程

  • 建立connection
  • 创建channel
  • 利用channel声明队列
  • 定义consumer的消费行为handleDelivery()
  • 利用channel将消费者与队列绑定

三、SpringAMOP

1.什么是SpringAMOP

SpringAMOP是基于AMOP协议定义的一套API规范,提供模板接收发送消息。包括两部分,spring-amqp是基础抽象,spring-rabbit是底层的默认实现。

2.案例:利用SpringAMQP实现HelloWorld中的基础消息队列功能

发送

  • 引入amqp的starter依赖
  • 配置RabbitMQ地址
  • 利用RabbitTemplate的convertAndSend方法

①引入AMQP依赖:子模块都需要amop,所以在把依赖写在父工程

<!--AMQP依赖,包含RabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

②在publisher服务中编写配置文件application.yml,连接mq信息

spring:
  rabbitmq:
    host: 192.168.150.101 # 主机名
    port: 5672 # 端口
    virtual-host: / # 虚拟主机 
    username: itcast # 用户名
    password: 123321 # 密码

③测试类

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testSimpleQueue() { 
        String queueName = "simple.queue";
        String message = "hello, spring amqp!";
        rabbitTemplate.convertAndSend(queueName, message);
    }
}

接收

  • 引入amqp的starter依赖
  • 配置RabbitMQ地址
  • 定义类,添加@Component注解
  • 类中声明方法,添加@RabbitListener注解,方法参数就时消息

①②跟前面一样,编写③消费逻辑类

@Component
public class SpringRabbitListener {

    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueueMessage(String msg) throws InterruptedException {
        System.out.println("spring 消费者接收到消息 :【" + msg + "】");
    }
}

3.WorkQueue工作队列

Work queue工作队列,可以提高消息处理速度,避免队列消息堆积

案例:模拟WorkQueue,实现一个队列绑定多个消费者

基本思路如下:

  1. 在publisher服务中定义测试方法,每秒产生50条消息,发送到simple.queue
  2. 在consumer服务中定义两个消息监听者,都监听simple.queue队列
  3. 消费者1每秒处理50条消息,消费者2每秒处理10条消息

①修改publisher,1秒发送50条消息

@Test
public void testWorkQueue() throws InterruptedException {
    // 队列名称
    String queueName = "simple.queue";
    // 消息
    String message = "hello, message__";
    for (int i = 0; i < 50; i++) {
        // 发送消息
        rabbitTemplate.convertAndSend(queueName, message + i);
        // 避免发送太快
        Thread.sleep(20);
    }
}

②两个消费者监听simple.queue。消费者1每秒处理50条,消费者2每秒处理10条。

消费者1处理效率高。

@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
    System.out.println("spring 消费者1接收到消息:【" + msg + "】");
    Thread.sleep(20);
}

@RabbitListener(queues = "simple.queue") 
public void listenSimpleQueueMessage2(String msg) throws InterruptedException {
    System.err.println("spring 消费者2接收到消息:【" + msg + "】");
    Thread.sleep(100);
}

此时的两个消费者的消息数是相同的【平均分配】,但两个消费者处理消息的效率不一样。

prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息,能者多劳。

总结

Work模型的使用:

  • 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理
  • 通过设置prefetch来控制消费者预取的消息数量

4.发布Publish,订阅Subscribe

发布订阅模式跟之前案例区别是允许将同一个消息发送给多个消费者。实现方式是加入了exchange交换机。

①常见的交换机exchange类型

Fanout广播

Direct路由

Topic话题

 注意exchange负责消息路由(转发),而不是存储,路由失败则消息丢失

发布订阅-Fanout Exchange

Fanout Exchange会将接收到的消息路由到每一个跟其绑定的queue

案例:利用SpringAMQP演示FanoutExchange的使用

实现思路:

①在consumer服务中,利用代码声明队列,交换机,并将两者绑定。

②在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2

③在publisher中编写测试方法,向itcast.fanout发送消息

步骤1:在consumer服务声明Exchange、Queue、Binding

@Configuration
public class FanoutConfig {
   // itcast.fanout
   @Bean
   public FanoutExchange fanoutExchange(){
      return new FanoutExchange("itcast.fanout");
   }

   // fanout.queue1
   @Bean
   public Queue fanoutQueue1(){
      return new Queue("fanout.queue1");
   }

   // 绑定队列1到交换机
   @Bean
   public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
      return BindingBuilder
            .bind(fanoutQueue1)
            .to(fanoutExchange);
   }

   // fanout.queue2
   @Bean
   public Queue fanoutQueue2(){
      return new Queue("fanout.queue2");
   }

   // 绑定队列2到交换机
   @Bean
   public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
      return BindingBuilder
            .bind(fanoutQueue2)
            .to(fanoutExchange);
   }
}

 步骤2:在consumer服务声明两个消费者

@Component
public class SpringListener {
   @RabbitListener(queues = "fanout.queue1")
   public void listenFanoutQueue1(String msg) {
      System.out.println("消费者接收到fanout.queue1的消息:【" + msg + "】");
   }
   @RabbitListener(queues = "fanout.queue2")
   public void listenFanoutQueue2(String msg) {
      System.out.println("消费者接收到fanout.queue2的消息:【" + msg + "】");
   }
}

步骤3:在publisher服务发送消息到FanoutExchange

@RunWith(SpringRunner.class) // 开启注解
@SpringBootTest
public class PublisherApplicationTests {
   @Autowired
   private RabbitTemplate rabbitTemplate;

   @Test
   public void testSendFanoutExchange() {
      // 交换机名称
      String exchangeName = "itcast.fanout";
      // 消息
      String message = "hello, every one!";
      // 发送消息
      rabbitTemplate.convertAndSend(exchangeName, "", message);
   }
}

总结:

交换机的作用是什么?

①接收publisher发送的消息

②将消息按照规则路由到绑定的队列

③不能缓存消息,路由失败,消息丢失

④FanoutExchange将消息路由到每个绑定的队列

声明队列,交换机,绑定关系的Bean是什么?

Queue,FanoutExchange,Binding

发布订阅-DirectExchange

DirectExchange会将接收到的消息根据规则路由到指定的Queue,因此称为路由模式

①每一个Queue都与Exchange设置一个BindingKey

②发布者发送消息时,指定消息的RoutingKey

③Exchange将消息路由到BindingKey与消息RoutingKey一致的队列

案例:利用SpringAMQP演示DirectExchange的使用

实现思路:

①利用@RabbitListener声明Exchange,Queue,RoutingKey

②在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2

③在publisher中编写测试方法,向itcast. direct发送消息

步骤1:在consumer服务声明Exchange、Queue

@Component
public class SpringListener {
   @RabbitListener(bindings=@QueueBinding(
         value = @Queue(name = "direct.queue1"),
         exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT),
         key = {"red","blue"}
   ))
   public void listenFanoutQueue1(String msg) {
      System.out.println("消费者接收到fanout.queue1的消息:【" + msg + "】");
   }
   @RabbitListener(bindings=@QueueBinding(
         value = @Queue(name = "direct.queue2"),
         exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT),
         key = {"red","yellow"}
   ))
   public void listenFanoutQueue2(String msg) {
      System.out.println("消费者接收到fanout.queue2的消息:【" + msg + "】");
   }
}

步骤2:在publisher服务发送消息到DirectExchange

@RunWith(SpringRunner.class) // 开启注解
@SpringBootTest
public class PublisherApplicationTests {
   @Autowired
   private RabbitTemplate rabbitTemplate;

   @Test
   public void testSendFanoutExchange() {
      // 交换机名称
      String exchangeName = "itcast.direct";
      // 消息
      String message = "hello, every one!!!";
      // 发送消息
      rabbitTemplate.convertAndSend(exchangeName, "yellow", message);
   }
}

发布订阅-TopicExchange

TopicExchangeDirectExchange类似,区别在于routingKey必须是多个单词的列表,并且以 . 分割。

QueueExchange指定BindingKey时可以使用通配符:

#:代指0个或多个单词

*:代指一个单词

利用SpringAMQP演示TopicExchange的使用,匹配使用

@Component
public class SpringListener {
   @RabbitListener(bindings=@QueueBinding(
         value = @Queue(name = "direct.queue1"),
         exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC),
         key = "china.#"
   ))
   public void listenFanoutQueue1(String msg) {
      System.out.println("消费者接收到fanout.queue1的消息:【" + msg + "】");
   }
   @RabbitListener(bindings=@QueueBinding(
         value = @Queue(name = "direct.queue2"),
         exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC),
         key = "#.news"
   ))
   public void listenFanoutQueue2(String msg) {
      System.out.println("消费者接收到fanout.queue2的消息:【" + msg + "】");
   }
}

描述下Direct交换机与Topic交换机的差异?

  • Topic交换机接收的消息RoutingKey必须是多个单词,以 . 分割
  • Topic交换机与队列绑定时的bindingKey可以指定通配符
  • #:代表0个或多个词
  • *:代表1个词

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1137978.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

计算机网络【CN】TCP报文段格式【20B】

序号&#xff1a;本报文段所发送的数据的第一个字节的序号确认号&#xff1a;期望收到对方下一个报文段的第一个数据字节的序号。 重要控制位&#xff1a; 紧急位URG&#xff1a;URG1时&#xff0c;标明此报文段中有紧急数据&#xff0c;是高优先级的数据&#xff0c;应尽快传送…

卡巴斯基2009杀毒软件

下载地址&#xff1a;https://user.qzone.qq.com/512526231/main https://user.qzone.qq.com/3503787372/main

冯诺依曼体系结构与操作系统

文章目录 1. 冯诺依曼体系结构概念理解冯诺依曼体系结构的优势 2. 操作系统概念设计OS的目的和定位如何理解管理 1. 冯诺依曼体系结构 概念 我们常见的计算机&#xff0c;如笔记本。我们不常见的计算机&#xff0c;如服务器&#xff0c;大部分都遵守冯诺依曼体系。 对于各个部…

java--跳转关键字和随机数

1.跳转关键字 break&#xff1a;跳出并结束当前所在循环的执行 continue&#xff1a;用于跳出当前循环的当次执行&#xff0c;直接进入循环的下一次执行 2.注意事项 break&#xff1a;只能用于结束所在循环&#xff0c;或结束所在switch语句的执行 continue&#xff1a;只能…

工业交换机常用功能有哪些?

工业交换机&#xff0c;又称工业以太网交换机&#xff0c;是一种在OSI参考模型的第二层&#xff08;数据链路层&#xff09;上工作的网络设备。它基于MAC地址识别&#xff0c;并能够封装和转发数据包。那么&#xff0c;工业交换机的常见常用功能有哪些呢&#xff1f; 工业交换…

JavaWeb——关于servlet种mapping地址映射的一些问题

6、Servlet 6.4、Mapping问题 一个Servlet可以指定一个映射路径 <servlet-mapping><servlet-name>hello</servlet-name><url-pattern>/hello</url-pattern> </servlet-mapping>一个Servlet可以指定多个映射路径 <servlet-mapping>&…

Redis和Memcached网络模型详解

1. Redis单线程单Reactor网络模型 1.1 redis单线程里不能执行十分耗时的流程&#xff0c;不然会客户端响应不及时 解决方法一&#xff1a; beforesleep里删除过期键操作若存在大量过期键时&#xff0c;会耗费大量时间&#xff0c;redis采用的策略之一就是采用timelimit方案超过…

2011-2021年“第四期”数字普惠金融与上市公司匹配(根据城市匹配)/上市公司数字普惠金融指数匹配数据

2011-2021年“第四期”数字普惠金融与上市公司匹配&#xff08;根据城市匹配&#xff09;/上市公司数字普惠金融指数匹配数据 1、时间&#xff1a;2011-2021年 指标&#xff1a;指标&#xff1a;股票代码、年份、行政区划代码、行业名称、行业代码、所属省份、所属城市、数字…

澳大利亚专线的清关时效

随着全球贸易的不断发展&#xff0c;越来越多的企业和个人开始选择通过海运、空运等物流方式将货物运送到世界各地。而在这些运输方式中&#xff0c;澳大利亚专线以其快速、高效的特性受到了广大客户的青睐。本文将为您详细介绍澳大利亚专线的清关时效。 一、澳大利亚专线的定义…

Vben开源添加本地路由(不用显示在菜单的路由)

1.在src\router\routes\modules下创建一个article.ts import type { AppRouteModule } from //router/types;import { LAYOUT } from //router/constant; import { t } from //hooks/web/useI18n;export const article: AppRouteModule {path: /article,name: Article,compon…

疫情集中隔离

系列文章目录 进阶的卡莎C++_睡觉觉觉得的博客-CSDN博客数1的个数_睡觉觉觉得的博客-CSDN博客双精度浮点数的输入输出_睡觉觉觉得的博客-CSDN博客足球联赛积分_睡觉觉觉得的博客-CSDN博客大减价(一级)_睡觉觉觉得的博客-CSDN博客小写字母的判断_睡觉觉觉得的博客-CSDN博客纸币(…

数字人成品牌营销流量密码?看数字人花样赋能品牌

在第十二届李曼中国养猪大会暨世界猪业博览会上&#xff0c;嘉吉动物营养推出了首个数字人产品星推官&#xff0c;在现场数字人结合动捕设备&#xff0c;向观众实时介绍品牌的全明星产品&#xff0c;同时与观众进行实时互动&#xff0c;吸引了众多参展观众的驻足&#xff0c;为…

K8s安装部署-----二进制安装部署

目录 前言 一、操作系统初始化配置 二、部署 etcd 集群 1、在master01节点操作 2、在 node01 节点上操作 3、在 node02 节点上操作 4、查看集群状态 二、部署docker引擎 三、部署 Master 组件 1、准备证书 2、准备二进制文件、token 3、启动kube-apiserver服务 4、…

0基础学习PyFlink——用户自定义函数之UDTF

大纲 表值函数完整代码 在《0基础学习PyFlink——用户自定义函数之UDF》中&#xff0c;我们讲解了UDF。本节我们将讲解表值函数——UDTF 表值函数 我们对比下UDF和UDTF def udf(f: Union[Callable, ScalarFunction, Type] None,input_types: Union[List[DataType], DataTy…

macOS Sonoma 14.1正式版发布 改善Apple Music界面 新增保修状态显示

10月26日消息&#xff0c;苹果今天为 macOS Sonoma 推出了 14.1 版本更新&#xff0c;本更新主要改善了 Apple Music 界面&#xff0c;设置中新增保修状态&#xff0c;并修复了多项错误内容。 经过几周的用户测试&#xff0c;Apple 正式向所有 Mac 用户发布了 macOS Sonoma 14.…

【代码随想录01】数组总结

抄去吧&#xff0c;保存去吧&#xff01;

p5.js 3D图形-立方体

本文简介 带尬猴&#xff0c;我嗨德育处主任 前面写了几篇 p5.js 文章 都还没涉及到3D图形&#xff0c;但其实 p5.js 是提供了基础的3D图形的。 本文就从最简单的立方体讲起&#xff0c;并做几个小demo和各位工友一起掌握立方体的用法。 立方体的基础用法 在 p5.js 里使用 b…

智慧矿山AI算法助力煤矿安全:人员越界识别精准迅速

煤矿作为一种危险性较高的工业领域&#xff0c;安全管理一直是煤矿企业的重要任务。传统煤矿安全管理主要依靠人工巡逻及视频监控等手段&#xff0c;但这些方法往往存在人力不足、盲区多等问题&#xff0c;无法实时监控和预警&#xff0c;难以有效避免事故的发生。随着人工智能…

如何在linux服务器上安装Anaconda与pytorch,以及pytorch卸载

如何在linux服务器上安装Anaconda与pytorch&#xff0c;以及pytorch卸载 1&#xff0c;安装anaconda1.1 下载anaconda安装包1.2 安装anaconda1.3 设计环境变量1.4 安装完成验证 2 Anaconda安装pytorch2.1 创建虚拟环境2.2 查看现存环境2.3 激活环境2.4 选择合适的pytorch版本下…

FL Studio水果2023体验版如何破解?

FL Studio是一款非常专业的水果工具&#xff0c;软件功能齐全&#xff0c;拥有编曲、剪辑、录音、混音等功能&#xff0c;可以满足用户的各种音乐制作需求。软件已经成功破解&#xff0c;全中文的软件界面&#xff0c;去除了试用时间限制&#xff0c;有需要的快来下载吧&#x…