基于消息中间件的异步通信机制在系统解耦中的优化与实现

news2024/11/15 17:17:35


✨✨谢谢大家捧场,祝屏幕前的小伙伴们每天都有好运相伴左右,一定要天天开心哦!✨✨ 
🎈🎈作者主页: 喔的嘛呀🎈🎈
✨✨ 帅哥美女们,我们共同加油!一起进步!✨✨ 

目录

引言

一. 选择合适的消息中间件

二. 定义消息格式和通信协议

1. 定义消息格式

消息头

消息体

2. 定义通信协议

发送消息

接收消息

消息处理

3. 示例代码

定义消息格式

发送消息

接收消息

三、发布-订阅模式

1. 定义发布-订阅模式

2. 示例代码

发布消息

订阅消息

3. 运行示例

4. 异步处理消息

5. 解耦系统

6. 实现步骤

7. 实例场景

实例场景:电商系统订单处理

场景描述

实现步骤

示例代码

订单服务发送消息

库存服务接收消息

物流服务接收消息


引言

在现代分布式系统中,异步通信和解耦是非常重要的设计原则。通过使用消息中间件,可以实现系统间的异步通信和解耦,提高系统的可扩展性和可靠性。本文将介绍如何使用消息中间件来实现系统间的异步通信和解耦,并通过一个实际场景来演示。

一. 选择合适的消息中间件

选择合适的消息中间件需要考虑多个因素,包括项目需求、性能要求、可靠性、社区支持等。常见的消息中间件包括 RabbitMQ、Kafka、ActiveMQ、Redis 等,下面针对不同的需求给出一些选择建议:

  1. 消息传递模式

    • 点对点:适合使用 RabbitMQ、ActiveMQ 等传统消息中间件。
    • 发布-订阅:适合使用 RabbitMQ、Kafka 等支持广播消息的中间件。
  2. 可靠性

    • 如果对消息的可靠性要求较高,需要确保消息不会丢失,可以考虑使用 RabbitMQ、Kafka 等提供消息持久化和高可靠性的中间件。
  3. 性能

    • 如果需要处理大量的消息并且需要低延迟,可以考虑使用 Kafka,它是一个高吞吐量的消息中间件,适合大数据场景。
    • 如果对延迟要求较低,可以选择 RabbitMQ、ActiveMQ 等传统消息中间件。
  4. 社区支持和生态系统

    • 考虑选择一个有活跃社区支持和完善生态系统的消息中间件,这样可以更容易地解决问题和扩展功能。
  5. 技术栈兼容性

    • 考虑选择一个与你的技术栈兼容的消息中间件,避免出现集成上的问题。

综合考虑以上因素,可以选择最适合项目需求的消息中间件。

二. 定义消息格式和通信协议

定义消息格式和通信协议是使用消息中间件的关键步骤之一,它涉及到消息的结构、内容和交互方式。下面以 RabbitMQ 为例,演示如何定义消息格式和通信协议。

1. 定义消息格式

在 RabbitMQ 中,消息通常由两部分组成:消息头和消息体。消息头包含一些元数据信息,如消息的类型、路由键等;消息体包含实际的业务数据。

消息头
  • Content-Type:消息体的类型,如 application/jsontext/plain 等。
  • DeliveryMode:消息持久性标志,标识消息是否需要持久化存储,可选值为 1(持久化)和 2(非持久化)。
  • CorrelationId:消息关联标识,用于关联一组相关消息。
  • 其他自定义的消息头字段,根据业务需求定义。
消息体
  • 消息体可以是任意格式的数据,如 JSON、XML、文本等,根据业务需求定义。

2. 定义通信协议

通信协议定义了消息的交互方式,包括消息的发送、接收和处理流程。通信协议可以包括以下几个方面:

发送消息
  • 客户端向消息队列发送消息,包括指定交换机(Exchange)、路由键(Routing Key)和消息体。
接收消息
  • 服务端从消息队列接收消息,根据消息的交换机和路由键接收对应的消息。
消息处理
  • 客户端接收到消息后,根据消息的内容执行相应的业务逻辑。

3. 示例代码

定义消息格式
public class Message {
    private String content;
    private String contentType;
    private int deliveryMode;
    private String correlationId;

    // 省略getter和setter方法
}
发送消息
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class SendMessage {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            Message message = new Message();
            message.setContent("Hello, RabbitMQ!");
            message.setContentType("text/plain");
            message.setDeliveryMode(1); // 持久化
            message.setCorrelationId("123456");
            String messageJson = toJson(message);
            channel.basicPublish("", QUEUE_NAME, null, messageJson.getBytes());
            System.out.println(" [x] Sent '" + messageJson + "'");
        }
    }

    private static String toJson(Message message) {
        // 将 message 对象转换成 JSON 格式的字符串
        return "{ \"content\": \"" + message.getContent() + "\", \"contentType\": \"" + message.getContentType() + "\", \"deliveryMode\": " + message.getDeliveryMode() + ", \"correlationId\": \"" + message.getCorrelationId() + "\" }";
    }
}
接收消息
import com.rabbitmq.client.*;

public class ReceiveMessage {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println(" [*] Waiting for messages. To exit press Ctrl+C");

            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String messageJson = new String(delivery.getBody(), "UTF-8");
                Message message = fromJson(messageJson, Message.class);
                System.out.println(" [x] Received '" + messageJson + "'");
            };
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
            });
        }
    }

    private static <T> T fromJson(String json, Class<T> clazz) {
        // 将 JSON 格式的字符串转换成指定类型的对象
        // 这里可以使用 JSON 框架(如 Jackson、Gson)来实现
        return null;
    }
}

通过以上步骤,可以定义消息格式和通信协议,并使用 RabbitMQ 实现消息的发送和接收。

三、发布-订阅模式

发布-订阅模式是一种常见的消息传递模式,用于实现消息的广播和订阅。在发布-订阅模式中,消息发布者将消息发布到一个主题(Topic),而消息订阅者可以订阅感兴趣的主题,从而接收到相关消息。下面以 RabbitMQ 为例,演示如何使用发布-订阅模式。

1. 定义发布-订阅模式

在发布-订阅模式中,有一个交换机(Exchange)用来接收发布者发布的消息,并根据订阅者的绑定关系将消息路由到对应的队列。订阅者可以创建自己的队列,并将队列绑定到交换机上,从而接收到发布者发布的消息。

2. 示例代码

发布消息
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Publisher {

    private final static String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
            String message = "Hello, subscribers!";
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}
订阅消息
import com.rabbitmq.client.*;

public class Subscriber {

    private final static String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
            String queueName = channel.queueDeclare().getQueue();
            channel.queueBind(queueName, EXCHANGE_NAME, "");

            System.out.println(" [*] Waiting for messages. To exit press Ctrl+C");

            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            };
            channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
            });
        }
    }
}

3. 运行示例

  1. 先运行订阅者 Subscriber,它会创建一个队列并绑定到交换机上,开始监听消息。
  2. 然后运行发布者 Publisher,它会向交换机发布一条消息。
  3. 订阅者会接收到发布者发布的消息,并输出到控制台。

通过以上步骤,可以实现基于 RabbitMQ 的发布-订阅模式。

4. 异步处理消息

通过消息中间件实现异步处理消息,即发送消息后不需要立即等待结果,而是继续执行其他任务。这样可以提高系统的响应速度和吞吐量。

5. 解耦系统

通过消息中间件,系统之间的通信变成了基于消息的方式,系统不再直接依赖于对方的接口和实现细节,从而实现了系统之间的解耦。

6. 实现步骤

  • 定义消息格式和通信协议:确定消息的格式和通信协议,包括消息的内容结构、消息的生命周期等。
  • 配置消息中间件:在系统中配置和启动消息中间件,确保消息中间件正常运行。
  • 消息的发布和订阅:编写代码实现消息的发布和订阅逻辑,将消息发布到指定的主题,并订阅感兴趣的主题。
  • 处理接收到的消息:编写代码处理接收到的消息,根据消息的内容执行相应的业务逻辑。
  • 测试和验证:对系统进行测试和验证,确保消息的发布、订阅和处理功能正常运行。

7. 实例场景

实例场景:电商系统订单处理
场景描述

假设有一个电商系统,包含订单服务、库存服务和物流服务。当用户下单时,订单服务需要通知库存服务减少库存,通知物流服务发货。为了提高系统的可扩展性和可靠性,我们可以使用消息中间件来实现订单处理的异步通信和解耦。

实现步骤
  1. 定义消息格式和通信协议:定义订单消息的格式,包括订单号、商品信息等,并确定消息的交换机和队列名称。

  2. 配置消息中间件:在消息中间件中配置交换机和队列,并确保消息的持久化。

  3. 订单服务发送消息:订单服务在用户下单后,将订单消息发送到消息队列中。

  4. 库存服务订阅消息:库存服务订阅订单消息队列,接收并处理订单消息,减少库存。

  5. 物流服务订阅消息:物流服务也订阅订单消息队列,接收并处理订单消息,进行发货。

示例代码
订单服务发送消息
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class OrderService {

    private static final String EXCHANGE_NAME = "orders";
    private static final String QUEUE_NAME = "order_queue";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "New order placed";
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}
库存服务接收消息
import com.rabbitmq.client.*;

public class InventoryService {

    private static final String EXCHANGE_NAME = "orders";
    private static final String QUEUE_NAME = "order_queue";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

            System.out.println(" [*] Waiting for orders. To exit press Ctrl+C");

            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
                // 处理订单消息,减少库存
            };
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
        }
    }
}
物流服务接收消息
import com.rabbitmq.client.*;

public class LogisticsService {

    private static final String EXCHANGE_NAME = "orders";
    private static final String QUEUE_NAME = "order_queue";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

            System.out.println(" [*] Waiting for orders. To exit press Ctrl+C");

            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
                // 处理订单消息,发货
            };
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
        }
    }
}

通过以上步骤的简单演示,订单服务可以异步发送订单消息,库存服务和物流服务可以订阅订单消息并处理,实现了订单处理的异步通信和解耦。

通过以上步骤,可以使用消息中间件实现系统间的异步通信和解耦,提高系统的可扩展性和可维护性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1684892.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

重生之我要精通JAVA--第五周笔记

文章目录 APIJDK7时间Date时间类CalendarSimpleDateFormat 类SimpleDateFormat 类作用 JDK8时间Zoneld时区 包装类Integer成员方法 Arrays Lambda表达式标准格式注意点好处省略写法 集合进阶Collection迭代器遍历Collection集合获取迭代器Iterator中的常用方法细节注意点 增强f…

【软考中级 软件设计师】计算机网络和安全

计算机网络和安全是软件设计师&#xff08;软考中级&#xff09;考试中的重要组成部分&#xff0c;它涵盖了网络基础、网络协议、网络架构、网络安全等多个方面。以下是一些核心概念和要点&#xff0c; 计算机网络基础 OSI七层模型&#xff1a;物理层、数据链路层、网络层、传…

【20天拿下Pytorch:Day 7】Dataset和DataLoader

文章目录 1. Dataset和DataLoader概述1.1 概要1.2 获取一个batch数据的步骤1.3 Dataset和DataLoader的功能分工1.4 Dataset和DataLoader的主要接口 2. 使用Dataset创建数据集2.1 根据Tensor创建数据集2.2 根据图片目录创建图片数据集2.3 创建自定义数据集 3. 使用DataLoader加载…

C++ TCP发送Socket数据

DEVC需要加入ws2_32库 #include <iostream> #include <winsock2.h>#pragma comment(lib, "ws2_32.lib")void sendData(const char* ip, int port, const char* data) {WSADATA wsaData;SOCKET sockfd;struct sockaddr_in server_addr;// 初始化Winsock…

打气球小游戏

1.气球往上飘 我们声明两个符号常量来作为窗体的长和宽,接着就是常规操作 #define WINDOW_WIDTH 800 #define WINDOW_HEIGHT 600#include<easyx.h> #include<stdio.h> int main() {initgraph(WINDOW_WIDTH, WINDOW_HEIGHT);setbkcolor(WHITE);cleardevice();get…

php部分特性漏洞学习

php部分函数漏洞学习 简单总结一些我遇到的ctf中的php的一些函数或特性的漏洞&#xff0c;我刷题还是太少了&#xff0c;所以很多例子来自ctfshow&#xff0c;以后遇到相关赛题再更新 1.MD5和其他hash 弱类型比较 php中&#xff0c;有两中判断相等的符号&#xff0c;和&…

【Linux】进程信号及相关函数/系统调用的简单认识与使用

文章目录 前言一、相关函数/系统调用1. signal2. kill3. abort (库函数)4. raise (库函数)5. alarm 前言 现实生活中, 存在着诸多信号, 比如红绿灯, 上下课铃声…我们在接收到信号时, 就会做出相应的动作. 对于进程也是如此的, 进程也会收到来自 OS 发出的信号, 根据信号的不同…

树莓派 Raspberry Pi M.2 HAT+ 现已发售!原理图流出!

​Raspberry Pi M.2 HAT 使您能够将 M.2 M-key 外设&#xff08;如 NVMe 驱动器和人工智能加速器&#xff09;连接到 Raspberry Pi 5。它能够提供与这些外设之间的快数据传输&#xff08;高达 500 MB/s&#xff09;&#xff0c;现在就可以从树莓派的授权经销商网络购买&#xf…

智能网关和交换机在智慧路灯杆上的用途差别

智慧路灯杆是智能城市建设中的一个重要组成部分&#xff0c;它整合了智能照明、视频监控、交通管理、环境监测、网络覆盖、信息发布、一键告警等多种功能。针对智慧路灯杆的使用场景&#xff0c;智能网关和交换机各自发挥着不同的作用&#xff0c;并且拥有各自的优缺点&#xf…

5.14.3 UNETR:用于 3D 医学图像分割的 Transformers

具有收缩和扩展路径的全卷积神经网络 (FCNN) 在大多数医学图像分割应用中表现出了突出的作用。在 FCNN 中&#xff0c;编码器通过学习全局和局部特征以及上下文表示来发挥不可或缺的作用&#xff0c;这些特征和上下文表示可用于解码器的语义输出预测。 在FCNN中&#xff0c;收缩…

包装类..

定义&#xff1a;基本数据类型对应的对象。 如何获取包装类&#xff1a;直接赋值即可&#xff1b;Integer i10; 其中的一个成员方法&#xff1a; public static int parseInt(String s)——把字符串类型的整数转成int 类型的整数。 8种包装类中&#xff0c;除了character都…

力扣刷题---1748.唯一元素的和【简单】

题目描述 给你一个整数数组 nums 。数组中唯一元素是那些只出现 恰好一次 的元素。 请你返回 nums 中唯一元素的 和 。 示例 1&#xff1a; 输入&#xff1a;nums [1,2,3,2] 输出&#xff1a;4 解释&#xff1a;唯一元素为 [1,3] &#xff0c;和为 4 。 示例 2&#xff1a;…

单词可交互的弧形文本

在一个项目中&#xff0c;要求把少儿读本做成电子教材呈现出来&#xff0c;电子书的排版要求跟纸质书一致。其中&#xff0c;英语书有个需求&#xff1a;书中有些不规则排版的文本&#xff08;如下图所示&#xff09;&#xff0c;当随书音频播放时&#xff0c;被读到的文本要求…

注意力机制篇 | MSFE:即插即用的多尺度滑窗注意力(附源码实现)

前言:Hello大家好,我是小哥谈。多尺度滑窗注意力(Multi-Scale Sliding Window Attention,MSFE)是一种用于处理图像的深度学习模型。它通过引入多尺度特征提取和滑窗注意力机制来提高图像识别的准确性。在MSFE中,模型采用多尺度卷积神经网络来提取图像的特征,然后使用滑窗…

同旺科技 FLUKE ADPT 隔离版发布 ---- 2

所需设备&#xff1a; 1、FLUKE ADPT 隔离版 内附链接&#xff1b; 应用于&#xff1a;福禄克Fluke 12E / 15BMax / 17B Max / 101 / 106 / 107 应用于&#xff1a;福禄克Fluke 15B / 17B / 18B 正面&#xff1a; 反面&#xff1a; 侧面&#xff1a; 开孔位置&#xff08;可…

一种综合评价及决策方法:层次分析法AHP

大家好&#xff0c;层次分析法(Analytic Hierarchy Process&#xff0c;AHP)是一种多准则决策方法&#xff0c;它帮助决策者处理复杂的决策问题&#xff0c;将其分解成层次结构&#xff0c;然后通过两两比较来确定各个层次的因素之间的相对重要性。这种分析方式允许决策者对问题…

抖店,今年入场还有机会吗?从客观角度分析!

大家好&#xff0c;我是电商小V 伴随着短视频平台的兴起&#xff0c;也慢慢的步入了电商市场&#xff0c;成为了一个新的电商模式&#xff0c;抖音小店就是依靠着短视频达人带货的流量模式&#xff0c;可以说一直处于红利期&#xff0c;享受着这个短视频风口&#xff0c;也是吸…

前端更改线上请求地址

由于后台接口更改 , 线上请求地址需从 /api/api/ 改成 /api/ , 需实现的效果如下图 1 在原本的vite.config.js中将前端做的端口转发内容更改 , 更改一行即可 import { defineConfig } from vite import react from vitejs/plugin-react import path from path import * as fs …

C语言指针相关知识(第四篇章)(非常详细版)

文章目录 前言一、什么是回调函数二、qsort函数的介绍(默认升序排序)三、qsort函数的模拟实现&#xff08;通过冒泡排序&#xff09;总结 前言 本文介绍了回调函数&#xff0c;qsort函数的使用&#xff0c;以用冒泡排序来模拟实现qsort函数 提示&#xff1a;以下是本篇文章正文…

大语言模型量化方法对比:GPTQ、GGUF、AWQ 包括显存和速度

GPTQ: Post-Training Quantization for GPT Models GPTQ是一种4位量化的训练后量化(PTQ)方法&#xff0c;主要关注GPU推理和性能。 该方法背后的思想是&#xff0c;尝试通过最小化该权重的均方误差将所有权重压缩到4位。在推理过程中&#xff0c;它将动态地将其权重去量化为f…