MQ基础知识

news2025/1/10 20:34:12

MQ基础

1.认识MQ

同步调用

我们现在基于OpenFeign的调用都属于是同步调用,那么这种方式存在哪些问题呢?

支付业务执行流程是这样的:

  1. 支付服务需要先调用用户服务完成余额扣减
  2. 然后支付服务自己要更新支付流水单的状态
  3. 然后支付服务调用交易服务,更新业务订单状态为已支付

在第二步时,业务就完成了,但是后面扩展业务如果失败对这个整个业务就会回滚

异步调用

异步调用方式其实就是基于消息通知的方式,一般包含三个角色:

  • 消息发送者:投递消息的人,就是原来的调用方
  • 消息Broker:管理、暂存、转发消息,你可以把它理解成微信服务器
  • 消息接收者:接收和处理消息的人,就是原来的服务提供方

在异步调用中,发送者不再直接同步调用接收者的业务接口,而是发送一条消息投递给消息Broker。然后接收者根据自己的需求从消息Broker那里订阅消息。每当发送方发送消息后,接受者都能获取消息并处理。即使后面业务失败或者扩展业务就会很方便

技术选型

消息Broker,目前常见的实现方案就是消息队列(MessageQueue),简称为MQ.

目比较常见的MQ实现:

  • ActiveMQ
  • RabbitMQ
  • RocketMQ
  • Kafka

几种常见MQ的对比:

RabbitMQActiveMQRocketMQKafka
公司/社区RabbitApache阿里Apache
开发语言ErlangJavaJavaScala&Java
协议支持AMQP,XMPP,SMTP,STOMPOpenWire,STOMP,REST,XMPP,AMQP自定义协议自定义协议
可用性一般
单机吞吐量一般非常高
消息延迟微秒级毫秒级毫秒级毫秒以内
消息可靠性一般一般

2.RabbitMQ

RabbitMQ是基于Erlang语言开发的开源消息通信中间件,官网地址:

https://www.rabbitmq.com/

安装

基于docker

docker run \
 -e RABBITMQ_DEFAULT_USER=itheima \
 -e RABBITMQ_DEFAULT_PASS=123321 \
 -v mq-plugins:/plugins \
 --name mq \
 --hostname mq \
 -p 15672:15672 \
 -p 5672:5672 \
 --network hm-net\
 -d \
 rabbitmq:3.8-management

可以看到在安装命令中有两个映射的端口:

  • 15672:RabbitMQ提供的管理控制台的端口
  • 5672:RabbitMQ的消息发送处理接口

安装完成后,我们访问 http://192.168.150.101:15672即可看到管理控制台。首次访问需要登录,默认的用户名和密码在配置文件中已经指定了。

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

其中包含几个概念:

  • publisher:生产者,也就是发送消息的一方
  • consumer:消费者,也就是消费消息的一方
  • queue:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理
  • exchange:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。
  • virtual host:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue

数据隔离

用户管理

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

这里的用户都是RabbitMQ的管理或运维人员。目前只有安装RabbitMQ时添加的itheima这个用户。仔细观察用户表格中的字段,如下:

  • Nameitheima,也就是用户名
  • Tagsadministrator,说明itheima用户是超级管理员,拥有所有权限
  • Can access virtual host/,可以访问的virtual host,这里的/是默认的virtual host

对于小型企业而言,出于成本考虑,我们通常只会搭建一套MQ集群,公司内的多个不同项目同时使用。这个时候为了避免互相干扰, 我们会利用virtual host的隔离特性,将不同项目隔离。一般会做两件事情:

  • 给每个项目创建独立的运维账号,将管理权限分离。
  • 给每个项目创建不同的virtual host,将每个项目的数据隔离
virtual Host

类似于容器的确概念

3.SpringAMQP

快速入门+最佳实践

依赖

  <!--AMQP依赖,包含RabbitMQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

配置文件

spring:
  rabbitmq:
    host: 192.168.150.101 # 你的虚拟机IP
    port: 5672 # 端口
    virtual-host: /hmall # 虚拟主机
    username: hmall # 用户名
    password: 123 # 密码

WorkQueues模型

Work queues,任务模型。简单来说就是多个消费者绑定到一个队列,共同消费队列中的消息。此时就可以使用work 模型,多个消费者共同处理消息处理,消息处理的速度就能大大提高了。

实现能者多劳

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

交换机类型

Fanout交换机

每个队列绑定的这个交换机都会受到消息(默认的交换机是一条消息一个队列处理)

Direct交换机

但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。

在Direct模型下:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
  • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息
Topic交换机

Topic类型的ExchangeDirect相比,都是可以根据RoutingKey把消息路由到不同的队列。

只不过Topic类型Exchange可以让队列在绑定BindingKey 的时候使用通配符!

通配符规则:

  • #:匹配一个或多个词
  • *:匹配不多不少恰好1个词

举例:

  • item.#:能够匹配item.spu.insert 或者 item.spu
  • item.*:只能匹配item.spu

声明队列和交换机

代码实现

在consumer中创建一个类,声明队列和交换机:

Fanout举例:

package com.itheima.consumer.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutConfig {
    /**
     * 声明交换机
     * @return Fanout类型交换机
     */
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("hmall.fanout");
    }

    /**
     * 第1个队列
     */
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout.queue1");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }

    /**
     * 第2个队列
     */
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout.queue2");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}
注解实现
@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue1"),
    exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
    key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
    System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue2"),
    exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
    key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
    System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}
消息发送
@Test
public void testSendDirectExchange() {
    // 交换机名称
    String exchangeName = "hmall.direct";
    // 消息
    String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";
    // 发送消息
    rabbitTemplate.convertAndSend(exchangeName, "red", message);
}
消息接收

我们在consumer服务中定义一个新的消费者,publisher是用Map发送,那么消费者也一定要用Map接收

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue1"),
    exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
    key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
    System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}

消息转换器

默认是JDK序列化方式,我们希望消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。

publisherconsumer两个服务中都引入依赖:

<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-xml</artifactId>
    <version>2.9.10</version>
</dependency>

注意,如果项目中引入了spring-boot-starter-web依赖,则无需再次引入Jackson依赖。

配置消息转换器,在publisherconsumer两个服务的启动类中添加一个Bean即可

@Bean
public MessageConverter messageConverter(){
    // 1.定义消息转换器
    Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
    // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
    jackson2JsonMessageConverter.setCreateMessageIds(true);
    return jackson2JsonMessageConverter;
}

blic MessageConverter messageConverter(){
// 1.定义消息转换器
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
jackson2JsonMessageConverter.setCreateMessageIds(true);
return jackson2JsonMessageConverter;
}


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2131652.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

信息安全数学基础(10)素数定理

前言 信息安全数学基础中的素数定理&#xff08;Prime Number Theorem&#xff09;是数论中一个非常重要的定理&#xff0c;它给出了小于或等于某个正整数x的素数的近似数量。这个定理在密码学、信息安全等领域有着广泛的应用&#xff0c;尤其是在设计加密算法时&#xff0c;对…

C++ —— 关于string类

目录 1. auto和范围for 1.1 auto关键字 1.2 范围for 2. string的三种遍历方式 3. string类的常用接口说明 3.1 成员函数 3.2 Iterators:&#xff08;迭代器&#xff09; 3.2.1正向迭代器和反向迭代器 3.3 Capacity&#xff08;容量&#xff09; 3.4 Modifiers&#x…

大模型微调十诫:关于将微调模型部署到生产环境的十条建议

转自;NLP工程化 大模型微调十诫&#xff1a;关于将微调模型部署到生产环境的十条建议&#xff1a; &#xff08;1&#xff09;不要盲目微调模型&#xff0c;先尝试使用提示的方式满足需求。只有当提示无法达到质量、性能或成本目标时&#xff0c;才考虑微调。 &#xff08;2…

ubuntu20.04 GLIBC从2.35降级到2.31

ubuntu20.04默认的GLIBC版本是2.31&#xff0c;因为某些库的依赖问题&#xff0c;脑子一抽把GLIBC升级到2.35&#xff0c;GLIBC升级参考一下另外一位博主的文章Ubuntu20.04更新GLIBC到2.35版本_glibc-2.35-CSDN博客 但当我想把GLIBC回退到2.31版本&#xff0c;参考网上的办法&a…

浅谈基于负荷时空均衡和弹性响应的电动汽车快充电价定价策略

摘要&#xff1a;为了引导电动汽车有序充电&#xff0c;提出了一种考虑负荷时空均衡和弹性响应的电动汽车快充电价定价策略。引入交通流理论描述交通路网&#xff0c;建立电动汽车快充负荷时空分布模型&#xff1b;考虑配电网调度和电动汽车快充负荷的弹性需求&#xff0c;构建…

【Python】从基础到进阶(七):深入理解Python中的异常处理与调试技巧

&#x1f525; 个人主页&#xff1a;空白诗 文章目录 一、引言二、异常处理概述1. 什么是异常&#xff1f;2. 异常的捕获与处理 三、常见的异常类型四、自定义异常五、调试与日志记录1. 使用assert进行调试2. 使用日志记录 六、案例&#xff1a;文件操作与异常处理1. 需求分析2…

【经验技巧】瞬态信号仿真中的码型选择问题

工程师在进行通道信号仿真时&#xff0c;经常会遇到信号码型选择的问题&#xff0c;通常的码型选择有两种&#xff1a;连续周期变化、随机变化&#xff0c;那么&#xff0c;不同的码型会对结果产生截然不同的影响&#xff0c;以设计中一路差分通道为例&#xff0c;搭载信号传输…

51.【C语言】字符函数和字符串函数(strcpy函数)

承接50.【C语言】字符函数和字符串函数(上) 点我跳转 5.strcpy函数 *简单使用 cplusplus的介绍 点我跳转 strcpy:string copy 翻译: 复制字符串 复制由source指向的C字符串到由destionation指向的数组中,包括\0(终止0字符)(在\0那里停止复制) 为了防止溢出,由destionation指…

深入内核分析BindException异常原因

一、前言 前段时间公司内的站点发布时经常遇到Tomcat使用的8080端口被占用&#xff0c;导致启动报错BindException的情况。笔者参与了该问题的排查和修复&#xff0c;本文将深入Tomcat、OpenJDK、Linux内核等源码为大家讲解问题的原因以及排查过程。 报错信息 Caused by: java…

收到了大厂中秋礼盒,哪家赢了?

大家好&#xff0c;我是鸭鸭&#xff01; 中秋节越来越近啦&#xff0c;大家都收到放假通知和中秋月饼了吗&#xff1f; 各大互联网品牌大厂的中秋创意礼盒也来啦&#xff01; 字节 今年字节的中秋礼盒&#xff0c;除了广州酒家的月饼之外&#xff0c;还发了一床2m*2.3m的四…

INDEMIND:扫地机器人,仍然不够“香”

不仅需要“新花样”&#xff0c;还要搞好“基本功”。 行业祛魅&#xff0c;重啃技术战 正如所有人都知道市场会发生变化&#xff0c;但扫地机器人的陡然降温还是给大多数人上了一课。尽管到了2023年&#xff0c;市场有所复苏&#xff0c;但零售量的增长也仅为4%。一时间&…

医药|基于springboot的医药管理系统设计与实现(附项目源码+论文+数据库)

私信或留言即免费送开题报告和任务书&#xff08;可指定任意题目&#xff09; 目录 一、摘要 二、相关技术 三、系统设计 四、数据库设计 五、核心代码 六、论文参考 七、源码获取 一、摘要 计算机网络发展到现在已经好几十年了&#xff0c;在理论上面已…

基于vue框架的宠物管理平台的设计与实现f3193(程序+源码+数据库+调试部署+开发环境)系统界面在最后面。

系统程序文件列表 项目功能&#xff1a;用户,医院简介,养宠知识,宠物分类,医生,预约医生,医嘱记录,宠物用品,用品分类,购买记录,供应商,宠物信息 开题报告内容 基于Vue框架的宠物管理平台的设计与实现开题报告 一、引言 随着宠物经济的兴起&#xff0c;宠物管理成为了一个日…

Win11+Ubuntu20.04双系统安装教程(避坑版)

Win11Ubuntu20.04双系统安装教程&#xff08;避坑版&#xff09; 前言系统盘制作安装Rufus系统盘制作 Windows磁盘配置移动分区&#xff08;磁盘分区时出现不连续的未分配空间需要用到&#xff0c;如果是连续的未分配空间即无需操作&#xff09;安装分区助手移动分区 安装Ubunt…

Redis的IO模型

Redis IO模型 Redis IO模型 使用的是基于 Reactor 模式的 I/O 多路复用模型。这个模型通过单线程事件循环来处理所有的客户端请求和响应。 基本模式 1. Reactor 模式 Reactor 模式是一种用于处理并发 I/O 操作的设计模式。它包含以下几个组件&#xff1a; 多路复用器&…

构建高效入学审核系统:Spring Boot解决方案

1系统概述 1.1 研究背景 随着计算机技术的发展以及计算机网络的逐渐普及&#xff0c;互联网成为人们查找信息的重要场所&#xff0c;二十一世纪是信息的时代&#xff0c;所以信息的管理显得特别重要。因此&#xff0c;使用计算机来管理大学生入学审核系统的相关信息成为必然。开…

redis常见的数据类型?

参考&#xff1a;一文读懂Redis五种数据类型及应用场景 - 知乎 (zhihu.com) String 类型 String 类型&#xff1a;Redis 最基本的数据类型&#xff0c;它是二进制安全的&#xff0c;意味着你可以用它来存储任何类型的数据&#xff0c;如图片、序列化对象等。使用场景&#xff…

OceanBase 运维管理工具 OCP 4.x 升级:聚焦高可用、易用性及可观测性

可视化的管控平台&#xff0c;对 OceanBase 这类的分布式数据库及大规模数据的运维管理来说&#xff0c;是提升运维效率与数据库管理水平的重要工具。OceanBase 运维管理工具 OCP 作为专为OceanBase数据库设计的企业级全生命周期管理平台&#xff0c;为用户提供了全面的数据库可…

句子成分——每日一划(六)

顺手简答一划&#xff1a;And&#xff1a;连词 you&#xff1a;主语 my friend&#xff1a;插入语 you&#xff1a;对主语起强调作用 are&#xff1a;系动词 the real hero&#xff1a;表语 目录 一、原句 二、独立成分&#xff0c;状语(Adverbial Phrase) 三、条件状语从…

Leetcode面试经典150题-82.删除排序链表中的重复元素II

之前写过这个题的基础第83题&#xff0c;看本文之前一定要先看懂这个Leetcode面试经典150题-82.删除排序链表中的重复元素II前序-83.删除排序链表中的重复元素_删除链表中重复的元素-CSDN博客 直接上代码了&#xff0c;解法都在代码里&#xff0c;不懂就留言或者私信 /*** De…