channel 通信的例子:
using consoleapp2;
using system.collections.concurrent;
using system.threading.channels;
var queue = new blockingcollection<message>(new concurrentqueue<message>());
var opt = new boundedchanneloptions(10)
{
fullmode = boundedchannelfullmode.wait,
singlereader = true,
singlewriter = true,
capacity = 100 //最大容量
};
//有限的
var channeltest = channel.createbounded<message>(opt);
//无限的
var channel = channel.createunbounded<message>();
var sender1 = sendmessagethreadasync(channel.writer, 1);
var sender2 = sendmessagethreadasync(channel.writer, 2);
var receiver1 = receivemessagethreadasync(channel.reader, 3);
var receiver2 = receivemessagethreadasync(channel.reader, 4);
//await sender;
// make sure all messages are received
await task.whenall(sender1, sender2);
channel.writer.complete();
await task.whenall(receiver1, receiver2);
//await receiver;
console.writeline("press any key to exit...");
console.readkey();
async task sendmessagethreadasync(channelwriter<message> writer, int id)
{
for (int i = 0; i < 20; i++)
{
await writer.writeasync(new message(id, i.tostring()));
console.writeline($"thread {id} sent {i}");
await task.delay(100);
}
}
async task receivemessagethreadasync(channelreader<message> reader, int id)
{
//try
//{
// while (!reader.completion.iscompleted)
// {
// var message = await reader.readasync();
// console.writeline($"thread {id} received {message.content}");
// }
//}
//catch (exception ex)
//{
// console.writeline($"thread {id} channel closed:{ex.message}");
//}
await foreach (var message in reader.readallasync())
{
console.writeline($"thread {id} received {message.content}");
}
}
record message(int fromid, string content);
改造为plc的实例
record plcdatamessage
{
public bool isconnected { get; init; }
public dbdata dbdata { get; init; }
// 可以添加其他需要传递的信息
}
// 创建一个无边界的channel来发送和接收消息
var plcdatachannel = channel.createunbounded<plcdatamessage>();
// 启动一个新的任务来模拟plc数据读取
task.factory.startnew(async () =>
{
var cts = new cancellationtokensource(); // 假设您已经有了取消令牌源
while (!cts.iscancellationrequested)
{
try
{
// ... 省略了连接plc的代码,这部分逻辑保持不变 ...
if (myisconnected)
{
dbdata dbdatatemp = await s7plc.readclassasync<dbdata>(42, 0);
// 心跳和其他操作...
// 构造消息并发送到channel
var message = new plcdatamessage
{
isconnected = myisconnected,
dbdata = dbdatatemp
};
await plcdatachannel.writer.writeasync(message, cts.token);
}
// ... 其他逻辑保持不变 ...
}
catch (exception ex)
{
// 处理异常并重新连接plc(如果需要)
// ...
// 可以通过channel发送一个特殊的消息来表示连接已断开或发生了错误
// 这里省略了这部分逻辑
// 休眠一段时间后再重试
await task.delay(2000, cts.token);
}
}
// 完成后通知channel不再发送更多数据
plcdatachannel.writer.complete();
}, cts.token, taskcreationoptions.longrunning, taskscheduler.default);
// 在另一个任务或线程中读取channel中的数据
task.run(async () =>
{
await foreach (var message in plcdatachannel.reader.readallasync(cts.token))
{
if (message.isconnected)
{
lock (lockobj)
{
// 更新dbdata,这里假设dbdata是一个线程安全的对象或结构
dbdata.str_s = message.dbdata.str_s.trim();
// ... 更新其他属性 ...
}
// 处理读取到的数据...
}
else
{
// 处理plc断开连接的情况...
}
}
// 读取完成,channel已关闭
console.writeline("plc数据读取完毕。");
}, cts.token);
// ... 其他代码,如等待所有任务完成、处理取消逻辑等 ...
using system;
using system.threading;
using system.threading.channels;
using system.threading.tasks;
// ... 其他必要的引用和类型定义 ...
// 创建一个无边界的channel来发送和接收消息
var plcdatachannel = channel.createunbounded<plcdatamessage>();
// 取消令牌源
var cts = new cancellationtokensource();
// 启动一个新的任务来模拟plc数据读取
task.run(async () =>
{
plc s7plc = null;
bool myisconnected = false;
int errortimes = 0;
try
{
while (!cts.iscancellationrequested)
{
if (s7plc == null || !myisconnected)
{
// 尝试连接plc(略去具体实现)
// ...
if (myisconnected)
{
// 连接成功,发送连接成功消息(如果需要)
// ...
}
}
else
{
try
{
// 读取plc数据(略去具体实现)
dbdata dbdatatemp = await s7plc.readclassasync<dbdata>(42, 0, cts.token);
// 心跳和其他操作...
// 构造消息并发送到channel
var message = new plcdatamessage { isconnected = myisconnected, dbdata = dbdatatemp };
await plcdatachannel.writer.writeasync(message, cts.token);
errortimes = 0; // 重置错误计数器
}
catch (exception ex)
{
errortimes++;
// 处理异常(例如记录日志)
// ...
// 在达到一定错误次数后,关闭plc连接并重置
if (errortimes > somethreshold)
{
s7plc?.close();
s7plc = null;
myisconnected = false;
// 可以选择发送一个断开连接的消息到channel
}
// 休眠一段时间后再重试
await task.delay(2000, cts.token);
}
}
// 可以添加一些延时来减少循环的频率
await task.delay(somepollinginterval, cts.token);
}
}
catch (operationcanceledexception)
{
// 取消是预期的,不需要额外处理
}
finally
{
// 确保关闭plc连接和channel写入器
s7plc?.close();
plcdatachannel.writer.complete();
}
}, cts.token);
// 在另一个任务或线程中读取channel中的数据
task.run(async () =>
{
await foreach (var message in plcdatachannel.reader.readallasync(cts.token))
{
if (message.isconnected)
{
// 更新dbdata(这里假设dbdata是一个线程安全的对象或结构)
// 根据需要添加适当的同步机制
// ...
// 处理读取到的数据...
}
else
{
// 处理plc断开连接的情况...
}
}
// 读取完成,channel已关闭
console.writeline("plc数据读取完毕。");
}, cts.token);
// ... 其他代码,如等待所有任务完成、处理取消逻辑等 ...
// 在某个适当的时刻取消任务
// cts.cancel();
// 等待所有任务完成(如果需要
拓展:c# channel实现线程间通信
c# channel实现线程间通信
同步方式实现:
using system;
using system.collections.concurrent;
using system.collections.generic;
using system.linq;
using system.text;
using system.threading;
using system.threading.channels;
using system.threading.tasks;
namespace consoleapp1
{
public class channeldemo
{
static channel<message> channel1 = channel.createunbounded<message>();
public static void main2()
{
sender.start(1);
receive1.start(2);
receive2.start(3);
sender.join();
thread.sleep(3000);
receive1.interrupt();
receive2.interrupt();
receive1.join();
receive2.join();
console.readkey();
}
static thread sender = new thread(sendmsg);
static thread receive1 = new thread(receivemsg);
static thread receive2 = new thread(receivemsg);
static void sendmsg(object id)
{
for (int i = 0; i < 20; i++)
{
if (channel1.writer.trywrite(new message((int)id, i.tostring())))
{
console.writeline($"【线程{id}】发送了【{i}】");
}
}
}
static void receivemsg(object id)
{
try
{
while (true)
{
if (channel1.reader.tryread(out message message))
{
console.writeline($"【线程{id}】从【线程{message.id}】接收了【{message.content}】");
}
thread.sleep(1);
}
}
catch (threadinterruptedexception ex)
{
console.writeline($"接收结束");
}
}
}
}
异步方式:
using system;
using system.collections.concurrent;
using system.collections.generic;
using system.linq;
using system.runtime.remoting.channels;
using system.text;
using system.threading;
using system.threading.channels;
using system.threading.tasks;
namespace consoleapp1
{
public class channeldemo2
{
static channel<message> channel1 = channel.createunbounded<message>();
public static async void main2()
{
await task.whenall(sender, sender2);
channel1.writer.complete();
await task.whenall(receive1, receive2);
console.readkey();
}
static task sender = sendmsgasync(channel1.writer, 1);
static task sender2 = sendmsgasync(channel1.writer, 4);
static task receive1 = receivemsgasync(channel1.reader, 2);
static task receive2 = receivemsgasync(channel1.reader, 3);
static async task sendmsgasync(channelwriter<message> writer, int id)
{
for (int i = 0; i < 20; i++)
{
await writer.writeasync(new message((int)id, i.tostring()));
console.writeline($"【线程{id}】发送了【{i}】");
}
}
static async task receivemsgasync(channelreader<message> reader,int id)
{
try
{
while (!reader.completion.iscompleted)
{
message message = await reader.readasync();
console.writeline($"【线程{id}】从【线程{message.id}】接收了【{message.content}】");
}
}
catch (channelclosedexception ex)
{
console.writeline($"channelclosed 接收结束");
}
}
}
}
在对channel进行实例化的时候,也可以传递一个options,这里面可以对消息容量,是否多个发送者和接受者进行定义。
以上就是c#使用channel实现plc异步任务之间的通信的详细内容,更多关于c# channel plc异步通信的资料请关注代码网其它相关文章!
发表评论