当前位置: 代码网 > it编程>前端脚本>Python > Python中串口操作的实现示例

Python中串口操作的实现示例

2025年01月08日 Python 我要评论
介绍基于python的串口通信功能主要通过使用pyserial库来实现。pyserial是一个跨平台的python库,它提供了对串行端口(如rs232、usb转串口等)进行读写操作的支持。下面将详细介

介绍

基于python的串口通信功能主要通过使用pyserial库来实现。pyserial是一个跨平台的python库,它提供了对串行端口(如rs232、usb转串口等)进行读写操作的支持。下面将详细介绍如何使用pyserial来进行串口通信。

安装pyserial

首先需要安装pyserial库,可以通过pip工具轻松完成:

pip install pyserial

基本概念

  • 波特率(baud rate):表示每秒传输的数据位数,是串行通信中的一个重要参数,必须确保发送端和接收端设置相同的波特率。

  • 数据位(data bits):通常为7或8位,指每次传输的数据量大小。

  • 停止位(stop bits):用于标识一个字节结束的数量,默认情况下为1个停止位。

  • 校验位(parity bit):可选参数,用来检测传输错误,有无校验、奇校验和偶校验三种选择。

  • 流控制(flow control):硬件流控(rts/cts, dtr/dsr)或软件流控(xon/xoff),用于管理数据流动。

使用pyserial的基本步骤

  • 导入模块

    • import serial
    • from serial.tools import list_ports
  • 查找可用的串口

    • 可以使用list_ports.comports()函数列出系统中所有可用的串口设备。
  • 打开串口连接

    • 使用serial.serial()创建一个串口对象,并指定相应的参数(如端口号、波特率等)。
  • 配置串口参数

    • 在创建串口对象时可以传递额外的关键字参数来设置波特率、数据位、停止位、校验位等。
  • 读取与写入数据

    • 通过调用串口对象的read()write()方法来进行数据的读取和发送。
  • 关闭串口连接

    • 当不再需要使用串口时,应该调用close()方法来释放资源。

串行通信服务实现

这段代码展示了如何使用 pyserial 库来管理和操作串行端口,包括列出可用的串行端口、打开和关闭端口、发送和接收数据。以下是详细的功能解析:

环境准备

确保安装了 pyserial 库,以便能够访问和管理串行端口。

pip install pyserial

列出所有串行端口

list_ports 函数用于扫描系统中的所有串行端口,并返回一个包含这些端口名称的列表。它还记录每个端口的描述信息和硬件id,方便调试或用户选择正确的端口。

def list_ports():
    port_names = []
    ports = serial.tools.list_ports.comports()
    for port, desc, hwid in ports:
        log_message(f"port: {port}, description: {desc}, hwid: {hwid}")
        port_names.append(port)
    return port_names

串行通信服务类 (commservice)

定义了一个用于串行通信的服务类commservice,以及一个列出所有可用串行端口的函数list_ports。它使用了python的serial库来处理串行通信,并且引入了一些辅助模块来增强功能

导入模块

深色版本

import serial
import binascii
import serial.tools.list_ports
from utils.const_util import appconst
  • serial:提供了对串行端口(如rs232、usb转串等)的访问。
  • binascii:包含将二进制数据转换为ascii字符串的函数。
  • serial.tools.list_ports:提供工具函数来枚举系统上的串行端口。
  • utils.const_util.appconst:自定义模块中定义的应用常量。

列出所有可用串行端口

深色版本

def list_ports():
    port_names = []
    ports = serial.tools.list_ports.comports()
    for port, desc, hwid in ports:
        print(f"port: {port}, description: {desc}, hwid: {hwid}")
        port_names.append(port)

    return port_names
  • list_ports函数:遍历所有可用的串行端口,并打印每个端口的名称、描述和硬件id。
  • 返回一个包含所有端口名称的列表,可用于选择具体的通信端口。

串行通信服务类

深色版本

class commservice:
    def __init__(self, comm_port):
        self.comm_port = comm_port
        self.ser_comm = none
  • 构造方法__init__:初始化时设置串行端口名称,并将实际的串行连接对象初始化为none

设置串行端口

    def set_comm_port(self, comm_port):
        self.comm_port = comm_port
  • set_comm_port方法:允许动态更改串行端口名称。

打开串行连接

    def open_comm(self):
        try:
            self.ser_comm = serial.serial(self.comm_port, 9600, bytesize=serial.eightbits, parity=serial.parity_none,
                                          stopbits=serial.stopbits_one, timeout=1, xonxoff=false, rtscts=false,
                                          dsrdtr=false)
        except:
            return none
        return self.ser_comm
  • open_comm方法:尝试打开指定的串行端口,并配置波特率、字节大小、校验位、停止位等参数。
  • 如果打开失败(例如端口不存在或已被占用),捕获异常并返回none;否则返回打开的串行连接对象。

关闭串行连接

    def close_comm(self):
        if self.is_opened:
            self.ser_comm.close()
            self.ser_comm = none
  • close_comm方法:检查串行连接是否打开,如果是,则关闭连接并重置连接对象为none

发送数据

    def send_comm(self, data):
        if self.is_opened:
            try:
                self.ser_comm.write(bytearray.fromhex(data.replace(' ', '')))
            except serial.serialutil.serialtimeoutexception:
                msg = appconst.comm_send_over
                return false, msg
            return true, ''
        else:
            msg = appconst.comm_send_failed
            return false, msg
  • send_comm方法:如果串行连接是打开状态,尝试发送十六进制格式的数据。
  • 使用bytearray.fromhex()将十六进制字符串转换为字节数组后发送。
  • 捕获可能发生的超时异常,并返回相应的错误信息。
  • 成功发送后返回true及空消息,发送失败则返回false及错误消息。

读取数据

    def read_comm(self):
        if self.is_opened:
            if self.ser_comm.in_waiting > 0:
                try:
                    data = self.ser_comm.readline()
                except:
                    return none
                return binascii.hexlify(data).decode('ascii')
            return none
        else:
            return none
  • read_comm方法:如果串行连接是打开状态并且有数据待读取,尝试读取一行数据。
  • 使用binascii.hexlify()将读取到的数据转换为十六进制表示形式,并解码为ascii字符串。
  • 如果发生异常或者没有数据可读,则返回none

检查连接状态

    def is_opened(self):
        return self.ser_comm is not none and self.ser_comm.is_open
  • is_opened方法:检查当前是否有有效的串行连接对象,并且该连接是否处于打开状态。

总结

commservice类封装了与串行设备通信所需的所有基本操作,包括打开/关闭连接、发送/接收数据以及检查连接状态。它还提供了一个静态方法list_ports来列出系统上所有的串行端口,这在需要用户选择具体端口进行通信时非常有用。此外,通过使用try-except语句,代码实现了简单的错误处理逻辑,确保即使在出现问题的情况下也能维持程序的基本运行。最后,利用appconst中的常量,使得错误消息可以集中管理,便于维护和国际化支持。

完整代码

import serial
import binascii
import serial.tools.list_ports
from utils.const_util import appconst


def list_ports():
    port_names = []
    ports = serial.tools.list_ports.comports()
    for port, desc, hwid in ports:
        print(f"port: {port}, description: {desc}, hwid: {hwid}")
        port_names.append(port)

    return port_names


class commservice:
    def __init__(self, comm_port):
        self.comm_port = comm_port
        self.ser_comm = none

    def set_comm_port(self, comm_port):
        self.comm_port = comm_port

    def open_comm(self):
        try:
            self.ser_comm = serial.serial(self.comm_port, 9600, bytesize=serial.eightbits, parity=serial.parity_none,
                                          stopbits=serial.stopbits_one, timeout=1, xonxoff=false, rtscts=false,
                                          dsrdtr=false)
        except:
            return none
        return self.ser_comm

    def close_comm(self):
        if self.is_opened:
            self.ser_comm.close()
            self.ser_comm = none

    def send_comm(self, data):
        if self.is_opened:
            try:
                self.ser_comm.write(bytearray.fromhex(data.replace(' ', '')))
            except serial.serialutil.serialtimeoutexception:
                msg = appconst.comm_send_over
                return false, msg
            return true, ''
        else:
            msg = appconst.comm_send_failed
            return false, msg

    def read_comm(self):
        if self.is_opened:
            if self.ser_comm.in_waiting > 0:
                try:
                    data = self.ser_comm.readline()
                except:
                    return none
                return binascii.hexlify(data).decode('ascii')

            return none
        else:
            return none

    def is_opened(self):
        return self.ser_comm is not none and self.ser_comm.is_open

使用示例

下面是一个简单的使用示例,假设已经有一个实例化好的 commservice 对象 comm_service

# 列出所有可用的串行端口
available_ports = list_ports()

# 假设选择了第一个可用端口进行通信
if available_ports:
    comm_service.set_comm_port(available_ports[0])
    ser = comm_service.open_comm()
    if ser:
        # 发送一些数据到串行端口
        success, msg = comm_service.send_comm("55 aa 01 02 03")
        if success:
            print("data sent successfully.")
        else:
            print(f"failed to send data: {msg}")

        # 尝试读取来自串行端口的数据
        received_data = comm_service.read_comm()
        if received_data:
            print(f"received data: {received_data}")

        # 关闭串行端口
        comm_service.close_comm()
else:
    print("no available serial ports found.")

通过这种方式,您可以轻松地与连接到计算机的任何串行设备进行交互,无论是读取传感器数据还是控制外部硬件。

到此这篇关于python中串口操作的实现示例的文章就介绍到这了,更多相关python 串口操作内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com