微服务设计模式 - 发布订阅模式(Publisher Subscriber Pattern)

news2024/11/23 8:30:29

微服务设计模式 - 发布订阅模式(Publisher Subscriber Pattern)

Publisher-Subscriber-Pattern3.

定义

发布-订阅模式(Publisher-Subscriber Pattern)是一种常见的设计模式,被广泛用于云计算和分布式系统中,以实现松散耦合的组件间通信。发布-订阅模式本质上是一种消息传递模式,其中消息发布者(Publisher)不会将消息直接发送给特定的接收者(Subscriber),而是将消息发布到一个中介(如消息通道或事件总线),订阅者通过订阅中介来接收感兴趣的消息。通过这种方式,发布者和订阅者之间实现了松耦合。

结构

发布-订阅模式主要包括以下几个组件:

  1. 发布者:生产并发布消息的源头。
  2. 中介:用于发布和传递消息的消息通道或事件总线。
  3. 订阅者:接收并处理消息的终点。
+-----------+             +---------+             +---------+
|  发布者   |  ---发布--> |  中介   | ---广播-->  |  订阅者  |
+-----------+             +---------+             +---------+
                                ^                     |
                                |                     |
+-----------+                   |        订阅         |
|  订阅者   | ------------------+
+-----------+

工作原理

Publisher Subscriber Pattern Sequence

发布-订阅模式的工作流程如下:

  1. 消息发布:发布者生成消息并将其发布到中介(如消息队列或事件总线)。
  2. 消息存储与管理:中介接收并存储消息,同时管理订阅者信息。
  3. 消息广播:中介将消息广播给所有订阅了该消息的订阅者。
  4. 消息接收与处理:订阅者接收消息并进行处理。

优势与应用场景

发布-订阅模式(Publisher-Subscriber Pattern)在分布式系统和微服务架构中被广泛应用,主要优势如下:

1. 松耦合

定义和实现独立:发布者和订阅者相互独立,发布者不需要知道订阅者的具体信息,反之亦然。各个服务之间互不依赖,使得系统模块可以独立开发、测试和部署。

  • 应用情景:一个电商平台上,订单服务和库存服务可以独立存在。订单创建后,库存减少操作不需要紧耦合在一起,可以通过消息队列来解耦。

2. 扩展性

服务扩展变得简单:可以方便地添加新的订阅者,而无需修改已有的发布者或者其他订阅者。新订阅者只需要订阅相应的消息即可。

  • 应用情景:在已有用户服务和通知服务的基础上,可以快速添加一个新的分析服务来订阅用户注册消息以进行数据分析。

3. 灵活性和适应性

动态消息路由:通过路由键和主题交换机,可以实现复杂的消息路由和选择,使得系统更具灵活性和适应性。

  • 应用情景:广告系统通过特定兴趣标签路由消息到不同的消费者,实现个性化推荐。

4. 提高性能和可靠性

异步处理提升性能:发布者可以立即发布消息而不必等待消息被处理,从而提高系统的响应速度。订阅者可以按自己的节奏处理消息,减少系统的耦合和脆弱性。

  • 应用情景:支付系统可以在用户付款后立即返回结果,后续的账单生成和通知可以异步处理。

RabbitMQ

RabbitMQ 作为一个高性能、高可靠性的消息队列系统,与发布-订阅模式非常契合。它提供了丰富的功能,使得在实际开发中实现发布-订阅模式变得相对简单和高效。

RabbitMQ 如何支持发布-订阅模式

  1. 交换机(Exchange):RabbitMQ 使用交换机来接收并路由消息。发布者将消息发送到交换机,交换机再根据路由键将消息转发到绑定的队列。不同类型的交换机(如 fanoutdirecttopicheaders)可以实现各种不同的路由策略。
  2. 队列(Queue):队列是存储消息的地方。消费者从队列中提取消息进行处理。通过将队列绑定到交换机,可以实现消息的广播和分发。
  3. 绑定(Binding):绑定定义了交换机如何将消息路由到队列。绑定一般会指定一个路由键或者模式,从而实现消息过滤和选择。

交换机类型

  • Direct Exchange:直接交换机将消息路由到绑定键匹配的队列上。

  • Fanout Exchange:扇出交换机将消息广播到所有绑定的队列上,不考虑路由键。

  • Topic Exchange:主题交换机通过路由键模式(如 user.*order.#)路由消息,允许更灵活的路由策略。

  • Headers Exchange:头部交换机通过消息的头部属性来路由消息。

示例代码

在微服务架构中,发布-订阅模式常用于事件驱动的通信。以下是一个利用RabbitMQ实现发布-订阅模式的示例。

+-----------+                  +---------------+                  +--------------+
|            |                 |               |                 |              |
|  发布者    | ---发布消息---> |     主题交换机 | --路由键--->  |     用户队列  | 
|            |                 |               | (user.*)        |              |
+-----------+                  +---------------+                  +--------------+
                                                                |              |
                                                                |  订阅者一    | <---处理消息--
                                                                +--------------+

+-----------+                  +---------------+                  +--------------+
|            |                 |               |                 |              |
|  发布者    | ---发布消息---> |     主题交换机 | --路由键--->  |     订单队列  | 
|            |                 |               | (order.*)       |              |
+-----------+                  +---------------+                  +--------------+
                                                                |              |
                                                                |  订阅者二    | <---处理消息--
                                                                +--------------+

项目说明

为了让消费者仅消费特定的事件,我们可以通过多种方式实现。常见的方法包括:

  1. 消息过滤:在发送消息时添加特定的标签或属性,消费者根据这些标签或属性进行过滤。
  2. 不同的队列或主题:将不同类型的事件发布到不同的消息队列或主题上,消费者订阅他们感兴趣的队列或主题。

在Spring Boot中,使用RabbitMQ时,我们可以利用路由键(Routing Key)和基于主题的交换机(Topic Exchange)实现这种功能,具体实现步骤:

  1. 配置RabbitMQ的交换机、队列和绑定关系。

  2. 发布者在发布消息时指定路由键。

  3. 消费者通过 @RabbitListener 注解订阅特定的路由键。

项目结构

event-driven-system/
│
├── src/main/java/com/example/event/
│   ├── EventApplication.java
│   ├── config/
│   │   └── RabbitConfig.java
│   ├── publisher/
│   │   └── MessagePublisher.java
│   ├── subscriber/
│   │   ├── UserSubscriber.java
│   │   └── OrderSubscriber.java
└── src/main/resources/
    ├── application.yml

代码流程

  • 发布者调用 publishUserEvent 方法,向 eventExchange 发布用户创建消息,消息被根据路由键 user.create 发送到 userQueue 中,然后 User Subscriber 消费者从 userQueue 队列中接收消息并处理。
  • 发布者调用 publishOrderEvent 方法,向 eventExchange 发布订单创建消息,消息被根据路由键 order.create 发送到 orderQueue 中,然后 Order Subscriber 消费者从 orderQueue 队列中接收消息并处理。

Publisher Subscriber Pattern Flow

这个设计确保了不同类型的消费者(用户订阅者和订单订阅者)能够接收和处理他们各自关注的消息,从而实现了消费者只消费特定事件的需求。

相关源代码

EventApplication.java

主程序启动类。

package com.example.event;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class EventApplication {

    public static void main(String[] args) {
        SpringApplication.run(EventApplication.class, args);
    }
}
RabbitConfig.java

在配置类中配置基于主题的交换机、队列及其绑定关系。我们可以在配置类中配置 RabbitListenerContainerFactory,并在使用 @RabbitListener 注解时自动使用这个工厂。这能让我们自定义一些与监听器相关的配置,比如并发消费者的数量、消息的确认模式等。

package com.example.event.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.listener.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

    @Bean
    public Queue userQueue() {
        return new Queue("userQueue");
    }

    @Bean
    public Queue orderQueue() {
        return new Queue("orderQueue");
    }

    @Bean
    public TopicExchange eventExchange() {
        return new TopicExchange("eventExchange");
    }

    @Bean
    public Binding userBinding(Queue userQueue, TopicExchange eventExchange) {
        return BindingBuilder.bind(userQueue).to(eventExchange).with("user.*");
    }

    @Bean
    public Binding orderBinding(Queue orderQueue, TopicExchange eventExchange) {
        return BindingBuilder.bind(orderQueue).to(eventExchange).with("order.*");
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        return new RabbitTemplate(connectionFactory);
    }

    @Bean
    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        // 设置消费者的并发数量,等配置
        factory.setConcurrentConsumers(3);
        factory.setMaxConcurrentConsumers(10);
        return factory;
    }
}
MessagePublisher.java

发布者在发布消息时指定路由键,以便消费者能按需求过滤消息。

package com.example.event.publisher;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class MessagePublisher {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void publishUserEvent(String message) {
        rabbitTemplate.convertAndSend("eventExchange", "user.create", message);
        System.out.printf("Published user event: %s%n", message);
    }

    public void publishOrderEvent(String message) {
        rabbitTemplate.convertAndSend("eventExchange", "order.create", message);
        System.out.printf("Published order event: %s%n", message);
    }
}
UserSubscriber.java

用户订阅者只会接收指定的“user”事件,通过 @RabbitListener 注解明确指定使用 rabbitListenerContainerFactory,从而确保我们自定义的容器配置能够被应用。

package com.example.event.subscriber;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@Service
public class UserSubscriber {
    @RabbitListener(queues = "userQueue", containerFactory = "rabbitListenerContainerFactory")
    public void handleUserMessage(String message) {
        System.out.printf("Received user event message: %s%n", message);
    }
}
OrderSubscriber.java

订单订阅者只会接收指定的“order”事件。

package com.example.event.subscriber;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@Service
public class OrderSubscriber {
    @RabbitListener(queues = "orderQueue", containerFactory = "rabbitListenerContainerFactory")
    public void handleOrderMessage(String message) {
        System.out.printf("Received order event message: %s%n", message);
    }
}
application.yml

rabbitmq配置。

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest

总结

Robert-C-Martin-Quote-All-race-conditions-deadlock-conditions-and

发布-订阅模式是实现松耦合系统的强大工具,在云计算和分布式系统中应用广泛。通过使用如RabbitMQ的消息中间件,我们可以很容易地在Spring Boot项目中实现这一模式。通过本文的示例,我们展示了如何利用 RabbitListenerContainerFactory 配置消费者行为,并通过Spring Boot注解 @RabbitListener 进行订阅,从而实现发布-订阅模式的精细化控制和一个完整的发布-订阅模式。这种模式不仅提高了系统的扩展性和灵活性,还大大简化了开发过程中的依赖管理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2230910.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

00-开发环境 MPLAB IDE 配置

MPLAB IDE V8.83 File 菜单简介 New (CtrlN)&#xff1a; 创建一个新文件&#xff0c;用于编写新的代码。 Add New File to Project...&#xff1a; 将新文件添加到当前项目中。 Open... (CtrlO)&#xff1a; 打开现有文件。 Close (CtrlE)&#xff1a; 关闭当前打开的文件。 …

Pytorch猴痘病识别

Pytorch猴痘病识别 &#x1f368; 本文为&#x1f517;365天深度学习训练营 中的学习记录博客&#x1f356; 原作者&#xff1a;K同学啊 电脑系统&#xff1a;Windows11 显卡型号&#xff1a;NVIDIA Quadro P620 语言环境&#xff1a;python 3.9.7 编译器&#xff1a;jupyte…

GA/T1400视图库平台EasyCVR视频分析设备平台微信H5小程序:智能视频监控的新篇章

GA/T1400视图库平台EasyCVR是一款综合性的视频管理工具&#xff0c;它兼容Windows、Linux&#xff08;包括CentOS和Ubuntu&#xff09;以及国产操作系统。这个平台不仅能够接入多种协议&#xff0c;还能将不同格式的视频数据统一转换为标准化的视频流&#xff0c;通过无需插件的…

Kafka相关知识点(上)

为什么要使用消息队列&#xff1f; 使用消息队列的主要目的主要记住这几个关键词:解耦、异步、削峰填谷。 解耦: 在一个复杂的系统中&#xff0c;不同的模块或服务之间可能需要相互依赖&#xff0c;如果直接使用函数调用或者 API 调用的方式&#xff0c;会造成模块之间的耦合…

qt QTextEdit详解

QTextEdit是Qt框架中的一个文本编辑控件类&#xff0c;它提供了丰富的功能用于编辑和显示纯文本以及富文本。 重要方法 setPlainText(const QString &text)&#xff1a;设置纯文本内容。toPlainText()&#xff1a;获取纯文本内容。setHtml(const QString &text)&#…

大学城水电管理系统开发:Spring Boot指南

2相关技术 2.1 MYSQL数据库 MySQL是一个真正的多用户、多线程SQL数据库服务器。 是基于SQL的客户/服务器模式的关系数据库管理系统&#xff0c;它的有点有有功能强大、使用简单、管理方便、安全可靠性高、运行速度快、多线程、跨平台性、完全网络化、稳定性等&#xff0c;非常…

Github 2024-11-02 Rust开源项目日报 Top10

根据Github Trendings的统计,今日(2024-11-02统计)共有10个项目上榜。根据开发语言中项目的数量,汇总情况如下: 开发语言项目数量Rust项目10Python项目2Dart项目1RustDesk: 用Rust编写的开源远程桌面软件 创建周期:1218 天开发语言:Rust, Dart协议类型:GNU Affero Genera…

【android12】【AHandler】【4.AHandler原理篇ALooper类方法全解】

AHandler系列 【android12】【AHandler】【1.AHandler异步无回复消息原理篇】-CSDN博客 【android12】【AHandler】【2.AHandler异步回复消息原理篇】-CSDN博客 【android12】【AHandler】【3.AHandler原理篇AHandler类方法全解】-CSDN博客 其他系列 本人系列文章-CSDN博客…

【深度学习】CrossEntropyLoss需要手动softmax吗?

【深度学习】CrossEntropyLoss需要手动softmax吗&#xff1f; 问题&#xff1a;CrossEntropyLoss需要手动softmax吗&#xff1f;答案&#xff1a;不需要官方文档代码解释 问题&#xff1a;CrossEntropyLoss需要手动softmax吗&#xff1f; 之前用 pytorch 实现自己的网络时&…

EtherCAT转ModbusTCP相关技术

EtherCAT/Ethernet/IP/Profinet/ModbusTCP协议互转工业串口网关https://item.taobao.com/item.htm?ftt&id822721028899 MS-GW15 概述 MS-GW15 是 EtherCAT 和 Modbus TCP 协议转换网关&#xff0c;为用户提供一种 PLC 扩展的集成解决方案&#xff0c;可以轻松容易将 Modbu…

使用Centos搭建Rocket.Chat教程

本章教程,主要介绍如何在CentOS系统上搭建Rocket.Cha。 一、Rocket.Chat是什么? Rocket.Chat 是一个开源的团队协作和通讯平台,类似于 Slack 或 Microsoft Teams。它提供了即时消息、视频会议、文件共享、以及与其他服务的集成等功能。用户可以在自己的服务器上部署 Rocket.…

jenkins 构建报错 mvn: command not found

首先安装过 maven&#xff0c;并且配置过环境变量 win r ,输入 cmd 键入 mvn -v 出现上图输出&#xff0c;则证明安装成功。 原因 jenkins 没有 maven 配置全局属性, 导致无法找到 mvn 命令。 解决方案 找到全局属性&#xff0c;点击新增&#xff0c;配置 MAVEN_HOME 路…

C++入门基础知识134—【关于C 库函数 - gmtime()】

成长路上不孤单&#x1f60a;&#x1f60a;&#x1f60a;&#x1f60a;&#x1f60a;&#x1f60a; 【14后&#x1f60a;///C爱好者&#x1f60a;///持续分享所学&#x1f60a;///如有需要欢迎收藏转发///&#x1f60a;】 今日分享关于C 库函数 - gmtime()的相关内容&#xf…

如何开发一个摄影行业小程序?开发一个摄影行业小程序需要哪些功能?南昌各行业小程序开发

如何开发一个摄影行业小程序&#xff1f;开发一个摄影行业小程序需要以下步骤&#xff1a; 1、需求分析&#xff1a;明确小程序的定位和功能需求&#xff0c;例如拍照、修图、分享、预约摄影师等。 2、设计界面&#xff1a;根据需求分析&#xff0c;设计小程序的用户界面&…

虚幻引擎5(UE5)学习教程

虚幻引擎5&#xff08;UE5&#xff09;学习教程 引言 虚幻引擎5&#xff08;Unreal Engine 5&#xff0c;简称UE5&#xff09;是Epic Games开发的一款强大的游戏引擎&#xff0c;广泛应用于游戏开发、影视制作、建筑可视化等多个领域。UE5引入了许多先进的技术&#xff0c;如…

linux线程的认识

1.虚拟地址空间 管理进程的pcb结构体task_struct中有一个mm_struct指针&#xff0c;指向虚拟地址空间&#xff0c;而编译好的代码中有地址&#xff0c;但是是虚拟地址&#xff0c;那么虚拟地址是怎么转化成真实的物理地址呢&#xff1f;通过页表转化 我们知道&#xff0c;代码…

05 Django 框架模型介绍(一)

文章目录 1、Django 模型简介2、Django 中创建并使用模型&#xff08;1&#xff09;新加一个名为 myapp 的应用&#xff08;2&#xff09;定义模型类&#xff08;2&#xff09;激活模型类&#xff08;3&#xff09;创建数据库迁移文件&#xff08;4&#xff09;应用迁移文件 3、…

Autosar CP中的I/O硬件抽象层功能与接口使用导读

规范的主要内容 I/O硬件抽象层&#xff08;I/O Hardware Abstraction Layer&#xff0c;简称IoHwAb&#xff09;的主要功能包括以下几点&#xff1a; 提供硬件访问接口&#xff1a;I/O硬件抽象层为上层软件组件&#xff08;如应用层软件&#xff09;提供访问微控制器硬件&…

【毫米波雷达(三)】汽车控制器启动流程——BootLoader

汽车控制器启动流程——BootLoader 一、什么是Bootloader(BT)&#xff1f;二、FBL、PBL、SBL、ESS的区别三、MCU的 A/B分区的实现 一、什么是Bootloader(BT)&#xff1f; BT就是一段程序&#xff0c;一段引导程序。它包含了启动代码、中断、主程序等。 雷达启动需要由BT跳转到…

初始JavaEE篇——多线程(8):JUC的组件

找往期文章包括但不限于本期文章中不懂的知识点&#xff1a; 个人主页&#xff1a;我要学编程程(ಥ_ಥ)-CSDN博客 所属专栏&#xff1a;JavaEE 目录 Callable接口 ReentrantLock synchronized 与 ReentrantLock的区别 信号量&#xff08;Semaphore&#xff09; CountDown…