C# 实现条件变量

news2025/1/9 19:55:38

C# 进程通信系列

第一章 共享内存
第二章 条件变量(本章)
第三章 消息队列


文章目录

  • C# 进程通信系列
  • 前言
  • 一、关键实现
    • 1、用到的主要对象
    • 2、初始化区分创建和打开
    • 3、变量放到共享内存
    • 4、等待和释放逻辑
  • 二、完整代码
  • 三、使用示例
    • 1、线程同步控制
    • 2、进程同步控制
  • 总结


前言

C#提供的多进程同步对象有互斥锁和信号量,但是并没有条件变量。虽然信号量条件变量一定程度可以等效,但是具体的使用还是会有区别。比如说消息队列用条件变量就比信号量方便,用信号量要么缺乏灵活性,要么辅助代码已经和实现一个条件变量没区别了。本文提供了一种条件变量的实现方法,可以用于进程间的同步控制。


一、关键实现

1、用到的主要对象

下列对象都是跨进程的

//互斥变量,
Mutex _mtx;
//等待发送信号量
Semaphore _waitSem;
//等待完成信号量
Semaphore _waitDone;
//共享内存,用于存储计数变量
MemoryMappedFile _mmf;
//共享内存的读写对象
MemoryMappedViewAccessor _mmva;

2、初始化区分创建和打开

利用Mutex判断是创建还是打开

bool isCreateNew;
_mtx = new Mutex(false, name, out isCreateNew);
 if(isCreateNew){
  //只能在创建时,初始化共享变量
  }

3、变量放到共享内存

条件变量需要的计算对象就两个Waiting、Signals表示等待数和释放数。

//放到共享内存的数据
struct SharedData
{
    public int Waiting;
    public int Signals;
}
SharedData Data
{
    set
    {
        _mmva.Write(0, ref value);
    }
    get
    {
        SharedData ret;
        _mmva.Read(0, out ret);
        return ret;
    }
}

4、等待和释放逻辑

参考了SDL2的条件变量实现,具体略。有兴趣的朋友可以自行查找源码查看。


二、完整代码

using System.IO.MemoryMappedFiles;
using System.Runtime.InteropServices;

namespace AC
{
    /************************************************************************
    * @Project:  	AC::ConditionVariable
    * @Decription:   条件变量
    *                支持跨进程
    * @Verision:  	v1.0.0    
    *              更新日志
    *              v1.0.0:实现基本功能
    *              v1.1.0:优化进程内逻辑
    * @Author:  	Xin
    * @Create:  	2024/07/18 15:25:00
    * @LastUpdate:  2024/07/21 20:53:00
    ************************************************************************
    * Copyright @ 2025. All rights reserved.
    ************************************************************************/
    class ConditionVariable : IDisposable
    {
        /// <summary>
        /// 构造方法
        /// 本进程内使用条件变量,会更轻量一些。
        /// </summary>
        public ConditionVariable()
        {
            bool isCreateNew;
            Initialize(null, out isCreateNew);
        }
        /// <summary>
        /// 构造方法
        /// </summary>
        /// <param name="name">唯一名称,系统级别,不同进程创建相同名称的本对象,就是同一个条件变量。</param>
        public ConditionVariable(string name)
        {
            bool isCreateNew;
            Initialize(name, out isCreateNew);
        }
        /// <summary>
        /// 构造方法
        /// </summary>
        /// <param name="name">唯一名称,系统级别,不同进程创建相同名称的本对象,就是同一个条件变量。</param>
        /// <param name="isCreateNew">表示是否新创建,是则是创建,否则是打开已存在的。</param>
        public ConditionVariable(string? name, out bool isCreateNew)
        {
            Initialize(name, out isCreateNew);
        }

        /// <summary>
        /// 等待
        /// </summary>
        /// <param name="outerMtx">外部锁</param>
        public void WaitOne(Mutex outerMtx)
        {
            WaitOne(Timeout.InfiniteTimeSpan, outerMtx);
        }
        /// <summary>
        /// 等待超时
        /// </summary>
        /// <param name="timeout">超时时间</param>
        /// <param name="outerMtx">外部锁,可以跨进程使用</param>
        /// <returns>是则成功,否则超时</returns>
        public bool WaitOne(TimeSpan timeout, Mutex outerMtx)
        {
            return WaitOne(timeout, (object)outerMtx);
        }

        /// <summary>
        /// 等待
        /// </summary>
        /// <param name="outerMtx">外部锁,为lock关键字的对象,只能本进程使用</param>
        public void WaitOne(object outerMtx)
        {
            WaitOne(Timeout.InfiniteTimeSpan, outerMtx);
        }

        /// <summary>
        /// 等待超时
        /// </summary>
        /// <param name="timeout">超时时间</param>
        /// <param name="outerMtx">外部锁</param>
        /// <returns>是则成功,否则超时</returns>
        public bool WaitOne(TimeSpan timeout, object outerMtx)
        {
            bool isNotTimeout;
            //记录等待数量
            if (_mtx != null) _mtx.WaitOne(); else Monitor.Enter(_waitSem);
            var ws = Data;
            ws.Waiting++;
            Data = ws;
            if (_mtx != null) _mtx.ReleaseMutex(); else Monitor.Exit(_waitSem);
            //解除外部的互斥锁,让其他线程可以进入条件等待。
            if (outerMtx is Mutex) ((Mutex)outerMtx).ReleaseMutex(); else Monitor.Exit(outerMtx);
            //等待信号
            isNotTimeout = _waitSem.WaitOne(timeout);
            if (_mtx != null) _mtx.WaitOne(); else Monitor.Enter(_waitSem);
            ws = Data;
            if (isNotTimeout && ws.Signals > 0)
            {
                //通知发送信号的线程,等待完成。
                _waitDone.Release();
                ws.Signals--;
            }
            ws.Waiting--;
            Data = ws;
            if (_mtx != null) _mtx.ReleaseMutex(); else Monitor.Exit(_waitSem);
            //加上外部互斥锁,还原外部的锁状态。
            if (outerMtx is Mutex) ((Mutex)outerMtx).WaitOne(); else Monitor.Enter(outerMtx);
            return !isNotTimeout;
        }


        /// <summary>
        /// 释放,通知
        /// </summary>
        public void Release()
        {
            if (_mtx != null) _mtx.WaitOne(); else Monitor.Enter(_waitSem);
            var ws = Data;
            if (ws.Waiting > ws.Signals)
            {
                ws.Signals++;
                Data = ws;
                _waitSem.Release();
                if (_mtx != null) _mtx.ReleaseMutex(); else Monitor.Exit(_waitSem);
                _waitDone.WaitOne();
            }
            else
            {
                if (_mtx != null) _mtx.ReleaseMutex(); else Monitor.Exit(_waitSem);
            }
        }
        /// <summary>
        /// 释放全部,广播
        /// </summary>
        public void ReleaseAll()
        {
            if (_mtx != null) _mtx.WaitOne(); else Monitor.Enter(_waitSem);
            var ws = Data;
            if (ws.Waiting > ws.Signals)
            {
                int waiting = ws.Waiting - ws.Signals;
                ws.Signals = ws.Waiting;
                Data = ws;
                _waitSem.Release(waiting);
                if (_mtx != null) _mtx.ReleaseMutex(); else Monitor.Exit(_waitSem);
                _waitDone.WaitOne(waiting);
            }
            else
            {
                if (_mtx != null) _mtx.ReleaseMutex(); else Monitor.Exit(_waitSem); _mtx.ReleaseMutex();
            }
        }
        /// <summary>
        /// 销毁对象,只会销毁当前实例,如果多个打开同个名称,其他对象不受影响
        /// </summary>
        public void Dispose()
        {
            _mtx?.Dispose();
            _waitSem.Dispose();
            _waitDone.Dispose();
            _mmva?.Dispose();
            _mmf?.Dispose();
        }
        void Initialize(string? name, out bool isCreateNew)
        {
            Mutex? mtx = null;
            Semaphore? waitSem = null;
            Semaphore? waitDone = null;
            MemoryMappedFile? mmf = null;
            MemoryMappedViewAccessor? mmva = null;
            try
            {
                if (name != null)
                {
                    mtx = _mtx = new Mutex(false, name, out isCreateNew);
                    _mtx.WaitOne();
                    try
                    {
                        waitSem = _waitSem = new Semaphore(0, int.MaxValue, name + ".cv.ws");
                        waitDone = _waitDone = new Semaphore(0, int.MaxValue, name + ".cv.wd");
                        var _shmPath = Path.Combine(_TempDirectory, name + ".cv");
                        mmf = _mmf = MemoryMappedFile.CreateFromFile(File.Open(_shmPath, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.ReadWrite), null, Marshal.SizeOf<SharedData>(), MemoryMappedFileAccess.ReadWrite, HandleInheritability.Inheritable, false);
                        mmva = _mmva = _mmf.CreateViewAccessor();
                        if (isCreateNew) Data = new SharedData() { Signals = 0, Waiting = 0 };
                    }
                    finally
                    {
                        _mtx.ReleaseMutex();
                    }
                }
                else
                {
                    waitSem = _waitSem = new Semaphore(0, int.MaxValue);
                    waitDone = _waitDone = new Semaphore(0, int.MaxValue);
                    isCreateNew = true;
                    Data = new SharedData() { Signals = 0, Waiting = 0 };
                }
            }
            catch
            {
                mtx?.Dispose();
                waitSem?.Dispose();
                waitDone?.Dispose();
                mmf?.Dispose();
                mmva?.Dispose();
                isCreateNew = false;
                throw;
            }
        }

        Mutex? _mtx;
        Semaphore _waitSem;
        Semaphore _waitDone;
        MemoryMappedFile ?_mmf;
        MemoryMappedViewAccessor ?_mmva;
        SharedData _data;
        struct SharedData
        {
            public int Waiting;
            public int Signals;
        }
        SharedData Data
        {
            set
            {
                if (_mmva != null)
                {
                    _mmva.Write(0, ref value);
                }
                else
                {
                    _data = value;
                }
            }
            get
            {
                if (_mmva != null)
                {
                    SharedData ret;
                    _mmva.Read(0, out ret);
                    return ret;
                }
                return _data;
            }
        }
        static string _TempDirectory = Path.GetTempPath() + "EE3E9111-8F65-4D68-AB2B-A018DD9ECF3C";
    }
}

三、使用示例

1、线程同步控制

using AC;
ConditionVariable cv = new ConditionVariable();
string text = "";
//子线程发送消息
new Thread(() =>
{
    int n = 0;
    while (true)
    {
       lock(cv){
        text = (n++).ToString();
        //通知主线程
        cv.Release();
       }
    }

}).Start();
//主线程接收消息
while (true)
{
    lock(cv){
    //等待子消息
    cv.WaitOne(cv);
    Console.WriteLine(text);
    }
}

在这里插入图片描述

2、进程同步控制

进程A

//不同进程名称相同就是同一个对象
ConditionVariable cv = new ConditionVariable("cv1");
Mutex mutex = new Mutex(false,"mx1");
//进程A发送消息
while (true)
{
    mutex.WaitOne();
    //共享内存写略
    //通知进程B
    cv.Release();
    mutex.ReleaseMutex();
}

进程B

//不同进程名称相同就是同一个对象
ConditionVariable cv = new ConditionVariable("cv1");
Mutex mutex = new Mutex(false,"mx1");
//进程B接收消息
while (true)
{
    mutex.WaitOne();
    //等待进A程消息
    cv.WaitOne(mutex);
    //共享内存读略
    Console.WriteLine("收到进程A消息");
    mutex.ReleaseMutex();
}

在这里插入图片描述


总结

以上就是今天要讲的内容,之所以实现这样一个对象是因为,笔者在写跨进程队列通信,用信号量实现发现有所局限,想要完善与重写一个条件变量差异不大,索性直接实现一个条件变量,提供给队列使用,同时还具体通用性,在其他地方也能使用。总的来说,条件变量还是有用的,虽然需要遇到相应的使用场景才能意识到它的作用。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1947329.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

物理机 gogs+jenkins+sonarqube 实现CI/CD

一、部署gogs_0.11.91_linux_amd64.tar.gz gogs官网下载&#xff1a;https://dl.gogs.io/ yum -y install mariadb-serversystemctl start mariadbsystemctl enable mariadbuseradd gittar zxvf gogs_0.11.91_linux_amd64.tar.gzcd gogsmysql -u root -p < scripts/mysql.…

减轻幻觉新SOTA,7B模型自迭代训练效果超越GPT-4,上海AI lab发布

LLMs在回答各种复杂问题时&#xff0c;有时会“胡言乱语”&#xff0c;产生所谓的幻觉。解决这一问题的初始步骤就是创建高质量幻觉数据集训练模型以帮助检测、缓解幻觉。 但现有的幻觉标注数据集&#xff0c;因为领域窄、数量少&#xff0c;加上制作成本高、标注人员水平不一…

大厂面试官问我:两个1亿行的文件怎么求交集?【后端八股文十五:场景题合集】

本文为【场景题合集】初版&#xff0c;后续还会进行优化更新&#xff0c;欢迎大家关注交流~ hello hello~ &#xff0c;这里是绝命Coding——老白~&#x1f496;&#x1f496; &#xff0c;欢迎大家点赞&#x1f973;&#x1f973;关注&#x1f4a5;&#x1f4a5;收藏&#x1f…

第一百七十八节 Java IO教程 - Java符号链接、Java文件

Java IO教程 - Java符号链接 符号链接包含对另一个文件或目录的引用。 符号链接引用的文件称为符号链接的目标文件。 符号链接上的操作对应用程序是透明的。我们可以使用java.nio.file.Files类处理符号链接。 isSymbolicLink(Path p)方法检查指定路径指定的文件是否是符号链…

解决 MDCFilter 引起的 Shiro UnavailableSecurityManagerException 异常:将过滤器交给 Shiro 管理

若将自定义的 MDCFilter 注册到 FilterRegistrationBean 中&#xff0c;而又在 MDCFilter 中使用了和 Shiro 相关的操作&#xff08;如获取当前登录用户&#xff09;&#xff0c;此时会因为 MDCFilter 先于 SecurityManager 实例化导致出现 UnavailableSecurityManagerExceptio…

C语言 ——— 函数指针数组的讲解及其用法

目录 前言 函数指针数组的定义 函数指针数组的使用 前言 数组是存放一组相同类型数据的存储空间 关于指针数组的知识请见&#xff1a;C语言 ——— 指针数组 & 指针数组模拟二维整型数组-CSDN博客 那么要将多个函数的地址存储到数组中&#xff0c;这个数组该如何定义…

太原高校大学智能制造实验室数字孪生可视化系统平台建设项目验收

随着科技的不断进步&#xff0c;智能制造已经成为推动制造业转型升级的重要力量。太原高校大学智能制造实验室紧跟时代步伐&#xff0c;积极推进数字孪生可视化系统平台的建设&#xff0c;并于近日圆满完成了项目的验收工作。这一里程碑式的成果&#xff0c;不仅标志着实验室在…

Angular由一个bug说起之八:实践中遇到的一个数据颗粒度的问题

互联网产品离不开数据处理&#xff0c;数据处理有一些基本的原则包括&#xff1a;准确性、‌完整性、‌一致性、‌保密性、‌及时性。‌ 准确性&#xff1a;是数据处理的首要目标&#xff0c;‌确保数据的真实性和可靠性。‌准确的数据是进行分析和决策的基础&#xff0c;‌因此…

Three.js投射光线实现三维物体交互

<template><div id"webgl"></div> </template><script setup> import * as THREE from three //导入轨道控制器 import { OrbitControls } from three/examples/jsm/controls/OrbitControls // 导入 dat.gui import { GUI } from thre…

谷粒商城实战笔记-43-前端基础-Vue-使用Vue脚手架进行模块化开发

文章目录 一&#xff0c;Vue的模块化开发1&#xff0c;目录结构2&#xff0c;单文件组件 (SFC)3&#xff0c;模块化路由4&#xff0c;Vuex 模块5&#xff0c;动态组件和异步组件6&#xff0c;抽象和复用7&#xff0c;构建和打包8&#xff0c;测试9&#xff0c;文档和注释10&…

基于Neo4j将知识图谱用于检索增强生成:Knowledge Graphs for RAG

Knowledge Graphs for RAG 本文是学习https://www.deeplearning.ai/short-courses/knowledge-graphs-rag/这门课的学习笔记。 What you’ll learn in this course Knowledge graphs are used in development to structure complex data relationships, drive intelligent sea…

Node.js知识点总结

Node.js知识点总结 Node.js其本质和浏览器一样是一个JavaScript运行环境。浏览器的运行环境为V8引擎浏览器内置API&#xff08;BOM、DOM、Canvas);而node.js的运行环境是V8引擎node提供的API(fs、path、http)。它使JavaScript可以编写后端。 基本知识 fs文件系统模块 它提供一…

深度学习复盘与论文复现E

文章目录 一、项目复现的问题及其解决方案1、 Cannot find DGL C graphbolt library2、 “is“ with a literal. Did you mean ““?”3、运行SEG、SPG查看GATNet的网络结构4、关于LI-FPN项目找不到数据粒度不匹配问题5、关于LI-FPN项目num_samples为空6、解决路径问题7、 !ss…

Nginx 怎样处理请求的缓存数据清理策略?

&#x1f345;关注博主&#x1f397;️ 带你畅游技术世界&#xff0c;不错过每一次成长机会&#xff01; 文章目录 Nginx 怎样处理请求的缓存数据清理策略&#xff1f;一、理解 Nginx 缓存的重要性二、Nginx 缓存数据的类型&#xff08;一&#xff09;静态文件缓存&#xff08;…

PHP简单商城单商户小程序系统源码

&#x1f6cd;️轻松开店&#xff0c;触手可及&#xff01;「简单商城小程序」让电商梦想照进现实&#x1f31f; &#x1f389;开店新风尚&#xff0c;「简单商城小程序」引领潮流&#xff01; 还在为繁琐的电商开店流程烦恼吗&#xff1f;想要快速搭建自己的线上商城&#x…

Linux嵌入式学习——数据结构——线性表的链式结构

线性表的链式存储 解决顺序存储的缺点&#xff0c;插入和删除&#xff0c;动态存储问题。 特点&#xff1a; 线性表链式存储结构的特点是一组任意的存储单位存储线性表的数据元素&#xff0c;存储单元可以是连续的&#xff0c;也可以不连续。可以被存储在任意内存未被占…

vue3如何实现分面漏斗图

如下图&#xff1a; 这里我选择采用Vue3g2plot进行实现。 参考网址&#xff1a;https://g2plot.antv.antgroup.com/zh/examples/more-plots/funnel/#facet-transpose 核心代码如下&#xff1a; const data [{ stage: 简历筛选, number: 253, company: A公司 },{ stage: 初…

东京裸机云多IP服务器全面分析

东京裸机云多IP服务器是一种提供多IP地址分配和高性能网络服务的云计算解决方案&#xff0c;广泛应用于需要多IP管理和高稳定性的网络应用。下面将从几个方面具体介绍东京裸机云多IP服务器&#xff0c;rak部落为您整理发布东京裸机云多IP服务器的全面分析。 在数字化时代&#…

Docker 部署常用中间件(redis,rabbitMQ,mysql8,es,kibana,nginx等)亲测成功~~~

Docker 部署常用中间件 在日常开发中必要的环境&#xff0c;大多数都是单点后续持续更新集群模式~~~ docker 安装reids docker pull redis:7.2.5 编辑redis.conf # 绑定地址&#xff0c;默认只允许本机访问 # bind 192.168.1.100 10.0.0.1 # bind 127.0.0.1 ::1 bind 0.0…

学员心得 | 做好这几点,轻松通关云计算HCIE3.0!

烈日当空&#xff0c;骄阳似火&#xff0c;在这炎炎夏日迎来了自己今年一份满意的答卷--华为HCIE云计算“通过”的e妹儿&#xff08;嘿嘿&#xff01;证书也快了&#xff09;。此时此刻&#xff0c;想把自己“辛勤劳作”备考IE的一些经历和建议留给后来的同路人。 “世界上最可…