当前位置: 代码网 > it编程>前端脚本>Python > 使用websocket协议控制工业PLC:实现工业自动化的新思路

使用websocket协议控制工业PLC:实现工业自动化的新思路

2024年08月03日 Python 我要评论
工业自动化控制系统中,PLC(Programmable Logic Controller)是一种常用的控制器设备。它通过接口与工业设备、传感器、执行器等连接,实现对生产线、机器设备等的控制和监测。传统的PLC控制方式主要依靠人机界面或SCADA系统,但是这种方式在控制精度、响应速度等方面存在一定的局限性。而基于WebSocket双通道协议的控制方式,可以提高控制精度和实时性,进一步优化PLC控制。本文将介绍如何使用WebSocket双通道协议,结合Python和Flask框架,实现对PLC的控制。

微信公众号:锦程爱编程
关注可了解更多的内容。问题或建议,请公众号留言;
如果你觉得对你有帮助,欢迎赞赏

进入正文前先查看效果演示

前言

工业自动化控制系统中,plc(programmable logic controller)是一种常用的控制器设备。它通过接口与工业设备、传感器、执行器等连接,实现对生产线、机器设备等的控制和监测。传统的plc控制方式主要依靠人机界面或scada系统,但是这种方式在控制精度、响应速度等方面存在一定的局限性。而基于websocket双通道协议的控制方式,可以提高控制精度和实时性,进一步优化plc控制。

本文将介绍如何使用websocket双通道协议,结合python和flask框架,实现对plc的控制。

1、准备工作

需要准备以下软件和硬件环境:

后端资源
•	python 3.x
•	flask框架
•	websocket库(或socketio库,本文socketio)
前端资源
•	websocket库
•	vue.js库(使用原生js可忽略)
•	jquery库(使用原生js可忽略)
plc资源
•	plc设备(可以使用仿真,本文仿真)
•	博图plc编程软件(其他编程软件也可,本文博图)

2、实现双向通信

我们需要实现websocket双通道协议。该协议支持服务器和客户端之间的双向通信,使服务器可以主动向客户端发送消息,并接收客户端发送的消息。为了实现这一功能,需要以下步骤:

(1)我们需要使用python的websocket库。在本文中,我们选择使用socketio库来实现websocket通信。

在python中,我们可以使用flask_socketio库中的socketio类来创建websocket连接。首先,我们需要引入socketio类和一些必要的方法:

step1:导入socketio,flask类
from flask import flask, render_template, request
from flask_socketio import socketio, emit
step2:创建flask对象
app = flask(__name__)

app.config['secret_key'] = 'secret!'
socketio = socketio(app)
step3:监听匹配路由,提供hml页面
@app.route('/')
def index():
    return render_template('index.html')
step4:监听客户端消息
@socketio.on('message')
def handle_message(data):
    print('客户端消息为:',messages)
    emit('message', messages, broadcast=true)
step5:启动运行socketio,指定服务器地址和端口(不指定默认为127.0.0.1:5000)
socketio.run(app, host='0.0.0.0', port=5000, debug=true)

这样我们后端程序websocket即时通讯就搭建好了,接下来我们搭建前端程序。

(2)在本文中,为了跟后端保持一致,我们前端也选择使用socketio库来实现websocket即时通信。

step1:引入socketio方法
  <script src="https://cdn.socket.io/socket.io-3.0.1.min.js"></script>
step2:创建socket对象
var socket = io('http://192.168.43.176:5000', {
    transports: ['websocket', 'polling'],
    withcredentials: true
    });
step3:向服务器发送消息
 document.getelementbyid('send').addeventlistener('click', function() {
                const message = document.getelementbyid('input').value;
                console.log('发送:', message);
                socket.emit('message', message);
            });
step4:客户端的html代码
<div id="output"></div>
<input type="text" id="input">
<button id="send">send</button>
效果展示

打开html页面,发送输入的消息
在这里插入图片描述

后端接收到消息并将消息转发回去
在这里插入图片描述
在这里插入图片描述
这样我们的websocket即使通讯就搭建好了,并实现了相互通讯,接下来我们引入连接plc的相关内容。

3、python后端与plc的通讯

在python中,我们使用snap7库来控制plc设备,这里面提供了许多plc的方法,例如:until数据解析类,client客户端连接类等等。

接下来,我们需要来实现通过snap7库来python控制plc设备。如果没有plc设备可以安装仿真软件。

创建连接

snap7实现通讯的时候,是将plc作为服务端,pc以客户端的身份主动连接的,所以最开始的时候,我们应该创建通讯需要使用的客户端。

step1:打开仿真软件,设置ip并启动plc

在这里插入图片描述

step2:导入snap7模块
# 导入前需要安装snap7
# pip install python-snap7
import snap7
# 导入解析plc数据结构解析成python数据结构模块
from snap7 import util
step3:连接plc,并测试连接是否正常
# 连接至plc
plcobj.connect('192.168.0.1', 0, 1)
# 打印连接状态
print(f"连接状态:{plcobj.get_connected()}")

# 关闭连接
plcobj.disconnect()

# 打印连接状态
print(f"连接状态:{plcobj.get_connected()}")

在这里插入图片描述

读取数据

以读取db10的以下的五个变量为例:
打开tia protal,创建db块,编号为10,并添加如下图所示的变量并赋初值,下载到仿真的plc后打开打开数值监控:

在这里插入图片描述

首先我们需要计算需要读取的总字节数,也就是最后一个变量的地址(即偏移量)加上它的数据长度。可以看到,上面的六个变量中,最后一个地址是264,wstring为512字节,所以需要读取的总字节数为264+512=776个。

所以第一步,把这776个字节全部读取上来:

data = plc.db_read(db_number=10,start=0,size=776)

其中,self.plc是我们刚刚创建的通讯客户端对象,db_read是它的读取db块的方法,第一个参数是db号,第二个是要读取的字节起始的起始地址,第三个参数是要读取的字节总数,用data这个变量来接收这些数据。
当然也可以写成这种形式:

data = self.plc.read_area(snap7.client.areas.db, 10, 0, 776)

read_area是读取任意区域的方法,通过第一个参数的枚举来区分读取的区域,如input、output、db等,后面三个参数与db_read一致。

当我们需要的数据以字节的形式读取上来以后,我们就可以进行解析了。

注意:读取plc信号时,python不认识plc中的数据结构,所以需要使用snap7中的util将plc数据格式转化成python格式;接下来进行数据解析。

解析数据

首先查看文档,可以看到snap7中有一个util的模块,它提供了多种数据类型的转换方法,可以将从plc读取上来的字节直接解析为python可识别的数据类型。

在这里插入图片描述
根据文档,我们写出以下转换代码:

plc_bool = util.get_bool(data, 0, 0)
# 读取int型数据类型
plc_int = util.get_int(data, 2)
# 读取real型数据类型
plc_real = str(util.get_real(data, 4))
# 读取string型数据类型
plc_string = util.get_string(data, 8)
# 读取wstring型数据类型
plc_wstring = util.get_wstring(data, 264)

可以看到,我们在使用snap7的转换方法的时候,只需要把我们读取到的字节数组以及数据的起始索引传进去即可,比起使用python自身的方法会更加简单。

get_bool方法的第三个参数为该字节的第几个bool量,因为一个bool量只需要一个位来表示,而一个字节是包含八个位的,也就是说这个字节可以表示八个bool量,在这里对应的db10里地址为0.0~0.7的八个bool量,由于我们要读取的是地址0上的第一个bool量,所以第二个参数和第三个参数分别为0,0。

get_string方法的第三个参数为该字符串的最大长度,由于string类型共有256个字节,所以此处填256。
打印可以看到读取出来的读取结果

在这里插入图片描述
监控plc真实数值

在这里插入图片描述

写入plc信号数据

与读取数据一样,写入数据也有两种方式,但是区别也仅仅是生成写入数据的字节方式不同。

写入数据可以调用db_write方法,也可以调用write_area,与读取数据一样,前者只能用以写入db块,后者可以写入任意区域。

self.plc.write_area(snap7.client.areas.db,10,0,data)
self.plc.db_write(10,0,data)

data是需要写入的数据的字节形式。

python-snap7转换

python-snap7提供了不同数据类型的转换方法,如下图所示:

在这里插入图片描述
根据文档,我们写出以下转换代码:

写入bool型变量
write_bool = bytearray(1)
# 将变量的第0个字节的第0个为设置为true
util.set_bool(write_bool, 0, 0, true)
# 将变量写入plc
self.plc.db_write(10, 0, write_bool)
写入int型变量
wint_group = bytearray(2)
# 将变量的第0个字节赋值
util.set_int(wint_group, 0, 10)
# 将变量写入plc
self.plc.db_write(10, 2, wint_group)
写入real型变量
wreal_group = bytearray(4)
# 将变量的第0个字节赋值
util.set_real(wreal_group, 0,6.0)
# 将变量写入plc
self.plc.db_write(10, 4, wreal_group)
写入string型变量
write_string = bytearray(256)
# 将变量的第0个字节赋值
util.set_string(write_string, 0,123)
# 将变量写入plc
self.plc.db_write(10, 8, write_string)
写入wstring型变量
str=456”
data = int.to_bytes(508, 2, 'big') + int.to_bytes(len(str), 2, 'big')+str.encode(encoding='utf-16be')
# 将变量写入plc
self.plc.db_write(10, 264, data)

注意:由于目前并没有set_wstring方法,而且set_string方法也不支持wstring,所以wstring依然使用python的decode方法进行写入。

再打开tia protal,可以看到数据已经成功写入。

在这里插入图片描述

4、部分代码展示

python
from flask import flask, render_template, request
from flask_socketio import socketio, emit
from plc_control import plc

app = flask(__name__)

app.config['secret_key'] = 'secret!'
socketio = socketio(app, cors_allowed_origins="*")
plc = plc("192.168.0.1")


@app.route('/')
def index():
    return render_template('index.html')


@app.route('/plc')
def plc_control():
    return render_template('plc_vue.html')


@socketio.on('connect')
def handle_connect():
    ip = request.remote_addr


@socketio.on('disconnect')
def handle_disconnect():
    ip = request.remote_addr


messages = []


@socketio.on('message')
def handle_message(data):
    messages.append(data)
    print('客户端消息为:',messages)
    emit('message', messages, broadcast=true)


@socketio.on('loadmessage')
def handle_message(data):
    emit('message', messages, broadcast=true)


@socketio.on('connect_plc')
def connect_plc(data):
    print("连接中,请等待......")
    plc_status = plc.connect()
    if plc_status is true:
        print("plc已连接")
        emit('connect_plc', plc_status, broadcast=true)
    else:
        print("plc连接失败")
        emit('connect_plc', plc_status, broadcast=true)


@socketio.on('disconnect_plc')
def disconnect_plc(data):
    plc_status = plc.disconnect()
    if plc_status is false:
        print("plc已断开")
        emit('connect_plc', plc_status, broadcast=true)


@socketio.on('wbool')
def wbool(data):
    plc.write_signal(wbool=data)


@socketio.on('wint')
def wint(data):
    data = int(data)
    plc.write_signal(wint=data)


@socketio.on('wreal')
def wreal(data):
    data = int(data)
    plc.write_signal(wreal=data)


@socketio.on('wstring')
def wstring(data):
    print(data)
    plc.write_signal(wstring=data)


@socketio.on('wwstring')
def wwstring(data):
    plc.write_signal(wwstring=data)


def read_signal():
    while true:
        plc_signal = plc.read_signal()
        plc_status = plc.connect_status()
        socketio.emit('read_signal', plc_signal)  # 将信号通过 websocket 发送到前端
        socketio.emit('connect_plc', plc_status)  # 将plc连接状态通过 websocket 发送到前端
        time.sleep(1)

if __name__ == '__main__':
    socketio.run(app)
    ```
#### javascript
```javascript
var vm = new vue({
    el: '#app',
    // 修改vue变量的读取语法
    delimiters: ['[[', ']]'],
    data: {
        name:'',
        name_error:false,
        iint: '',
        ireal: '',
        istring: '',
        iwstring: '',
        message:'',
        messages:[]
    },
    mounted(){//页面加载完,会自动调用mounted函数
        this.load_message();
    },
    methods: {
        // 手动点击按钮连接服务器
        manual_connect_plc: function(){
            let plc_status = $("#connect_plc").html();
            // let plc_ip = 
            if(plc_status=='连接'){
                socket.emit('connect_plc', "connect_plc");
            }
            else{
                socket.emit('disconnect_plc', "disconnect_plc");
            }
        },
        // 向服务器发送写入bool数据
        write_bool: function(){
            let bool = $('#wbool').html()
            if(bool=='false'){
                socket.emit('wbool', false);
            }
            else{
                socket.emit('wbool', true);
            }
        },
        // 向服务器发送写入int数据
        write_int: function(){
            if(!this.iint)return;
            socket.emit('wint', this.iint);
            this.iint = '';
        },
        // 向服务器发送写入real数据
        write_real: function(){
            if(!this.ireal)return;
            socket.emit('wreal', this.ireal);
            this.ireal = '';
        },
        // 向服务器发送写入string数据
        write_string: function(){
            if(!this.istring)return;
            socket.emit('wstring', this.istring);
            this.istring = '';;
        },
        // 向服务器发送写入wstring数据
        write_wstring: function(){
            if(!this.iwstring)return;
            socket.emit('wwstring', this.iwstring);
            this.iwstring = '';
        },
        // 向服务器发送发送消息
        send_message:function(){
            if(!this.name){
                this.name_error = true;
                settimeout(()=>{
                    this.name_error = false;
                    return;
                },2000)
            }
            if(!this.message)return;
            socket.emit('message', {'name':this.name,'content':this.message});
            this.message = '';
        },
        // 向服务器加载消息
        load_message:function(){
            socket.emit('loadmessage', '');
        },
    }
});
远程代码修改
// 连接服务器

var socket = io('http://修改公网ip或者域名:端口', {

    transports: ['websocket', 'polling'],

    withcredentials: true

    });

注意:由于内容受限,小编提供的以上代码非所有代码,只提供了关键的代码,如需全部代码,请关注微信公众号:锦程爱编程,获取源码

5、效果演示

plc数据读取、写入演示

基于内网控制plc读写演示

plc数据读取、写入远程控制演示

基于外网控制plc读写演示

6、扩展及用途

1、车间供应商内网plc调试

(1)可以让所有的plc调试人员处于内网下进行调试交流,省去由于车间距离较远来回走动的时间、也不用时刻看手机微信的群聊天。

(2)交接工作清楚,调试者对话框可以永久储存对话消息,也可支持查询,工厂停电数据也不会丢失,实现永久化储存,需要后期扩展数据库。

(3)分工明确、可以实时知道每个调试人员的调试状态,每个班次调试结束可以填入对应任务进度,后台收到数据会进行格式化输出并汇总,后期需扩展后台格式化文件程序。

(4)手机连入内网可实现机器人调试人员和plc调试人员的通讯,减少现场人员沟通困难的问题。

(5)如果只想读取plc数据,可以不安装plc编程软件,可以直接实现我们的电脑与plc设备直接通讯。(只支持内网,外网访问还需一台连接外网的电脑与plc连接)

2、车间供应商外网plc调试

(1)实现车间现场人员解决不了的plc问题,可以远程求助大佬帮助,公司大佬可通过公网获取到现场设备数据,再将数据写入到plc仿真,实现远程调试现场设备。(后期需扩展接口)

(2)通过外网可以将每个调试人员的调试进度汇总到公司总部,将人员状态、项目进度、面临难点汇总格式化输出,实现项目归一化管理的目标。(后期需扩展接口)

优点:以上都是优点。
缺点:

1、读取plc的数据量大,对网络质量要求较高

2、后期的接口代码量较大。

3、远程调试存在数据延迟,获取设备的数据存在误差,远程调试不应让设备处于高速运行。

7、总结

在本文中,我们介绍了如何使用websocket双通道协议,结合python和flask框架,以及snap7模块实现对plc设备的控制。通过这种方式,我们可以实现双向通信,进一步提高plc的控制精度和实时性。当然,本文仅提供了一个简单的示例,实际应用中还需要考虑更多的安全性和稳定性问题,以确保系统运行的稳定和安全。

关注公众号

在这里插入图片描述

赞赏作者

如果你觉得到对你有帮助,欢迎点赞、收藏加关注,有你的支持,作者一定会努力做得越来越好!

你的支持就是对作者最大的赞赏!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com