channel 通信的例子:
using consoleapp2; using system.collections.concurrent; using system.threading.channels; var queue = new blockingcollection<message>(new concurrentqueue<message>()); var opt = new boundedchanneloptions(10) { fullmode = boundedchannelfullmode.wait, singlereader = true, singlewriter = true, capacity = 100 //最大容量 }; //有限的 var channeltest = channel.createbounded<message>(opt); //无限的 var channel = channel.createunbounded<message>(); var sender1 = sendmessagethreadasync(channel.writer, 1); var sender2 = sendmessagethreadasync(channel.writer, 2); var receiver1 = receivemessagethreadasync(channel.reader, 3); var receiver2 = receivemessagethreadasync(channel.reader, 4); //await sender; // make sure all messages are received await task.whenall(sender1, sender2); channel.writer.complete(); await task.whenall(receiver1, receiver2); //await receiver; console.writeline("press any key to exit..."); console.readkey(); async task sendmessagethreadasync(channelwriter<message> writer, int id) { for (int i = 0; i < 20; i++) { await writer.writeasync(new message(id, i.tostring())); console.writeline($"thread {id} sent {i}"); await task.delay(100); } } async task receivemessagethreadasync(channelreader<message> reader, int id) { //try //{ // while (!reader.completion.iscompleted) // { // var message = await reader.readasync(); // console.writeline($"thread {id} received {message.content}"); // } //} //catch (exception ex) //{ // console.writeline($"thread {id} channel closed:{ex.message}"); //} await foreach (var message in reader.readallasync()) { console.writeline($"thread {id} received {message.content}"); } } record message(int fromid, string content);
改造为plc的实例
record plcdatamessage { public bool isconnected { get; init; } public dbdata dbdata { get; init; } // 可以添加其他需要传递的信息 }
// 创建一个无边界的channel来发送和接收消息 var plcdatachannel = channel.createunbounded<plcdatamessage>(); // 启动一个新的任务来模拟plc数据读取 task.factory.startnew(async () => { var cts = new cancellationtokensource(); // 假设您已经有了取消令牌源 while (!cts.iscancellationrequested) { try { // ... 省略了连接plc的代码,这部分逻辑保持不变 ... if (myisconnected) { dbdata dbdatatemp = await s7plc.readclassasync<dbdata>(42, 0); // 心跳和其他操作... // 构造消息并发送到channel var message = new plcdatamessage { isconnected = myisconnected, dbdata = dbdatatemp }; await plcdatachannel.writer.writeasync(message, cts.token); } // ... 其他逻辑保持不变 ... } catch (exception ex) { // 处理异常并重新连接plc(如果需要) // ... // 可以通过channel发送一个特殊的消息来表示连接已断开或发生了错误 // 这里省略了这部分逻辑 // 休眠一段时间后再重试 await task.delay(2000, cts.token); } } // 完成后通知channel不再发送更多数据 plcdatachannel.writer.complete(); }, cts.token, taskcreationoptions.longrunning, taskscheduler.default); // 在另一个任务或线程中读取channel中的数据 task.run(async () => { await foreach (var message in plcdatachannel.reader.readallasync(cts.token)) { if (message.isconnected) { lock (lockobj) { // 更新dbdata,这里假设dbdata是一个线程安全的对象或结构 dbdata.str_s = message.dbdata.str_s.trim(); // ... 更新其他属性 ... } // 处理读取到的数据... } else { // 处理plc断开连接的情况... } } // 读取完成,channel已关闭 console.writeline("plc数据读取完毕。"); }, cts.token); // ... 其他代码,如等待所有任务完成、处理取消逻辑等 ...
using system; using system.threading; using system.threading.channels; using system.threading.tasks; // ... 其他必要的引用和类型定义 ... // 创建一个无边界的channel来发送和接收消息 var plcdatachannel = channel.createunbounded<plcdatamessage>(); // 取消令牌源 var cts = new cancellationtokensource(); // 启动一个新的任务来模拟plc数据读取 task.run(async () => { plc s7plc = null; bool myisconnected = false; int errortimes = 0; try { while (!cts.iscancellationrequested) { if (s7plc == null || !myisconnected) { // 尝试连接plc(略去具体实现) // ... if (myisconnected) { // 连接成功,发送连接成功消息(如果需要) // ... } } else { try { // 读取plc数据(略去具体实现) dbdata dbdatatemp = await s7plc.readclassasync<dbdata>(42, 0, cts.token); // 心跳和其他操作... // 构造消息并发送到channel var message = new plcdatamessage { isconnected = myisconnected, dbdata = dbdatatemp }; await plcdatachannel.writer.writeasync(message, cts.token); errortimes = 0; // 重置错误计数器 } catch (exception ex) { errortimes++; // 处理异常(例如记录日志) // ... // 在达到一定错误次数后,关闭plc连接并重置 if (errortimes > somethreshold) { s7plc?.close(); s7plc = null; myisconnected = false; // 可以选择发送一个断开连接的消息到channel } // 休眠一段时间后再重试 await task.delay(2000, cts.token); } } // 可以添加一些延时来减少循环的频率 await task.delay(somepollinginterval, cts.token); } } catch (operationcanceledexception) { // 取消是预期的,不需要额外处理 } finally { // 确保关闭plc连接和channel写入器 s7plc?.close(); plcdatachannel.writer.complete(); } }, cts.token); // 在另一个任务或线程中读取channel中的数据 task.run(async () => { await foreach (var message in plcdatachannel.reader.readallasync(cts.token)) { if (message.isconnected) { // 更新dbdata(这里假设dbdata是一个线程安全的对象或结构) // 根据需要添加适当的同步机制 // ... // 处理读取到的数据... } else { // 处理plc断开连接的情况... } } // 读取完成,channel已关闭 console.writeline("plc数据读取完毕。"); }, cts.token); // ... 其他代码,如等待所有任务完成、处理取消逻辑等 ... // 在某个适当的时刻取消任务 // cts.cancel(); // 等待所有任务完成(如果需要
拓展:c# channel实现线程间通信
c# channel实现线程间通信
同步方式实现:
using system; using system.collections.concurrent; using system.collections.generic; using system.linq; using system.text; using system.threading; using system.threading.channels; using system.threading.tasks; namespace consoleapp1 { public class channeldemo { static channel<message> channel1 = channel.createunbounded<message>(); public static void main2() { sender.start(1); receive1.start(2); receive2.start(3); sender.join(); thread.sleep(3000); receive1.interrupt(); receive2.interrupt(); receive1.join(); receive2.join(); console.readkey(); } static thread sender = new thread(sendmsg); static thread receive1 = new thread(receivemsg); static thread receive2 = new thread(receivemsg); static void sendmsg(object id) { for (int i = 0; i < 20; i++) { if (channel1.writer.trywrite(new message((int)id, i.tostring()))) { console.writeline($"【线程{id}】发送了【{i}】"); } } } static void receivemsg(object id) { try { while (true) { if (channel1.reader.tryread(out message message)) { console.writeline($"【线程{id}】从【线程{message.id}】接收了【{message.content}】"); } thread.sleep(1); } } catch (threadinterruptedexception ex) { console.writeline($"接收结束"); } } } }
异步方式:
using system; using system.collections.concurrent; using system.collections.generic; using system.linq; using system.runtime.remoting.channels; using system.text; using system.threading; using system.threading.channels; using system.threading.tasks; namespace consoleapp1 { public class channeldemo2 { static channel<message> channel1 = channel.createunbounded<message>(); public static async void main2() { await task.whenall(sender, sender2); channel1.writer.complete(); await task.whenall(receive1, receive2); console.readkey(); } static task sender = sendmsgasync(channel1.writer, 1); static task sender2 = sendmsgasync(channel1.writer, 4); static task receive1 = receivemsgasync(channel1.reader, 2); static task receive2 = receivemsgasync(channel1.reader, 3); static async task sendmsgasync(channelwriter<message> writer, int id) { for (int i = 0; i < 20; i++) { await writer.writeasync(new message((int)id, i.tostring())); console.writeline($"【线程{id}】发送了【{i}】"); } } static async task receivemsgasync(channelreader<message> reader,int id) { try { while (!reader.completion.iscompleted) { message message = await reader.readasync(); console.writeline($"【线程{id}】从【线程{message.id}】接收了【{message.content}】"); } } catch (channelclosedexception ex) { console.writeline($"channelclosed 接收结束"); } } } }
在对channel进行实例化的时候,也可以传递一个options,这里面可以对消息容量,是否多个发送者和接受者进行定义。
以上就是c#使用channel实现plc异步任务之间的通信的详细内容,更多关于c# channel plc异步通信的资料请关注代码网其它相关文章!
发表评论