MQ基础:RabbitMQ真面目

news2025/1/19 11:16:40

同步调用方式,指的是发送方直接发送给接收方的形式。而这种方式在某些情况下可能出现问题,比如当业务逻辑变得复杂,同步的方式需要等待上一条指令被接收后才会继续,对性能的影响很大。

异步的方式,增加了一个消息代理的角色,可以管理、暂存、转发消息。这样消息提供者只需要把消息发送给消息代理,就能够继续进行下一个业务。而消息代理只需要把消息发送给消息接受者,就完成了它的任务。

异步的方式,有很多优点:

  • 能够解除消息发送者和消息接受者之间的耦合关系,拓展性强
  • 消息发送者无需等待消息是否发送给消息接受者,性能好
  • 当消息接受者出现故障时,只需要消息代理把消息发送给修复好的消息接受者就代表完成
  • 当消息短时间突然增多时,消息代理可以缓存消息,削峰填谷,减少服务器的压力

MQ(MessageQueue),中文是消息队列,就是存放消息的队列,是异步调用中的Broke。

目录

安装RabbitMQ

Java客户端

Work模型

交换机

Fanout

Direct

Topic

声明队列和交换机

消息转换器


安装RabbitMQ

基于docker安装:

把mq.tar镜像拷贝到linux中,再读取:

docker load -i mq.tar 

通过一串命令就可以把RabbitMQ启动起来:

docker run \
  -e RABBITMQ_DEFAULT_USER=root \
  -e RABBITMQ_DEFAULT_PASS=123 \
  -v mq-plugins:/plugins \
  --name mq \
  --hostname mq \
  -p 15672:15672 \
  -p 5672:5672 \
  --network demo_network \
  -d \
  rabbitmq:3.8-management

RabbitMQ整体架构:

  • publisher:消息发送者
  • consumer:消息消费者
  • queue:队列,存储消息
  • exchange:交换机,负责路由消息

在这其中,交换机必须要和队列绑定,才能够把消息路由给队列。

数据隔离

在Rabbitmq的网页中,有Virtual host这一栏。这代表着虚拟主机,不同的虚拟主机,数据是不同的,队列什么的也不通用。

想要创建虚拟主机,只需要在Virtual中新建一个就可以了。这样两个虚拟主机数据是分离开的。

Java客户端

由于Rabbitmq官方提供的API、比较复杂,于是我们使用AMQP来处理MQ。

Spring AMQP是基于AMQP协议定义的一套API规范,提供了板来发送和接收消息。包含两部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现。我们就通过Spring AMQP来完成对于Rabbitmq的应用。

引入AMQP的依赖:

SpringAMQP如何收发消息?

  1. 引入spring-boot-starter-amqp依赖
  2. 配置rabbitmq服务端信息
  3. 利用RabbitTemplate发送消息
  4. 利用@RabbitListener注解声明要监听的队列,监听消息

利用控制台创建队列simole.queue:

在publisher服务中,利用SpringAMQP直接向simple.queue发送消息:

配置好配置文件,

为了方便测试,我们在test中定义好方法:

在consumer服务中,利用SpringAMQP编写消费者,监听simple.queue队列

Work模型

实现一个队列绑定多个消费者:

  1. 在RabbitMQ的控制台创建一个队列,名为work.queue
  2. 在publisher服务中定义测试方法,在1秒内产生50条消息,发送到work.queue
  3. 在consumer服务中定义两个消息监听者,都监听work.queue队列
  4. 消费者1每秒处理50条消息,消费者2每秒处理5条消息

先新建一个队列作为work.queue

此时再在代码中修改两个消息监听者:

默认情况下,多个RabbitListener接收消息是通过轮询的方式,并且分配很均匀。当50条消息分配给两个消费者,会被均匀的分配给这两个消费者。

但是要是消费者1和消费者2的消息处理能力不同,会发生什么呢?

我们让消费者1每隔20ms才接受一次消息,消费者2每隔200ms才接收一次消息:

这显然不符合我们的要求。既然消费者1的性能要好一些,就应该让它多接受一些消息,而不是像现在这样仍然是每个消费者接收25条消息。

消费者消息推送限制
默认情况下,RabbitMQ的会将消息依次轮询投递给绑定在队列上的每一个消费者。但这并没有考虑到消费者是否已经处理完消息,可能出现消息堆积。

因此我们需要修改application.yml,设置preFetch值为1,确保同一时刻最多投递给消费者1条消息:

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1

这样的分配机制,就充分利用了消费者1的性能,实现“能者多劳”。

交换机

生产环境中都会经过exchange交换机,通过交换机路由到队列。交换机的类型有三种:

  • Fanout  广播
  • Direct    定向
  • Topic     话题

Fanout

这个交换机会把消息广播到每一个跟其绑定的queue。

实现思路:

  1. 在RabbitMQ控制台中,声明队列fanout.queue1和fanout.queue2
  2. 在RabbitMQ控制台中,声明交换机fanout,将两个队列与其绑定
  3. 在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2
  4. 在publisher中编写测试方法,向fanout发送消息

测试结果是两个消费者都可以接收到消息

Direct

实际生产环境中,交换机和消费者可能会存在复杂的关系,如果按照Fanout的方式来路由,可能会出现精细度不够的情况。Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此称为定向路由。

  • 每一个Queue都与Exchange设置一个BindingKey
  • 发布者发送消息时,指定消息的RoutingKey
  • Exchange将消息路由到BindingKey与消息RoutingKey一致的队列

跟Fanout大体上一样,需要在Routing key中绑定key:

代码中,在生产者方只需要指定routingkey就完成了。

Topic

TopicExchange与DirectExchange类似,区别在于routingKey可以是多个单词的列表,并且以 . 分割。

并且Topic还支持通配符, # 代表0个或者多个单词, * 代表一个单词。这就意味着Topic可以完成Fanout和Direct的功能,更加精细化的控制交换机。

代码中,也只需要更改routingkey即可完成匹配。

声明队列和交换机

在之前演示的过程中,就已经手动创建了很多个交换机和队列。实际生产环境中,只会比这个更多,所以用代码来创建队列和交换机就显得尤为重要。

SpringAMOP提供了几个类,用来声明队列、交换机及其绑定关系:

  • Queue:用于声明队列,可以用工厂类QueueBuilder构建
  • Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建
  • Binding:用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder构建

fanout是最简单的,如果是direct和topic,都需要在绑定关系中加上routingkey参数:

public Binding directQueue1Binding(Queue directQueue1, DirectExchange directExchange) {
    return BindingBuilder.bind(directQueue1).to(directExchange).with("red");
}

除了这种方式,还可以用注解的方式。基于@RabbitListener注解来声明队列和交换机的方式:

注意,都是在消费者中创建。

消息转换器

当我们发送的消息是一个Map类型时

接收端接收到的是这样子的:

Payload采用的是JDK的序列化方式,明明msg中只有简单的jack,但是却占用了这么大的空间,并且这样的序列化方式还不安全,容易被篡改。

解决方式就是,我们采用Json序列化。

在publisher和consumer中都引入jackson依赖,并且配置MessageConverter:

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
</dependency>
@Bean
public MessageConverter jacksonMessageConverter() {
    return new Jackson2JsonMessageConverter();
}
//消费者生产者都需要配置这个,配置到启动类即可

此处就可以看到消息转换器发挥了作用。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2175892.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

网络编程(12)——完善粘包处理操作(id字段)

十二、day12 之前的粘包处理是基于消息头包含的消息体长度进行对应的切包操作&#xff0c;但并不完整。一般来说&#xff0c;消息头仅包含数据域的长度&#xff0c;但是如果要进行逻辑处理&#xff0c;就需要传递一个id字段表示要处理的消息id&#xff0c;当然可以不在包头传i…

【Godot4.3】简单物理模拟之圆粒子碰撞检测

概述 最近开始研究游戏物理的内容&#xff0c;研究运动、速度、加速度之类的内容。也开始模仿一些简单的粒子模拟。这些都是一些基础、简单且古老的算法&#xff0c;但是对于理解游戏内的物理模拟很有帮助。甚至你可以在js、Python或者其他程序语言中实现它们。 图形的碰撞检…

Linux操作系统中SpringGateway

1、SpringGateway简介 核心功能有三个&#xff1a; 路由&#xff1a;用于设置转发地址的 断言&#xff1a;用来判断真实应该请求什么地址 过滤器&#xff1a;可以过滤地址和处理参数 1、什么是网关 网关是后台服务的统一入口&#xff0c;类似于平时网络里提到的网关。 2…

ppt压缩有什么简单方法?压缩PPT文件的几种方法

ppt压缩有什么简单方法&#xff1f;许多用户常常面临文件过大的问题&#xff0c;尤其在需要通过电子邮件发送或上传至网络平台时&#xff0c;大文件会带来诸多麻烦。此外&#xff0c;较大的文件可能导致软件响应缓慢&#xff0c;从而影响整体的演示体验。因此&#xff0c;寻找有…

ESP8266/01s模块烧录MQTT AT固件篇

&#xff08;代码完美实现&#xff09;stm32 新版 onenet mqtt物联网(保姆级教程) 地址&#xff1a; &#xff08;代码完美实现&#xff09;stm32 新版 onenet mqtt物联网(保姆级教程)https://blog.csdn.net/Wang2869902214/article/details/142501323 乐鑫ESP8266/安信可…

Python项目Flask框架整合Mysql

一、在配置类中编写Mysql配置信息 二、实现Mysql配置类 import pymysql from config.config import MYSQL_HOST, MYSQL_USER, MYSQL_PASSWD, MYSQL_PROT, MYSQL_DB, MYSQL_CHARSETclass MysqlDB():def __init__(self, MYSQL_HOST, MYSQL_USER, MYSQL_PASSWD, MYSQL_PROT, MYS…

time命令:轻松测量Linux命令执行时间!

一、命令简介 用途&#xff1a; 用于测量 Linux 命令执行的时间&#xff0c;包括实际时间、用户 CPU 时间和系统 CPU 时间。刚开始以为是用来“看现在几点钟”的 &#x1f972;。标签&#xff1a; 实用工具&#xff0c;性能分析。 ‍ 二、命令参数 2.1 命令格式 time [选项…

COSCon'24 第九届中国开源年会议题征集正式启动

一年一度的开源盛会&#xff0c;COSCon24 第九届中国开源年会暨开源社十周年嘉年华将于2024年11月2-3日在中关村国家自主创新示范区会议中心举办。在为期2天的大会中&#xff0c;我们将为大家带来精彩纷呈的 Keynote 主题演讲&#xff08;上午&#xff09;&#xff0c;和百花齐…

【初阶数据结构】排序——选择排序

目录 前言选择排序堆排序 前言 对于常见的排序算法有以下几种&#xff1a; 下面这节我们来看选择排序算法。 选择排序 基本思想&#xff1a;   每一次从待排序的数据元素中遍历选出最大&#xff08;或最小&#xff09;的元素放在序列的起始位置&#xff0c;直到全部待排序…

2024前端技术发展概况

当前前端技术呈现出多方面的发展态势&#xff0c;以下是详细介绍&#xff1a; 前端框架不断演进&#xff1a; React&#xff1a;作为流行的前端框架之一&#xff0c;React 依旧保持着强大的生命力。它具有高效的虚拟 DOM 机制&#xff0c;能够快速更新和渲染页面&#xff0c;极…

如何创建一个docker,给它命名,且下次重新打开它

1.创建一个新的docker并同时命名 docker run -it --name one ubuntu:18.04 /bin/bash 这时候我们已经创建了一个docker,并且命名为"one" 2.关闭当前docker exit 3.这时docker已经终止了&#xff0c;我们需要使用它要重新启动 docker start one 4.现在可以重新打…

多线程篇八

多线程篇八 如笔者理解有误欢迎指正交流&#x1f338;&#x1f338;&#x1f338; 线程池 什么是线程池&#xff1f; 顾名思义&#xff0c;线程池是一个存放了很多线程的池子.既然有很多线程&#xff0c;那一定很方便调用对吧&#xff0c;有很多线程那大家一定喜欢一起玩吧&…

【计算机网络】Tcp报文的组成,Tcp是如何实现可靠传输的?

Tcp的报文组成 TCP&#xff08;传输控制协议&#xff09;是计算机网络中一种重要的传输协议&#xff0c;其报文组成包括多个字段&#xff0c;每个字段具有特定的含义。以下是TCP报文头的主要组成部分&#xff1a; 源端口号&#xff08;Source Port&#xff09;&#xff1a;占用…

c语言中例题:打印出杨辉三角

杨辉三角是一个经典的数学模型 从顶部的单个1开始&#xff0c;下面每一行的数字都是其正上方的两个数之和 每行数字左右对称&#xff0c;且由1开始逐渐变大 1 1 1 1 2 1 1 3 3 1 由以上规律可以看出arr[i][j] arr[i-1][j] ar…

Android使用RecyclerView仿美团分类界面

RecyclerView目前来说对大家可能不陌生了。由于在公司的项目中&#xff0c;我们一直用的listview和gridview。某天产品设计仿照美团的分类界面设计了一个界面&#xff0c;我发现用gridview不能实现这样的效果&#xff0c;所以就想到了RecyclerView&#xff0c;确实是一个很好的…

对话总结:Scale AI的创始人兼CEO Alex Wang

AI的三大支柱 计算:主要由大公司如NVIDIA推动。算法:顶尖实验室如OpenAI主导。数据:Scale致力于推动数据进展。前沿数据的重要性 与人类智能相比较,前沿数据是AI发展的关键。互联网数据是机器与人类合作的结果。语言模型的发展 第一阶段:原始的Transformer论文和GPT的小规…

PHP爬虫淘宝商品SKU详细信息获取指南

在电子商务领域&#xff0c;获取商品的SKU&#xff08;Stock Keeping Unit&#xff0c;库存单位&#xff09;详细信息对于商家进行库存管理、订单处理和客户服务至关重要。淘宝作为中国最大的电商平台之一&#xff0c;提供了丰富的API接口&#xff0c;使得开发者能够通过PHP爬虫…

AI在教育行业应用的启发和未来的方向

大家好&#xff0c;我是Shelly&#xff0c;一个专注于输出AI工具和科技前沿内容的AI应用教练&#xff0c;体验过300款以上的AI应用工具。关注科技及大模型领域对社会的影响10年。关注我一起驾驭AI工具&#xff0c;拥抱AI时代的到来。 shelly已经给大家分享了很多AI的工具&#…

ThinkPHP一对多的关联模型运用

一、序言 最近在写ThinkPHP关联模型的时候一些用法总忘&#xff0c;我就想通过写博客的方式复习和整理下一些用法。 具体版本&#xff1a; topthink/framework&#xff1a;6.1.4topthink/think-orm&#xff1a;2.0.61 二、实例应用 1、一对多的关联 本文案例&#xff1a;一个用…

MySQL - 单表查询

DQL (数据查询语言)是用来查询数据库表中的记录的操作。在实际的业务系统中&#xff0c;查询操作的频率远远高于增删改。常见的查询操作包括条件查询、排序、分组等。 1. DQL 语法 SELECT 字段列表 FROM 表名列表 [WHERE 条件列表] [GROUP BY 分组字段] [HAVING 分组后条件]…