『笔记』可扩展架构设计之消息队列

news2025/1/10 13:30:38

前言

众所周知,开发低耦合系统是软件开发的终极目标之一。低耦合的系统更加容易扩展,低耦合的模块更加容易复用,更易于维护和管理。我们知道,消息队列的主要功能就是收发消息,但是它的作用不仅仅只是解决应用之间的通信问题这么简单。消息队列作为常用的中间件,经常被用来对系统解耦,对模块解耦。增强系统的可扩展性和模块的可复用性。

除了对用于对系统、模块解耦,消息队列还有以下几种通途:

  • 服务异步处理
  • 流量控制
  • 作为发布 / 订阅系统实现一个微服务级系统间的观察者模式
  • 连接流计算任务和数据
  • 用于将消息广播给大量接收者

事物的存在总会有对立的一面,引入消息队列可能会带来延迟问题、产生数据不一致的问题、增加系统复杂度的问题等等。

EDA 架构之生产者与消费者模式

事件驱动架构(Event Driven Architecture, EDA)

EDA 架构原理

事件驱动架构由事件发起者和事件使用者组成。事件的发起者检测或感知事件,并以消息的形式来表示事件。它并不知道事件的使用者或事件引起的结果。

检测到事件后,系统会通过事件通道从事件发起者传输给事件使用者,而事件处理平台则会在该通道中以异步方式处理事件。事件发生时,需要通知事件使用者。他们可能会处理事件,也可能只是受事件的影响。

事件处理平台将对事件做出正确响应,并将活动下发给相应的事件使用者。通过这种下发活动,我们就可以看到事件的结果。

检测到事件后,系统会通过事件通道从事件发起者传输给事件使用者,而事件处理平台则会在该通道中以异步方式处理事件。事件发生时,需要通知事件使用者。他们可能会处理事件,也可能只是受事件的影响。

事件处理平台将对事件做出正确响应,并将活动下发给相应的事件使用者。通过这种下发活动,我们就可以看到事件的结果。

生产者-消费者模型

操作系统中常见的 EDA 架构就是生产者-消费者模型。消息队列常用来作为生产者和消费者之间的缓冲带,平衡生产者和消费者的处理能同时对服务进行解耦。有了这层缓冲带,生产者和消费者可能都不知道对方的存在。

生产者与消费者模式

以下为生产者-消费者模型的简单实现,(内存消息队列)

import time

from queue import Queue
from random import randint
from threading import Thread

class Producer(Thread):
    def __init__(self, queue):
        super().__init__()
        self.queue = queue

    def run(self):
        while True:
            productA = randint(0, 10)
            productB = randint(90, 100)
            print('Produce A「number」: {}, Produce B「number」: {}'.format(productA, productB))
            self.queue.put((productA, productB))
            time.sleep(2)

class Consumer(Thread):
    def __init__(self, queue):
        super().__init__()
        self.queue = queue

    def run(self):
        while True:
            # block=True, if queue is empty, block(阻塞)
            products_tuple = self.queue.get(block=True)
            print(f'Consume products: {products_tuple[0]} & {products_tuple[1]}')
            time.sleep(randint(0, 10))

def main():
    queue = Queue()
    producer = Producer(queue)
    consumer = Consumer(queue)

    producer.start()
    consumer.start()

main()
"""
Produce A「number」: 8, Produce B「number」: 95
Consume products: 8 & 95
Produce A「number」: 4, Produce B「number」: 92
Consume products: 4 & 92
Produce A「number」: 9, Produce B「number」: 90
... """

基于ZeroMQ PubSub模式的观察者模式实例

ZeroMQ

PubSub模式

# publisher1.py
import time
import zmq

def publisher1():
    context = zmq.Context()
    socket = context.socket(zmq.PUB)
    socket.bind("tcp://*:5555")

    while True:
        count = 99

        while True:
            time.sleep(1)
            socket.send_string('publisher1 pushes event %d' % count)
            print('push event %d' % count)
            count += 1

if __name__ == "__main__":
    publisher1()
# publisher2.py

import time
import zmq

def publisher2():
    context = zmq.Context()
    socket = context.socket(zmq.PUB)
    socket.bind("tcp://*:5556")

    while True:
        count = 1

        while True:
            time.sleep(1)
            socket.send_string('publisher2 pushes event %d' % count)
            print('push event %d' % count)
            count += 1

if __name__ == "__main__":
    publisher2()
# subscriber1.py

import zmq

def subscriber1():
    context = zmq.Context()
    socket = context.socket(zmq.SUB)
    socket.connect('tcp://127.0.0.1:5555')
    socket.connect('tcp://127.0.0.1:5556')
    socket.setsockopt_string(zmq.SUBSCRIBE, '')

    while True:
        message = socket.recv()
        print('message: %s' % message)

if __name__ == "__main__":
    subscriber1()
# subscriber2.py

import zmq

def subscriber2():
    context = zmq.Context()
    socket = context.socket(zmq.SUB)
    socket.connect('tcp://127.0.0.1:5555')
    socket.connect('tcp://127.0.0.1:5556')
    socket.setsockopt_string(zmq.SUBSCRIBE, '')

    while True:
        message = socket.recv()
        print('message: %s' % message)

if __name__ == "__main__":
    subscriber2()

秒杀系统的架构设计与消息队列

某秒杀系统的主要处理步骤如下:

  • 风险控制
  • 库存锁定
  • 生成订单
  • 短信通知
  • 更新统计数据

使用消息队列进行异步处理

由于秒杀成功的关键取决于风险控制、库存锁定这两步骤,所以 server 端处理了这两步之后可以给 client 端返回结果了,后续的步骤可放入消息队列中异步执行。不一定要在秒杀请求中完成。集中资源处理关键步骤(同步),碎片时间(全部秒杀请求处理结束)处理次要步骤(异步)。

demo

使用消息队列进行流量控制(削峰)

秒杀开始后,将超过 server 端处理上限(短时间内)的秒杀请求放入消息队列中,后续有能力处理时再对消息队列中消费请求进行处理。对于超时的请求可以直接丢弃(秒杀失败)。

demo

参考

  • 大型网站技术架构
  • 什么是事件驱动架构
  • 为什么需要消息队列-极客时间
  • ZeroMQ
  • pyzmq

本文由博客群发一文多发等运营工具平台 OpenWrite 发布

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1544796.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

java一和零(力扣Leetcode474)

一和零 力扣原题 给定一个二进制字符串数组 strs 和两个整数 m 和 n,请你找出并返回 strs 的最大子集的长度,该子集中最多有 m 个 0 和 n 个 1。 示例 1: 输入:strs [“10”, “0001”, “111001”, “1”, “0”], m 5, n …

基于GA优化的CNN-LSTM-Attention的时间序列回归预测matlab仿真

目录 1.算法运行效果图预览 2.算法运行软件版本 3.部分核心程序 4.算法理论概述 4.1卷积神经网络(CNN)在时间序列中的应用 4.2 长短时记忆网络(LSTM)处理序列依赖关系 4.3 注意力机制(Attention) 5…

基于BP神经网络的城市空气质量数据预测matlab仿真

目录 1.算法运行效果图预览 2.算法运行软件版本 3.部分核心程序 4.算法理论概述 4.1 BP神经网络结构 4.2 神经元模型与激活函数 4.3 前向传播过程 4.4反向传播算法及其误差函数 4.5 权重更新规则 4.6 迭代训练 5.算法完整程序工程 1.算法运行效果图预览 2.算法运行软…

Qt_day4:2024/3/25

作业1: 完善对话框,点击登录对话框,如果账号和密码匹配,则弹出信息对话框,给出提示”登录成功“,提供一个Ok按钮,用户点击Ok后,关闭登录界面,跳转到其他界面 如果账号和…

一篇文章,告别Flutter状态管理争论,问题和解决

起因 每隔一段时间,都会出现一个新的状态管理框架,最近在YouTube上也发现了有人在推signals, 一个起源于React的状态管理框架,人们总是乐此不疲的发明各种好用或者为了解决特定问题而产生的方案,比如Bloc, 工具会推陈出新&#x…

qt table 简易封装,样式美化,以及 合并表格和颜色的区分 已解决

在需求中&#xff0c; 难免会使用 table 进行渲染窗口&#xff0c;做一个简单的封装。美化表格最终效果&#xff01;&#xff01;&#xff01; 代码部分 // 显示 20行 20列CCendDetailsInfoTableWidget* table new CCendDetailsInfoTableWidget(20,10);for (int i 0; i < …

ESCTF-逆向赛题WP

ESCTF_reverse题解 逆吧腻吧babypybabypolyreeasy_rere1你是个好孩子完结撒花 Q_W_Q 逆吧腻吧 下载副本后无壳&#xff0c;直接拖入ida分析分析函数逻辑&#xff1a;ida打开如下&#xff1a;提取出全局变量res的数据后&#xff0c;编写异或脚本进行解密&#xff1a; a[0xBF, …

enscan自动化主域名信息收集

enscan下载 Releases wgpsec/ENScan_GO (github.com) 能查的分类 实操&#xff1a; 首先打开linux 的虚拟机、 然后把下面这个粘贴到虚拟机中 解压后打开命令行 初始化 ./enscan-0.0.16-linux-amd64 -v 命令参数如下 oppo信息收集 运行下面代码时 先去配置文件把coo…

JavaEE企业开发新技术3

目录 2.11 Method的基本操作-1 文字性概念描述 代码&#xff1a; 2.12 Method的基本操作-2 2.13 Method的基本操作-3 2.14 数组的反射操作-1 文字性概念&#xff1a; 代码&#xff1a; 2.15 数组的反射操作-2 学习内容 2.11 Method的基本操作-1 文字性概念描述 Me…

io的学习4

打印流 分类&#xff1a;打印流一般是指&#xff1a;PrintStream、PrintWriter两个类 特点&#xff1a; 1.打印流只操作文件目的地&#xff0c;不操作数据源 2.特有的写出方法可以实现&#xff0c;数据原样写出 3.特有的写出方法&#xff0c;可以实现自动刷新&#xff0c;…

仅用一个月,游卡完成从MySQL到上线OceanBase的实践

编者按&#xff1a;自2023年9月起&#xff0c;游卡——国内最早卡牌游戏研发者之一&#xff0c;开始测试OceanBase&#xff0c;并在短短两个月内成功将三个核心业务应用迁移至OceanBase上。究竟是何因素促使游卡放弃游戏行业普遍采用的MySQL方案&#xff0c;转而大胆选择OceanB…

荟萃分析R Meta-Analyses 3 Effect Sizes

总结 效应量是荟萃分析的基石。为了进行荟萃分析&#xff0c;我们至少需要估计效应大小及其标准误差。 效应大小的标准误差代表研究对效应估计的精确程度。荟萃分析以更高的精度和更高的权重给出效应量&#xff0c;因为它们可以更好地估计真实效应。 我们可以在荟萃分析中使用…

一文整合工厂模式、模板模式、策略模式

为什么使用设计模式 今天终于有时间系统的整理一下这几个设计模式了&#xff0c; 这几个真是最常用的&#xff0c;用好了它们&#xff0c;你就在也不用一大堆的if else 了。能更好的处理大量的代码冗余问题。 在我们的实际开发中&#xff0c;肯定会有这样的场景&#xff1a;我…

【C语言基础】:内存操作函数

文章目录 一、memcpy函数的使用和模拟实现1.1 memcpy函数的使用1.2 memcpy函数的模拟实现 二、memmove函数的使用和模拟实现2.1 memmove函数的使用2.2 memmove函数的模拟实现 三、memset函数的使用3.1 menset函数的使用 四、memcmp函数的使用4.1 memcmp函数的使用 学海无涯苦作…

Qt与编码

ASCII码:一个字节&#xff0c;256个字符。 Unicode:字母&#xff0c;汉字都占用两个字节。 utf-8:字母一个字节&#xff0c;汉字3个字节。 gbk:字母一个字节&#xff0c;汉字2个字节。 gb2312:可以表示汉字&#xff0c;gb2312<gbk。 编码查看&#xff1a; https://www.…

钡铼技术R40路由器助力构建无人值守的智能化污水处理厂

钡铼技术R40路由器作为智能化污水处理厂的关键网络设备&#xff0c;发挥着至关重要的作用&#xff0c;助力构建无人值守的智能化污水处理系统。在现代社会&#xff0c;污水处理是城市环境保护和可持续发展的重要组成部分&#xff0c;而智能化污水处理厂借助先进的技术和设备&am…

C语言数据结构易错知识点(5)(插入排序、选择排序)

插入排序&#xff1a;直接插入排序、希尔排序 选择排序&#xff1a;直接选择排序、堆排序 上述排序都是需要掌握的&#xff0c;但原理不会讲解&#xff0c;网上有很多详尽地解释&#xff0c;本文章主要分享一下代码实现上应当注意的事项 1.直接插入排序&#xff1a; 代码实…

Kevin的128纪念日

上面这个是我在三天前做的一个开场白一样的封面。在设计的时候我的想法很简单&#xff0c;把自己给展现出来。我没有去过多的加其他花花绿绿的东西&#xff0c;我想把我本身的状态和形象给凸显出来。 哈哈~看到这里有人就想问&#xff0c;这个躺在沙发上吃零食的懒猫就是你的个…

利用瑞士军刀netcat建立连接并实现文件上传

实验环境&#xff1a; Kali:192.168.117.129 Windows10:192.168.135.142 第一步&#xff1a;建立连接 在Windows上下载netcat(官网搜索) 下载好之后在netcat目录打开cmd进入小黑屏 实验一&#xff1a;建立虚拟机与主机的连接 命令&#xff1a; Kali:nc 192.168.135.144…

FastAPI+React全栈开发05 React前端框架概述

Chapter01 Web Development and the FARM Stack 05 The frontend React FastAPIReact全栈开发05 React前端框架概述 Let’s start with a bit of context here. Perhaps the changes in the world of the web are most visible when we talk about the frontend, the part o…