当前位置: 代码网 > it编程>前端脚本>Python > python如何用pymodbus库进行modbus tcp通信

python如何用pymodbus库进行modbus tcp通信

2024年07月05日 Python 我要评论
使用pymodbus库进行modbus tcp通信使用python解决工业通信问题是一个非常好的选择,python具有丰富的生态,可以轻松解决工业通信的各种问题。本篇主要介绍使用pymodbus库进行

使用pymodbus库进行modbus tcp通信

使用python解决工业通信问题是一个非常好的选择,python具有丰富的生态,可以轻松解决工业通信的各种问题。

本篇主要介绍使用pymodbus库进行modbus tcp仿真,实现pc端读取plc或工业设备modbus变量。

安装pymodbus:

pip install -u pymodbus

创建modbus tcp server

这里我们先创建一个虚拟的modbus设备,如果你手里有一个plc或者工业设备,可以直接跳过本节。

  • modbus_server.py
'''
 * @author: liuzhao 
 * @last modified time: 2022-10-05 09:56:13 
'''

from pymodbus.server.sync import (
    starttcpserver,
)
from pymodbus.datastore import (
    modbussequentialdatablock,
    modbusservercontext,
    modbusslavecontext,
)
from pymodbus.version import version

datablock = modbussequentialdatablock.create()
context = modbusslavecontext(
    di=datablock,
    co=datablock,
    hr=datablock,
    ir=datablock,
    )
single = true

# build data storage
store = modbusservercontext(slaves=context, single=single)


if __name__ == '__main__':

	address = ("0.0.0.0", 503)
	starttcpserver(
	    context=store,  # data storage
	    address=address,  # listen address
	  	allow_reuse_address=true,  # allow the reuse of an address
	)

直接运行该脚本,就可以在本机的503端口创建一台modbus设备了,具体实现暂不深追,我们学习的重点是客户端对modbus变量的读写。

读写modbus变量

modbus变量类型以及地址

object typeaccesssizeaddress
coilread-write1 bit00001 – 09999
discrete inputread-only1 bit10001 – 19999
input registerread-only16 bits30001 – 39999
holding registerread-write16 bits40001 – 49999

coil是线圈,discrete input是数字量输入,input register是模拟量输入,holding register是保持寄存器。一般地址范围是0-65535

读取常规变量

读写线圈 | 读取输入变量 | 读写保持寄存器

from pymodbus.client.sync import modbustcpclient
from pymodbus.bit_read_message import readcoilsresponse
from pymodbus.register_read_message import readinputregistersresponse
from pymodbus.exceptions import connectionexception      # 连接失败,用于异常处理

host = '127.0.0.1'
port = 503
client = modbustcpclient(host,port)


# 写入线圈
client.write_coil(1, true)
client.write_coil(2, false)
client.write_coil(3, true)

# 读取线圈    注意对于离散量的读取,第二个参数cout是有坑的,必须为8的倍数个
result:readcoilsresponse = client.read_coils(address=1,cout=8)     # 从地址1开始读,读取8个线圈,一次读8的倍数个线圈,不设置为8的倍数可能会出现问题
print(result.iserror())

# 不建议使用
print(result.getbit(7))            # 这里的参数address不是plc里的地址,而是python列表的address,

print('read_coils ')

# 建议使用
print(result.bits)        # 打印读取结果,一共8位
# 读取其中的位
print(                   
    result.bits[0],
    result.bits[1],
    result.bits[2]
    )         # 相当于result.getbit(0)


# 读取数字输入
result = client.read_discrete_inputs(address=10001,count=8)    # 从10001开始读,读取8位
print(result.bits)


# 读取模拟输入寄存器
input_register_result:readinputregistersresponse = client.read_input_registers(1,count=8)
# print(f'is_error:{input_register_result.iserror()}')
print('read_input_registers ')
print(input_register_result.registers)   
print(input_register_result.getregister(0))   


# 读写保持寄存器
client.write_register(address=40001,value=100)
result:readinputregistersresponse = client.read_holding_registers(address=40001,count=1)
print('read_holding_registers ')
print(result.registers)

# 关闭连接
client.close()

读取复杂变量

字符串、浮点数、负数等

这里需要注意modbus设备的存储结构是低位低字节还是低位高字节,也就是设备内存的字节、字的排列顺序。

根据不同的设备,对照下表调整正确的组合方式。

word orderbyte orderword1word2
bigbig0x12340x5678
biglittle0x34120x7856
littlebig0x56780x1234
littlelittle0x78560x3412
# 复杂数据类型

from collections import ordereddict
import logging

from pymodbus.client.sync import modbustcpclient as modbusclient

from pymodbus.constants import endian
from pymodbus.payload import binarypayloadbuilder, binarypayloaddecoder



order_dict = {"<": "little", ">": "big"}


def run_binary_payload_client(host:str,port:int):
  
    for word_endian, byte_endian in (
        (endian.big, endian.big),
        (endian.big, endian.little),
        (endian.little, endian.big),
        (endian.little, endian.little),
    ):
        print("-" * 60)
        print(f"word order: {order_dict[word_endian]}")
        print(f"byte order: {order_dict[byte_endian]}")
        print()
    
        builder = binarypayloadbuilder(
            wordorder=word_endian,
            byteorder=byte_endian,
        )
		
		# 写入的变量
        my_string = "abcd-efgh123345765432"
        builder.add_string(my_string)
        builder.add_bits([0, 1, 0, 1, 1, 0, 1, 0])
        builder.add_8bit_int(-0x12)
        builder.add_8bit_uint(0x12)
        builder.add_16bit_int(-0x5678)
        builder.add_16bit_uint(0x1234)
        builder.add_32bit_int(-0x1234)
        builder.add_32bit_uint(0x12345678)
        builder.add_16bit_float(12.34)
        builder.add_16bit_float(-12.34)
        builder.add_32bit_float(22.34)
        builder.add_32bit_float(-22.34)
        builder.add_64bit_int(-0xdeadbeef)
        builder.add_64bit_uint(0x12345678deadbeef)
        builder.add_64bit_uint(0x12345678deadbeef)
        builder.add_64bit_float(123.45)
        builder.add_64bit_float(-123.45)
        registers = builder.to_registers()
        print("writing registers:")
        print(registers)
        print("\n")
        payload = builder.build()
        address = 40001          # 从40001开始写入
        # we can write registers
        client.write_registers(address, registers, unit=1)    # 写入
     	
     	# 读取复杂变量
        print("reading registers:")
        address = 40001

        count = len(payload)
        print(f"payload_len {count}")
        result = client.read_holding_registers(address, count, slave=1)
        print(result.registers)
        print("\n")
        decoder = binarypayloaddecoder.fromregisters(
            result.registers, byteorder=byte_endian, wordorder=word_endian
        )
        # make sure word/byte order is consistent between binarypayloadbuilder and binarypayloaddecoder
        assert (
            decoder._byteorder == builder._byteorder  # pylint: disable=protected-access
        )  # nosec
        assert (
            decoder._wordorder == builder._wordorder  # pylint: disable=protected-access
        )  # nosec

        decoded = ordereddict(
            [
                ("string", decoder.decode_string(len(my_string))),
                ("bits", decoder.decode_bits()),
                ("8int", decoder.decode_8bit_int()),
                ("8uint", decoder.decode_8bit_uint()),
                ("16int", decoder.decode_16bit_int()),
                ("16uint", decoder.decode_16bit_uint()),
                ("32int", decoder.decode_32bit_int()),
                ("32uint", decoder.decode_32bit_uint()),
                ("16float", decoder.decode_16bit_float()),
                ("16float2", decoder.decode_16bit_float()),
                ("32float", decoder.decode_32bit_float()),
                ("32float2", decoder.decode_32bit_float()),
                ("64int", decoder.decode_64bit_int()),
                ("64uint", decoder.decode_64bit_uint()),
                ("ignore", decoder.skip_bytes(8)),
                ("64float", decoder.decode_64bit_float()),
                ("64float2", decoder.decode_64bit_float()),
            ]
        )
        print("decoded data")
        for name, value in iter(decoded.items()):
            print(
                "%s\t" % name,  # pylint: disable=consider-using-f-string
                hex(value) if isinstance(value, int) else value,
            )
        print("\n")

	# 关闭连接
    client.close()


if __name__ == "__main__":
    run_binary_payload_client("127.0.0.1", 503)    

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com