Java分布式微服务4——异步服务通讯(RabbitMQ)中间件

news2024/9/25 19:21:48

文章目录

  • 微服务的远程异步调用
  • MQ介绍
  • RabbitMQ
    • RabbitMQ结构
    • RabbitMQ的单机部署
      • 1.下载镜像
      • 2.安装MQ
    • RabbitMQ入门
    • 常见消息模型
  • SpringAMQP
    • SpringAMQP实现基础消息队列
    • SpringAMQP实现工作队列
    • SpringAMQP实现发布订阅
      • 1. Fanout Exchange 广播模式
      • 2. Direct Exchange 路由模式
      • 3. Topic Exchange 话题
    • 消息转换器

微服务的远程异步调用

为什么需要异步调用?
在这里插入图片描述
在这里插入图片描述

  1. 故障隔离:支付服务不负责调用其他三个服务,只负责通知Broker支付成功这个事件,然后就返回结果,后面的服务故障了和前面发布事件的服务无关,前面的服务发布完事件就结束了
  2. 吞吐量提升:Broker将支付成功的事件广播给订阅了这个事件的那些服务,服务们各自并发进行接下来的工作,吞吐量变高,性能提升
  3. 耦合度低:有新服务加入只要让它订阅就行,耦合度低
  4. 流量削峰:broker可以起到缓冲作用,把大量事件存着给后面慢慢处理

在这里插入图片描述
异步调用的缺点:

  1. 依赖于Broker的可靠性、安全性、吞吐能力
  2. 架构复杂,业务流程不清晰,难以追踪

MQ介绍

MQ(MessageQueue)消息队列,就是上文事件驱动架构中的Broker
在这里插入图片描述

RabbitMQ

RabbitMQ结构

在这里插入图片描述

  • VirtualHost: 一般属于不同用户,互相隔离,是对exchange、queue等资源的逻辑分组
  • channel: 操作MQ的工具
  • queue: 缓存消息队列
  • exchange: 路由消息到队列中

RabbitMQ的单机部署

基于Erlang语言设计
RabbitMQ文档
在Centos7虚拟机中使用Docker来安装。

1.下载镜像

方式一:在线拉取

docker pull rabbitmq:3-management

方式二:从本地加载

.tar镜像包上传到虚拟机中后,使用命令加载镜像即可:

docker load -i mq.tar

2.安装MQ

执行下面的命令来运行MQ容器:

docker run \
 -e RABBITMQ_DEFAULT_USER=itcast \
 -e RABBITMQ_DEFAULT_PASS=123321 \
 --name mq \
 --hostname mq1 \
 -p 15672:15672 \ # 15672是管理平台的端口
 -p 5672:5672 \ # 5672消息通信的端口
 -d \
 rabbitmq:3-management

在浏览器使用虚拟机地址:15672就能看到管理平台
ps.一般情况下每个用户要独享一个虚拟主机
管理后台介绍

RabbitMQ入门

官方入门示例

常见消息模型

  1. 基本消息队列basic queue
  2. 工作消息队列work queue
  3. 发布订阅:广播fanout
  4. 发布订阅:路由direct
  5. 发布订阅:主题topic

SpringAMQP

在这里插入图片描述

  • 监听器容器,异步处理入栈消息
  • 发送和接收消息的RabbitTemplate
  • RabbitAdmin用于自动声明队列,交换和绑定

exchange、queue这种东西,如果没有提前创建好,在使用的时候也会自动创建

SpringAMQP实现基础消息队列

  1. 父工程引入spring-amqp依赖
        <!--AMQP依赖,包含RabbitMQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
  1. 在publisher服务中利用RabbitTemplate发送消息到simple.queue队列
    在publisher的application.yml中添加mq的连接信息
spring:
  rabbitmq:
    host: 192.168.36.128 # 主机名
    port: 5672
    virtual-host: / # 虚拟主机
    username: itcast
    password: 123456

在测试类中编写一个测试方法,注入RabbitTemplate对象
别忘了加注解让spring boot启动,要不然没东西注入报空指针

//@RunWith(SpringRunner.class)
@SpringBootTest
public class PublisherTest {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSimpleQueue(){
        String queueName = "simple.queue";
        String message = "hello, spring amqp!";
        rabbitTemplate.convertAndSend(queueName, message);
    }
}

运行测试方法,在管理平台中就能看到队列中有一个消息了

  1. 在consumer服务中编写消费逻辑,绑定simple.queue队列

同样,添加依赖,然后在配置文件中添加AMQP信息

消费者只需要新建一个类(为了被Springboot找到并内部注入需要添加@Component),定义一个监听方法(用@RabbitListener修饰)即可:

@Component
public class SpringRabbitListener {
    
    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueueMessage(String message){
        System.out.println("spring消费者接收到消息:"+message);
    }
}

启动consumer微服务以后,它会自动监听simple.listener队列中有没有消息,如果有就直接拿过来
在这里插入图片描述

SpringAMQP实现工作队列

在这里插入图片描述
绑定两个consumer可以提高消息处理的速度,避免消息堆积

假设publisher共发送了50条消息,那设置两个consumer的监听者:

@Component
public class SpringRabbitListener {
    @RabbitListener(queues = "simple.queue")
    public void listenWorkQueueMessage1(String msg) throws InterruptedException {
        System.out.println("消费者1接收到消息:"+msg);
        Thread.sleep(50);
    }

    @RabbitListener(queues = "simple.queue")
    public void listenWorkQueueMessage2(String msg) throws InterruptedException {
        System.err.println("消费者2接收到消息:"+msg);
        Thread.sleep(10);
    }
}

事实上,50条消息被平均分配给了两个consumer监听器,消费者1接收完25条以后还要慢慢等消费者2接收完它的25条,并不会抢消息
这是消息预取机制造成的问题,两个消费者是在消费前就把消息分配好了

在配置文件中,可以设置消息预取的上限simple.prefetch(默认为无限),设置为1的时候就是一条一条取,以达到能者多劳,总体速度变快的效果。

spring:
  rabbitmq:
    host: 192.168.36.128
    port: 5672
    virtual-host: /
    username: itcast
    password: 123456
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才拿下一条

SpringAMQP实现发布订阅

发布订阅模式与先前案例的区别是,允许同一消息被群发给多个消费者,而不是一个消费者消费完就删除。实现方法靠exchange(交换机)

常见的场景也是一个事件的完成会调动很多后续的服务
在这里插入图片描述
消息发布者现在只需要把消息交给交换机,不需要知道给哪些队列,交换机会帮助转发
交换机三种类型:

  • Fanout广播
  • Direct路由
  • Topic话题

1. Fanout Exchange 广播模式

  1. 在consumer中声明队列、交换机,并且把队列绑定到交换机(下面还有使用@RabbitListener注解声明和绑定的方法)
@Configuration
public class FanoutConfig {
    @Bean // 声明交换机
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("itcast.fanout");
    }
    @Bean // 声明队列
    public Queue fanoutQueue1(){
        return new Queue("fanout.queue1");
    }
    @Bean // 绑定
    public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }//以相同方式声明第2个队列并绑定
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout.queue2");
    }
    @Bean
    public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}

这些Bean会被springboot自动装配,被AMQP使用
2. 在consumer中编写两个消费者方法,分别监听fanout.queue1和fanout.queue2

@Component
public class SpringRabbitListener {

    @RabbitListener(queues = "fanout.queue1")
    public void listenFanoutQueueMessage1(String msg) throws InterruptedException {
        System.out.println("消费者1接收到消息:"+msg);
        Thread.sleep(50);
    }

    @RabbitListener(queues = "fanout.queue2")
    public void listenFanoutQueueMessage2(String msg) throws InterruptedException {
        System.err.println("消费者2接收到消息:"+msg);
        Thread.sleep(10);
    }
}
  1. 在publisher中编写测试方法,向itcast.fanout发送消息
@Test
public void testSendFanoutExchange(){
    // 交换机名称
    String exchangeName = "itcast.fanout";
    // 消息
    String msg = "it's a broadcast";
    // 发送
    rabbitTemplate.convertAndSend(exchangeName, "", msg);
}

在这里插入图片描述

2. Direct Exchange 路由模式

DIrect Exchange会根据规则把消息路由(routes)到指定队列
在这里插入图片描述
实现思路如下:

  1. 利用@RabbitListener注解声明Exchange、Queue、RoutingKey,之前那种自定义Bean的方式比较繁琐
@Component
public class SpringRabbitListener {
	@RabbitListener(bindings = @QueueBinding(
	    	value = @Queue(name = "direct.queue1"),
	        exchange = @Exchange(name = "itcast.direct", type = "direct"),
	        key = {"red", "blue"}
	))
	public void listenDirectQueue1(String msg){
	    System.out.println("消费者接收到来自direct.queue1的消息:"+msg);
	}
	
	@RabbitListener(bindings = @QueueBinding(
	     	value = @Queue(name = "direct.queue2"),
	        exchange = @Exchange(name = "itcast.direct", type = "direct"),
	        key = {"red", "yellow"}
	))
	public void listenDirectQueue2(String msg){
	    System.out.println("消费者接收到来自direct.queue2的消息:"+msg);
	}
}
  1. 发送消息,携带routingKey
@SpringBootTest
public class PublisherTest {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSendDirectExchange(){
        // 交换机名称
        String exchangeName = "itcast.direct";
        // 消息
        String msg = "Hello Blue";
        // 发送
        rabbitTemplate.convertAndSend(exchangeName, "blue", msg);
    }
}

3. Topic Exchange 话题

在这里插入图片描述
Topic中的BindingKey支持通配符(注意是BindingKey):
#: 0或多个单词
*: 1个单词
在这里插入图片描述

实现思路:
在@RabbitListener中

  1. 声明bindingKey的时候使用通配符
  2. Exchange的type指定为"topic"
@RabbitListener(bindings = @QueueBinding(
    	value = @Queue(name = "topic.queue1"),
        exchange = @Exchange(name = "itcast.topic", type = "topic"),
        key = {"china.#"}
))

这样routingKey只要符合bingdingKey的模式,就会把消息分发给它

消息转换器

rabbitTemplate.convertAndSend发送的信息是Object类型的,所以可以传任意对象,会自动序列化
默认使用的序列化方式是java提供的序列化,类会被序列化成字节串,有许多缺点

我们可以采用别的序列化方式,比如JSON序列化方式,把MessageConverter类型的容器中的对象顶掉就行

在这里插入图片描述
记得发送端和接收端的对象类型要一致或者兼容,用的序列化MessageConverter也要一致
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/854089.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Add-in Express for Microsoft Office and Delphi Crack

Add-in Express for Microsoft Office and Delphi Crack 适用于Microsoft Office和Delphi VCL的Add-in Express使您能够在几次点击中为Microsoft Office开发专业插件。它生成基于COM的项目&#xff0c;这些项目包含Microsoft Office外接程序或智能标记的所有必要功能&#xff0…

3分钟创建新生分班查询二维码,无需技术、0成本

作为教师&#xff0c;我们深知分班是一项极其重要的任务&#xff0c;需要综合考虑学生的性格、能力和兴趣等多个方面&#xff0c;以确保每个学生都能够获得最佳的学习环境和成绩。在本文中&#xff0c;我将分享一种便捷的方式来告知家长有关分班录取情况的方法。 通常&#xf…

深度学习关键要素:数据集汇总与分享

引言 在深度学习的应用中&#xff0c;数据被认为是最重要的因素之一。因此&#xff0c;选择一个好的数据集对于深度学习的成功至关重要。在选择数据集时&#xff0c;不仅需要关注数据量的大小、多样性以及质量&#xff0c;还要考虑数据集是否代表了所研究问题的真实情况。本文…

分布式应用:Zabbix代理服务器与SNMP监控

目录 一、理论 1.分布式监控 2.Zabbix代理服务器部署 3.配置 agent 使用 proxy 4.设置 Zabbix-SNMP 监控 二、实验 1.Zabbix代理服务器部署 2.配置 agent 使用 proxy 3.设置 Zabbix-SNMP 监控 三、总结 一、理论 1.分布式监控 &#xff08;1&#xff09;作用&#x…

基于Java开发的企业级数字化采购系统

一、项目介绍 一款全源码可二开&#xff0c;可基于云部署、私有部署的企业级数字化采购管理系统&#xff0c;供应商全生命周期管控&#xff0c;公开询价管理&#xff0c;招标&#xff0c;定标&#xff0c;评审&#xff0c;生成订单&#xff0c;送货&#xff0c;收货全流程管理…

Linux 上安装部署Nacos

标题&#xff1a;在Linux上安装和部署Nacos Nacos是一个开源的分布式服务发现和配置管理平台&#xff0c;它可以帮助开发人员实现微服务架构中的服务注册、发现和动态配置管理。 步骤1&#xff1a;准备工作 在开始安装Nacos之前&#xff0c;确保您已经具备以下条件&#xff1…

VS通过TCPIP与visionpro通讯

效果图 服务器端 visionpro配置服务器端&#xff0c;配置端口号、需要发送的数据等 客户端 vs编写代码接收数据 主要是复制的例程&#xff0c;到时候编写的时候可以借鉴。 using System; using System.Collections.Generic; using System.ComponentModel; using System.Dat…

使用 `nmcli` 在 CentOS 8 上添加永久路由

CentOS 8 使用 NetworkManager 作为默认的网络管理工具&#xff0c;因此我们可以使用 nmcli 工具来实现相同的目标。使用 nmcli 可以更加直观地管理路由&#xff0c;并且更符合 CentOS 8 的默认网络管理方式。 以下是使用 nmcli 在 CentOS 8 上添加永久路由的步骤&#xff1a;…

类加载机制——双亲委派机制

类加载器分类 类加载器 类加载器&#xff08;英文&#xff1a;ClassLoader&#xff09;负责加载 .class 字节码文件&#xff0c;.class 字节码文件在文件开头有特定的文件标识。ClassLoader 只负责 .class 字节码文件的加载&#xff0c;至于它是否可以运行&#xff0c;则由 E…

【论文阅读】基于深度学习的时序异常检测——TimesNet

系列文章链接 参考数据集讲解&#xff1a;数据基础&#xff1a;多维时序数据集简介 论文一&#xff1a;2022 Anomaly Transformer&#xff1a;异常分数预测 论文二&#xff1a;2022 TransAD&#xff1a;异常分数预测 论文三&#xff1a;2023 TimesNet&#xff1a;基于卷积的多任…

《乡村振兴战略下传统村落文化旅游设计》浙江新华书店已售罄!

《乡村振兴战略下传统村落文化旅游设计》浙江新华书店已售罄&#xff01;

CTF Stegano练习之隐写初探

今天要介绍的是CTF练习中的Stegano隐写题型 。做隐写题的时候&#xff0c;工具是很重要的&#xff0c;接下来介绍一些工具。 1、TrID TrID是一款根据文件二进制数据特征进行判断的文件类型识别工具。虽然也有类似的文件类型识别工具&#xff0c;但是大多数都是使用硬编码的识…

11个点告诉你 如何用Docker+jenkins 运行 python 自动化

一、实现思路 在 Linux 服务器安装 docker 创建 jenkins 容器 根据自动化项目依赖包构建 python 镜像(构建自动化 python 环境) 运行新的 python 容器&#xff0c;执行 jenkins 从仓库中拉下来的自动化项目 执行完成之后删除容器 二、环境准备 Linux 服务器一台(我的是 C…

面向数据科学家和分析师的统计基础

推荐&#xff1a;使用 NSDT场景编辑器助你快速搭建可编辑的3D应用场景 “统计学是科学的语法。 卡尔皮尔逊 统计学在数据科学和数据分析中的重要性不容低估。统计提供了查找结构和提供更深入数据见解的工具和方法。统计学和数学都喜欢事实&#xff0c;讨厌猜测。了解这两个重要…

网络安全(黑客)零基础入门

导语 什么是 Web 安全&#xff1f;我又该如何入门学习它呢&#xff1f;学习过程中又应注意哪些问题呢&#xff1f;... 或许你的心中有着这样的疑问、不过别着急&#xff0c;本文会为你一一解答这些问题。 正文 定义 Web 安全&#xff0c;顾名思义便是由保障 Web 应用能够持续…

微信私域更好玩了

之前分享过&#xff0c;“小绿书”“公众号文章转音频”等内测中或悄悄已升级的功能。 其实&#xff0c;微信还在内测很多新功能&#xff0c;只是没公开 今天&#xff0c;小编又发现新升级 就是『附近』功能 增加了一个本地生活的入口&#xff0c;这里面是短视频和图文 展示…

IoTDB 1.x 开启外部访问

对于部署的IoTDB数据库&#xff0c;如果需要局域网内其他设备进行访问的处理。 1、防火墙开放端口 无论windows还是liunx都需要你将6667默认的端口加入防火墙中&#xff0c;否则肯定是无法访问端口 2、修改配置文件 对conf/iotdb-datanode.properties文件中的 修改为本机的…

Oracle DB 安全性 : TDE HSM TCPS Wallet Imperva

• 配置口令文件以使用区分大小写的口令 • 对表空间进行加密 • 配置对网络服务的细粒度访问 TCPS 安全口令支持 Oracle Database 11g中的口令&#xff1a; • 区分大小写 • 包含更多的字符 • 使用更安全的散列算法 • 在散列算法中使用salt 用户名仍是Oracle 标识…

Python入门02

0目录 1.容器操作&#xff08;序列操作&#xff09; 2.函数 3.模块 1.容器操作&#xff08;序列操作&#xff09; 列表的基本操作 定义一个列表[] 访问列表&#xff08;打印或者通过下标和索引&#xff09; 新增元素 Append(在末尾) 指定位置新增元素 Insert 删除&…

【算法篇C++实现】算法的时间、空间复杂度

文章目录 &#x1f680;一、算法的概念&#x1f680;二、算法的特征1.可行性2.确定性3.有穷性4.输入5.输出 &#x1f680;三、算法的评价1.正确性2.可读性3.健壮性 &#x1f680;四、算法的复杂度⛳&#xff08;一&#xff09;时间复杂度1、时间复杂度的概念2、大O的渐进表示法…