兼顾性能+实时性处理缓冲数据解决方案

news2025/4/7 4:36:18

我们经常会遇到这样的数据处理应用场景:我们利用一个组件实时收集外部交付给它的数据,并由它转发给一个外部处理程序进行处理。考虑到性能,它会将数据存储在本地缓冲区,等累积到指定的数量后打包发送;考虑到实时性,数据不能在缓冲区存太长的时间,必须设置一个延时时间,一旦超过这个时间,缓冲的数据必须立即发出去。看似简单的需求,如果需要综合考虑性能、线程安全、内存分配,要实现起来还真有点麻烦。这个问题有不同的解法,本文提供一种实现方案。

一、实例演示

我们先来看看最终达成的效果。在如下这段代码中,我们使用一个Batcher<string>对象来接收应用分发给它的数据,该对象最终会在适当的时机处理它们。 调用Batcher<string>构造函数的三个参数分别表示:

  • processor:批量处理数据的委托对象,它指向的Process方法会将当前时间和处理的数据量输出到控制台上;
  • batchSize:单次处理的数据量,当缓冲的数据累积到这个阈值时会触发数据的自动处理。我们将这个阈值设置为10
  • interval:两次处理处理的最长间隔,我们设置为5秒
var batcher = new Batcher<string>(
    processor:Process,
    batchSize:10,
    interval: TimeSpan.FromSeconds(5));
var random = new Random();
while (true)
{
    var count = random.Next(1, 4);
    for (var i = 0; i < count; i++)
    {
        batcher.Add(Guid.NewGuid().ToString());
    }
    await Task.Delay(1000);
}

static void Process(Batch<string> batch)
{
    using (batch)
    {
        Console.WriteLine($"[{DateTimeOffset.Now}]{batch.Count} items are delivered.");
    }
}

如上面的代码片段所示,在一个循环中,我们每隔1秒钟随机添加1-3个数据项。从下图中可以看出,Process方法的调用具有两种触发条件,一是累积的数据量达到设置的阈值10,另一个则是当前时间与上一次处理时间间隔超过5秒。

二、待处理的批量数据:Batch<T>

除了上面实例涉及的Batcher<T>,该解决方案还涉及两个额外的类型,如下这个Batch<T>类型表示最终发送的批量数据。为了避免缓冲数据带来的内存分配,我们使用了一个单独的ArrayPool<T>对象来创建池化的数组,这个功能体现在静态方法CreatePooledArray方法上。由于构建Batch<T>对象提供的数组来源于对象池,在处理完毕后必须回归对象池,所以我们让这个类型实现了IDisposable接口,并将这一操作实现在Dispose方法种。在调用ArrayPool<T>对象的Return方法时,我们特意将数组清空。由于提供的数组来源于对象池,所以并不能保证每个数据元素都承载了有效的数据,实现的迭代器和返回数量的Count属性对此作了相应的处理。

public sealed class Batch<T> : IEnumerable<T>, IDisposable where T : class
{
    private bool _isDisposed;
    private int? _count;
    private readonly T[] _data;
    private static readonly ArrayPool<T> _pool = ArrayPool<T>.Create();

    public int Count
    {
        get
        {
            if (_isDisposed) throw new ObjectDisposedException(nameof(Batch<T>));
            if(_count.HasValue) return _count.Value;
            var count = 0;
            for (int index = 0; index < _data.Length; index++)
            {
                if (_data[index] is  null)
                {
                    break;
                }
                count++;
            }
            return (_count = count).Value;
        }
    }
    public Batch(T[] data) => _data = data ?? throw new ArgumentNullException(nameof(data));
    public void Dispose()
    {
        _pool.Return(_data, clearArray: true);
        _isDisposed = true;
    }
    public IEnumerator<T> GetEnumerator() => new Enumerator(this);
    IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
    public static T[] CreatePooledArray(int batchSize) => _pool.Rent(batchSize);
    private void EnsureNotDisposed()
    {
        if (_isDisposed) throw new ObjectDisposedException(nameof(Batch<T>));
    }

    private sealed class Enumerator : IEnumerator<T>
    {
        private readonly Batch<T> _batch;
        private readonly T[] _data;
        private int _index = -1;
        public Enumerator(Batch<T> batch)
        {
            _batch = batch;
            _data = batch._data;
        }
        public T Current
        {
            get { _batch.EnsureNotDisposed(); return _data[_index]; }
        }
        object IEnumerator.Current => Current;
        public void Dispose() { }
        public bool MoveNext()
        {
            _batch.EnsureNotDisposed();
            return ++_index < _data.Length && _data[_index] is not null;
        }
        public void Reset()
        {
            _batch.EnsureNotDisposed();
            _index = -1;
        }
    }
}

三、感知数据处理的时机:BatchChangeToken

Batcher具有两个触发数据处理的设置:缓冲的数据量和两次数据处理之间的最长间隔。当累积的数据量或者当前时间与上一次处理的间隔达到阈值,缓冲的数据将自动被处理。.NET Core经常利用一个IChangeToken作为通知的令牌,为此我们定义了如下这个实现了该接口的BatchChangeToken类型。如下面的代码片段所示,上述两个触发条件体现在两个CancellationToken对象上,我们利用它们创建了对应的CancellationChangeToken对象,最后利用这两个CancellationChangeToken创建了一个CompositeChangeToken对象。这个CompositeChangeToken对象最终被用来实现了IChangeToken接口的三个成员。

internal sealed class BatchChangeToken : IChangeToken
{
    private readonly IChangeToken _innerToken;
    private readonly int _countThreshold;
    private readonly CancellationTokenSource _expirationTokenSource;
    private readonly CancellationTokenSource _countTokenSource;
    private int _counter;

    public BatchChangeToken(int countThreshold, TimeSpan timeThreshold)
    {
        _countThreshold = countThreshold;
        _countTokenSource = new CancellationTokenSource();
        _expirationTokenSource = new CancellationTokenSource(timeThreshold);
        var countToken = new CancellationChangeToken(_countTokenSource.Token);
        var expirationToken = new CancellationChangeToken(_expirationTokenSource.Token);
        _innerToken = new CompositeChangeToken(new IChangeToken[] { countToken, expirationToken });
    }

    public bool HasChanged => _innerToken.HasChanged;
    public bool ActiveChangeCallbacks => _innerToken.ActiveChangeCallbacks;
    public IDisposable RegisterChangeCallback(Action<object?> callback, object? state) => _innerToken.RegisterChangeCallback(s =>
    {
        callback(s);
        _countTokenSource.Dispose();
        _expirationTokenSource.Dispose();
    }, state);
    public void Increase()
    {
        Interlocked.Increment(ref _counter);
        if (_counter >= _countThreshold)
        {
            _countTokenSource.Cancel();
        }
    }
}

上述两个CancellationToken来源于对应的CancellationTokenSource,对应的字段为_countTokenSource和_expirationTokenSource。_expirationTokenSource根据设置的数据处理时间间隔创建而成。为了确定缓冲的数据量,我们提供了一个计数器,并利用Increase方法进行计数。在超过设置的数据量时,该方法会调用_expirationTokenSource的Cancel方法。在实现的ActiveChangeCallbacks方法种,我们将针对这两个CancellationTokenSource的释放放在注册的回调中。

四、接收、缓冲、打包和处理数据:Batcher<T>

最终用于打包的Batcher类型定义如下。在构造函数中,我们除了提供上述两个阈值外,还提供了一个Action<Batch<T>>委托完成针对打包数据的处理。通过Add方法接收的数据存储在_data字段返回的数组上,它时通过Batch<T>的静态方法CreatePooledArray提供的。我们使用字段_index表示添加数据在_data数组中存储的位置,并使用InterLocked.Increase方法解决并发问题。

public sealed class Batcher<T> : IDisposable where T : class
{
    private readonly Action<Batch<T>> _processor;
    private T[] _data;
    private BatchChangeToken _changeToken = default!;
    private readonly int _batchSize;
    private int _index = -1;
    private readonly IDisposable _scheduler;

    public Batcher(Action<Batch<T>> processor, int batchSize, TimeSpan interval)
    {
        _processor = processor ?? throw new ArgumentNullException(nameof(processor));
        _batchSize = batchSize;
        _data = Batch<T>.CreatePooledArray(batchSize);
        _scheduler = ChangeToken.OnChange(() => _changeToken = new BatchChangeToken(_batchSize, interval), OnChange);

        void OnChange()
        {
            var data = Interlocked.Exchange(ref _data, Batch<T>.CreatePooledArray(batchSize));
            if (data[0] is not null)
            {
                Interlocked.Exchange(ref _index, -1);
                _ = Task.Run(() => _processor.Invoke(new Batch<T>(data)));
            }
        }
    }

    public void Add(T item)
    {
        if (item is null) throw new ArgumentNullException(nameof(item));
        var index = Interlocked.Increment(ref _index);
        if (index >= _batchSize)
        {
            SpinWait.SpinUntil(() => _index < _batchSize - 1);
            Add(item);
        }
        _data[index] = item;
        _changeToken.Increase();
    }

    public void Dispose() => _scheduler.Dispose();
}

在构造函数中,我们调用了ChangeToken的静态方法OnChange将数据处理操作绑定到创建的BatchChangeToken对象上,并确保每次发送“数据处理”后将重新创建的BatchChangeToken对象赋值到_changeToken字段上,因为Add放到需要调用它的Increase增加计数。当接收到数据处理通知后,我们会调用Batch<T>的静态方法CreatePooledArray构建一个数组将字段 ­_data引用的数组替换下来,并将其封装成Batch<T>对象进行处理(如果数据存在)。于此同时,表示添加数据存储索引的_index恢复成-1。Add方法在对_index做自增操作后,如果发现累积的数据量达到阈值,需要等待数据处理完毕。由于数据处理以异步的方式处理,这里的耗时时很低的,所以我们这里选择了自旋的方式等待它完成。

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/607484.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ChatGPT与软件架构(3) - 软件架构提示工程

高效利用ChatGPT辅助研发的关键是在研发生命周期的不同阶段采用对应提示获取有益的帮助。原文: Leveraging Prompt Engineering in Software Architecture with ChatGPT 软件架构开发生命周期转型。 Beth Smith Unsplash 简介 作为解决方案架构师&#xff0c;有必要掌握软件架构…

【分布式架构】资源与事务:可观测性的基本二重性

西格曼&#xff1a;我叫本西格曼。我是Lightstep的联合创始人兼首席执行官。我在这里讨论的是资源和事务&#xff0c;这是可观察性的一个基本的二元性。我职业生涯的大部分时间都在研究可观察性。在我职业生涯之初&#xff0c;我在谷歌工作了九年&#xff0c;致力于谷歌的分布式…

SLAM实战项目(1) — ORB-SLAM2稠密地图重建

目录 1 整体思路 2 功能实现 3 结果运行 (1) TUM数据集下载 (2) associate.py用于RGB和Depth匹配 (3) 运行数据集 4 CMakeLists.txt文件修改 5 完整PointCloudMapping.h和PointCloudMapping.cc 6 报错分析 7 思考扩展 文章参考部分开源代码和报错文章 1 整体思路 利…

【微信小程序开发】第 3 节 - 安装开发者工具

欢迎来到博主 Apeiron 的博客&#xff0c;祝您旅程愉快 &#xff01; 时止则止&#xff0c;时行则行。动静不失其时&#xff0c;其道光明。 目录 1、缘起 2、微信开发者工具 3、下载 4、安装 5、扫码登录 6、设置外观和代理 7、总结 1、缘起 开发微信小程序从大的方…

吊打面试官的16000字JVM专属秘籍,又一个Java面试神器!终于可在简历写上精通JVM了!

前言 吊打面试官的16000字JVM专属秘籍&#xff0c;总共包含三部分的内容&#xff0c;从基础到进阶带大家一步步深入理解JVM&#xff01; 学完就可以在简历上面直接写上精通JVM&#xff01; 因为篇幅限制这里只给大家做简单的一个介绍&#xff0c;也就是进行一个大点的梳理&a…

记录--手把手教你Vue+ECharts+高德地图API实现天气预报数据可视化

这里给大家分享我在网上总结出来的一些知识&#xff0c;希望对大家有所帮助 前言 所谓数据可视化&#xff0c;我们可以理解为从宏观角度来看一眼就能看出来整个数据的占比&#xff0c;走向。对于数据可视化&#xff0c;很多互联网公司是很看重这一块的&#xff0c;包括大厂&…

基于wireshark打造安全分析师工具--解析suricata中的分析结果

从本篇文章开始&#xff0c;我将通过若干篇文章陆续介绍在实际安全运营的过程中&#xff0c;基于wireshark打造安全分析师趁手的流量威胁分析工具&#xff0c;帮助安全分析人员在面对网络数据包取证和分析时候达到事半功倍的效果。本篇文件介绍使用在使用iwreshark分析数据包事…

21天学会C++:Day7----auto关键字

CSDN的uu们&#xff0c;大家好。这里是C入门的第七讲。 座右铭&#xff1a;前路坎坷&#xff0c;披荆斩棘&#xff0c;扶摇直上。 博客主页&#xff1a; 姬如祎 收录专栏&#xff1a;C专题 目录 1. 知识引入 2. auto的使用 2.1 auto与指针和引用结合起来使用 2.2 在同一…

区分序列/UIO/特征集示例

区分序列/UIO/特征集示例 从确定性有限状态机进行测试&#xff1a;检查状态 概述 让我们假设我们有一个状态集 S 的 FSM M。还假设我们知道通过转换 t 达到的当前状态是 s 或 s0。 我们如何确定 t 到达了哪个状态&#xff1f; 分离状态 输入序列 w 将两个状态 s 和 s0 分开&…

C++进阶之继承

文章目录 前言一、继承的概念及定义1.继承概念2.继承格式与访问限定符3.继承基类与派生类的访问关系变化4.总结 二、基类和派生类对象赋值转换基本概念与规则 三、继承中的作用域四、派生类的默认成员函数五、继承与友元六、继承与静态成员六、复杂的菱形继承及菱形虚拟继承七、…

图论试题2020

n-m 2 16 Pk(Kn)k(k-1)…(k-n1)。 C&#xff1a;A2对角线元素aii2等于对应顶点vi的度数&#xff0c;所以对角线元素之和等于边数的两倍。 A的所有特征值的平方和等于A2的对角线元素之和。 B 完全图没有顶点隔&#xff0c;实际上也只有以完全图为生成子图的图没有顶点隔。 连通…

Qt6 C++基础入门1 定时器与QTimer

定时器 定时器图片流水灯案例 实现效果&#xff1a;构建一个界面&#xff0c;点击开始按钮轮流播放文件夹下图片&#xff0c;点击停止按钮停止播放 构建页面&#xff0c;上部是一个没有内容的 label 下面是开始和暂停按钮&#xff0c;各自的名称分别为 startBtn 和 stopBtn 先保…

6.事件绑定

目录 1 事件对象的属性 2 事件绑定方式 3 在事件中赋值 4 事件传参 1 事件对象的属性 target是触发该事件源头的组件&#xff0c;currentTarget是当前事件所绑定的组件&#xff0c;比如现在有一个父组件包着子组件&#xff0c;你给父组件绑定事件&#xff0c;由于事件…

ps磨皮插件专用智能磨皮插件Portraiture4

Portraiture是一款智能磨皮插件&#xff0c;为Photoshop和Lightroom添加一键磨皮美化功能&#xff0c;快速对照片中皮肤、头发、眉毛等部位进行美化&#xff0c;无需手动调整&#xff0c;大大提高P图效率。全新4版本&#xff0c;升级AI算法&#xff0c;并独家支持多人及全身模式…

从0到1深入剖析微服务架构,阿里人十年经验浓缩成一份笔记

前言 数字化经济的快速发展和云计算给底层IT系统带来的巨大变革正是当下微服务架构快速发展的时代背景。Gartner预计&#xff0c;从2018年到2022年&#xff0c;PaaS将成为未来的主流平台交付模式&#xff0c;而PaaS平台需要更加灵活的云原生应用架构做技术支撑&#xff0c;微服…

图论与算法(3)图的深度优先遍历

1. 遍历的意义 1.1 图的遍历 图的遍历是指按照一定规则访问图中的所有顶点&#xff0c;以便获取图的信息或执行特定操作。常见的图遍历算法包括深度优先搜索&#xff08;DFS&#xff09;和广度优先搜索&#xff08;BFS&#xff09;。 深度优先搜索&#xff08;DFS&#xff0…

UART串口通信实验

不管是单片机开发还是嵌入式 Linux 开发&#xff0c;串口都是最常用到的外设。 可以通过串口将开发板与电脑相连&#xff0c;然后在电脑上通过串口调试助手来调试程序。 还有很多模块&#xff0c;比如蓝牙、GPS、GPRS等都使用串口与主控进行通信。 UART简介 串口全称串行接口…

vb6 Webview2微软Edge Chromium内核执行JS取网页数据测速

微软Edge Chromium内核执行JS获取网页数据测试 ExcuteScript eval(document.body.innerHTML) from : https://www.163.com 采集的网页HTM字符串占用字节空间1.2MB ExcuteScript回调事件中取得JS执行结果&#xff0c;用时 54 毫秒 其中JSON转字符13.5209毫秒 jSON数据长度: 增…

ChatGPT更新说明(20230524)

原文传送门&#xff1a;ChatGPT — Release Notes 更新说明&#xff08;5月24日&#xff09; 简要&#xff1a;iOS应用在更多国家可用&#xff0c;Alpha测试中的共享链接&#xff0c;Bing插件&#xff0c;iOS上的历史记录禁用 ChatGPT iOS应用在更多国家可用 好消息&#xf…

Elasticsearch:如何使用集群级别的分片分配过滤(不包括节点)安全地停用节点

当你想停用 Elasticsearch 中的节点时&#xff0c;通常的过程不是直接销毁节点。 如果你这样做&#xff0c;那么你就有数据丢失的风险&#xff0c;这不是你想要对应该是可靠的数据库做的事情。 这样做的问题是&#xff0c;节点很可能会通过 Elasticsearch 处理的恰当命名的分片…