【软件架构】流水线设计模式

news2024/12/30 3:49:14

流水线模式

流水线模式是一种软件设计模式,它提供了构建和执行一系列操作的能力。

在这里插入图片描述

此模式最好与插件模式结合使用,以便在应用程序启动时动态构建流水线。

顺序

流水线的最基本实现是一个简单的操作序列。

    public interface IOperation<T>
    {
        void Invoke(T data);
    }

可以调用操作的接口来处理数据。

public class Pipeline<T> : IOperation<T>
    {
        private readonly List<IOperation<T>> operations = new List<IOperation<T>>();

        // add operation at the end of the pipeline
        public void Register(IOperation<T> operation)
        {
            operations.Add(operation);
        }

        // invoke every operations
        public void Invoke(T data)
        {
            foreach (var operation in operations) operation.Invoke(data);
        }
    }

流水线一个一个地处理每个操作。流水线类还实现了 IOperation 接口,因此它们可以组合在一起

public class ReverseOperation : IOperation<string>
{
    public void Invoke(string data) => Console.WriteLine($" The string is reversed : {data.Reverse()}");
}

该操作可以写在专用中。

public class Operation<T> : IOperation<T>
{
    private readonly Action<T> action;

    public Operation(Action<T> action)
    {
        this.action = action;
    }

    public void Invoke(T data) => action(data);
}

或者使用包装器从lambda自动创建操作。

// build
var pipeline = new Pipeline<string>();

// lambda
pipeline.Register(new Operation<string>(str =>
{
    Console.WriteLine($"The string {str} contains {str.Length} characters.");
}));

// class
pipeline.Register(new ReverseOperation());

// execute
pipeline.Invoke("apple");

应在调用流水线之前注册流水线操作。

断路器

您要添加到流水线中的第一个功能是添加断路器

public interface IOperation<T>
{
    bool Invoke(T data);
}

每个操作都会返回结果:失败成功

class Pipeline<T> : IOperation<T>
{
    // invoke every operations
    public bool Invoke(T data)
    {
        foreach (var operation in operations)
        {
            if (!operation.Invoke(data))
            {
                Console.WriteLine("Aborting pipeline..");
                return false;
            }
        }

        return true;
    }
}

如果操作失败,流水线执行应该停止

异步

另一个要求可能是拥有一个可以处理异步操作的流水线。

public interface IOperation<T>
{
    void SetNext(IOperation<T> next);
    void Invoke(T data);
}

在完成数据处理后,每个操作现在都必须调用流水线中的下一个操作。

class Pipeline<T> : IOperation<T>
{
    // not the best
    private readonly List<IOperation<T>> operations = new List<IOperation<T>>();
    private readonly IOperation<T> terminate;

    public Pipeline()
    {
        terminate = new Operation<T>(data => {}); // not the best
    }

    void IOperation<T>.SetNext(IOperation<T> next)
    {
        terminate.SetNext(next);
    }

    // append an operation at the end of the pipeline
    public void RegisterOperation(IOperation<T> operation)
    {
        // when the operation is finished, it will call terminate
        operation.SetNext(terminate);
        if (operations.Any())
        {
            // link the last registered operation with the newly registered one
            operations.Last().SetNext(operation);
        }
        operations.Add(operation);
    }

    public void Invoke(T data)
    {
        var operation = operations.Any() ? operations.First() : terminate;
        operation.Invoke(data);
    }
}

流水线稍微复杂一点,因为它需要在注册新操作时设置一个操作。另一种解决方案是使用构建器

public class WriteOperation : IOperation<string>
{
    private IOperation<string> next;

    public void SetNext(IOperation<string> next)
    {
        this.next = next;
    }

    public void Invoke(string data)
    {
        Task.Run(() =>
        {
            Console.WriteLine("Writing data to the disk...");
            Thread.Sleep(100); // just kidding !
            Console.WriteLine("Data successfully written to the disk !");
            next?.Invoke(data);
        });
    }
}

此操作是异步的,将在专用线程中运行,当时间到时,它将调用下一个操作以继续流水线。

class Operation<T> : IOperation<T>
{
    private readonly Func<T, bool> action;
    private IOperation<T> next;

    public Operation(Func<T, bool> action)
    {
        this.action = action;
    }

    public Operation(Action<T> action)
    {
        this.action = data =>
        {
            action(data);
            return true;
        };
    }

    public void SetNext(IOperation<T> next)
    {
        this.next = next;
    }

    public void Invoke(T data)
    {
        if (action(data)) next.Invoke(data);
    }
}

通用操作既可以与简单操作一起使用,也可以在使用函数时使用内置断路器

// the main pipeline
var pipeline = new Pipeline<string>();
pipeline.RegisterOperation(new Operation<string>(data =>
{
    Console.WriteLine($"Everyone likes {data} !");
    return true;
}));
pipeline.RegisterOperation(new WriteOperation());
pipeline.RegisterOperation(new Operation<string>(data =>
{
    if (data == "banana")
    {
        Console.WriteLine("This banana made the pipeline abort...");
        return false;
    }

    return true;
}));
pipeline.RegisterOperation(new Operation<string>(data => Console.WriteLine("This operation should not be called !")));

// a verbose pipeline to wrap the main pipeline
var verbose = new Pipeline<string>();
verbose.RegisterOperation(new Operation<string>(data => Console.WriteLine("Beginning of the pipeline...")));
verbose.RegisterOperation(pipeline);
verbose.RegisterOperation(new Operation<string>(data => Console.WriteLine("End of the pipeline...")));
verbose.Invoke("banana");

Console.WriteLine("The pipeline is asynchronous, so we should have more messages after this one : ");

这个简单的例子使用了我们实现的几个特性

在这里插入图片描述
现在你知道如何让你的流水线异步了
在这里插入图片描述

如果对之前的操作有另一个回调**,那就更好了,这样您就可以让结果逆流通过流水线。

插入

使用流水线设计模式的主要原因通常是需要能够添加插件,这些插件可以将操作附加到现有流水线或在其中挂钩操作。

public class Pipeline: IOperation
{
    // exposes operations for hooking
    public readonly LinkedList<IOperation> Operations = new LinkedList<IOperation>();

    // add operation at the end of the pipeline
    public void Register(IOperation operation)
    {
        Operations.AddLast(operation);
    }

    // invoke every operations
    public void Invoke()
    {
        foreach (var operation in Operations) operation.Invoke();
    }
}

流水线确实很基础,但这一次,操作暴露了

class Application
{
    internal abstract class Plugin
    {
        public abstract void Initialize(Application application);
    }

    private readonly List<Plugin> plugins = new List<Plugin>();
    public readonly Pipeline Pipeline = new Pipeline();

    public void RegisterPlugin(Plugin plugin)
    {
        plugins.Add(plugin);
    }

    public void Initialize()
    {
        Pipeline.Register(new Operation(() => Console.WriteLine("Step 1")));
        Pipeline.Register(new Operation(() => Console.WriteLine("Step 2")));
        Pipeline.Register(new Operation(() => Console.WriteLine("Step 3")));

        foreach (var plugin in plugins) plugin.Initialize(this);
    }

    public void Run()
    {
        Pipeline.Invoke();
    }
}

让我们来看一个带有流水线的简单应用程序,它只会在控制台中显示 3 个步骤。此应用程序还支持插件来修改流水线。

class HookPlugin : Application.Plugin
{
    public override void Initialize(Application application)
    {
        var operations = application.Pipeline.Operations;
        operations.AddAfter(operations.First, new Operation(() => Console.WriteLine("I really want to be second !")));
    }
}

第一个插件将在流水线的第二个插槽中挂接另一个操作。

class LatePlugin : Application.Plugin
{
    public override void Initialize(Application application)
    {
        application.Pipeline.Register(new Operation(() => Console.WriteLine("Sorry guys, I am late...")));
    }
}

第二个插件将在流水线末尾附加一个新操作。

var application = new Application();
application.RegisterPlugin(new HookPlugin());
application.RegisterPlugin(new LatePlugin());
application.Initialize();
application.Run();

应用程序和插件放在一起,我们可以调用流水线。
在这里插入图片描述

另一个有用的功能是能够在与单个项目相同的流水线中处理批处理数据。

class BatchPipeline<T> : IOperation<T[]>
{
    private readonly Pipeline<T> pipeline;

    public BatchPipeline(Pipeline<T> pipeline)
    {
        this.pipeline = pipeline;
    }

    // invoke each operation on each item
    public bool Invoke(T[] data)
    {
        // wrap items
        var items = data.Select(item => new Result<T>(item)).ToArray();

        foreach (var operation in pipeline.Operations)
        {
            // detects when every operation failed
            var failed = true;

            foreach (var item in items)
            {
                if(!item.Success) continue;
                if (!operation.Invoke(item.Data)) item.Fail();
                else failed = false;
            }

            // circuit breaker
            if (failed) return false;
            Console.WriteLine("----------------------");
        }

        return true;
    }
}

批处理流水线包装流水线并调用每个项目的每个操作。

class Result<T>
{
    public readonly T Data;
    public bool Success { get; private set; } = true;
    public void Fail() => Success = false;

    public Result(T data)
    {
        Data = data;
    }
}

每个项目都被包裹起来,所以我们可以记住断路器的结果。

public class CheckMagnitudeOperation : IOperation<int>
{
    private readonly int threshold;

    public CheckMagnitudeOperation(int magnitude)
    {
        threshold = (int) Math.Pow(10, magnitude);
    }

    public bool Invoke(int data)
    {
        if (data < threshold)
        {
            Console.WriteLine($"{data} < {threshold} -> ko");
            return false;
        }

        Console.WriteLine($"{data} >= {threshold} -> ok");
        return true;
    }
}

此操作检查整数是否具有所需的数量级

// base pipeline
var pipeline = new Pipeline<int>();
pipeline.Register(new CheckMagnitudeOperation(1));
pipeline.Register(new CheckMagnitudeOperation(2));
pipeline.Register(new CheckMagnitudeOperation(3));
pipeline.Register(new CheckMagnitudeOperation(4));

// batch pipeline
var batch = new BatchPipeline<int>(pipeline);
batch.Invoke(new []{ 12, 345, 6789 });

流水线将检查一批整数数量级。
在这里插入图片描述流水线只为没有失败的项目调用下一个操作。

高性能流水线

流水线设计模式也可以指更具体和以性能为导向的软件架构。

在这里插入图片描述
一些项目使用流水线通过在专用线程中运行流水线的每个操作来优化大量数据的处理。

abstract class Processor<T> : IOperation<T>
{
    private readonly BlockingCollection<T> queue = new BlockingCollection<T>();

    public IOperation<T> Next { private get; set; }
    public IOperation<T> Terminate { private get; set; }
    void IOperation<T>.Invoke(T data) => queue.Add(data);

    protected Processor()
    {
        Task.Run(Run);
    }

    private void Run()
    {
        Console.WriteLine($"Thread {GetType().Name} Started !");
        while (true)
        {
            var data = queue.Take();
            var operation = Process(data) ? Next : Terminate;
            operation.Invoke(data);
            Sleep(); // hack to have random output ;)
        }
    }

    protected abstract bool Pr

每个线程都将从充当缓冲区的并发队列中消费生成数据

public interface IOperation<T>
{
    IOperation<T> Next { set; }
    IOperation<T> Terminate { set; }
    void Invoke(T data);
}

这一次,我们将使用带有断路器的异步操作。

class Operation<T> : IOperation<T>
{
    private readonly Func<T, bool> action;

    public Operation(Func<T, bool> action)
    {
        this.action = action;
    }

    public IOperation<T> Next { private get; set; }
    public IOperation<T> Terminate { private get; set; }

    public void Invoke(T data)
    {
        var operation = action(data) ? Next : Terminate;
        operation?.Invoke(data);
    }
}

如果操作成功,它应该调用next,如果失败则终止

class Pipeline<T> : IOperation<T>
{
    private readonly List<IOperation<T>> operations = new List<IOperation<T>>();

    public IOperation<T> Next { private get; set; }
    public IOperation<T> Terminate { private get; set; }

    private readonly IOperation<T> success;
    private readonly IOperation<T> fail;

    public Pipeline()
    {
        success = new Operation<T>(Success);
        fail = new Operation<T>(Fail);
    }

    // append an operation at the end of the pipeline
    public void RegisterOperation(IOperation<T> operation)
    {
        // when the operation is finished, it will call either call success or fail
        operation.Next = success;
        operation.Terminate = fail;

        if (operations.Any())
        {
            // link the last registered operation with the newly registered one
            operations.Last().Next = operation;
        }
        operations.Add(operation);
    }

    public void Invoke(T data)
    {
        var operation = operations.Any() ? operations.First() : fail;
        operation.Invoke(data);
    }

    private bool Success(T data)
    {
        Continue(data);
        return true;
    }

    private bool Fail(T data)
    {
        // we decide to bypass the circuit breaker
        Continue(data);
        return false;
    }

    private void Continue(T data)
    {
        Next?.Invoke(data);
    }
}

流水线被设计成绕过断路器。Success或Fail,它总是会继续流水线序列并调用下一个操作。

// build
var pipeline = new Pipeline<Order>();
pipeline.RegisterOperation(new CreateOrderProcessor());
pipeline.RegisterOperation(new PriceOrderProcessor());
pipeline.RegisterOperation(new PaymentOrderProcessor(User.Users.ToDictionary(user => user.Id, user => user.InitialBalance)));
pipeline.RegisterOperation(new DeliverOrderProcessor());

var monitor = new Pipeline<Order>();
monitor.RegisterOperation(pipeline);
monitor.RegisterOperation(new Operation<Order>(order =>
{
    var report = order.Status == OrderStatus.Delivered ? "Success" : "Fail";
    Console.WriteLine($"[IMPORTANT] Order {order.OrderId} Finished Processing : {report}");
    return true;
}));

// process
foreach (var product in GetOrders()) monitor.Invoke(product);

这个场景有点复杂,所以我不会解释一切。这个想法是让不同的线程处理传入的订单。订单处理完成后,我们会检查订单的状态。

class CreateOrderProcessor : Processor<Order>
{
    private readonly List<Order> orders = new List<Order>();

    protected override bool Process(Order order)
    {
        order.OrderId = orders.Count;
        order.CreationTime = DateTime.UtcNow;
        order.Status = OrderStatus.Created;
        orders.Add(order);
        Console.WriteLine($"Create Order {order.OrderId}");
        return true;
    }
}

每个订单处理器都隔离在一个专用线程中,因此您可以优化存储数据的方式并在不使用锁的情况下直接访问内存。

class PaymentOrderProcessor : Processor<Order>
{
    protected override bool Process(Order order)
    {
        var balance = GetBalance(order.UserId);
        var expected = balance - order.TotalPrice;

        if (expected >= 0)
        {
            Console.WriteLine($"Payment User {order.UserId} Order {order.OrderId} : {order.TotalPrice} USD | Balance {balance} -> {expected}");
            SetBalance(order.UserId, expected);
            order.Status = OrderStatus.Payed;
            return true;
        }
        else
        {
            Console.WriteLine($"Insufficient Balance : User {order.UserId} Balance {balance} USD | Order {order.OrderId} : {order.TotalPrice} USD");
            order.Status = OrderStatus.Canceled;
            return false;
        }
    }
}

支付订单处理器是唯一可以访问用户余额的线程。它可以获取或更新任何余额,而无需担心并发问题。

在这里插入图片描述

流水线正在尽可能快地处理操作序列。

结论

流水线设计模式有很多非常不同的实现方式,从简单的命令链到更复杂的工作流

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/677372.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【学习笔记】关于transformer

1.Embedding 一文读懂Embedding的概念&#xff0c;以及它和深度学习的关系 - 知乎 one-hot编码当矩阵过于稀疏时计算开销大&#xff0c;于是加上Embedding层&#xff0c;通过Embedding层&#xff08;矩阵乘法&#xff09;实现降维。 Embedding层将一个一个词&#xff08;词源…

Spring Boot 如何使用 Spring Security 进行认证和授权

Spring Boot 如何使用 Spring Security 进行认证和授权 在 Web 应用程序中&#xff0c;认证和授权是非常重要的功能。Spring Security 是一个基于 Spring 框架的强大的安全框架&#xff0c;它提供了完整的认证和授权解决方案&#xff0c;并且可以轻松地集成到 Spring Boot 应用…

gtk_table_attch与gtk_grid_attach的区别

gtk_table_attch与gtk_grid_attach的区别 button gtk_button_new_with_label (“Short fat button”); gtk_table_attach (GTK_TABLE (table), button, 0, 2, 3, 4, xoptions, yoptions, 0, 0); 0—2–3—4 左 右 上 下 /* 横线从左边的0移到右边的2&#xff0c;竖线从上边的…

3 python进阶篇

文章目录 面向对象类属性和类方法类属性类方法静态方法 单例模式__new__ 方法类实现单例模式 异常 、模块和包异常自定义异常 模块和包模块的搜索顺序包的init文件发布模块&#xff08;了解&#xff09; 文件seek文件/目录的常用管理操作eval函数 补充性知识位运算小技巧 参考我…

软考A计划-系统集成项目管理工程师-一般补充知识-中

点击跳转专栏>Unity3D特效百例点击跳转专栏>案例项目实战源码点击跳转专栏>游戏脚本-辅助自动化点击跳转专栏>Android控件全解手册点击跳转专栏>Scratch编程案例点击跳转>软考全系列 &#x1f449;关于作者 专注于Android/Unity和各种游戏开发技巧&#xff…

【LeetCode热题100】打卡第27天:二叉树的前序、中序、后序遍历

文章目录 【LeetCode热题100】打卡第27天&#xff1a;二叉树的前序、中序、后序遍历⛅前言&#x1f4d5;二叉树的前序遍历&#x1f512;题目&#x1f511;题解 &#x1f4d5;二叉树的中序遍历&#x1f512;题目&#x1f511;题解 &#x1f4d5;二叉树的后序遍历&#x1f512;题…

(万字长文)React 18 源码与原理解读 —— 看这一篇就够了

写在专栏开头&#xff08;叠甲&#xff09; 作者并不是前端技术专家&#xff0c;也只是一名喜欢学习新东西的前端技术小白&#xff0c;想要学习源码只是为了应付急转直下的前端行情和找工作的需要&#xff0c;这篇专栏是作者学习的过程中自己的思考和体会&#xff0c;也有很多参…

django中模板的使用

django中模板的使用 第一步 创建模板文件夹第二步 把模板存放进去第三步 把模板路径 加入到setting.py第四步 在视图函数处理第五步 路由挂载第六步 网页访问 第一步 创建模板文件夹 在项目的同层级下 新建模板文件夹 第二步 把模板存放进去 index.html <!DOCTYPE html&…

【Docker】一文了解Docker

文章目录 什么是Docker?为什么要使用Docker&#xff1f;与虚拟机的比较Docker架构Docker使用场景Docker安装阿里云镜像加速器1、登录阿里云2、找到镜像加速器3、配置使用 如今Docker的使用已经非常普遍&#xff0c;特别在一线互联网公司。使用Docker技术可以帮助企业快速水平扩…

C++ 自己动手实现简单的文件操作 (2023.6.23)

C 自己动手实现简单的文件操作 2023.6.23 引言1、文件简介2、各式各样的文件格式2.1 不同类型文件的扩展名2.1.1 文本文件2.1.2 数据文件2.1.3 音频文件2.1.4 视频文件2.1.5 电子书文件2.1.6 3D图像文件2.1.7 位图文件2.1.8 矢量图文件2.1.9 相机原始文件2.1.10 页面布局文件2.…

自监督对比学习框架SimCLR原理

目录 一、前言 人工智能发展近况 对比学习 二、数据集介绍 STL-10数据集 三、无监督图像表征对比学习 SimCLR SimCLR算法基本原理 数据增强与正负样本匹配 编码器 损失函数 对比学习全过程 四、有监督的图像下游任务迁移 替换下游任务网络层 有监督训练 五、实…

环境配置 | Git的安装及配置[图文详情]

Git是一个开源的分布式版本控制系统&#xff0c;可以有效、高速地处理从小到大的项目版本管理。下面介绍了基础概念及详细的用图文形式介绍一下git安装过程. 目录 1.Git基础概念 2.Git的下载及安装 3.常见的git命令 Git高级技巧 Git与团队协作 1.Git基础概念 仓库&#…

Charm-Crypto在Anaconda虚拟环境下的安装教程--基于Ubuntu20.04

第零步 VMware虚拟机设置和安装Anaconda虚拟环境 因为后面要编译源码&#xff0c;所以最好把CPU设置为最大&#xff0c;例如我的电脑是4核8线程&#xff0c;则&#xff1a; 关于Anaconda虚拟环境&#xff0c;这里不再赘述&#xff0c;后面都假设已经安装好虚拟环境&#xff0c;…

包装类--Math 类--Arrays 类--System 类

包装类–Math 类–Arrays 类–System 类 包装类 包装类的分类 包装类和基本数据的转换 演示包装类和基本数据类型的相互转换&#xff0c;这里以int和Integer演示。 1&#xff09;jdk5前的手动装箱和拆箱方式&#xff0c;装箱&#xff1a;基本类型&#xff0d;>包装类型&…

OpenAI收费标准,ChatGPT调用须知!

OpenAI收费标准&#xff0c;ChatGPT调用须知&#xff01; 免费镜像站价格说明GPT4GPT3.5图片模型如何付费 免费镜像站 ChatGPT有很多镜像站&#xff0c;需要输入API-KEY才可以使用&#xff0c;镜像站不会进行收费&#xff0c;而是OpenAI会对您进行收费。本文主要说明OpenAI的收…

【好书精读】网络是怎样连接的 —— IP 与以太网的包收发操作

&#xff08; 该图由AI制作 &#xff09; 目录 包的基本知识 包收发操作概览 生成包含接收方 IP 地址的 IP 头部 生成以太网用的 MAC 头部 通过 ARP 查询目标路由器的 MAC 地址 以太网的基本知识 将 IP 包转换成电或光信号发送出去 给网络包再加 3 个控制数据 向集线…

代码随想录算法训练营第四十一天 | 背包问题(一维、二维)、416. 分割等和子集

01背包&#xff1a;n种物品&#xff0c;每种物品只有1个&#xff0c;有相应的重量和价值 最多只能装m的重量&#xff0c;最多价值为多少&#xff1f; dp[i][j] : [0, i]物品任取放进容量为j的背包里 不放物品i&#xff1a;dp[i-1][j] 放物品i&#xff1a;dp[i-1][j-weight[…

如何系统性的学习Python语言

零基础同学的福音来了&#xff0c;如果你对Python语言的学习感兴趣&#xff0c;接下来可以由浅入深的了解下Python语言&#xff0c;哪怕你是零基础的小白也完全可以学会的&#xff0c;最后也会给大家放出学习和实例相结合的教程及方法&#xff0c;给到各位同学系统性的教学&…

ES-索引管理

前言 数据类型 ​ 搜索引擎是对数据的检索&#xff0c;所以我们先从生活中的数据说起。我们生活中的数据总体分为两种&#xff1a; 结构化数据非结构化数据 结构化数据&#xff1a; 也称作行数据&#xff0c;是由二维表结构来逻辑表达和实现的数据&#xff0c;严格地遵循数…

<C语言> 数组

1.一维数组的创建和初始化。 1.1 数组的创建 数组是一组相同类型元素的集合。 使用以下方式声明一个一维数组&#xff1a; type arrayName[arraySize];type是数组中元素的类型&#xff0c;arrayName是数组的名称&#xff0c;arraySize是数组的大小&#xff08;即元素的个数&a…