前言
c#提供的多进程同步对象有互斥锁和信号量,但是并没有条件变量。虽然信号量条件变量一定程度可以等效,但是具体的使用还是会有区别。比如说消息队列用条件变量就比信号量方便,用信号量要么缺乏灵活性,要么辅助代码已经和实现一个条件变量没区别了。本文提供了一种条件变量的实现方法,可以用于进程间的同步控制。
一、关键实现
1、用到的主要对象
下列对象都是跨进程的
//互斥变量, mutex _mtx; //等待发送信号量 semaphore _waitsem; //等待完成信号量 semaphore _waitdone; //共享内存,用于存储计数变量 memorymappedfile _mmf; //共享内存的读写对象 memorymappedviewaccessor _mmva;
2、初始化区分创建和打开
利用mutex判断是创建还是打开
bool iscreatenew; _mtx = new mutex(false, name, out iscreatenew); if(iscreatenew){ //只能在创建时,初始化共享变量 }
3、变量放到共享内存
条件变量需要的计算对象就两个waiting、signals表示等待数和释放数。
//放到共享内存的数据 struct shareddata { public int waiting; public int signals; } shareddata data { set { _mmva.write(0, ref value); } get { shareddata ret; _mmva.read(0, out ret); return ret; } }
4、等待和释放逻辑
参考了sdl2的条件变量实现,具体略。有兴趣的朋友可以自行查找源码查看。
二、完整代码
using system.io.memorymappedfiles; using system.runtime.interopservices; namespace ac { /************************************************************************ * @project: ac::conditionvariable * @decription: 条件变量 * 支持跨进程 * @verision: v1.0.0 * 更新日志 * v1.0.0:实现基本功能 * @author: xin * @create: 2024/07/18 15:25:00 * @lastupdate: 2024/07/21 20:53:00 ************************************************************************ * copyright @ 2025. all rights reserved. ************************************************************************/ class conditionvariable : idisposable { /// <summary> /// 构造方法 /// </summary> public conditionvariable() { bool iscreatenew; initialize(null, out iscreatenew); } /// <summary> /// 构造方法 /// </summary> /// <param name="name">唯一名称,系统级别,不同进程创建相同名称的本对象,就是同一个条件变量。</param> public conditionvariable(string? name) { bool iscreatenew; initialize(name, out iscreatenew); } /// <summary> /// 构造方法 /// </summary> /// <param name="name">唯一名称,系统级别,不同进程创建相同名称的本对象,就是同一个条件变量。</param> /// <param name="iscreatenew">表示是否新创建,是则是创建,否则是打开已存在的。</param> public conditionvariable(string? name, out bool iscreatenew) { initialize(name, out iscreatenew); } /// <summary> /// 等待 /// </summary> /// <param name="outermtx">外部锁</param> public void waitone(mutex outermtx) { waitone(timeout.infinitetimespan, outermtx); } /// <summary> /// 等待超时 /// </summary> /// <param name="timeout">超时时间</param> /// <param name="outermtx">外部锁</param> /// <returns>是则成功,否则超时</returns> public bool waitone(timespan timeout, mutex outermtx) { bool isnottimeout; //记录等待数量 _mtx.waitone(); var ws = data; ws.waiting++; data = ws; _mtx.releasemutex(); //解除外部的互斥锁,让其他线程可以进入条件等待。 outermtx.releasemutex(); //等待信号 isnottimeout = _waitsem.waitone(timeout); _mtx.waitone(); ws = data; if (isnottimeout && ws.signals > 0) { //通知发送信号的线程,等待完成。 _waitdone.release(); ws.signals--; } ws.waiting--; data = ws; _mtx.releasemutex(); //加上外部互斥锁,还原外部的锁状态。 outermtx.waitone(); return !isnottimeout; } /// <summary> /// 释放,通知 /// </summary> public void release() { _mtx.waitone(); var ws = data; if (ws.waiting > ws.signals) { ws.signals++; data = ws; _waitsem.release(); _mtx.releasemutex(); _waitdone.waitone(); } else { _mtx.releasemutex(); } } /// <summary> /// 释放全部,广播 /// </summary> public void releaseall() { _mtx.waitone(); var ws = data; if (ws.waiting > ws.signals) { int waiting = ws.waiting - ws.signals; ws.signals = ws.waiting; data = ws; _waitsem.release(waiting); _mtx.releasemutex(); _waitdone.waitone(waiting); } else { _mtx.releasemutex(); } } /// <summary> /// 销毁对象,只会销毁当前实例,如果多个打开同个名称,其他对象不受影响 /// </summary> public void dispose() { _mtx.dispose(); _waitsem.dispose(); _waitdone.dispose(); _mmva.dispose(); _mmf.dispose(); } void initialize(string? name, out bool iscreatenew) { mutex? mtx = null; semaphore? waitsem = null; semaphore? waitdone = null; memorymappedfile? mmf = null; memorymappedviewaccessor? mmva = null; try { mtx = _mtx = new mutex(false, name, out iscreatenew); _mtx.waitone(); try { waitsem = _waitsem = new semaphore(0, int.maxvalue, name + ".cv.ws"); waitdone = _waitdone = new semaphore(0, int.maxvalue, name + ".cv.wd"); var _shmpath = path.combine(_tempdirectory, name + ".cv"); mmf = _mmf = memorymappedfile.createfromfile(file.open(_shmpath, filemode.openorcreate, fileaccess.readwrite, fileshare.readwrite), null, marshal.sizeof<shareddata>(), memorymappedfileaccess.readwrite, handleinheritability.inheritable, false); mmva = _mmva = _mmf.createviewaccessor(); if (iscreatenew) data = new shareddata() { signals = 0, waiting = 0 }; } finally { _mtx.releasemutex(); } } catch { mtx?.dispose(); waitsem?.dispose(); waitdone?.dispose(); mmf?.dispose(); mmva?.dispose(); iscreatenew = false; throw; } } mutex _mtx; semaphore _waitsem; semaphore _waitdone; memorymappedfile _mmf; memorymappedviewaccessor _mmva; struct shareddata { public int waiting; public int signals; } shareddata data { set { _mmva.write(0, ref value); } get { shareddata ret; _mmva.read(0, out ret); return ret; } } static string _tempdirectory = path.gettemppath() + "ee3e9111-8f65-4d68-ab2b-a018dd9ecf3c"; } }
三、使用示例
1、同步控制
using ac; conditionvariable cv = new conditionvariable(); mutex mutex = new mutex(); string text = ""; //子线程发送消息 new thread(() => { int n = 0; while (true) { mutex.waitone(); text = (n++).tostring(); //通知主线程 cv.release(); mutex.releasemutex(); } }).start(); //主线程接收消息 while (true) { mutex.waitone(); //等待子消息 cv.waitone(mutex); console.writeline(text); mutex.releasemutex(); }
2、跨进程控制
进程a
//不同进程名称相同就是同一个对象 conditionvariable cv = new conditionvariable("cv1"); mutex mutex = new mutex(false,"mx1"); //进程a发送消息 while (true) { mutex.waitone(); //共享进程读写略 //通知进程b cv.release(); mutex.releasemutex(); }
进程b
//不同进程名称相同就是同一个对象 conditionvariable cv = new conditionvariable("cv1"); mutex mutex = new mutex(false,"mx1"); //进程b接收消息 while (true) { mutex.waitone(); //等待进a程消息 cv.waitone(mutex); //共享进程读写略 console.writeline("收到进程a消息"); mutex.releasemutex(); }
总结
以上就是今天要讲的内容,之所以实现这样一个对象是因为,笔者在写跨进程队列通信,用信号量实现发现有所局限,想要完善与重写一个条件变量差异不大,索性直接实现一个条件变量,提供给队列使用,同时还具体通用性,在其他地方也能使用。总的来说,条件变量还是有用的,虽然需要遇到相应的使用场景才能意识到它的作用。
到此这篇关于c#实现跨进程条件变量的示例代码的文章就介绍到这了,更多相关c#跨进程条件变量内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论