C# .NET 中的反应式系统

news2025/1/30 16:32:22

概述:反应式系统已成为构建健壮、可扩展和响应迅速的应用程序的强大范式。这些系统被设计为更具弹性、弹性和消息驱动性,确保它们在各种条件下保持响应,包括高负载、网络延迟和故障。在本文中,我们将探讨 .NET 生态系统中反应式系统的概念,利用 (Rx.NET) 和 来说明一个复杂的用例:用于实时监控和分析股票市场数据的实时仪表板。Reactive ExtensionsActor model (Akka.NET)了解反应式系统反应式系统旨在对事件、负载、故障甚至用户做出反应。反应性宣言概述了四个关键特征:响应式:系统及时响应。弹性:系统在面对故障时保持响应。弹性:系统在不同的工作负载下保持响应。消息驱动:系反应式系统已成为构建健壮、可扩展和响应迅速的应用程序的强大范式。

这些系统被设计为更具弹性、弹性和消息驱动性,确保它们在各种条件下保持响应,包括高负载、网络延迟和故障。

在本文中,我们将探讨 .NET 生态系统中反应式系统的概念,利用 (Rx.NET) 和 来说明一个复杂的用例:用于实时监控和分析股票市场数据的实时仪表板。Reactive ExtensionsActor model (Akka.NET)

了解反应式系统

反应式系统旨在对事件、负载、故障甚至用户做出反应。反应性宣言概述了四个关键特征:

  • 响应式:系统及时响应。

  • 弹性:系统在面对故障时保持响应。

  • 弹性:系统在不同的工作负载下保持响应。

  • 消息驱动:系统依靠异步消息传递来确保松耦合、隔离和位置透明。

实时股市仪表板

想象一下这样的场景:金融机构希望为其用户提供一个实时仪表板,该仪表板显示股票市场趋势、重大股票变动警报并提供实时分析。

此应用程序需要处理大量数据,有效地处理数据,并立即更新用户界面。它是反应式系统的完美候选者。

工具:Rx.NET 和 Akka.NET

为了应对这一挑战,我们将使用两个强大的库:

  • **Rx.NET (Reactive Extensions for .NET):**一个库,用于使用可观察序列和 LINQ 样式的查询运算符编写异步和基于事件的程序。

  • Akka.NET:一个开源工具包和运行时,用于在 .NET 上构建高度并发、分布式和容错的事件驱动应用程序。

在 .NET 中构建反应式系统 — 图片来源:由 Author 创建

构建我们的应用程序

我们的应用程序由几个组件组成:

  1. 数据摄取服务:连接到模拟股票市场数据流。

  2. 处理引擎:分析重大事件(例如,价格急剧上涨)的数据。

  3. 仪表盘服务:实时更新实时仪表盘。

先决条件

  • 安装 Rx.NET 软件包 (System.Reactive)

  • 安装 Akka.NET 软件包 (Akka)

项目结构

此示例的结构如下,全部位于单个控制台应用程序项目中:

  • StockTick类来表示股票数据。

  • StockMarketSimulator类来模拟股票数据流。

  • Akka.NET actors:用于更新仪表板和处理重大动作。DashboardActorSignificantMovementActor

  • 将所有内容连接在一起的主类。Program

步骤 1:设置数据引入服务

我们需要模拟股票市场价格的实时数据馈送。为简单起见,假设我们有一个返回流的函数,其中是一个表示股票符号、价格和时间戳的类。GetStockStream()IObservable<StockTick>StockTick

我们将使用 Rx.NET 定期生成对象流。 方法每秒(或您喜欢的任何其他合理间隔)创建一个价格变动,然后将这些变动映射到具有随机生成的价格和交易品种的对象。StockTickObservable.IntervalStockTick

using System;  
using System.Reactive.Linq;  
using System.Reactive.Threading.Tasks;  
using System.Threading.Tasks;  
  
public class StockTick  
{  
    public string Symbol { get; set; }  
  
    public double Price { get; set; }  
  
    public DateTime Timestamp { get; set; }  
}  
  
public static class StockMarketSimulator  
{  
    private static readonly Random rand = new Random();  
    private static readonly string[] symbols = new[] {   
      "AAPL",   
      "MSFT",   
      "GOOGL",   
      "AMZN",   
      "FB" };  
  
    public static IObservable<StockTick> GetStockStream()  
    {  
        return Observable.Interval(TimeSpan.FromSeconds(1))  
            .Select(_ => new StockTick  
            {  
                Symbol = symbols[rand.Next(symbols.Length)],  
                Price = Math.Round(100 + (rand.NextDouble() * 1000), 2),  
                Timestamp = DateTime.Now  
            });  
    }  
}

第 2 步:使用 Rx.NET 分析数据

Rx.NET 在处理数据流方面大放异彩。我们可以订阅并使用 LINQ 查询来实时处理和分析股票即时报价。IObservable<StockTick>

让我们假设,一个重大的变动是在 5 秒缓冲窗口内价格变化超过 30%。

using System;  
using System.Linq;  
using System.Reactive.Linq;  
  
public class StockAnalysis  
{  
    public static void AnalyzeStockTicks(IActorRef significantMovementActor)  
    {  
        IObservable<StockTick> stockStream = StockMarketSimulator.GetStockStream();  
  
        return stockStream  
            .GroupBy(tick => tick.Symbol)  
            .SelectMany(group => group.Buffer(TimeSpan.FromSeconds(30)))  
            .Select(buffer =>  
            {  
                var firstTick = buffer.First();  
                var lastTick = buffer.Last();  
  
                // Avoid potential issues if the buffer is empty  
                if (firstTick == null || lastTick == null) return null;  
  
                var priceChange = Math.Abs((lastTick.Price - firstTick.Price) / firstTick.Price);  
  
                return new  
                {  
                    Symbol = firstTick.Symbol,  
                    PriceChange = priceChange,  
                    StartPrice = firstTick.Price,  
                    EndPrice = lastTick.Price,  
                    Timestamp = DateTime.Now  
                };  
            })  
            .Where(x => x != null && x.PriceChange > 0.05); // Filter for more than 5% price change   
    }  
}

对每个股票品种的报价进行分组,缓冲 30 秒,然后进行分析以发现重大的价格变化。如果价格变化超过 5%,则认为价格变动显著,并且有关变动的信息将打印到控制台。

调用是 Rx.NET 模式的重要组成部分,其中订阅了处理的结果(重大变动)。该方法是触发可观察序列的执行,并定义如何处理每个发出的项目(重大的股价变动)。.Subscribe(movement => { ... })Subscribe

步骤 3:集成仪表板更新的 Akka.NET

我们将创建一个 Akka.NET 参与者系统,以处理我们的 Rx.NET 分析检测到的重大股票变动。此执行组件系统将由两个主要执行组件组成:

  • DashboardActor:负责接收重要的移动消息并更新仪表板。

  • SignificantMovementActor:订阅可观察对象,并在检测到重大移动时向 发送消息。significantMovementsDashboardActor

public class SignificantMovement  
{  
    public string Symbol { get; set; }  
  
    public double PriceChange { get; set; }  
  
    public double StartPrice { get; set; }  
  
    public double EndPrice { get; set; }  
  
    public DateTime Timestamp { get; set; }  
}

using Akka.Actor;  
  
// Akka.NET Actor for dashboard updates  
public class DashboardActor : ReceiveActor  
{  
    public DashboardActor()  
    {  
        Receive\<SignificantMovement>(movement =>  
        {  
            // Logic to update the dashboard with the significant movement  
            Console.WriteLine($"Dashboard updated for {movement.Symbol}: {movement.PriceChange \* 100:F2}% change, from {movement.StartPrice} to {movement.EndPrice}");  
        });  
    }  
}

using Akka.Actor;  
using System;  
using System.Reactive.Linq;  
  
// Akka.NET Actor to handle significant movements  
public class SignificantMovementActor : ReceiveActor  
{  
    private readonly IActorRef _dashboardActor;  
  
    public SignificantMovementActor(IActorRef dashboardActor, IObservable<dynamic> significantMovements)  
    {  
        this._dashboardActor = dashboardActor;  
  
        significantMovements.Subscribe(movement =>  
        {  
            var significantMovement = new SignificantMovement  
            {  
                Symbol = movement.Symbol,  
                PriceChange = movement.PriceChange,  
                StartPrice = movement.StartPrice,  
                EndPrice = movement.EndPrice,  
                Timestamp = movement.Timestamp  
            };  
  
            _dashboardActor.Tell(significantMovement);  
        });  
    }  
}

创建执行组件系统,并在主应用程序逻辑中将所有内容绑定在一起。

using Akka.Actor;  
using System;  
  
class Program  
{  
    static void Main(string[] args)  
    {  
        var system = ActorSystem.Create("StockMonitorSystem");  
        var dashboardActor = system.ActorOf<DashboardActor>("dashboardActor");  
  
        var significantMovements = StockAnalysis.AnalyzeStockTicks();  
        var props = Props.Create(() => new SignificantMovementActor(dashboardActor, significantMovements));  
        system.ActorOf(props, "significantMovementActor");  
  
        Console.WriteLine("System is running. Press any key to exit...");  
        Console.ReadLine();  
        system.Terminate().Wait();  
    }  
}

该方法设置并运行一个 Akka.NET actor 系统,该系统与 Rx.NET 集成,以处理和显示显着的库存变动。Main

让我们分解该方法的每个部分,以了解它是如何工作的,以及它如何实现对股票市场数据模拟和处理的持续监听。Main

var system = ActorSystem.Create("StockMonitorSystem");

此行初始化名为 的 Akka.NET 执行组件系统的新实例。执行组件系统是一个分层的执行组件组,它为创建执行组件、调度消息和管理执行组件生命周期提供基础结构。这是使用 Akka.NET 的入口点。StockMonitorSystem

var dashboardActor = system.ActorOf<DashboardActor>("dashboardActor");

这将在执行组件系统中创建类型的执行组件。 是用于实例化 actor 的方法,返回对新创建的 actor 的引用。 是为此执行组件实例提供的名称,可用于在系统中查找它。DashboardActorActorOfdashboardActor

var props = Props.Create(() => new SignificantMovementActor(dashboardActor, significantMovements));

Props 是一个配置类,用于描述如何创建 actor 的实例。通过使用 ,您可以指定应如何构造 ,包括其依赖项。在这里,它采用一个 lambda 表达式,该表达式构造一个新的 ,传入引用和可观察序列。Props.CreateSignificantMovementActorSignificantMovementActordashboardActorsignificantMovements

system.ActorOf(props, "significantMovementActor");

此行创建使用前面定义的 props 的实例。它类似于 的创建方式,但使用 用于更复杂的初始化。SignificantMovementActordashboardActorprops

现在,从 Rx.NET 创建的参与者和可观察序列被设置为连续处理和显示重要的库存变动。侦听可观察对象并将其收到的任何消息转发到 .反过来,将处理这些消息(在本例中,将信息打印到控制台)。SignificantMovementActorsignificantMovementsdashboardActordashboardActor

Console.ReadLine();

此行至关重要,因为它可以防止应用程序立即退出。它等待用户按下一个键,然后再继续,有效地保持应用程序(以及参与者系统)运行并能够处理传入的模拟股票数据。

system.Terminate().Wait();

system.Terminate()启动执行组件系统的关闭,停止所有执行组件并释放资源。这是一个异步操作,返回 .Task

.Wait()在终止任务上,确保应用程序在执行组件系统完全关闭之前不会退出。这对于干净和优雅的退出非常重要,确保完成所有正在进行的处理并正确释放资源。

因此,该方法建立了一个连续的、反应性的处理管道,使用 Akka.NET 参与者来处理和显示重大的股票市场走势。Main

问:我能否通过发布到队列并从队列中侦听而不是使用 Rx.NET 和 Akka.NET 来实现相同的功能?

是的,队列可用于类似的数据流和处理,但 Rx.NET 提供了更具表现力和简洁的流处理能力,而 Akka.NET 提供了一个强大的框架,用于使用基于参与者的模型构建并发和分布式系统,从而增强容错能力和系统响应能力。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1600472.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Day 15 Linux网络管理

IP解析 IP地址组成&#xff1a;IP地址由4部分数字组成&#xff0c;每部分数字对应于8位二进制数字&#xff0c;各部分之间用小数点分开&#xff0c;这是点分2进制。如果换算为10进制我们称为点分10进制。 每个ip地址由两部分组成网络地址(NetID)和主机地址(HostID).网络地址表…

DataGrip数据库管理工具安装使用

DataGrip数据库管理工具安装使用 DataGrip介绍 DataGrip是jetbrains旗下的一款数据库管理工具&#xff0c;相信做过java开发的同学都知道&#xff0c;idea就是这家公司发明的。 DataGrip 是JetBrains公司开发的数据库管理客户端工具&#xff08;操作数据库的IDE&#xff0c;…

看图找LOGO,基于YOLOv8全系列【n/s/m/l/x】参数模型开发构建生活场景下的商品商标LOGO检测识别系统

日常生活中&#xff0c;我们会看到眼花缭乱的各种各样的产品logo&#xff0c;但是往往却未必能认全&#xff0c;正因为有这个想法&#xff0c;这里我花费了过去近两周的时间采集和构建了包含50种商品商标logo的数据集&#xff0c;基于YOLOv8全系列的参数模型开发构建了对应的检…

初识--Linux的虚拟地址空间

重新了解地址空间 在学习c/c语言的时候,大家一定见过以下这张图 说的是程序会加载在如图的结构上,实际上,我们真的对他很了解吗,而在Linux进程控制这,就会有一个奇怪的现象 前提提要:简要介绍一下fork函数 进程内核数据结构(PCB)自己的代码以及数据 在Linux中,fork可以从当…

什么是邮箱分身?如何快速创建30个邮箱分身?

很多人只知道微信、QQ等应用分身&#xff0c;对于邮箱分身并不是很了解。邮箱分身和他们的不同点在于我们直接在原有邮箱的基础上创立新的虚拟邮箱地址&#xff0c;并且密码一致&#xff0c;在我们需要运营多个社交媒体账号或者管理多个项目的情况下&#xff0c;邮箱分身是一个…

盲盒小程序成为收益“法宝”?盲盒线上如何发展

近年来&#xff0c;盲盒在年轻人中掀起了一股潮玩热风&#xff0c;受到了不少年轻人的青睐&#xff0c;盲盒商品更是在不断创新中&#xff0c;收藏价值逐渐提高。随着市场规模的扩大&#xff0c;越来越多的玩家和商家涌入到了市场中&#xff0c;盲盒的商业模式正在加快发展中。…

人工智能与IP代理池:解析网络数据采集的未来

前言 随着互联网的快速发展&#xff0c;数据成为了当今社会最宝贵的资源之一。然而&#xff0c;要获取大量的网络数据并进行有效的分析&#xff0c;往往需要面对诸多挑战&#xff0c;其中之一就是网络封锁与反爬虫机制。在这个背景下&#xff0c;人工智能&#xff08;AI&#x…

【CANN训练营】目标检测(YoloV5s)实践(Python实现)

样例介绍 使用多路离线视频流&#xff08;* .mp4&#xff09;作为应用程序的输入&#xff0c;基于YoloV5s模型对输入视频中的物体做实时检测&#xff0c;将推理结果信息使用imshow方式显示。 样例代码逻辑如下所示&#xff1a; 环境信息 CPU&#xff1a;Intel Xeon Gold 63…

ASP.NET基于CS应用程序平台多语种技术应用研究

摘 要 C/S应用程序平台多语种技术是一种基于C/S应用技术结构平台的关于多语种的转换和翻译技术。本设计基于Visual Studio.Net集成开发环境&#xff0c;采用SQL Server2000进行数据库后台开发。通过采用数据字典实现应用系统的静态文本转换&#xff1b;通过使用Visual Studio.…

适用于 Windows 的 10 个顶级 PDF 编辑器 [免费和付费]

曾经打开PDF文件&#xff0c;感觉自己被困在数字迷宫中吗&#xff1f;无法编辑的文本、无法调整大小的图像以及签署感觉像是一件苦差事的文档&#xff1f;好吧&#xff0c;不用再担心了&#xff01;本指南解开了在 Windows 上掌握 PDF 的秘密&#xff0c;其中包含 10 款适用于 …

LoRA:大模型的低阶自适用(使用BERT在IMDB数据集上运用LoRA微调)

文章目录 简介LoRA文章主要贡献LoRA技术模型图技术细节论文实验结果LoRA在bert的运用LoRA核心代码实战分析 简介 论文链接https://arxiv.org/pdf/2106.09685v2.pdf 本文将先介绍论文中的LoRA技术&#xff0c;然后以BERT为例在IMDB数据集上代码实现运用这项微调技术。 代码数…

OpenCV基本图像处理操作(四)——傅立叶变换

傅里叶变换的作用 高频&#xff1a;变化剧烈的灰度分量&#xff0c;例如边界 低频&#xff1a;变化缓慢的灰度分量&#xff0c;例如一片大海 滤波 低通滤波器&#xff1a;只保留低频&#xff0c;会使得图像模糊 高通滤波器&#xff1a;只保留高频&#xff0c;会使得图像细节…

【React】Ant Design自定义主题风格及主题切换

Ant Design 的自定义主题&#xff0c;对于刚入手的时候感觉真是一脸蒙圈&#xff0c;那今天给它梳理倒腾下&#xff1b; 1、自定义主题要点 整体样式变化&#xff0c;主要两个部分&#xff1a; 1.1、Design Token https://ant.design/docs/react/customize-theme-cn#theme 官…

新经济助推高质量发展“大有云钞”聚焦未来趋势

近日&#xff0c;由大有云钞科技&#xff08;北京&#xff09;有限公司主办的一场关于“新经济助力高质量发展法治研讨会”在北京国家会议中心隆重举行。此次研讨会汇聚了来自政府、企业、学术界和法律界的众多专家学者&#xff0c;共同探讨新经济背景下的法治建设和高质量发展…

0基础如何入门编程?

0基础如何进入IT行业 &#xff1f; 前言 简介&#xff1a;对于没有任何相关背景知识的人来说&#xff0c;如何才能成功进入IT行业&#xff1f;是否有一些特定的方法或技巧可以帮助他们实现这一目标&#xff1f; 主要方法有如下几点建议提供给宝子们 目录 免费视频网课学习…

static+单例模式+类的复合继承

汇编语言 汇编语言是最靠谱的验证“编程语言相关知识点”正确性的方式 汇编语言与机器语言一一对应&#xff0c;每一条机器语言都有与之对应的汇编指令 机器语言是计算机使用的语言&#xff0c;它是一串二进制数字 汇编语言可以通过汇编得到机器语言机器语言可以通过反汇编得到…

Shell循环以及条件语句使用

Shell脚本基础已经发过&#xff0c;可在主页查找&#xff0c;现在讲解case&#xff0c;for&#xff0c;while语句&#xff0c;以及语句的练习。 1.case语句 等同于C语⾔的switch-case 格式&#xff1a; case $变量 in # 判断变量的值 a) # 值是什么语句;; # 相当于break 但…

docker网路和主机通讯问题

#注 1&#xff0c;安装docker和启动容器服务的时候如果防火墙处于开启状态&#xff0c;那么重启docker里面的容器的时候必须开启防火墙&#xff0c;否则会出现iptable错误&#xff1b; 2&#xff0c;linux开启防火墙会导致主机和docker网络之间单向通讯&#xff0c;主机可以访…

文献速递:深度学习肝脏肿瘤诊断---基于深度学习的表型分类重新划分联合肝细胞胆管癌

Title 题目 Deep learning-based phenotyping reclassifies combined hepatocellular cholangiocarcinoma 基于深度学习的表型分类重新划分联合肝细胞胆管癌 01文献速递介绍 Primary liver cancer arises either from hepatocytic or biliary lineage cells, giving rise to…

会议室预约小程序开源版开发

会议室预约小程序开源版开发 支持设置免费预约和付费预约、积分兑换商城、积分签到等 会议室类目&#xff0c;提供多种类型和设施的会议室选择&#xff0c;满足不同会议需求。 预约日历&#xff0c;展示会议室预约情况&#xff0c;方便用户选择空闲时段。 预约记录&#xff0…