MQ,RabbitMQ,SpringAMQP的原理与实操

news2025/1/16 3:51:08

MQ

同步通信

image-20240202103233412

image-20240202105021949

image-20240202105123170

异步通信

image-20240202111930461

事件驱动优势:

  • 服务解耦

  • 性能提升,吞吐量提高

    image-20240202141023030

  • 服务没有强依赖,不担心级联失败问题

    image-20240202141137606

  • 流量消峰

    image-20240202141435355

​ 小结: 大多情况对时效性要求较高,所有大多数时间用同步。而如果不需要对方的结果,且吞吐量,并发量较高则需要使用异步通信

image-20240202141921703

MQ常见框架

MQ(MessageQueue),消息队列,字面来看就是存放消息的队列,也就是事件驱动架构中的Broker

消息:就是事件,比如支付成功了这个事件,在MQ中就是一个消息

image-20240202144211395

RabbitMQ,RocketMQ 适合处理业务(若需要优化定制则选Rocket,因为用Java写的)

Kafka 适合处理日志(海量数据且对数据安全性要求不高的场景),ActiveMQ用的较少

RabbitMQ

RabbitMQ概述与安装

RabbitMQ是基于Erlang语言(面向并发的语言,天生为分布式系统而设计的)开发的开源消息通信中间件,官网地址:https://www.rabbitmq.com/

参考课前资料(链接:https://pan.baidu.com/s/1JuVKKFpUXg8TFxa_FoV3Gg
提取码:1468) 来安装RabbitMQ

image-20240202144811905

之后在浏览器输入:http://192.168.83.130:15672/ 进入RabbitMQ管理页面,按docker run中设置的账号密码进行登录

结果如下

image-20240204101726227

mq整体架构

image-20240204103735587

小结

image-20240204103835121

常见消息模型

image-20240204105108372

HelloWorld 案例

image-20240204105310538

动手实践

案例: 完成官方Demo中的hello world案例(链接:https://pan.baidu.com/s/1JuVKKFpUXg8TFxa_FoV3Gg
提取码:1468)

image-20240204105416259

打开项目,将ip调成自己的rabbitmq使用虚拟机(或电脑)的ip,再运行一次PublisherTest中的 testSendMessage() 方法

发送一条消息。再运行ConsumerTest 中main方法来接收消息。

image-20240204112803673

小结

image-20240204135103572

SpringAMQP

AMOP(Advanced Message Queuing Protocol)高级消息队列协议,大大简化消息发送和接收的代码量,且与语言无关

SpringAmqp的官方地址:https://spring.io/projects/spring-amqp

image-20240204145954201

image-20240204140927498

AMQP依赖

<!--AMQP依赖,包含RabbitMQ-->
<dependency>    
	<groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

在配置文件中添加mq连接信息

spring:
  rabbitmq:
   	host: 192.168.83.130 # 主机名
    port: 5672 # 端口
    virtual-host: / # 虚拟主机 
    username: itcast # 用户名
    password: 123321 # 密码

Basic Queue 简单队列模型

案例:利用SpringAMQP实现HelloWorld中的基础消息队列功能

流程如下:

1.在父工程中引入spring-amqp的依赖,以及在publisher服务中编写配置

2.在publisher服务中利用RabbitTemplate的convertAndSend方法,发送消息到simple.queue这个队列

image-20240204145734357

SpringAMQP发送消息步骤:引入依赖和设置配置---->利用RabbitTemplate的convertAndSend方法

3.在consumer中编写代码,接收消息

image-20240204151638720

SpringAMQP接收消息步骤:引入依赖和设置配置—》定义类,添加Component注解,类中声明方法添加@RabbitListener注解

Work Queue 工作队列模型

Work queue,工作队列,可以提高消息处理速度,避免队列消息堆积

比如队列 一秒来50条消息 一个消费者一秒处理40条消息,那么需要两个消费者才能使得队列中消息被处理不丢失

image-20240204153355750

案例:实现一个队列绑定多个消费者

image-20240204153947098

问题:rabbitMQ消息预取,会将50条消息平均分给消费者1和消费者2,但消费者2处理速度慢,因此在1s内处理不完publisher发过来的50条消息

解决方案:让能者多劳,设置preFetch,控制预取消息的上限

image-20240204160513742

小结image-20240204161439493

发布、订阅模型-Fanout

image-20240204161952605

注意:exchange负责消息路由,而不是存储(queue负责存储),路由失败则消息丢失

Fanout Exchange 会将接收到的消息路由到每一个跟其绑定的queue(广播)

案例:利用SpringAMQP演示FanoutExchange的使用

image-20240204163804072

step1 在consumer服务中声明Exchange、Queue、Binding(绑定关系)

image-20240204163828992

image-20240204164305716

step2 在consumer服务声明两个消费者

在consumer服务的SpringRabbitListener类中,添加两个方法,分别监听fanout.queue1和fanout.queue2:

@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
	System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}

@RabbitListener(queues = "fanout.queue2") 
public void listenFanoutQueue2(String msg) {
	System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}

step3 在publisher服务发送消息到FanoutExchange

在publisher服务的SpringAmqpTest类中添加测试方法:

@Test
public void testFanoutExchange() {
	// 队列名称  
	String exchangeName = "itcast.fanout"; 
	// 消息
	String message = "hello, everyone!";
	// 发送消息,参数分别是:交互机名称、RoutingKey(暂时为空)、消息		
    rabbitTemplate.convertAndSend(exchangeName, "", message);
}

小结

image-20240205092228233

发布、订阅模型-Direct

image-20240205092356181

案例:利用SpringAMQP演示DirectExchange的使用

image-20240205092544599

步骤一 在consumer服务声明Exchange、Queue

1.在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2,

2.并利用@RabbitListener声明Exchange、Queue、RoutingKey

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "direct.queue1"),
        exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
        key = {"red","blue"}
))
public void listenDirectQueue1(String msg){
    System.out.println("消费者1........接收到路由消息:【" + msg + "】" + LocalTime.now());
}


@RabbitListener(bindings = @QueueBinding(
       value = @Queue(name = "direct.queue2"),
       exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
        key = {"red","yellow"}
))
public void listenDirectQueue2(String msg){
    System.out.println("消费者2........接收到消路由息:【" + msg + "】" + LocalTime.now());
}

步骤二 在publisher服务发送消息到DirectExchange

在publisher服务的SpringAmqpTest类中添加测试方法:

@Test
public void testDirectExchange() {
    //交换机名字
    String exchangeName = "itcast.direct";
    //消息
    String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";
    //发送消息,参数依次为:交换机名称,RoutingKey,消息
    rabbitTemplate.convertAndSend(exchangeName, "blue", message);
}

从blue->yellow->red 运行三次,得到结果如下

image-20240205104021565

小结

image-20240205104321850

发布、订阅模型-Topic

image-20240205104559605

案例 利用SpringAMQP演示TopicExchange的使用

image-20240205104825731

步骤一:在consumer服务声明Exchange、Queue

1.在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2,

2.并利用@RabbitListener声明Exchange、Queue、RoutingKey

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue1"),
    exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
    key = "china.#"
))
public void listenTopicQueue1(String msg){
    System.out.println("消费者1........接收到路由消息:【" + msg + "】" + LocalTime.now());
}


@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue2"),
    exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
    key = "#.news"
))
public void listenTopicQueue2(String msg){
    System.out.println("消费者2........接收到消路由息:【" + msg + "】" + LocalTime.now());
}

步骤二:在publisher服务发送消息到TopicExchange

在publisher服务的SpringAmqpTest类中添加测试方法:

@Test
public void testTopicExchange() {
    //交换机名字
    String exchangeName = "itcast.topic";
    //消息
    String message = "喜报!孙悟空大战哥斯拉,胜!";
    //发送消息,参数依次为:交换机名称,RoutingKey,消息
    rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}

小结

image-20240205105655795

消息转化器

Spring的对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理的。而默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化。如果要修改只需要定义一个MessageConverter 类型的Bean即可。推荐用JSON方式序列化,步骤如下:

​ 在publisher服务引入依赖

<dependency>   
	<groupId>com.fasterxml.jackson.core</groupId>   
	<artifactId>jackson-databind</artifactId>
</dependency>

​ 在publisher服务声明MessageConverter。(原本应该放到配置类中,但启动类也是配置类,所以可以放启动类中)

@Bean
public MessageConverter jsonMessageConverter(){
	return new Jackson2JsonMessageConverter(); 
}

image-20240205111950238

案例 测试发送Object类型消息

image-20240205111336946

结果如下(没有更改JDK序列化方式)

image-20240205111231469

使用json序列化器之后

image-20240205111303797

consumer接收消息过程

step1:加jackson依赖,依赖上面已经放父工程中,就不用做了

step2: 将pulisher中相同的MessageConverter放入consumer 启动类中(发送方与接收方必须相同)

@Bean
public MessageConverter jsonMessageConverter(){
	return new Jackson2JsonMessageConverter(); 
}

step3: 定义一个消费者,监听object.queue队列并消费消息

 @RabbitListener(queues = "object.queue")
public void listenObjectQueue(Map<String,Object> msg){
    System.out.println("消费者........接收到对象消息:【" + msg + "】" + LocalTime.now());
}

image-20240205135854654

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1435118.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2024美赛数学建模E题:房产保险的可持续性,思路全解,代码模型分析

2024美赛数学建模E题思路全解&#xff0c;代码模型分析,完整详细内容见文末名片 添加图片注释&#xff0c;不超过 140 字&#xff08;可选&#xff09; 保险公司应该在承保保单时考虑多种因素&#xff0c;以确保公司的长期健康和稳定性。以下是一个可能的模式&#xff0c;以确…

C# Socket通信从入门到精通(21)——Tcp客户端判断与服务器断开连接的三种方法以及C#代码实现

前言 我们开发的tcp客户端程序在连接服务器以后,经常会遇到服务器已经关闭但是作为客户端的我们不知道,这时候应该应该有一个机制我们可以实时监测客户端和服务器已经断开连接,如果已经断开了连接,我们应该及时报警提示用户客户端和服务器已经断开连接,本文介绍三种可以监…

力扣面试题 05.03. 翻转数位(前、后缀和)

Problem: 面试题 05.03. 翻转数位 文章目录 题目描述思路及解法复杂度Code 题目描述 思路及解法 1.将十进制数转换为二进制数&#xff08;每次按位与1求与&#xff0c;并且右移&#xff09;&#xff1b; 2.依次求取二进制数中每一位的前缀1的数量和&#xff0c;和后缀1的数量和…

zabbix配置主动监控

1.准备一台新的主机&#xff0c;安装相关软件包。 [rootsishi ~]# rpm -Uvh https://repo.zabbix.com/zabbix/5.0/rhel/7/x86_64/zabbix-release-5.0-1.el7.noarch.rpm [rootsishi ~]# yum -y install zabbix-agent2.修改zabbix-agent端的配置文件 [rootsishi ~]# vim /etc/z…

【并发编程】原子累加器

&#x1f4dd;个人主页&#xff1a;五敷有你 &#x1f525;系列专栏&#xff1a;并发编程 ⛺️稳重求进&#xff0c;晒太阳 JDK8之后有专门做累加的类&#xff0c;效率比自己做快数倍以上 累加器性能比较 参数是方法 // supplier 提供者 无中生有 ()->结果// func…

3.0 Zookeeper linux 服务端集群搭建步骤

本章节将示范三台 zookeeper 服务端集群搭建步骤。 所需准备工作&#xff0c;创建三台虚拟机环境并安装好 java 开发工具包 JDK&#xff0c;可以使用 VM 或者 vagrantvirtualbox 搭建 centos/ubuntu 环境&#xff0c;本案例基于宿主机 windows10 系统同时使用 vagrantvirtualb…

负重20kg复合翼垂直起降无人机应用,复合翼无人机技术分析

主要任务应用 1.管线巡查 挂载可见光/红外二合一光电载荷和小型SAR设备&#xff0c;对既定线路进行昼夜巡视侦察&#xff0c;利用图像实时传回指挥控制中心&#xff0c;可用于石油管路、电力线路、舰艇航线及周围态势感知&#xff0c;利于依据现场实情进行战略决策和指令传达…

文心一言 VS 讯飞星火 VS chatgpt (195)-- 算法导论14.3 3题

三、用go语言&#xff0c;请给出一个有效的算法&#xff0c;对一个给定的区间i&#xff0c;返回一个与i重叠且具有最小低端点的区间;或者当这样的区间不存在时返回T.nil。 文心一言&#xff0c;去掉math&#xff0c;代码正常运行&#xff1a; 在Go语言中&#xff0c;你可以使…

栈--数据结构

初始化栈、入栈、出栈 栈&#xff1a;只允许在一端进行插入或删除操作的线性表 栈顶&#xff08;Top&#xff09; 代码实战步骤依次是初始化栈&#xff0c;判断栈是否为空&#xff0c;压栈&#xff0c;获取栈顶元素&#xff0c;弹栈。 代码 #include <stdio.h>#define …

Linux系统c/c++开发环境配置

安装LLVM全家桶及CMAKE 输入以下命令&#xff0c;安装clang&#xff0c;clangd&#xff0c;lldb及cmake。 sudo apt install clang clangd lldb cmake yukeyangDESKTOP-QFK2F47:~/myfiles/test$ sudo apt install clang clangd lldb cmake [sudo] password for yukeyang: Re…

re:从0开始的CSS学习之路 1. CSS语法规则

0. 写在前面 现在大模型卷的飞起&#xff0c;感觉做页面的活可能以后就不需要人来做了&#xff0c;不知道现在还有没有学前端的必要。。。 1. HTML和CSS结合的三种方式 在HTML中&#xff0c;我们强调HTML并不关心显示样式&#xff0c;样式是CSS的工作&#xff0c;现在就轮到C…

如何在Linux中安装新版的Python软件

一、引言 Python是目前世界上最为流行的编程语言&#xff0c;其在人工智能领域表现尤为出色。通常&#xff0c;我们为了测试github上面的一些项目&#xff0c;比如&#xff1a;chat-on-wechat&#xff0c; 我们就可以在vps上的Linux系统中安装Python&#xff0c;从而实现各种人…

聚观早报 | iOS 17.4正式版将上线;魅族21 Pro或下月发布

聚观早报每日整理最值得关注的行业重点事件&#xff0c;帮助大家及时了解最新行业动态&#xff0c;每日读报&#xff0c;就读聚观365资讯简报。 整理丨Cutie 2月5日消息 iOS 17.4正式版将上线 魅族21 Pro或下月发布 小米MIX Flip细节曝光 OPPO Find X7 Ultra卫星通信版 …

相机图像质量研究(3)图像质量测试介绍

系列文章目录 相机图像质量研究(1)Camera成像流程介绍 相机图像质量研究(2)ISP专用平台调优介绍 相机图像质量研究(3)图像质量测试介绍 相机图像质量研究(4)常见问题总结&#xff1a;光学结构对成像的影响--焦距 相机图像质量研究(5)常见问题总结&#xff1a;光学结构对成…

基于BiLSTM-CRF模型的分词、词性标注、信息抽取任务的详解,侧重模型推导细化以及LAC分词实践

基于BiLSTM-CRF模型的分词、词性标注、信息抽取任务的详解,侧重模型推导细化以及LAC分词实践 1.GRU简介 GRU(Gate Recurrent Unit)门控循环单元,是[循环神经网络](RNN)的变种种,与 LSTM 类似通过门控单元解决 RNN 中不能长期记忆和反向传播中的梯度等问题。与 LSTM 相…

Android用setRectToRect实现Bitmap基于Matrix矩阵scale缩放RectF动画,Kotlin(一)

Android用setRectToRect实现Bitmap基于Matrix矩阵scale缩放RectF动画&#xff0c;Kotlin&#xff08;一&#xff09; 基于Matrix&#xff0c;控制Bitmap的setRectToRect的目标RectF的宽高。从很小的宽高开始&#xff0c;不断迭代增加setRectToRect的目标RectF的宽高&#xff0c…

python进行批量搜索匹配替换文本文字的matlab操作实例

在进行一些数据处理时&#xff0c;可能需要抓取原文中的一些内容&#xff0c;批量替换原文另外的一些内容&#xff0c;而且事先还需要一步搜索匹配的步骤。 举个例子&#xff0c;如下matlab输出的txt文件&#xff0c;原文件有几万行数据&#xff0c;这里只摘取3行对应的 文件文…

react 之 react.forwardRef

react.forwardRef使用ref暴露DOM节点给父组件 1.使用场景 import { forwardRef, useRef } from "react"// 子组件 // function Son () { // return <input type"text" /> // }const Son forwardRef((props, ref) > {return <input type&qu…

Golang与Erlang有什么差异

Golang和Erlang是两种备受关注的编程语言&#xff0c;它们各自具有独特的特点和优势。下面我将简单的探讨一下Golang和Erlang之间的差异&#xff0c;并且分析它们在并发模型、运行环境、函数式编程和领域特性等多个方面的不同之处。 并发模型 Golang使用goroutines和channels…

分布式延时消息的另外一种选择 Redisson (推荐使用)

前言 目录 前言 基本使用 内部数据结构介绍 基本流程 发送延时消息 获取延时消息 初始化延时队列 总结 因为工作中需要用到分布式的延时队列&#xff0c;调研了一段时间&#xff0c;选择使用 Redisson DelayedQueue&#xff0c;为了搞清楚内部运行流程&#xff0c;特记…