C# 基于共享内存实现跨进程队列

news2024/11/16 9:45:18

C# 进程通信系列

第一章 共享内存
第二章 共享队列(本章)


文章目录

  • C# 进程通信系列
  • 前言
  • 一、实现原理
    • 1、用到的主要对象
    • 2、创建共享内存
    • 3、头部信息
    • 4、入队
    • 5、出队
    • 6、释放资源
  • 二、完整代码
  • 三、使用示例
    • 1、传输byte[]数据
    • 2、传输字符串
    • 3、传输对象
  • 总结


前言

进程通信一般情况下比较少用,但是也有一些使用场景,有些做视频传输的似乎会用多进程来实现,还有在子进程中调用特定的库来避免内存泄漏,笔者最近也遇到了需要使用多进程的场景。多进程的使用最主要的就是进程间的通信,本文参考了go语言的ipc库,实现了一个基于共享内存的跨进程队列。


一、实现原理

1、用到的主要对象

//共享内存管理对象
MemoryMappedFile _mmf;
//跨进程的互斥变量
Mutex _mtx;
//入队信号量
Semaphore _semaEq;
//出队信号量
Semaphore _semaDq;

2、创建共享内存

创建共享内存需要使用MemoryMappedFile.CreateFromFile实现跨平台。CreateNew只能创建无法打开第二个,OpenExisting只支持windows。

string name="共享内存标识名称";
_shmPath="共享内存文件路径"+name;
//通过文件路径创建共享内存
_mmf = MemoryMappedFile.CreateFromFile(File.Open(_shmPath, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.ReadWrite), null, (_QueuetHeaderSize + (elementBodyMaxSize + _ElementHeaderSize) * capacity), MemoryMappedFileAccess.ReadWrite, HandleInheritability.Inheritable, false);
//创建互斥变量
_mtx = new Mutex(false, Name + ".mx");
//创建入队信号量,capacity为队列元素个数容量
_semaEq = new Semaphore(0, (int)capacity, name+ ".eq");
//创建出队信号量,capacity为队列元素个数容量
_semaDq = new Semaphore((int)capacity, (int)capacity, name + ".dq");

获取读写对象

_mmva = _mmf.CreateViewAccessor();

值类型数组方式写入

T[] obj;
_mmva.WriteArray<T>(position , obj, 0, obj.Length);

值类型数组方式读取

T[] arr=new T[n];
_mmva.ReadArray(position, arr, 0, arr.ArrayLength);

3、头部信息

采用循环队列方式实现,判断队空队满通过count、capacity的方式(参考了C#的Queue源码),避免占用多一个空间。

struct QueueHeader
{
    //元素大小
    public nint ElementSize;
    //队列容量
    public nint Capacity;
    //当前元素个数
    public nint Count;
    //队列头
    public nint Front;
    //队列尾
    public nint Rear;
}

队列头信息需要存储在共享内存中。

QueueHeader Header
{
    get
    {
        QueueHeader header;
        _mmva.Read(0, out header);
        return header;
    }
    set
    {
        _mmva.Write(0, ref value);
    }
}

4、入队

示例如下

bool Enqueuee<T>(T[] obj) where T : struct
{    
    //共享内存中读取header
    var header = Header;
    //队列满返回
    if (header.Count == header.Capacity) return false;
    //计算写入的位置,头部长度+队尾*元素大小
    nint position = _QueuetHeaderSize + header.Rear * header.ElementSize;
    //写入共享内存
    _mmva.WriteArray<T>(position, obj, 0, obj.Length);
    //更新队尾
    header.Rear = (header.Rear + 1) % header.Capacity;
    //更新长度
    header.Count++;
    //更新头部信息到共享内存
    Header=header;
    return true;
}

同步

 //等待出队信号量(如果队列满则会等待)
 if (!_semaDq.WaitOne(timeout)) return false;
 //进入互斥锁
 if (!_mtx.WaitOne(timeout)) return false;
 try
 {   
     //入队
     Enqueue(obj);
 }
 finally
 {
     //通知入队信号量
     _semaEq.Release();
     //释放互斥锁
     _mtx.ReleaseMutex();
 }
 return true;

5、出队

object Dequeue()
{   
    //共享内存中读取header
    var header = Header;
    //队列空则返回
    if (header.Count == 0) return null;
    //计算读取的位置,头部长度+队头*元素大小
    long position = _QueuetHeaderSize + header.Front * header.ElementSize;
    //创建数据用于装载数据
    Array arr = Array.CreateInstance(readType, msg.Header.ArrayLength);
    //将泛型转type调用。
    var readArray = _ReadArrayGeneric.MakeGenericMethod(readType);
    //读取共享内存的数据
    readArray.Invoke(_mmva, [position , arr, 0, arr.Length]);
    //更新队头
    header.Front = (header.Front + 1) % header.Capacity;
    //更新长度
    header.Count--;
    //更新头部信息到共享内存
    Header = header;
    return msg;
}

同步

 //等待入队信号量(如果队列空则会等待)
if (!_semaEq.WaitOne(timeout)) return null;
 //进入互斥锁
if (!_mtx.WaitOne(timeout!)) return null;
try
{ 
   //出队
    return  Dequeue();
}
finally
{
     //通知入队信号量
    _semaDq.Release();
     //释放互斥锁
    _mtx.ReleaseMutex();
}

6、释放资源

/// <summary>
/// 销毁队列,只会销毁当前实例,如果多个队列打开同个名称,其他队列不受影响
/// </summary>
public void Dispose()
{
    _mmf.Dispose();
    _mmva.Dispose();
    _mtx.Dispose();
    _semaEq.Dispose();
    _semaDq.Dispose();
}

二、完整代码

类的定义

/// <summary>
/// 共享队列
/// 基于共享内存实现
/// </summary>
class SharedQueue : IDisposable
{
    /// <summary>
    /// 名称
    /// </summary>
    public string Name { get; private set; }
    /// <summary>
    /// 元素最大大小
    /// </summary>
    public long ElementMaxSize { get; private set; }
    /// <summary>
    /// 队列容量
    /// </summary>
    public long Capacity { get; private set; }
    /// <summary>
    /// 表示是否新创建,是则是创建,否则是打开已存在的。
    /// </summary>
    public bool IsNewCreate { get; private set; }
    /// <summary>
    /// 构造方法
    /// </summary>
    /// <param name="name">唯一名称,系统级别,不同进程创建相同名称的本对象,就是同一个队列,可以进行数据传输。</param>
    /// <param name="capacity">队列容量,元素个数总量</param>
    /// <param name="elementBodyMaxSize">队列元素最大大小,此大小需要考虑传输数据Type.FullName长度</param>
    public SharedQueue(string name, nint capacity = 1, nint elementBodyMaxSize = 3145728)
    /// <summary>
    /// 发送数据
    /// </summary>
    /// <param name="obj">发送的对象,支持值类型(元类型、结构体)、值类型数组、可json序列化的任意对象(实体类、数组、List、字典等等),无法序列化会产生异常。
    /// 会根据类型自动判断传输方式,值类型以及值类型数组会直接内存拷贝,引用类型会进行序列化。
    /// 此方法队列满了会阻塞,直到发送成功才返回。
    /// </param>
    /// <param name="isForceSerialize">是否强制序列化,结构体不含引用的情况下会直接复制数据性能较高,但是如果结构体成员变量有引用类型则会引发异常,此时可以强制序列化。</param>
    public void Send(object obj, bool isForceSerialize = false);
    /// <summary>
    /// 接收数据
    /// 此方法队列空会阻塞,直到有数据才返回。
    /// </summary>
    /// <returns>接收的数据,与send的数据类型对应。可以通过type或is判断,或者提前知道类型直接转换</returns>
    public object Receive();
    /// <summary>
    /// 发送数据超时
    /// </summary>
    /// <param name="obj">发送的对象,支持值类型(元类型、结构体)、值类型数组、可json序列化的任意对象(实体类、数组、List、字典等等),无法序列化会产生异常。
    /// 会根据类型自动判断传输方式,值类型以及值类型数组会直接内存拷贝,引用类型会进行序列化。</param>
    /// <param name="timeout">超时时长</param>
    /// <param name="isForceSerialize">是否强制序列化,结构体不含引用的情况下会直接复制数据性能较高,但是如果结构体成员变量有引用类型则会引发异常,此时可以强制序列化。</param>
    /// <returns>true发送成功,false超时</returns>
    public bool SendTimeout(object obj, TimeSpan timeout, bool isForceSerialize = false);
    /// <summary>
    /// 接收超时
    /// </summary>
    /// <param name="timeout">超时时长</param>
    /// <returns>接收的数据,与send的数据类型对应。可以通过type或is判断,或者提前知道类型直接转换。
    /// 超时返回null。
    /// </returns>
    public object? ReceiveTimeout(TimeSpan timeout);
    /// <summary>
    /// 销毁队列,只会销毁当前实例,如果多个队列打开同个名称,其他队列不受影响
    /// </summary>
    public void Dispose();
}

项目
vs2022 .net 8.0 控制台项目
https://download.csdn.net/download/u013113678/89544650


三、使用示例

1、传输byte[]数据

进程a

SharedQueue shq= new SharedQueue("shq1", 10);
byte[] a = new byte[5] { 1, 2, 3, 4, 5 };
//发送数据
shq.send(a);

进程b

SharedQueue shq= new SharedQueue("shq1", 10);
//接收数据
var a=shq.Receive() as byte[];
Console.Write("receive: ");
foreach (var i in a)
{
    Console.Write(i);
}

在这里插入图片描述

2、传输字符串

进程a

SharedQueue shq= new SharedQueue("shq1", 10,64);
shq.send("123456");

进程b

SharedQueue shq= new SharedQueue("shq1", 10,64);
var a=shq.Receive() as string;
Console.WriteLine("receive: " + a);

在这里插入图片描述

3、传输对象

class A
{
    public string Name;
    public int Number;
}

进程a

SharedQueue shq= new SharedQueue("shq1", 10,64);
sq.Send(new A() { Name = "Tommy", Number = 102185784 });

进程b

SharedQueue shq= new SharedQueue("shq1", 10,64);
var a=shq.Receive() as A;
Console.WriteLine("receive: " + a.Name + " " + a.Number);

在这里插入图片描述


总结

以上就是今天要讲的内容,实现这样的一个对象,虽然代码量不多,但还是有一点难度的,很多细节需要处理,比如泛型转type以统一接口,信号量实现队列和条件变量是有差异的,用CreateFromFile才能实现跨平台。总的来说,有了这样的一个队列,跨线程通信就变的比较方便且高效了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1928430.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

HarmonyOS 屏幕适配设计

1. armonyOS 屏幕适配设计 1.1. 像素单位 &#xff08;1&#xff09;px (Pixels)   px代表屏幕上的像素点&#xff0c;是手机屏幕分辨率的单位&#xff0c;即屏幕物理像素单位。 &#xff08;2&#xff09;vp (Viewport Percentage)   vp是视口百分比单位&#xff0c;基于…

Java学习之SPI、JDBC、SpringFactoriesLoader、Dubbo

概述 SPI&#xff0c;Service Provider Interface&#xff0c;一种服务发现机制&#xff0c;指一些提供给你继承、扩展&#xff0c;完成自定义功能的类、接口或方法。 在SPI机制中&#xff0c;服务提供者为某个接口实现具体的类&#xff0c;而在运行时通过SPI机制&#xff0c…

Facebook未来展望:数字社交平台的进化之路

在信息技术日新月异的时代&#xff0c;社交媒体平台不仅是人们交流沟通的重要工具&#xff0c;更是推动社会进步和变革的重要力量。作为全球最大的社交媒体平台之一&#xff0c;Facebook在过去十多年里&#xff0c;不断创新和发展&#xff0c;改变了数十亿用户的社交方式。展望…

构建企业多层社会传播网络:以AI智能名片S2B2C商城小程序为例

摘要&#xff1a;在数字化转型的浪潮中&#xff0c;企业如何有效构建并优化其社会传播网络&#xff0c;已成为提升市场竞争力、深化用户关系及实现价值转化的关键。本文以AI智能名片S2B2C商城小程序为例&#xff0c;深入探讨如何通过一系列精细化的策略与技术创新&#xff0c;构…

IP地址知识点

一、IP地址组成 把一个IP地址分成两部分&#xff1a;网络号&#xff08;标识了一个局域网&#xff09;主机号&#xff08;标识了一个局域网中的设备&#xff09; 下图是通过一个路由器连接的两个局域网&#xff08;两个相邻的局域网&#xff09;&#xff0c;网络号不相同&…

AI绘画入门实践|Midjourney 的模型版本

模型分类 Midjourney 的模型主要分为2大类&#xff1a; 默认模型&#xff1a;目前包括&#xff1a;V1, V2, V3, V4, V5.0, V5.1, V5.2, V6 NIJI模型&#xff1a;目前包括&#xff1a;NIJI V4, NIJI V5, NIJI V6 模型切换 你在服务器输入框中输入 /settings&#xff1a; 回车后…

Mac电脑清理软件有哪些 MacBooster和CleanMyMac哪个好用 苹果电脑清理垃圾软件推荐 cleanmymac和柠檬清理

对于苹果电脑用户来说&#xff0c;‌选择合适的清理软件可以帮助优化电脑性能&#xff0c;‌释放存储空间&#xff0c;‌并确保系统安全。一款好用的苹果电脑清理软件&#xff0c;能让Mac系统保持良好的运行状态&#xff0c;避免系统和应用程序卡顿的产生。有关Mac电脑清理软件…

什么是MOW,以bitget钱包为例

元描述&#xff1a;MOW凭借其富有创意的故事情节和广阔的潜力在Solana上脱颖而出。本文深入探讨了其独特的概念和光明的未来。 Mouse in a Cats World (MOW)是一个基于Solana区块链的创新meme项目&#xff0c;它重新构想了一个异想天开且赋予权力的故事。在这个奇幻的宇宙中&am…

JuiceFS、Ceph 和 MinIO 结合使用

1. 流程图 将 JuiceFS、Ceph 和 MinIO 结合使用&#xff0c;可以充分利用 Ceph 的分布式存储能力、JuiceFS 的高性能文件系统特性&#xff0c;以及 MinIO 提供的对象存储接口。以下是一个方案&#xff0c;介绍如何配置和部署 JuiceFS 使用 Ceph 作为其底层存储&#xff0c;并通…

非法闯入智能监测摄像机:安全守护的新利器

在当今社会&#xff0c;安全问题愈发受到重视。随着科技的进步&#xff0c;非法闯入智能监测摄像机应运而生&#xff0c;成为保护家庭和财产安全的重要工具。这种摄像机不仅具备监控功能&#xff0c;还集成了智能识别和报警系统&#xff0c;能够在第一时间内检测到潜在的入侵行…

three.js创建基础模型

场景是一个三维空间&#xff0c;是所有物品的容器。可以将其想象成一个空房间&#xff0c;里面可以放置要呈现的物体、相机、光源等。 通过new THREE.Scene()来创建一个新的场景。 /**1. 创建场景 -- 放置物体对象的环境*/ const scene new THREE.Scene();场景只是一个三维的…

JVM(day2)

经典垃圾收集器 Serial收集 使用一个处理器或一条收集线程去完成垃圾收集工作&#xff0c;更重要的是强调在它进行垃圾收集时&#xff0c;必须暂停其他所有工作线程&#xff0c;直到它收集结束。 ParNew收集器 ParNew 收集器除了支持多线程并行收集之外&#xff0c;其他与 …

HTTP背后的故事:理解现代网络如何工作的关键(二)

一.认识请求方法(method) 1.GET方法 请求体中的首行包括&#xff1a;方法&#xff0c;URL&#xff0c;版本号 方法描述的是这次请求&#xff0c;是具体去做什么 GET方法&#xff1a; 1.GET 是最常用的 HTTP 方法. 常用于获取服务器上的某个资源。 2.在浏览器中直接输入 UR…

【实战:Django-Celery-Flower实现异步和定时爬虫及其监控邮件告警】

1 Django中集成方式一&#xff08;通用方案&#xff09; 1.1 把上面的包-复制到djagno项目中 1.2 在views中编写视图函数 1.3 配置路由 1.4 浏览器访问&#xff0c;提交任务 1.5 启动worker执行任务 1.6 查看任务结果 2 Django中集成方式二&#xff08;官方方案&#xff0…

25_Vision Transformer原理详解

1.1 简介 Vision Transformer (ViT) 是一种将Transformer架构从自然语言处理(NLP)领域扩展到计算机视觉(CV)领域的革命性模型&#xff0c;由Google的研究人员在2020年提出。ViT的核心在于证明了Transformer架构不仅在处理序列数据&#xff08;如文本&#xff09;方面非常有效&…

探索智能合约在金融科技中的前沿应用与挑战

随着区块链技术的发展和普及&#xff0c;智能合约作为其核心应用之一&#xff0c;在金融科技&#xff08;FinTech&#xff09;领域中展现出了巨大的潜力和挑战。本文将深入探讨智能合约的基本概念、前沿应用案例&#xff0c;以及面临的技术挑战和发展趋势&#xff0c;旨在帮助读…

redis笔记2

redis是用c语言写的,放不频繁更新的数据&#xff08;用户数据。课程数据&#xff09; Redis 中&#xff0c;"穿透"通常指的是缓存穿透&#xff08;Cache Penetration&#xff09;问题&#xff0c;这是指一种恶意或非法请求直接绕过缓存层&#xff0c;直接访问数据库或…

HouseCrafter:平面草稿至3D室内场景的革新之旅

在室内设计、房地产展示和影视布景设计等领域,将平面草稿图快速转换为立体的3D场景一直是一个迫切的需求。HouseCrafter,一个创新的AI室内设计方案,正致力于解决这一挑战。本文将探索HouseCrafter如何将这一过程自动化并提升至新的高度。 一、定位:AI室内设计的革新者 Ho…

通过MATLAB控制TI毫米波雷达的工作状态之TLV数据解析及绘制

前言 前一章博主介绍了如何基于设计视图中的这些组件结合MATLAB代码来实现TI毫米波雷达数据的实时采集。这一章将在此基础上实现TI毫米波雷达的TLV数据解析。过程中部分算法会涉及到一些简单的毫米波雷达相关算法,需要各位有一定的毫米波雷达基础。 TLV数据之协议解析 紧着…

爬虫学习 | 01 Web Scraper的使用

目录 背景介绍&#xff1a; 第一部分&#xff1a;Web Scraper简介 1.什么是Web Scraper&#xff1a; Web Scraper&#x1f6d2; 主要用途&#xff1a; 2.为什么选择Web Scraper&#xff1a; 第二部分&#xff1a;安装Web Scraper ​​​​​1.打开google浏览器&#xf…