前言
进程通信一般情况下比较少用,但是也有一些使用场景,有些做视频传输的似乎会用多进程来实现,还有在子进程中调用特定的库来避免内存泄漏,笔者最近也遇到了需要使用多进程的场景。多进程的使用最主要的就是进程间的通信,本文参考了go语言的ipc库,实现了一个基于共享内存的跨进程队列。
一、实现原理
1、用到的主要对象
//共享内存管理对象 memorymappedfile _mmf; //跨进程的互斥变量 mutex _mtx; //入队信号量 semaphore _semaeq; //出队信号量 semaphore _semadq;
2、创建共享内存
创建共享内存需要使用memorymappedfile.createfromfile实现跨平台。createnew只能创建无法打开第二个,openexisting只支持windows。
string name="共享内存标识名称"; _shmpath="共享内存文件路径"+name; //通过文件路径创建共享内存 _mmf = memorymappedfile.createfromfile(file.open(_shmpath, filemode.openorcreate, fileaccess.readwrite, fileshare.readwrite), null, (_queuetheadersize + (elementbodymaxsize + _elementheadersize) * capacity), memorymappedfileaccess.readwrite, handleinheritability.inheritable, false); //创建互斥变量 _mtx = new mutex(false, name + ".mx"); //创建入队信号量,capacity为队列元素个数容量 _semaeq = new semaphore(0, (int)capacity, name+ ".eq"); //创建出队信号量,capacity为队列元素个数容量 _semadq = new semaphore((int)capacity, (int)capacity, name + ".dq");
获取读写对象
_mmva = _mmf.createviewaccessor();
值类型数组方式写入
t[] obj; _mmva.writearray<t>(position , obj, 0, obj.length);
3、头部信息
采用循环队列方式实现,判断队空队满通过count、capacity的方式(参考了c#的queue源码),避免占用多一个空间。
struct queueheader
{
//元素大小
public nint elementsize;
//队列容量
public nint capacity;
//当前元素个数
public nint count;
//队列头
public nint front;
//队列尾
public nint rear;
}
队列头信息需要存储在共享内存中。
queueheader header
{
get
{
queueheader header;
_mmva.read(0, out header);
return header;
}
set
{
_mmva.write(0, ref value);
}
}
4、入队
示例如下
bool enqueuee<t>(t[] obj) where t : struct
{
//共享内存中读取header
var header = header;
//队列满返回
if (header.count == header.capacity) return false;
//计算写入的位置,头部长度+队尾*元素大小
nint position = _queuetheadersize + header.rear * header.elementsize;
//写入共享内存
_mmva.writearray<t>(position, obj, 0, obj.length);
//更新队尾
header.rear = (header.rear + 1) % header.capacity;
//更新长度
header.count++;
//更新头部信息到共享内存
header=header;
return true;
}
同步
//等待出队信号量(如果队列满则会等待)
if (!_semadq.waitone(timeout)) return false;
//进入互斥锁
if (!_mtx.waitone(timeout)) return false;
try
{
//入队
enqueue(obj);
}
finally
{
//通知入队信号量
_semaeq.release();
//释放互斥锁
_mtx.releasemutex();
}
return true;
5、出队
object dequeue()
{
//共享内存中读取header
var header = header;
//队列空则返回
if (header.count == 0) return null;
//计算读取的位置,头部长度+队头*元素大小
long position = _queuetheadersize + header.front * header.elementsize;
//创建数据用于装载数据
array arr = array.createinstance(readtype, msg.header.arraylength);
//将泛型转type调用。
var readarray = _readarraygeneric.makegenericmethod(readtype);
//读取共享内存的数据
readarray.invoke(_mmva, [position , arr, 0, arr.length]);
//更新队头
header.front = (header.front + 1) % header.capacity;
//更新长度
header.count--;
//更新头部信息到共享内存
header = header;
return msg;
}
同步
//等待入队信号量(如果队列空则会等待)
if (!_semaeq.waitone(timeout)) return null;
//进入互斥锁
if (!_mtx.waitone(timeout!)) return null;
try
{
//出队
return dequeue();
}
finally
{
//通知入队信号量
_semadq.release();
//释放互斥锁
_mtx.releasemutex();
}
6、释放资源
/// <summary>
/// 销毁队列,只会销毁当前实例,如果多个队列打开同个名称,其他队列不受影响
/// </summary>
public void dispose()
{
_mmf.dispose();
_mmva.dispose();
_mtx.dispose();
_semaeq.dispose();
_semadq.dispose();
}
二、完整代码
类的定义
/// <summary>
/// 共享队列
/// 基于共享内存实现
/// </summary>
class sharedqueue : idisposable
{
/// <summary>
/// 名称
/// </summary>
public string name { get; private set; }
/// <summary>
/// 元素最大大小
/// </summary>
public long elementmaxsize { get; private set; }
/// <summary>
/// 队列容量
/// </summary>
public long capacity { get; private set; }
/// <summary>
/// 表示是否新创建,是则是创建,否则是打开已存在的。
/// </summary>
public bool isnewcreate { get; private set; }
/// <summary>
/// 构造方法
/// </summary>
/// <param name="name">唯一名称,系统级别,不同进程创建相同名称的本对象,就是同一个队列,可以进行数据传输。</param>
/// <param name="capacity">队列容量,元素个数总量</param>
/// <param name="elementbodymaxsize">队列元素最大大小,此大小需要考虑传输数据type.fullname长度</param>
public sharedqueue(string name, nint capacity = 1, nint elementbodymaxsize = 3145728);
/// <summary>
/// 发送数据
/// </summary>
/// <param name="obj">发送的对象,支持值类型(元类型、结构体)、值类型数组、可json序列化的任意对象(实体类、数组、list、字典等等),无法序列化会产生异常。
/// 会根据类型自动判断传输方式,值类型以及值类型数组会直接内存拷贝,引用类型会进行序列化。
/// 此方法队列满了会阻塞,直到发送成功才返回。
/// </param>
/// <param name="isforceserialize">是否强制序列化,结构体不含引用的情况下会直接复制数据性能较高,但是如果结构体成员变量有引用类型则会引发异常,此时可以强制序列化。</param>
public void send(object obj, bool isforceserialize = false);
/// <summary>
/// 接收数据
/// 此方法队列空会阻塞,直到有数据才返回。
/// </summary>
/// <returns>接收的数据,与send的数据类型对应。可以通过type或is判断,或者提前知道类型直接转换</returns>
public object receive();
/// <summary>
/// 发送数据超时
/// </summary>
/// <param name="obj">发送的对象,支持值类型(元类型、结构体)、值类型数组、可json序列化的任意对象(实体类、数组、list、字典等等),无法序列化会产生异常。
/// 会根据类型自动判断传输方式,值类型以及值类型数组会直接内存拷贝,引用类型会进行序列化。</param>
/// <param name="timeout">超时时长</param>
/// <param name="isforceserialize">是否强制序列化,结构体不含引用的情况下会直接复制数据性能较高,但是如果结构体成员变量有引用类型则会引发异常,此时可以强制序列化。</param>
/// <returns>true发送成功,false超时</returns>
public bool sendtimeout(object obj, timespan timeout, bool isforceserialize = false);
/// <summary>
/// 接收超时
/// </summary>
/// <param name="timeout">超时时长</param>
/// <returns>接收的数据,与send的数据类型对应。可以通过type或is判断,或者提前知道类型直接转换。
/// 超时返回null。
/// </returns>
public object? receivetimeout(timespan timeout);
/// <summary>
/// 销毁队列,只会销毁当前实例,如果多个队列打开同个名称,其他队列不受影响
/// </summary>
public void dispose();
}
三、使用示例
1、传输byte[]数据
进程a
sharedqueue shq= new sharedqueue("shq1", 10);
byte[] a = new byte[5] { 1, 2, 3, 4, 5 };
//发送数据
shq.send(a);
进程b
sharedqueue shq= new sharedqueue("shq1", 10);
//接收数据
var a=shq.receive() as byte[];
console.write("receive: ");
foreach (var i in a)
{
console.write(i);
}

2、传输字符串
进程a
sharedqueue shq= new sharedqueue("shq1", 10,64);
shq.send("12345");
进程b
sharedqueue shq= new sharedqueue("shq1", 10,64);
var a=shq.receive() as string;
console.writeline("receive: " + a);

3、传输对象
class a
{
public string name;
public int number;
}
进程a
sharedqueue shq= new sharedqueue("shq1", 10,64);
sq.send(new a() { name = "tommy", number = 102185784 });
进程b
sharedqueue shq= new sharedqueue("shq1", 10,64);
var a=shq.receive() as a;
console.writeline("receive: " + a.name + " " + a.number);

总结
以上就是今天要讲的内容,实现这样的一个对象,虽然代码量不多,但还是有一点难度的,很多细节需要处理,比如泛型转type以统一接口,信号量实现队列和条件变量是有差异的,用createfromfile才能实现跨平台。总的来说,有了这样的一个队列,跨线程通信就变的比较方便且高效了。
到此这篇关于c#基于共享内存实现跨进程队列的文章就介绍到这了,更多相关c#跨进程队列内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论