当前位置: 代码网 > it编程>编程语言>C# > C#使用channel实现Plc异步任务之间的通信

C#使用channel实现Plc异步任务之间的通信

2024年06月14日 C# 我要评论
channel 通信的例子:using consoleapp2;using system.collections.concurrent;using system.threading.channels;

channel 通信的例子:

using consoleapp2;
using system.collections.concurrent;
using system.threading.channels;

var queue = new blockingcollection<message>(new concurrentqueue<message>());

var opt = new boundedchanneloptions(10)
{
    fullmode = boundedchannelfullmode.wait,
    singlereader = true,
    singlewriter = true,
    capacity = 100 //最大容量
};

//有限的
var channeltest = channel.createbounded<message>(opt);


//无限的
var channel = channel.createunbounded<message>();

var sender1 = sendmessagethreadasync(channel.writer, 1);
var sender2 = sendmessagethreadasync(channel.writer, 2);
var receiver1 = receivemessagethreadasync(channel.reader, 3);
var receiver2 = receivemessagethreadasync(channel.reader, 4);
//await sender;
// make sure all messages are received

await task.whenall(sender1, sender2);

channel.writer.complete();

await task.whenall(receiver1, receiver2);

//await receiver;

console.writeline("press any key to exit...");
console.readkey();

async task sendmessagethreadasync(channelwriter<message> writer, int id)
{
    for (int i = 0; i < 20; i++)
    {
        await writer.writeasync(new message(id, i.tostring()));
        console.writeline($"thread {id} sent {i}");
        await task.delay(100);
    }
}

async task receivemessagethreadasync(channelreader<message> reader, int id)
{

    //try
    //{
    //    while (!reader.completion.iscompleted)
    //    {
    //        var message = await reader.readasync();
    //        console.writeline($"thread {id} received {message.content}");
    //    }
    //}
    //catch (exception ex)
    //{
    //    console.writeline($"thread {id} channel closed:{ex.message}");
    //}

    await foreach (var message in reader.readallasync())
    {
        console.writeline($"thread {id} received {message.content}");
    }
}

record message(int fromid, string content);






改造为plc的实例

record plcdatamessage  
{  
    public bool isconnected { get; init; }  
    public dbdata dbdata { get; init; }  
    // 可以添加其他需要传递的信息  
}
// 创建一个无边界的channel来发送和接收消息  
var plcdatachannel = channel.createunbounded<plcdatamessage>();  
  
// 启动一个新的任务来模拟plc数据读取  
task.factory.startnew(async () =>  
{  
    var cts = new cancellationtokensource(); // 假设您已经有了取消令牌源  
    while (!cts.iscancellationrequested)  
    {  
        try  
        {  
            // ... 省略了连接plc的代码,这部分逻辑保持不变 ...  
  
            if (myisconnected)  
            {  
                dbdata dbdatatemp = await s7plc.readclassasync<dbdata>(42, 0);  
  
                // 心跳和其他操作...  
  
                // 构造消息并发送到channel  
                var message = new plcdatamessage  
                {  
                    isconnected = myisconnected,  
                    dbdata = dbdatatemp  
                };  
                await plcdatachannel.writer.writeasync(message, cts.token);  
            }  
  
            // ... 其他逻辑保持不变 ...  
        }  
        catch (exception ex)  
        {  
            // 处理异常并重新连接plc(如果需要)  
            // ...  
  
            // 可以通过channel发送一个特殊的消息来表示连接已断开或发生了错误  
            // 这里省略了这部分逻辑  
  
            // 休眠一段时间后再重试  
            await task.delay(2000, cts.token);  
        }  
    }  
  
    // 完成后通知channel不再发送更多数据  
    plcdatachannel.writer.complete();  
}, cts.token, taskcreationoptions.longrunning, taskscheduler.default);  
  
// 在另一个任务或线程中读取channel中的数据  
task.run(async () =>  
{  
    await foreach (var message in plcdatachannel.reader.readallasync(cts.token))  
    {  
        if (message.isconnected)  
        {  
            lock (lockobj)  
            {  
                // 更新dbdata,这里假设dbdata是一个线程安全的对象或结构  
                dbdata.str_s = message.dbdata.str_s.trim();  
                // ... 更新其他属性 ...  
            }  
            // 处理读取到的数据...  
        }  
        else  
        {  
            // 处理plc断开连接的情况...  
        }  
    }  
  
    // 读取完成,channel已关闭  
    console.writeline("plc数据读取完毕。");  
}, cts.token);  
  
// ... 其他代码,如等待所有任务完成、处理取消逻辑等 ...
using system;  
using system.threading;  
using system.threading.channels;  
using system.threading.tasks;  
  
// ... 其他必要的引用和类型定义 ...  
  
// 创建一个无边界的channel来发送和接收消息  
var plcdatachannel = channel.createunbounded<plcdatamessage>();  
  
// 取消令牌源  
var cts = new cancellationtokensource();  
  
// 启动一个新的任务来模拟plc数据读取  
task.run(async () =>  
{  
    plc s7plc = null;  
    bool myisconnected = false;  
    int errortimes = 0;  
  
    try  
    {  
        while (!cts.iscancellationrequested)  
        {  
            if (s7plc == null || !myisconnected)  
            {  
                // 尝试连接plc(略去具体实现)  
                // ...  
  
                if (myisconnected)  
                {  
                    // 连接成功,发送连接成功消息(如果需要)  
                    // ...  
                }  
            }  
            else  
            {  
                try  
                {  
                    // 读取plc数据(略去具体实现)  
                    dbdata dbdatatemp = await s7plc.readclassasync<dbdata>(42, 0, cts.token);  
  
                    // 心跳和其他操作...  
  
                    // 构造消息并发送到channel  
                    var message = new plcdatamessage { isconnected = myisconnected, dbdata = dbdatatemp };  
                    await plcdatachannel.writer.writeasync(message, cts.token);  
  
                    errortimes = 0; // 重置错误计数器  
                }  
                catch (exception ex)  
                {  
                    errortimes++;  
                    // 处理异常(例如记录日志)  
                    // ...  
  
                    // 在达到一定错误次数后,关闭plc连接并重置  
                    if (errortimes > somethreshold)  
                    {  
                        s7plc?.close();  
                        s7plc = null;  
                        myisconnected = false;  
                        // 可以选择发送一个断开连接的消息到channel  
                    }  
  
                    // 休眠一段时间后再重试  
                    await task.delay(2000, cts.token);  
                }  
            }  
  
            // 可以添加一些延时来减少循环的频率  
            await task.delay(somepollinginterval, cts.token);  
        }  
    }  
    catch (operationcanceledexception)  
    {  
        // 取消是预期的,不需要额外处理  
    }  
    finally  
    {  
        // 确保关闭plc连接和channel写入器  
        s7plc?.close();  
        plcdatachannel.writer.complete();  
    }  
}, cts.token);  
  
// 在另一个任务或线程中读取channel中的数据  
task.run(async () =>  
{  
    await foreach (var message in plcdatachannel.reader.readallasync(cts.token))  
    {  
        if (message.isconnected)  
        {  
            // 更新dbdata(这里假设dbdata是一个线程安全的对象或结构)  
            // 根据需要添加适当的同步机制  
            // ...  
  
            // 处理读取到的数据...  
        }  
        else  
        {  
            // 处理plc断开连接的情况...  
        }  
    }  
  
    // 读取完成,channel已关闭  
    console.writeline("plc数据读取完毕。");  
}, cts.token);  
  
// ... 其他代码,如等待所有任务完成、处理取消逻辑等 ...  
  
// 在某个适当的时刻取消任务  
// cts.cancel();  
  
// 等待所有任务完成(如果需要

拓展:c# channel实现线程间通信

c# channel实现线程间通信

同步方式实现:

using system;
using system.collections.concurrent;
using system.collections.generic;
using system.linq;
using system.text;
using system.threading;
using system.threading.channels;
using system.threading.tasks;

namespace consoleapp1
{
    public class channeldemo
    {
        static channel<message> channel1 = channel.createunbounded<message>();
        public static void main2()
        {
            sender.start(1);
            receive1.start(2);
            receive2.start(3);
            sender.join();
            thread.sleep(3000);
            receive1.interrupt();
            receive2.interrupt();

            receive1.join();
            receive2.join();


            console.readkey();
        }
        static thread sender = new thread(sendmsg);

        static thread receive1 = new thread(receivemsg);
        static thread receive2 = new thread(receivemsg);

        static void sendmsg(object id)
        {
            for (int i = 0; i < 20; i++)
            {
                if (channel1.writer.trywrite(new message((int)id, i.tostring())))
                {
                    console.writeline($"【线程{id}】发送了【{i}】");
                }
            }
        }

        static void receivemsg(object id)
        {
            try
            {
                while (true)
                {
                    if (channel1.reader.tryread(out message message))
                    {
                        console.writeline($"【线程{id}】从【线程{message.id}】接收了【{message.content}】");

                    }
                    thread.sleep(1);
                }
            }
            catch (threadinterruptedexception ex)
            {
                console.writeline($"接收结束");
            }
        }
    }
}


异步方式:

using system;
using system.collections.concurrent;
using system.collections.generic;
using system.linq;
using system.runtime.remoting.channels;
using system.text;
using system.threading;
using system.threading.channels;
using system.threading.tasks;

namespace consoleapp1
{
    public class channeldemo2
    {
        static channel<message> channel1 = channel.createunbounded<message>();
        
         public static async void main2()
        {
            await task.whenall(sender, sender2);
            channel1.writer.complete();
            await task.whenall(receive1, receive2);
           
            console.readkey();
        }

        static task sender = sendmsgasync(channel1.writer, 1);
        static task sender2 = sendmsgasync(channel1.writer, 4);
        static task receive1 = receivemsgasync(channel1.reader, 2);
        static task receive2 = receivemsgasync(channel1.reader, 3);

        static async  task sendmsgasync(channelwriter<message> writer, int id)
        {
            for (int i = 0; i < 20; i++)
            {
                await writer.writeasync(new message((int)id, i.tostring()));
                console.writeline($"【线程{id}】发送了【{i}】");
            }
        }

        static async task receivemsgasync(channelreader<message> reader,int id)
        {
            try
            {
                while (!reader.completion.iscompleted)
                {
                    message message = await reader.readasync();           
                    console.writeline($"【线程{id}】从【线程{message.id}】接收了【{message.content}】");
                   
                }
            }
            catch (channelclosedexception ex)
            {
                console.writeline($"channelclosed 接收结束");
            }
        }

    }
}


在对channel进行实例化的时候,也可以传递一个options,这里面可以对消息容量,是否多个发送者和接受者进行定义。

以上就是c#使用channel实现plc异步任务之间的通信的详细内容,更多关于c# channel plc异步通信的资料请关注代码网其它相关文章!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com