五种服务异步通信(MQ)-详解、代码案例

news2025/1/24 5:39:48

简介:本篇文章主要是介绍了常用的异步通信原理,主要是RabbitMQ技术

目录

1、初始MQ(异步通讯)

1.1 同步通讯

1.2 异步通讯

1.3 MQ常见框架

2、RabbitMQ快速入门

2.1 RabbitMQ概述和安装

2.2 常见消息模型

2.3 快速入门

3、SpringAMQP

3.1 什么是SpringAMQP

3.2 SimpleQueue案例

3.3 SpringAMQP(发布、订阅模式)

3.3.1 广播模式

3.3.2 路由模式代码演示

3.3.3 话题模式

4、SpringAMQP-消息转换器

5、总结


1、初始MQ(异步通讯)

1.1 同步通讯

图 1.1-1 同步通讯存在的问题
上图中展示的就是同步通讯的问题

1.2 异步通讯

图 1.2-1 异步通讯优缺点

异步通信的优点:

  • 耦合度地
  • 吞吐量提升
  • 故障隔离
  • 流量削峰

异步通信的缺点:

  • 依赖于Broker的可靠性、安全性、吞吐能力
  • 架构复杂了、业务没有明显的流程线、不好追踪管理
上图中展示的就是异步通信的优缺点

1.3 MQ常见框架

图 1.3-1  MQ产品
上图中展示的便是四款常见的MQ产品,他们之间的优势性能也有清晰地比对

2、RabbitMQ快速入门

2.1 RabbitMQ概述和安装

图 2.1-1 RabbitMQ安装
所需要的安装包、详细记录安装步骤的MD文件,因为内容过多,我放在网盘里面了
百度网盘地址:https://pan.baidu.com/s/1FZtWCWMl_QpZEIcGNnpwKA 
提取码:6666
图 2.1-2 RabbitMQ概述
上图中展示的便是RabbitMQ的内部流程、逻辑,即消息发送者发送消息后传递给交换机,交换机将其消息存储到queue队列中,等待消息接受者获取

2.2 常见消息模型

图 2.2-1 五种消息模型
上图中展示的就是常用的五种消息队列模型,其官网地址:RabbitMQ Tutorials | RabbitMQ

2.3 快速入门

package cn.itcast.mq.helloworld;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class PublisherTest {
    @Test
    public void testSendMessage() throws IOException, TimeoutException {
        // 1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
        factory.setHost("192.168.150.101");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("itcast");
        factory.setPassword("123321");
        // 1.2.建立连接
        Connection connection = factory.newConnection();

        // 2.创建通道Channel
        Channel channel = connection.createChannel();

        // 3.创建队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.发送消息
        String message = "hello, rabbitmq!";
        channel.basicPublish("", queueName, null, message.getBytes());
        System.out.println("发送消息成功:【" + message + "】");

        // 5.关闭通道和连接
        channel.close();
        connection.close();

    }
}
package cn.itcast.mq.helloworld;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ConsumerTest {

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
        factory.setHost("192.168.150.101");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("itcast");
        factory.setPassword("123321");
        // 1.2.建立连接
        Connection connection = factory.newConnection();

        // 2.创建通道Channel
        Channel channel = connection.createChannel();

        // 3.创建队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.订阅消息
        channel.basicConsume(queueName, true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 5.处理消息
                String message = new String(body);
                System.out.println("接收到消息:【" + message + "】");
            }
        });
        System.out.println("等待接收消息。。。。");
    }
}

3、SpringAMQP

3.1 什么是SpringAMQP

图 3.1-1 SpringAMQP介绍
上图中展示的是关于SpringAMQP的消息发送和接收的标准

3.2 SimpleQueue案例

图 3.2-1 消息发送者
上图中展示的是消息发送者的代码案例:即配置连接信息、编写测试代码

3.3 SpringAMQP(发布、订阅模式)

图 3.3-1 发布、订阅模式
上图中展示的是三种通过路由器转发消息的模型,即广播模式、路由模式、话题模式
3.3.1 广播模式

1、消息发送者代码

package cn.itcast.mq.helloworld;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class PublisherTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testFanoutExchange(){
        // 交换机名称
        String exchangeName = "itcast.fanout";

        // 消息
        String message = "hello,everyone!";

        // 发送消息
        rabbitTemplate.convertAndSend(exchangeName, "", message);
    }
}

2、交换机、队列配置类代码

package cn.itcast.mq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutConfig {
    // 1.声明广播交换机
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("itcast.fanout");
    }
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout.queue1");
    }
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout.queue2");
    }
    // 2.交换机绑定队列一
    @Bean
    public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
        return BindingBuilder
                .bind(fanoutQueue1)
                .to(fanoutExchange);
    }
    // 3.交换机绑定队列二
    @Bean
    public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
        return BindingBuilder
                .bind(fanoutQueue2)
                .to(fanoutExchange);
    }
}

3、消息接受者代码

package cn.itcast.mq.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.time.LocalTime;
@Component
public class SpringRabbitListener {

/*    @RabbitListener(queues = "simple.queue")
    public void listenWorkQueue1(String msg) throws InterruptedException{
        System.out.println("消费者1接收到消息: 【" + msg + "】" + LocalTime.now());
    }*/

    @RabbitListener(queues = "fanout.queue1")
    public void listenFanoutQueue1(String msg){
        System.out.println("消费者1接收到消息: 【" + msg + "】" + LocalTime.now());
    }

    @RabbitListener(queues = "fanout.queue2")
    public void listenFanoutQueue2(String msg){
        System.out.println("消费者1接收到消息: 【" + msg + "】" + LocalTime.now());
    }
}
3.3.2 路由模式代码演示

1、消息发送者代码

package cn.itcast.mq.helloworld;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class PublisherTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testFanoutExchange(){
        // 交换机名称
        String exchangeName = "itcast.direct";

        // 消息
        String message = "hello,everyone!";

        // 发送消息
        rabbitTemplate.convertAndSend(exchangeName, "red", message);
    }
}

2、消息接受者代码

package cn.itcast.mq.listener;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.time.LocalTime;

@Component
public class SpringRabbitListener {

    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"), exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT), key = {"red", "blue"}))
    public void listenDirectQueue1(String msg){
        System.out.println("消费者接收到direct.queue1的消息: 【" + msg + "】");
    }
    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"), exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT), key = {"red", "yellow"}))
    public void listenDirectQueue2(String msg){
        System.out.println("消费者接收到direct.queue1的消息: 【" + msg + "】");
    }
}
3.3.3 话题模式

1、消息发送者代码

package cn.itcast.mq.helloworld;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class PublisherTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
/*
    @Test
    public void testFanoutExchange(){
        // 交换机名称
        String exchangeName = "itcast.direct";

        // 消息
        String message = "hello,everyone!";

        // 发送消息
        rabbitTemplate.convertAndSend(exchangeName, "red", message);
    }*/

    @Test
    public void testTopicExchange(){
        // 交换机名称
        String exchangeName = "itcast.topic";

        // 消息
        String message = "hello,everyone!";

        // 发送消息
        rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
    }
}

2、消息接受者代码

package cn.itcast.mq.listener;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.time.LocalTime;

@Component
public class SpringRabbitListener {
    /**
     * 话题路由器
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue1"),
            exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
            key = "china.#"
    ))
    public void listenTopicQueue1(String msg){
        System.out.println("消费者接收到direct.queue1的消息: 【" + msg + "】");
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue2"),
            exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
            key = "#.news"
    ))
    public void listenTopicQueue2(String msg){
        System.out.println("消费者接收到direct.queue1的消息: 【" + msg + "】");
    }
/*    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"), exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT), key = {"red", "blue"}))
    public void listenDirectQueue1(String msg){
        System.out.println("消费者接收到direct.queue1的消息: 【" + msg + "】");
    }
    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"), exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT), key = {"red", "yellow"}))
    public void listenDirectQueue2(String msg){
        System.out.println("消费者接收到direct.queue1的消息: 【" + msg + "】");
    }*/
}

4、SpringAMQP-消息转换器

图 4-1 SpringAMQP的作用
我们知道 RabbitTemplate 传递的参数中,消息对象是以字节数组传递的,经过序列化(默认是通过JDK实现的)后显示为正常的数据,但是如果传递的是Map,List集合这种数据,SpringCloud自带的序列化就会出现异常,为了解决这一问题,我们需要引入SpringAMQP-消息转换器
图 4-1 项目的总pom文件
在项目的总pom文件中添加相对应的依赖
图 4-3 消息发送端、接收端
在项目的消息发送端、接收端的启动类中创建Bean对象

5、总结

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1616119.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【华为 ICT HCIA eNSP 习题汇总】——题目集18

1、SSH默认工作使用的TCP端口号是()。 A、20 B、21 C、22 D、23 考点:①传输层 ②应用层 解析:(C) SSH为建立在应用层和传输层上的安全协议,是对TCP/IP协议的传输层以上的SSH会话流程进行加密的…

数字时代的社交王者:探索Facebook的社交帝国

引言:社交媒体的霸主 在数字化浪潮席卷全球的当下,社交媒体已然成为人们日常生活中不可或缺的一部分,而Facebook则是这个领域的不二之选。作为全球最大的社交网络,Facebook不仅拥有庞大的用户群体,更在技术创新、社会…

System Dashboard for Mac:强大的系统监控与管理工具

System Dashboard for Mac是一款专为苹果电脑设计的系统监控与管理工具,以其直观易用的界面和全面的功能,深受用户喜爱。 System Dashboard for Mac v1.10.11激活版下载 这款软件能够实时监测系统的重要参数,包括CPU使用率、内存利用率、硬盘…

Acer宏碁掠夺者战斧300笔记本电脑PH315-52工厂模式原装Win10系统安装包 恢复出厂开箱状态 带恢复重置

宏碁掠夺者PH315-52原厂Windows10工厂包镜像下载,预装oem系统 链接:https://pan.baidu.com/s/1grmJzz6nW1GOaImY_ymXGw?pwdi286 提取码:i286 原厂W10系统自带所有驱动、PredatorSense风扇键盘控制中心、Office办公软件、出厂主题壁纸、系统…

OpenHarmony开发实例:【电话簿联系人Contacts】

样例简介 Contacts应用是基于OpenHarmony SDK开发的安装在润和HiSpark Taurus AI Camera(Hi3516d)开发板标准系统上的应用;应用主要功能是展示联系人列表,并点击某一列弹出联系人详细信息; 运行效果 样例原理 样例主要有一个list组件和dia…

unknown option ‘--variant=xxx‘

进行react native开发,启动项目时,出现如上报错时,试着用‘mode’代替variant即可 详细说明: 修改scripts命令,加入–variantdevDebug 终端运行对应命令,报如下的错 error: unknown option --variantdevDebug如图&am…

个人网站的SEO优化系列——如何实现搜索引擎的收录

如果你自己做了一个网站,并且想让更多的人知道你的网站,那么无非就是两种途径 一、自己进行宣传,或者花钱宣传 二、使用搜索引擎的自然流量 而如果搜索引擎都没有收录你的站点,别说是自然流量,就算是使用特定语句【sit…

在誉天学习云计算HCIE,担心考试考不过?

誉天定制化课程内容覆盖了所有考试重点,可以系统地掌握理论与实践知识。 对于笔试,类似于备考驾照理论学习阶段,誉天为大家提供在线模拟测试系统,帮助大家掌握云计算笔试考点。笔试通过后,18个月内(一年半…

函数模版实例化

目录 一、前言 二、 什么是C模板 💦泛型编程的思想 💦C模板的分类 三、函数模板 💦函数模板概念 💦函数模板格式 💦函数模板的原理 💦函数模板的实例化 🍎隐式实例化 🍉显式实例化 …

Pytorch下张量的形状操作(详细)

目录 一、基本操作函数 二、分类:维度改变,张量变形,维度重排 2.1维度改变 2.2张量变形 2.3维度重排 三、实例 一、基本操作函数 在PyTorch中,对张量的形状进行操作是常见的需求,因为它允许我们重新组织、选择和…

53、图论-课程表

思路: 其实就是图的拓扑排序,我们可以构建一个图形结构,比如[0,1]表示1->0,对于0来说入度为1。 遍历结束后,从入度为0的开始遍历。引文只有入度为0的节点没有先决条件。然后依次减少1。直到所有节点入度都为0.然后…

Ai-WB2 系列模组SDK接入亚马逊云

文章目录 前言一、准备二、亚马逊云物模型建立1. 注册亚马逊账号,登录AWS IoT控制台,[注册地址](https://aws.amazon.com/cn/)2. 创建好之后点击登录3. 创建物品以及下载证书 三、连接亚马逊云demo获取以及配置1. 下载源码2. 按照顺序执行下面指令3. 修改…

Tensorflow AutoGraph 的作用和功能

🍉 CSDN 叶庭云:https://yetingyun.blog.csdn.net/ TensorFlow AutoGraph 是 TensorFlow 中的一个重要特性,它允许开发者使用普通的 Python 语法编写高效的 TensorFlow 图(graph)。这意味着开发者可以利用 Python 的易…

【51单片机项目】基于51单片机自制多功能小键盘/模拟USB键盘【附源码】(STC89C52RC+CH9328)

目录 一、效果展示 二、创作灵感 三、硬件电路 注意事项 工作原理 四、源码 main.c 五、附录 CH9328工作原理 CH9328的模式选择 ​编辑 全键盘键码值表 参考链接 一、效果展示 该小键盘具有三种功能: 1、自动输入开机密码 2、每隔一段时间自动按下ct…

在Mac M1笔记本上跑大语言模型llama3的4个步骤?(install、pull、run、ask)

要点 Ollama一个功能强大的本地大语言模型LLM运行工具,支持很多模型,并且操作极其简单快速回忆步骤: 下载ollama工具:https://ollama.com/download 下载模型:ollama pull llama3 #根据libs列表直接指定名字 运行模型…

【软考】设计模式之适配器模式

目录 1. 说明2. 应用场景3. 结构图4. 构成5. 优缺点5.1 优点5.2 缺点 6. 适用性7. java示例7.1 类适配器模式7.2 对象适配器模式 1. 说明 1.Adapter(适配器)。2.将一个类的接口转换成客户希望的另外一个接口。3.Adapter模式使得原本由于接口不兼容而不能…

visionTransformer window平台下报错

错误: KeyError: Transformer/encoderblock_0/MlpBlock_3/Dense_0kernel is not a file in the archive解决方法: 修改这个函数即可,主要原因是Linux系统与window系统路径分隔符不一样导致 def load_from(self, weights, n_block):ROOT f&…

物理隔离条件下的数据安全导入导出方案,哪种最安全可控?

数据安全在当今信息化社会中扮演着至关重要的角色,尤其像政府、军工等单位,有比较多的核心数据要保护,一旦出现数据泄漏,将造成不可估量的后果。因此为了保护数据安全,政府、军工等单位一般会采取纯物理隔离&#xff0…

day07 51单片机-串口通信

51 单片机-串口通信 1 串口通信 1.1 需求描述 本案例讲解如何通过串口和PC以9600波特率,无校验位、1停止位通信。最终实现PC向单片机发送字符串,单片机回复PC。本案例中采用串口1通信。 1.2 硬件设计 1.2.1 串口工作原理 串口是将数据按照比特逐一发送的通信接口。在串…

视频教程下载:ChatGPT驱动的SEO、网络营销、生产力提升

用户遇到的一个常见问题是在ChatGPT对话过程中难以保持清晰的目的和专注。这可能导致互动无效和浪费时间。这门课程将教给各种创意人士——艺术家、制造者、博主、讲师和内容创作者——如何制定理想的提示配方,从而产生更有成效的对话和更高的回报。 这是一门关于如…