SpringCloud学习路线(9)——服务异步通讯RabbitMQ

news2024/11/24 5:58:53

一、初见MQ

(一)什么是MQ?

MQ(MessageQueue),意思是消息队列,也就是事件驱动架构中的Broker。

(二)同步调用

1、概念: 同步调用是指,某一服务需要多个服务共同参与,但多个服务之间有一定的执行顺序,当每一个服务都需要等待前面一个服务完成才能继续执行。

2、存在的问题

  • 耦合度高: 新需求需要改动原代码
  • 性能下降: 调用者需要等待服务提供者相应,如果调用链过长则响应时间等于每次调用的时间之和。
  • 资源浪费: 调用链的每个服务在等待响应过程中,不会释放请求资源,高并发场景下会浪费系统资源。
  • 级联失败: 若服务提供者出现宕机,所有调用者都会因故障而导致整个服务集群故障。

(三)异步调用

1、实现模式: 异步调用常见实现的就是事件驱动模式。

2、事件驱动的优势

  • 服务解耦: 只需要将请求交付给事件管理器进行管理即可完成服务。
  • 性能提升: 与客户交互的服务短时间就能完成,并不需要等待后续服务完成。
  • 服务弱依赖: 其它服务宕机不影响服务集群的使用
  • 流量缓冲: 事件管理器通过任务队列的方式,使得订阅的服务按照自身速度进行执行。

3、事件驱动的缺点

  • 高度依赖Broker的可靠性、安全性、吞吐能力
  • 架构复杂时,业务没有明显的流程线,不便于跟踪管理

(四)MQ常见框架

RabbitMQ(中小企业)ActiveMQRocketMQ(大型企业)Kafka
公司/社区RabbitApacheAlibabaApache
开发语言ErlangJavaJavaJava
协议支持AMQP,XMPP,SMTP,STOMPOpenWire,STOMP,REST,XMPP,AMQP自定义协议自定义协议
可用性一般
单机吞吐量一般极高
消息延迟微妙级毫秒级毫秒级毫秒以内
消息可靠一般一般

二、使用MQ

(一)RabbitMQ概述

RqbbitMQ是基于Erlang语言开发的开源消息通讯中间件,官方地址:https://rabbitmq.com/

(二)安装MQ

docker pull rabbitmq:3-management

在这里插入图片描述

(三)运行RabbitMQ

#配置 MQ的用户名和密码,容器名和主机名,端口,镜像名 ,注意:15672端口是MQ的控制台访问端口,5672是对外暴露的消息通信端口
docker run -e RABBITMQ_DEFAULT_USER=xxx -e RABBITMQ_DEFAULT_PASS=xxxx --name mq --hostname mq1 -p 15672:15672 -p 5672:5672 -d rabbitmq:3-management

在这里插入图片描述

访问MQ的控制台

在这里插入图片描述
(4)RabbitMQ的整体结构

在这里插入图片描述

(5)RabbitMQ中的几个概念

  • channel: 操作MQ的工具
  • exchange: 路由消息到队列
  • queue: 缓存消息
  • Virtual Host: 虚拟主机,是对queue,exchange等资源进行逻辑分组

(6)常见的MQ模型

  • 基本消息队列(BasicQueue): Publisher —1:1— Queue —1:1— Customer
  • 工作消息队列(WorkQueue): Publisher —1:1— Queue —1:n— Customer
  • 发布/订阅(Publish、Subscribe): 根据交换机类型又有三种模型
    • Fanout Exchange: 广播,Publisher—1:1—Exchange—1:n—Queue—1:1—Customer
    • Direct Exchange: 路由,Publisher—1:1—Exchange—1:n—Queue—1:1—Customer
    • Topic Exchange: 主题,
  • RPC
  • 发布者确认

第一种:基本消息队列的基本使用

包含三种角色:publisherqueueconsumer

  • publisher: 消费发布者,将消息发布到队列queue
  • queue: 消息队列,负责接受并缓存消息
  • consumer: 订阅队列,处理队列中的消息

收发消息的过程: 获取连接 》 建立通信通道 》 创建消息队列 》 收发消息 》 释放资源

1、publisher和consumer引入依赖

  <dependency>
	<groupId>com.rabbitmq</groupId>
	<artifactId>amqp-client</artifactId>
</dependency>

2、Publisher创建发送消息通道

@SpringBootTest
class PublisherApplicationTests {
    @Test
    void testSendMessage() throws IOException, TimeoutException {
//        1、建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
//        2、设置连接参数
        connectionFactory.setHost("192.168.92.131");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("root");
        connectionFactory.setPassword("root");
//        3、建立连接
        Connection connection = connectionFactory.newConnection();
//        4、建立通信通道Channel
        Channel channel = connection.createChannel();
//        5、创建队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName,false,false,false,null);
//        6、发送信息
        String message = "hello,rabbitmq!";
        channel.basicPublish("",queueName,null,message.getBytes(StandardCharsets.UTF_8));
        System.out.println("发送消息成功:【"+message+"】");
//        7、关闭通道和连接
        channel.close();
        connection.close();
    }
}

2、Consumer创建订阅通道

class ConsumerApplicationTests {
    public static void main(String[] args) throws IOException, TimeoutException {
        //        1、建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //        2、设置连接参数
        connectionFactory.setHost("192.168.92.131");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("root");
        connectionFactory.setPassword("root");
//        3、建立连接
        Connection connection = connectionFactory.newConnection();
//        4、建立通信通道Channel
        Channel channel = connection.createChannel();
//        5、创建队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName,false,false,false,null);
//        6、订阅消息
        channel.basicConsume(queueName,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                7、处理消息
                String message = new String(body);
                System.out.println("接收到消息:【"+message+"】");
            }
        });
        System.out.println("等待接收消息....");
    }
}

第二种:Work Queue 工作队列

与基本队列的区别在于,它能使用多个订阅队列进行高效的处理请求。(因为一个订阅队列的处理速度是有限的)

使用过程与基本队列几乎一致,只是开启了多个订阅队列。

在使用过程中我们会发现,多个订阅队列对任务的分配是平均的,这就是预取机制

我们需要的是快速处理的订阅队列获取更多的请求,慢速处理的订阅队列获取少量的请求,它如何实现呢?

通过修改配置文件,设置一个 preFetch 值。

spring:
  rabbitmq:
    host: 192.168.92.131 #IP
    port: 5672 #端口
    virtual-host: / #虚拟主机
    username: root #用户名
    password: root #密码
    listener:
      simple:
        prefetch: 1 # 每次取 1 个请求,处理完才能取下一个。

第三种:FanoutQueue 广播消息队列

SpringAMQP提供声明交换机、队列、绑定关系的API

主要使用的是Exchange.FanoutExchange类。

实现思路:
1、在consumer服务,声明队列,交换机,并将两者绑定。

@Configuration
public class FanoutConfig{
	//交换机
	@Bean
	public FanoutExchange fanoutExchange(){
		return new FanoutExchange("com.fanout");
	}

	//队列
	@Bean
	public Queue fanoutQueue1(){
		return new Queue("com.queue1");
	}

	//绑定关系
	@Bean
	public Binding bindingQueue(Queue fanoutQueue1,FanoutExchange fanoutExchange){
		return BindingBuilder.bind(fanoutQueue).to(fanoutExchange);
	}

	//...以相同方式声明第2个队列,并完成绑定

}

2、在consumer服务,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2

@Component
public class SpringRabbitListener {
	@RabbitListener(queues = "com.queue1")
	public void listenFanoutQueue1(String msg) throws InterruptedException {
		//...处理结果
	}

	@RabbitListener(queues = "com.queue2")
	public void listenFanoutQueue2(String msg) throws InterruptedException {
		//...处理结果
	}
}

3、在publisher编写测试方法,向交换机发送信息

@Test
public void sendFanoutExchange() {
	//1、交换机
	String exchangeName = "com.fanout";
	//2、消息
	String message = "Hello Fanout";
	//3、发送消息
	rabbitTemplate.covertAndSend(exchangeName, "", message);
}

第四种:路由信息队列

路由模式的流程: 即设置密钥的绑定关系,只有携带相应的密钥才能进入相应的队列

  • 每一个 QueueExchange 设置一个 BindingKey
  • 发布者发送消息时,需要指定消息的 RoutingKey
  • Exchange根据消息路由到 BindingKeyRoutingKey 一致的队列

实现思路:
1、利用 @RabbitListener 声明Exchange、Queue、RoutingKey

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"), exchange = @Exchange(name = "com.exchange", type = ExcahngeTypes.DIRECT), key = {"red","blue"}))
public void listenRoutingQueue1(String msg) throws InterruptedException {
	//...处理结果
}

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"), exchange = @Exchange(name = "com.exchange", type = ExcahngeTypes.DIRECT), key = {"red","green"}))
public void listenRoutingQueue2(String msg) throws InterruptedException {
	//...处理结果

2、发送消息实现

//指定队列处理
@Test
public void sendRoutingExchange1(){
	//交换机,消息
	String exchangeName = "com.exchange";
	String message = "Hello,RoutingMQ";
	//发送消息
	rabbitTemplate.covertAndSend(exchangeName, "blue", message);
}

//多队列处理
@Test
public void sendRoutingExchange2(){
	//交换机,消息
	String exchangeName = "com.exchange";
	String message = "Hello,RoutingMQ";
	//发送消息
	rabbitTemplate.covertAndSend(exchangeName, "red", message);
}

第五种:主题信息队列(通配key)

TopicExchange 与 DirectExchange 的区别: routingkey必须是多个单词的列表,并且以,分割。并且Queue与Exchange指定的BindingKey时可使用通配符:

  • **#:**代指 0 / n 个单词
  • *: 代指一个单词

实现思路:
1、通过 @RabbitListener 声明Exchange、Queue、RoutingKey

@RabbitListener(bingdings = @QueueBinding(exchange = @Exchange(name = "com.exchange", type = ExchangeTypes.TOPIC), queue = @Queue(name = "com.queue1"), key = {"china.#"}))
public void listenTopicQueue1(String msg) {
	//处理代码....
}

@RabbitListener(bingdings = @QueueBinding(exchange = @Exchange(name = "com.exchange", type = ExchangeTypes.TOPIC), queue = @Queue(name = "com.queue2"), key = {"#.news"}))
public void listenTopicQueue2(String msg) {
	//处理代码....
}

2、在publisher服务中,向交换机发送消息

@Test
public void sendTopicMessage(){
	//交换机,消息
	String exchangeName = "com.exchange";
	String message = "Hello,Topic";
	rabbitTemplate.convertAndSend(exchangeName,"china.call",message);
}

四、SpringAMQP

(一)概念

  • AMQP: Advanced Message Queuing Protocol 传递消息队列协议,是用于在应用程序或之间传递业务消息的开放标准。该协议与语言及平台无关,更符合为服务中独立性的要求。
  • Spring AMQP: Spring AMQP是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。其中 spring-amqp是基础抽象,spring-rabbit是底层的默认实现。

(二)实现基础消息队列

1、引入spring-amqp依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

2、publisher服务中利用RabbitTemplate发送消息到任务队列

  • 配置mq连接信息
spring:
  rabbitmq:
    host: 192.168.92.131 #IP
    port: 5672
    virtual-host: /
    username: root
    password: root
  • 编写发送方法
	@Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void sendMessage(){
        String queueName = "simple.queue";
        String message = "Hello World";
        rabbitTemplate.convertAndSend(queueName,message);
    }

3、在consumer服务中编写消费逻辑,绑定simple.queue队列

  • 配置mq连接信息
spring:
  rabbitmq:
    host: 192.168.92.131 #IP
    port: 5672
    virtual-host: /
    username: root
    password: root
  • 编写发送方法1
	@Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void getMessage(){
        String queueName = "simple.queue";
        // receive 表示接收方法,接收到的信息会封装到Message,可以看receive的返回值
        Message message = rabbitTemplate.receive(queueName);
        // Message.getBody 是 byte[]
        System.out.println(new String(message.getBody()));
    }
  • 编写发送方法2
    • 创建一个监听类
// 注册成 Bean 对象
@Component
public class SpringRabbitListener {

	// 监听器注释,queues = 订阅队列,并将返回值注入参数列表中	
    @RabbitListener(queues = "simple.queue")
    public void ListenSimpleQueueMessage(String msg){
        System.out.println("Spring 消费者接收到消息:【" + msg + "】");
    }
}

(三)消息转换器

为了让我们能够自由识别consumer发送的消息,则需要使用的是消息转换器

消息转换器如何使用?

Spring对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理,默认实现的是SimpleMessageConverter,基于ObjectObjectOutputStream完成序列化。

我们只需要定义一个 MessageConverter 类型的Bean即可,推荐使用JSON序列化

1、publisher引入依赖


<!-- 接收消息需要使用jackson的转换依赖 -->
<dependency>
	<groupId>com.fasterxml.jackson.dataformat</groupId>
	<artifactId>jackson-dataformat-xml</artifactId>
	<version>2.9.10</version>
</dependency>

<!-- 发送消息需要使用jackson的核心依赖 -->
<dependency>
	<groupId>com.fasterxml.jackson.core</groupId>
	<artifactId>jackson-databind</artifactId>
</dependency>

2、publisher启动类,声明MessageConverter

@Bean
public MessageConverter jsonMessageConverter(){
	return new Jackson2JsonMessageConverter();
}

3、consumer启动类,声明MessageConverter

@Bean
public MessageConverter jsonMessageConverter(){
	return new Jackson2JsonMessageConverter();
}

4、监听队列消息

@RabbitListener(queues = "object.queue")
public void listenObjectMessage(Object msg) {
	//处理数据....
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/785700.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Mysql表的查找进阶

重点细节知识&#xff1a;NULL是表示表里这个格子是空着的&#xff0c;NULL参与各种运算都是->false&#xff0c;但是只有这个才是可以用NULL等于NULL成功的 <>。,看一下&#xff0c;下图的区别&#xff0c;下面的是连空也算上了 补充一个is 用法&#xff0c;和上面语…

ubuntu docker离线安装docker(.deb包方式)(成功)(附卸载方法)

参考文章&#xff1a;Install Docker Engine on Ubuntu 文章目录 安装步骤下载安装包拷贝到目标主机并执行安装命令 验证拉取运行容器测试build dockerfile测试持久运行容器测试主机重启后&#xff0c;docker各服务是否正常自启 卸载方法附&#xff1a;各安装包作用说明&#x…

【iPadOS 开发】打开 iPad 的开发者模式的方法

文章目录 1. 前提条件2. 具体方法 1. 前提条件 iPad 通过 Type-C 线连接到 Mac Mac上已经安装 Xcode 2. 具体方法 在 Xcode 顶栏中的 Window 中打开 Devices and Simulators &#xff0c;可以看到自己的设备&#xff1a; 接着在 iPad 上进入 设置 > 隐私与安全性 > 开…

2023年Houdini电脑配置推荐,附上10款Houdini渲染器

SideFX Houdini是一款非常强大的工具&#xff0c;旨在创建最高质量的电影效果。它需要强大的系统来实现平稳的工作流程。赞奇云工作站为 SideFX Houdini找到最佳的 CPU、GPU 和渲染器。 什么是 SideFX Houdini&#xff1f; SideFX Houdini是一款 3D 动画和视觉效果软件&#…

监狱人员定位系统:提高监狱安全性及维护社会安全的工具

如何提高监狱安全性一直是社会关注的焦点。在现代化的安全管理工具中&#xff0c;监狱人员定位系统正逐渐被广泛应用于各地监狱。通过实时定位和监控&#xff0c;这一系统能够有效提高监狱安全性和管理效率&#xff0c;维护社会的安全和稳定。 那么&#xff0c;在这篇文章中&a…

Spring,SpringBoot,Spring MVC的区别是什么

1.Spring是什么 我们通常所说的 Spring 指的是 Spring Framework&#xff08;Spring 框架&#xff09;&#xff0c;它是⼀个开源框架&#xff0c;有着活跃⽽庞⼤的社区&#xff0c;这就是它之所以能⻓久不衰的原因。Spring ⽀持⼴泛的应⽤场景&#xff0c;它可以让 Java 企业级…

Python实现Up数据信息采集 <内含JS逆向解密>

目录标题 前言环境使用:模块使用:实现基本流程:代码展示&#xff1a;尾语 前言 嗨喽~大家好呀&#xff0c;这里是魔王呐 ❤ ~! 环境使用: python 3.8 >>> 运行代码 pycharm 2021.2 >>> 辅助敲代码 模块使用: 第三方模块 需要安装 import requests >…

chapter12:SpringBoot与检索

Spring Boot与检索视频 1. 简介 我们的应用经常需要添加检索功能&#xff0c;开源的ElasticSearch是目前全文搜索引擎的首选。 他可以快速的存储、搜索和分析海量数据。SpringBoot通过整合Spring Data ElasticSearch为我们提供了非常便捷的检索功能支持。 ElasticSearch是一…

骨头的诱惑题解

样例输入1&#xff1a; 4 4 5 S.X. ..X. ..XD ....样例输出1&#xff1a; NO样例输入2&#xff1a; 3 4 5 S.X. ..X. ...D样例输出2&#xff1a; YES思路分析&#xff1a; 看到能否到达终点的题目先想 d f s dfs dfs。但这道题规定必须刚好 T T T秒到达&#xff0c;所…

记一次对Ghidra反编译的修复

前言 Ghidra是NSA在2019年开源的逆向工具&#xff0c;可以说从开源发布开始&#xff0c;就基本成了开源界唯一可以与 IDA 竞争的存在&#xff0c;其它的工具多少总是欠点意思。不过从实际情况来看&#xff0c;虽然 Ghidra一直在积极维护&#xff0c;但是现在的Bug情况跟IDA相比…

【《Azure、DevOps和微服务软件架构实战(第2版)》——教你构建并交付可满足组织业务需求的高度可扩展的企业应用程序】

本书的编写方式与很多技术书籍不同&#xff0c;作者站在架构师的视角&#xff0c;以一个项目的整个生命周期为主线&#xff0c;向读者展示了如何在云时代设计和实现一款软件&#xff0c;其内容涵盖了从软件架构设计的基本原则、需求收集、解决方案设计&#xff0c;可选技术架构…

扫码自测,全对有奖!《人月神话》知识自测卷01-共10题

自测链接 https://www.101test.com/cand/index?paperIdUH5KRN 最先答对全部题目的前三名将获赠清华大学出版社近年出版的新书。 请把全对(10分)的截屏发给微信号13811867132&#xff08;备注&#xff1a;人月兑奖&#xff09;&#xff0c;并留下相关信息。 1. [单选] 在&qu…

Vue3兄弟组件之间传值-mitt

Vue3兄弟组件之间传值-使用mitt插件 环境vue3tsvite 1.安装mitt 在终端cd到项目目录运行 npm install mitt安装成功在package.json文件会有显示 2.在main.js里面全局引用 // An highlighted block import mitt from mitt app.config.globalProperties.$mitt mitt()3.此时…

低代码开源项目汇总

低代码是基于可视化和模型驱动理念&#xff0c;结合云原生与多端体验技术&#xff0c;它能够在多数业务场景下实现大幅度的提效降本&#xff0c;为专业开发者提供了一种全新的高生产力开发范式。 不定期汇总更新一些低代码开源项目。 1、Appsmith Appsmith 是一款开源低代码…

根据UIL下载图片/视频、根据URL自动下载图片/视频、GUI自动下载想要的图片

1&#xff0c;根据UIL下载图片/视频 def downForInterface(file_path):count 1value_rows []with open(file_path, encodingUTF-8) as file:f_csv csv.reader(file)for r in f_csv:value_rows.append(r)for file_path in value_rows:cunmulu if . in file_path[0]:print(cu…

Java毕业设计-汽车出租系统【含源码、论文】

前言 汽车出租管理系统&#xff1a; 随着当今社会科学技术的高速发展&#xff0c;人民的生活水平不断的提高&#xff0c;自由行也开始盛行。有些人为了方便&#xff0c;选择汽车租赁的方式出行&#xff0c;因此汽车租赁成为一个极具市场潜力的行业。面对日趋发展的租赁市场&a…

有向图的强联通分量-SCC-Tarjan算法

有向图的强联通分量(SCC)Tarjan算法 强连通分量&#xff08;Strongly Connected Components&#xff0c;SCC&#xff09;的定义是&#xff1a;极大的强连通子图。 下图中&#xff0c;子图{1,2,3,4}为一个强连通分量&#xff0c;因为顶点1,2,3,4两两可达。{5},{6}也分别是两个强…

B2B2C多商户跨境电商购物网站搭建(后台采集功能)

如何部署开发一个B2B2C开源多语言多商户跨境外贸网站 随着全球化的发展&#xff0c;跨境外贸成为了许多企业拓展业务的重要方向。搭建一个B2B2C跨境外贸网站&#xff0c;将有助于实现企业的全球化经营。那么如何搭建一个B2B2C跨境外贸网站呢&#xff1f; 一、选择合适的开源平…

安科瑞智能照明控制系统的应用发展需求-安科瑞黄安南

【摘 要】 &#xff1a;随着电力电子技术的快速发展&#xff0c;智能照明控制技术已经成为楼宇自动化控制系统的重要组成部分&#xff0c;是绿色照明的发展方向。智能照明控制系统在照明节能上起到重要的作用。文章结合实际案例&#xff0c;探讨了智能照明控制技术在照明节能上…

OK3588的NPU加速推理resnet18—rknn_toolkit_lite2的Python语言篇

OK3588的NPU加速推理MobileNet——Python语言篇 Rknn_toolkit_lite2Miniconda安装创建虚拟环境并运行NPU加速推理代码注释 Rknn_toolkit_lite2 RKNN Toolkit Lite2 主要用于 RKNN 模型在 Rockchip NPU 上的部署。 在使用 RKNN Toolkit Lite2 之前&#xff0c;用户需要先通过 R…