如何兼顾性能+实时性处理缓冲数据?

news2024/11/8 16:42:12

我们经常会遇到这样的数据处理应用场景:我们利用一个组件实时收集外部交付给它的数据,并由它转发给一个外部处理程序进行处理。考虑到性能,它会将数据存储在本地缓冲区,等累积到指定的数量后打包发送;考虑到实时性,数据不能在缓冲区存太长的时间,必须设置一个延时时间,一旦超过这个时间,缓冲的数据必须立即发出去。看似简单的需求,如果需要综合考虑性能、线程安全、内存分配,要实现起来还真有点麻烦。这个问题有不同的解法,本文提供一种实现方案。

一、实例演示

我们先来看看最终达成的效果。在如下这段代码中,我们使用一个Batcher<string>对象来接收应用分发给它的数据,该对象最终会在适当的时机处理它们。 调用Batcher<string>构造函数的三个参数分别表示:

  • processor:批量处理数据的委托对象,它指向的Process方法会将当前时间和处理的数据量输出到控制台上;
  • batchSize:单次处理的数据量,当缓冲的数据累积到这个阈值时会触发数据的自动处理。我们将这个阈值设置为10
  • interval:两次处理处理的最长间隔,我们设置为5秒
var batcher = new Batcher<string>(
    processor:Process,
    batchSize:10,
    interval: TimeSpan.FromSeconds(5));
var random = new Random();
while (true)
{
    var count = random.Next(1, 4);
    for (var i = 0; i < count; i++)
    {
        batcher.Add(Guid.NewGuid().ToString());
    }
    await Task.Delay(1000);
}

static void Process(Batch<string> batch)
{
    using (batch)
    {
        Console.WriteLine($"[{DateTimeOffset.Now}]{batch.Count} items are delivered.");
    }
}

如上面的代码片段所示,在一个循环中,我们每隔1秒钟随机添加1-3个数据项。从下图中可以看出,Process方法的调用具有两种触发条件,一是累积的数据量达到设置的阈值10,另一个则是当前时间与上一次处理时间间隔超过5秒。

二、待处理的批量数据:Batch<T>

除了上面实例涉及的Batcher<T>,该解决方案还涉及两个额外的类型,如下这个Batch<T>类型表示最终发送的批量数据。为了避免缓冲数据带来的内存分配,我们使用了一个单独的ArrayPool<T>对象来创建池化的数组,这个功能体现在静态方法CreatePooledArray方法上。由于构建Batch<T>对象提供的数组来源于对象池,在处理完毕后必须回归对象池,所以我们让这个类型实现了IDisposable接口,并将这一操作实现在Dispose方法种。在调用ArrayPool<T>对象的Return方法时,我们特意将数组清空。由于提供的数组来源于对象池,所以并不能保证每个数据元素都承载了有效的数据,实现的迭代器和返回数量的Count属性对此作了相应的处理。

public sealed class Batch<T> : IEnumerable<T>, IDisposable where T : class
{
    private bool _isDisposed;
    private int? _count;
    private readonly T[] _data;
    private static readonly ArrayPool<T> _pool = ArrayPool<T>.Create();

    public int Count
    {
        get
        {
            if (_isDisposed) throw new ObjectDisposedException(nameof(Batch<T>));
            if(_count.HasValue) return _count.Value;
            var count = 0;
            for (int index = 0; index < _data.Length; index++)
            {
                if (_data[index] is  null)
                {
                    break;
                }
                count++;
            }
            return (_count = count).Value;
        }
    }
    public Batch(T[] data) => _data = data ?? throw new ArgumentNullException(nameof(data));
    public void Dispose()
    {
        _pool.Return(_data, clearArray: true);
        _isDisposed = true;
    }
    public IEnumerator<T> GetEnumerator() => new Enumerator(this);
    IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
    public static T[] CreatePooledArray(int batchSize) => _pool.Rent(batchSize);
    private void EnsureNotDisposed()
    {
        if (_isDisposed) throw new ObjectDisposedException(nameof(Batch<T>));
    }

    private sealed class Enumerator : IEnumerator<T>
    {
        private readonly Batch<T> _batch;
        private readonly T[] _data;
        private int _index = -1;
        public Enumerator(Batch<T> batch)
        {
            _batch = batch;
            _data = batch._data;
        }
        public T Current
        {
            get { _batch.EnsureNotDisposed(); return _data[_index]; }
        }
        object IEnumerator.Current => Current;
        public void Dispose() { }
        public bool MoveNext()
        {
            _batch.EnsureNotDisposed();
            return ++_index < _data.Length && _data[_index] is not null;
        }
        public void Reset()
        {
            _batch.EnsureNotDisposed();
            _index = -1;
        }
    }
}

三、感知数据处理的时机:BatchChangeToken

Batcher具有两个触发数据处理的设置:缓冲的数据量和两次数据处理之间的最长间隔。当累积的数据量或者当前时间与上一次处理的间隔达到阈值,缓冲的数据将自动被处理。.NET Core经常利用一个IChangeToken作为通知的令牌,为此我们定义了如下这个实现了该接口的BatchChangeToken类型。如下面的代码片段所示,上述两个触发条件体现在两个CancellationToken对象上,我们利用它们创建了对应的CancellationChangeToken对象,最后利用这两个CancellationChangeToken创建了一个CompositeChangeToken对象。这个CompositeChangeToken对象最终被用来实现了IChangeToken接口的三个成员。

internal sealed class BatchChangeToken : IChangeToken
{
    private readonly IChangeToken _innerToken;
    private readonly int _countThreshold;
    private readonly CancellationTokenSource _expirationTokenSource;
    private readonly CancellationTokenSource _countTokenSource;
    private int _counter;

    public BatchChangeToken(int countThreshold, TimeSpan timeThreshold)
    {
        _countThreshold = countThreshold;
        _countTokenSource = new CancellationTokenSource();
        _expirationTokenSource = new CancellationTokenSource(timeThreshold);
        var countToken = new CancellationChangeToken(_countTokenSource.Token);
        var expirationToken = new CancellationChangeToken(_expirationTokenSource.Token);
        _innerToken = new CompositeChangeToken(new IChangeToken[] { countToken, expirationToken });
    }

    public bool HasChanged => _innerToken.HasChanged;
    public bool ActiveChangeCallbacks => _innerToken.ActiveChangeCallbacks;
    public IDisposable RegisterChangeCallback(Action<object?> callback, object? state) => _innerToken.RegisterChangeCallback(s =>
    {
        callback(s);
        _countTokenSource.Dispose();
        _expirationTokenSource.Dispose();
    }, state);
    public void Increase()
    {
        Interlocked.Increment(ref _counter);
        if (_counter >= _countThreshold)
        {
            _countTokenSource.Cancel();
        }
    }
}

上述两个CancellationToken来源于对应的CancellationTokenSource,对应的字段为_countTokenSource和_expirationTokenSource。_expirationTokenSource根据设置的数据处理时间间隔创建而成。为了确定缓冲的数据量,我们提供了一个计数器,并利用Increase方法进行计数。在超过设置的数据量时,该方法会调用_expirationTokenSource的Cancel方法。在实现的ActiveChangeCallbacks方法种,我们将针对这两个CancellationTokenSource的释放放在注册的回调中。

四、接收、缓冲、打包和处理数据:Batcher<T>

最终用于打包的Batcher类型定义如下。在构造函数中,我们除了提供上述两个阈值外,还提供了一个Action<Batch<T>>委托完成针对打包数据的处理。通过Add方法接收的数据存储在_data字段返回的数组上,它时通过Batch<T>的静态方法CreatePooledArray提供的。我们使用字段_index表示添加数据在_data数组中存储的位置,并使用InterLocked.Increase方法解决并发问题。

public sealed class Batcher<T> : IDisposable where T : class
{
    private readonly Action<Batch<T>> _processor;
    private T[] _data;
    private BatchChangeToken _changeToken = default!;
    private readonly int _batchSize;
    private int _index = -1;
    private readonly IDisposable _scheduler;

    public Batcher(Action<Batch<T>> processor, int batchSize, TimeSpan interval)
    {
        _processor = processor ?? throw new ArgumentNullException(nameof(processor));
        _batchSize = batchSize;
        _data = Batch<T>.CreatePooledArray(batchSize);
        _scheduler = ChangeToken.OnChange(() => _changeToken = new BatchChangeToken(_batchSize, interval), OnChange);

        void OnChange()
        {
            var data = Interlocked.Exchange(ref _data, Batch<T>.CreatePooledArray(batchSize));
            if (data[0] is not null)
            {
                Interlocked.Exchange(ref _index, -1);
                _ = Task.Run(() => _processor.Invoke(new Batch<T>(data)));
            }
        }
    }

    public void Add(T item)
    {
        if (item is null) throw new ArgumentNullException(nameof(item));
        var index = Interlocked.Increment(ref _index);
        if (index >= _batchSize)
        {
            SpinWait.SpinUntil(() => _index < _batchSize - 1);
            Add(item);
        }
        _data[index] = item;
        _changeToken.Increase();
    }

    public void Dispose() => _scheduler.Dispose();
}

在构造函数中,我们调用了ChangeToken的静态方法OnChange将数据处理操作绑定到创建的BatchChangeToken对象上,并确保每次发送“数据处理”后将重新创建的BatchChangeToken对象赋值到_changeToken字段上,因为Add放到需要调用它的Increase增加计数。当接收到数据处理通知后,我们会调用Batch<T>的静态方法CreatePooledArray构建一个数组将字段 ­_data引用的数组替换下来,并将其封装成Batch<T>对象进行处理(如果数据存在)。于此同时,表示添加数据存储索引的_index恢复成-1。Add方法在对_index做自增操作后,如果发现累积的数据量达到阈值,需要等待数据处理完毕。由于数据处理以异步的方式处理,这里的耗时时很低的,所以我们这里选择了自旋的方式等待它完成。

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/635964.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Golang每日一练(leetDay0094) H 指数 I\II H Index

目录 274. H 指数 H Index &#x1f31f;&#x1f31f; 275. H 指数 II H Index ii &#x1f31f;&#x1f31f; &#x1f31f; 每日一练刷题专栏 &#x1f31f; Rust每日一练 专栏 Golang每日一练 专栏 Python每日一练 专栏 C/C每日一练 专栏 Java每日一练 专栏 27…

sqlserver数据库学习感悟(1)----关于group by

以下含编写过程&#xff0c;如果嫌啰嗦&#xff0c;最后有总结哒&#xff01; 例题&#xff1a;有heat表和eatables两张表&#xff0c;分别为&#xff1a; eatables heat&#xff1a;protein&#xff08;蛋白质&#xff09;&#xff0c;fat&#xff08;脂肪&#xff09;&…

架构演变过程

单体架构 分布式架构&#xff1a;业务拆分 微服务 分布式架构的一种。独立开发和部署&#xff0c;一个业务多个服务支持 特点&#xff1a; 单一职责&#xff1a;拆分粒度更小&#xff0c;一个服务一个业务&#xff0c;避免重复开发&#xff08;面向对象角度来说符合最少知道原…

软考A计划-系统架构师-学习笔记-第五弹

点击跳转专栏>Unity3D特效百例点击跳转专栏>案例项目实战源码点击跳转专栏>游戏脚本-辅助自动化点击跳转专栏>Android控件全解手册点击跳转专栏>Scratch编程案例点击跳转>软考全系列 &#x1f449;关于作者 专注于Android/Unity和各种游戏开发技巧&#xff…

Swift——如何将某一进制的数字按另一种进制输出(比如十进制输出打印成十六进制,八进制打印输出成二进制)

最近由于需要阅读比较老的文档&#xff0c;老文档里内存地址是用八进制而不是十六进制&#xff0c;所以需要写一个小工具&#xff0c;用来转换进制进制。虽然自带的计算器可以&#xff0c;但是数量一多比较麻烦。 一开始我想费劲吧啦写十二个转换函数&#xff0c;虽然有些函数可…

chatgpt赋能python:Python字符串:如何定义一个空字符串

Python字符串&#xff1a;如何定义一个空字符串 在Python中&#xff0c;字符串是一种常见的数据类型&#xff0c;通常用于存储文本信息。定义一个空字符串在Python中非常简单&#xff0c;本文将介绍如何定义一个空字符串以及在Python中使用字符串的一些常见操作。 定义一个空…

基于Java+SpringBoot+Vue前后端分离摄影分享网站平台系统

博主介绍&#xff1a;✌全网粉丝30W,csdn特邀作者、博客专家、CSDN新星计划导师、java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精彩专…

python课程设计:实现一个飞行训练成绩管理微信息系统,对飞行员的某门飞行课目成绩进行质量评定

一、编程题目 在日常飞行训练中,飞行训练成绩是 飞行员进行综合等级评定的重要依据。 假设飞行训练团在某次飞行训练结束后,要对飞行员的某门飞行课目成绩进行质量评定。 请编程实现一个飞行训练成绩管理系统。要求完成以下功能: 1录入成绩 2计算平均分 3计算最高分 4查…

零入门kubernetes网络实战-35->vxlan简介以及原理介绍(vxlan报文结构介绍)

《零入门kubernetes网络实战》视频专栏地址 https://www.ixigua.com/7193641905282875942 本篇文章视频地址(稍后上传) 本篇文章开始介绍vxlan虚拟设备。 主要介绍vxlan的协议报文结构&#xff0c; 1、总结 主要涉及到以下方面&#xff1a; overlay跟vxlan的关系如何理解v…

Excel电子表格的PHP类库:PHP_XLSXWriter(大数据量报表、后台运行、浏览器下载)

Excel电子表格的PHP类库&#xff1a;PHP_XLSXWriter 一、PHP_XLSXWriter与PHPExcel的区别二、PHP_XLSXWriter的使用1.使用步骤2.后台下载3.浏览器下载4.封装函数 PHP_XLSXWriter 是一个用于生成 Microsoft Excel 2007 xlsx 文件的 PHP 库。XLSX 是一种用基于 XML 的开放式文件标…

chatgpt赋能python:Python多行缩进——提高代码可读性和效率的关键

Python多行缩进——提高代码可读性和效率的关键 众所周知&#xff0c;Python是一种缩进敏感的编程语言&#xff0c;它鼓励程序员使用缩进来表示代码块&#xff0c;而非传统的大括号或关键字。在Python中&#xff0c;缩进通过使用空格或制表符来实现&#xff0c;而且空格和制表…

深度剖析整形数据在内存中的存储

&#x1f4d5;博主介绍&#xff1a;目前大一正在学习c语言&#xff0c;数据结构&#xff0c;计算机网络。 c语言学习&#xff0c;是为了更好的学习其他的编程语言&#xff0c;C语言是母体语言&#xff0c;是人机交互接近底层的桥梁。 本章来学习数据的存储。 让我们开启c语言学…

Linux命令(31)之watch

Linux命令之watch 1.watch介绍 linux命令watch是周期性的用来执行某命令&#xff0c;并把某命令执行结果输出到屏幕上。使用watch命令&#xff0c;可以周期性的监测并输出某命令的执行结果到屏幕上&#xff0c;省得手动一遍一遍运行某命令&#xff0c;提高工作效率。 2.watc…

PHP伪协议filter详解,php://filter协议过滤器

「作者主页」&#xff1a;士别三日wyx 「作者简介」&#xff1a;CSDN top100、阿里云博客专家、华为云享专家、网络安全领域优质创作者 「推荐专栏」&#xff1a;对网络安全感兴趣的小伙伴可以关注专栏《网络安全入门到精通》 php://filter 一、访问数据流二、过滤数据流三、多…

[第一章 web入门]SQL注入-2

拿到题目后看提示&#xff0c;要自主访问两个页面 访问login.php后&#xff0c;是一个登录界面&#xff0c;直接测试注入类型 第一件事还是输入常用账户名admin&#xff0c;密码随便输入 回显账号或者密码错误 这种登录界面一般都是字符型注入&#xff0c;所以测试一下闭合符&a…

【力扣刷题 | 第七天】

前言&#xff1a; 今天我们将会进入栈与队列的刷题篇章&#xff0c;二者都是经典的数据结构&#xff0c;熟练的掌握栈与队列实现可以巧妙的解决有些问题。 232. 用栈实现队列 - 力扣&#xff08;LeetCode&#xff09; 请你仅使用两个栈实现先入先出队列。队列应当支持一般队…

野火Renesas R4M2 UDS诊断bootloader 升级MCU

基于can总线的UDS软件升级 最近学习UDS诊断协议&#xff08;ISO14229&#xff09;&#xff0c;是一项国际标准&#xff0c;为汽车电子系统中的诊断通信定义了统一的协议和服务。它规定了与诊断相关的服务需求&#xff0c;并没有设计通信机制。ISO14229仅对应用层和会话层做出了…

从零开始Vue项目中使用MapboxGL开发三维地图教程(一)MapboxGL介绍以及前期vue项目的搭建

MapboxGL介绍以及前期vue项目的搭建 1、Mapbox-gl简介2、搭建vue项目2.1、创建vue项目2.2、注册mapbox官网2.3、mapbox-gl入门案例 3、Mapbox-gl地图主要配置参数说明 1、Mapbox-gl简介 Mapbox-gl是一个开源、基于webgl技术的前端地图类库。 地图数据渲染和可视化这块我们经常用…

chatgpt赋能python:如何培训Python?-从入门到专业化

如何培训Python&#xff1f;- 从入门到专业化 Python是一种高级编程语言&#xff0c;已被广泛应用于各种领域以及各种应用程序的开发中。如果你也想成为一名Python开发人员&#xff0c;有以下几种培训方法帮助你一步步实现你的目标。 1.自学Python 学习Python的最简单方法是…

【前端 - CSS】第 13 课 - CSS 应用案例 - 体育新闻

欢迎来到博主 Apeiron 的博客&#xff0c;祝您旅程愉快 &#xff01; 时止则止&#xff0c;时行则行。动静不失其时&#xff0c;其道光明。 目录 1、缘起 2、示例代码 3、总结 1、缘起 最近学习完了 CSS 的 引入方式、选择器 和 字体修饰属性 的相关知识点&#xff0c;然后运…