pyserial 是一个 python 串口通信库,可以用于与各种串口设备(如 arduino、传感器、嵌入式设备等)进行通信。
1. pyserial 简介
pyserial 提供了跨平台的串口通信功能,支持:
- windows、linux、macos
- 多种串口设置(波特率、数据位、停止位、校验位等)
- 同步和异步通信
- 二进制和文本数据
2. 安装和基础使用
2.1 安装
pip install pyserial
2.2 基础示例
import serial
import time
# 基础串口通信示例
try:
# 打开串口
ser = serial.serial('com3', 9600, timeout=1)
print(f"串口已打开: {ser.name}")
# 等待设备初始化
time.sleep(2)
# 发送数据
ser.write(b'hello arduino!\n')
print("数据已发送")
# 读取数据
response = ser.readline().decode('utf-8').strip()
print(f"收到响应: {response}")
# 关闭串口
ser.close()
print("串口已关闭")
except serial.serialexception as e:
print(f"串口错误: {e}")
except exception as e:
print(f"其他错误: {e}")
3. 串口配置和打开
3.1 查找可用串口
import serial
import serial.tools.list_ports
def list_serial_ports():
"""列出所有可用的串口"""
ports = serial.tools.list_ports.comports()
if not ports:
print("没有找到可用的串口")
return []
print("可用串口列表:")
for i, port in enumerate(ports, 1):
print(f"{i}. {port.device} - {port.description}")
print(f" 硬件id: {port.hwid}")
print(f" 制造商: {port.manufacturer}")
print(f" 产品: {port.product}")
print()
return [port.device for port in ports]
# 使用示例
available_ports = list_serial_ports()
3.2 串口配置参数
import serial
# 完整的串口配置
serial_config = {
'port': 'com3', # 串口号 (windows: com3, linux: /dev/ttyusb0, macos: /dev/tty.usbserial)
'baudrate': 9600, # 波特率 (常用: 9600, 115200, 57600, 38400)
'bytesize': serial.eightbits, # 数据位 (可选: fivebits, sixbits, sevenbits, eightbits)
'parity': serial.parity_none, # 校验位 (可选: parity_none, parity_even, parity_odd, parity_mark, parity_space)
'stopbits': serial.stopbits_one, # 停止位 (可选: stopbits_one, stopbits_one_point_five, stopbits_two)
'timeout': 1, # 读取超时时间(秒),none表示阻塞读取,0表示非阻塞
'xonxoff': false, # 软件流控制
'rtscts': false, # 硬件(rts/cts)流控制
'dsrdtr': false, # 硬件(dsr/dtr)流控制
'write_timeout': 2, # 写入超时时间
}
# 打开串口
try:
ser = serial.serial(**serial_config)
print(f"串口配置: {ser}")
# 获取当前配置
print(f"当前波特率: {ser.baudrate}")
print(f"当前数据位: {ser.bytesize}")
print(f"当前校验位: {ser.parity}")
print(f"当前停止位: {ser.stopbits}")
ser.close()
except serial.serialexception as e:
print(f"无法打开串口: {e}")
3.3 自动检测和连接串口
import serial
import serial.tools.list_ports
import time
def auto_connect_serial(baudrate=9600, timeout=1):
"""
自动检测并连接串口
返回: serial.serial 对象 或 none
"""
ports = serial.tools.list_ports.comports()
if not ports:
print("未找到可用串口")
return none
for port in ports:
try:
print(f"尝试连接: {port.device} - {port.description}")
# 尝试连接
ser = serial.serial(
port=port.device,
baudrate=baudrate,
timeout=timeout
)
# 测试连接
time.sleep(2) # 等待设备初始化
# 清空缓冲区
ser.reset_input_buffer()
ser.reset_output_buffer()
# 发送测试命令
ser.write(b'at\r\n')
time.sleep(0.1)
# 检查响应
if ser.in_waiting > 0:
response = ser.read(ser.in_waiting)
print(f"设备响应: {response}")
print(f"成功连接到: {port.device}")
return ser
except serial.serialexception:
print(f"连接 {port.device} 失败,尝试下一个...")
continue
print("所有串口连接尝试失败")
return none
# 使用示例
ser = auto_connect_serial()
if ser:
# 进行通信操作
ser.write(b'hello!\n')
ser.close()
4. 数据读写操作
4.1 基本读写操作
import serial
import time
def basic_read_write_example():
"""基本读写操作示例"""
try:
# 打开串口
ser = serial.serial('com3', 9600, timeout=1)
print("串口已打开")
# 等待设备就绪
time.sleep(2)
# 清空缓冲区
ser.reset_input_buffer()
ser.reset_output_buffer()
# 方法1: 写入字符串 (需要编码为字节)
command = "at+ver?\r\n"
ser.write(command.encode('utf-8'))
print(f"发送命令: {command}")
# 方法2: 直接写入字节
ser.write(b'at+id?\r\n')
# 读取数据
time.sleep(0.5) # 等待响应
# 方法1: 读取指定字节数
if ser.in_waiting > 0:
data = ser.read(ser.in_waiting)
print(f"原始数据: {data}")
print(f"解码后: {data.decode('utf-8', errors='ignore')}")
# 方法2: 读取一行
ser.write(b'at+name?\r\n')
line = ser.readline().decode('utf-8', errors='ignore').strip()
print(f"读取一行: {line}")
# 方法3: 读取所有可用数据
ser.write(b'at+help\r\n')
time.sleep(1)
while ser.in_waiting > 0:
data = ser.read(ser.in_waiting)
print(f"收到数据: {data.decode('utf-8', errors='ignore')}")
ser.close()
except exception as e:
print(f"错误: {e}")
# 运行示例
basic_read_write_example()
4.2 文本模式通信
import serial
def text_communication_example():
"""文本模式通信示例"""
try:
ser = serial.serial('com3', 9600, timeout=1)
# 发送文本命令
commands = [
"led on\n",
"get temp\n",
"set speed 100\n",
"status\n"
]
for cmd in commands:
print(f"发送: {cmd.strip()}")
ser.write(cmd.encode('utf-8'))
# 读取响应
response = ser.readline().decode('utf-8', errors='ignore').strip()
print(f"响应: {response}")
# 处理特定响应
if "temp" in response:
temp_value = response.split(':')[1].strip()
print(f"温度值: {temp_value}°c")
ser.close()
except exception as e:
print(f"错误: {e}")
# 运行示例
text_communication_example()
4.3 二进制数据通信
import serial
import struct
import time
def binary_communication_example():
"""二进制数据通信示例"""
try:
ser = serial.serial('com3', 115200, timeout=1)
# 示例1: 发送二进制命令
# 假设协议: 起始字节(0xaa) + 命令字节 + 数据长度 + 数据 + 校验和
# 构建数据包
start_byte = 0xaa
command = 0x01 # 读取数据命令
data = [0x10, 0x20, 0x30]
data_length = len(data)
# 计算校验和
checksum = (start_byte + command + data_length + sum(data)) & 0xff
# 打包数据
packet = struct.pack('bbb', start_byte, command, data_length)
packet += bytes(data)
packet += struct.pack('b', checksum)
print(f"发送数据包: {packet.hex()}")
ser.write(packet)
# 等待响应
time.sleep(0.1)
# 读取响应
if ser.in_waiting >= 10: # 假设响应包长度至少10字节
response = ser.read(ser.in_waiting)
print(f"收到响应: {response.hex()}")
# 解析响应
if len(response) >= 3:
# 解析前3个字节
start, cmd, length = struct.unpack('bbb', response[:3])
print(f"响应: 起始={hex(start)}, 命令={hex(cmd)}, 长度={length}")
# 示例2: 发送浮点数
float_value = 3.14159
float_bytes = struct.pack('f', float_value)
ser.write(b'float')
ser.write(float_bytes)
print(f"发送浮点数: {float_value} -> {float_bytes.hex()}")
ser.close()
except exception as e:
print(f"错误: {e}")
# 运行示例
binary_communication_example()
4.4 持续数据读取
import serial
import threading
import time
class serialmonitor:
"""串口监视器,持续读取数据"""
def __init__(self, port, baudrate=9600):
self.ser = serial.serial(port, baudrate, timeout=1)
self.running = false
self.thread = none
def start_monitoring(self):
"""开始监视串口数据"""
if not self.running:
self.running = true
self.thread = threading.thread(target=self._monitor_loop)
self.thread.start()
print("串口监视器已启动")
def _monitor_loop(self):
"""监视循环"""
while self.running:
try:
# 检查是否有数据可读
if self.ser.in_waiting > 0:
# 读取一行数据
line = self.ser.readline()
try:
decoded = line.decode('utf-8').strip()
print(f"[串口数据] {decoded}")
# 这里可以添加数据处理逻辑
self._process_data(decoded)
except unicodedecodeerror:
# 如果是二进制数据,以十六进制显示
print(f"[二进制数据] {line.hex()}")
# 短暂休眠,避免占用过多cpu
time.sleep(0.01)
except exception as e:
print(f"读取数据时出错: {e}")
time.sleep(1)
def _process_data(self, data):
"""处理接收到的数据"""
# 示例:处理特定格式的数据
if data.startswith("temp:"):
temp = data.split(":")[1]
print(f"检测到温度数据: {temp}°c")
elif data.startswith("humi:"):
humi = data.split(":")[1]
print(f"检测到湿度数据: {humi}%")
def send_command(self, command):
"""发送命令"""
try:
if isinstance(command, str):
command = command.encode('utf-8')
self.ser.write(command)
print(f"发送命令: {command}")
except exception as e:
print(f"发送命令失败: {e}")
def stop_monitoring(self):
"""停止监视"""
self.running = false
if self.thread:
self.thread.join()
self.ser.close()
print("串口监视器已停止")
# 使用示例
def monitor_example():
monitor = serialmonitor('com3', 9600)
monitor.start_monitoring()
try:
# 主线程可以继续执行其他操作
for i in range(5):
monitor.send_command(f"get data {i}\n")
time.sleep(2)
# 等待用户输入退出
input("按回车键停止监视...")
finally:
monitor.stop_monitoring()
# 运行示例
monitor_example()
5. 高级功能
5.1 串口扫描和自动重连
import serial
import serial.tools.list_ports
import time
import threading
class smartserialconnection:
"""智能串口连接管理器"""
def __init__(self, port=none, baudrate=9600):
self.port = port
self.baudrate = baudrate
self.ser = none
self.connected = false
self.reconnect_thread = none
self.reconnect_flag = false
def connect(self, port=none):
"""连接到串口"""
if port:
self.port = port
if not self.port:
print("未指定串口号,尝试自动检测...")
self.port = self._auto_detect_port()
if not self.port:
print("无法找到可用串口")
return false
try:
self.ser = serial.serial(
port=self.port,
baudrate=self.baudrate,
timeout=1,
write_timeout=1
)
# 测试连接
time.sleep(2) # 等待设备初始化
self._test_connection()
self.connected = true
print(f"成功连接到 {self.port}")
return true
except serial.serialexception as e:
print(f"连接失败: {e}")
self.connected = false
return false
def _auto_detect_port(self):
"""自动检测串口"""
ports = serial.tools.list_ports.comports()
for port in ports:
print(f"尝试端口: {port.device}")
try:
# 尝试打开端口
test_ser = serial.serial(port.device, self.baudrate, timeout=0.5)
time.sleep(1)
# 发送测试命令
test_ser.write(b'at\r\n')
time.sleep(0.1)
# 检查响应
if test_ser.in_waiting > 0:
response = test_ser.read(test_ser.in_waiting)
print(f"端口 {port.device} 响应: {response}")
test_ser.close()
return port.device
test_ser.close()
except:
continue
return none
def _test_connection(self):
"""测试连接是否正常"""
if not self.ser:
return false
try:
# 清空缓冲区
self.ser.reset_input_buffer()
self.ser.reset_output_buffer()
# 发送测试命令
self.ser.write(b'at\r\n')
time.sleep(0.1)
# 尝试读取响应
if self.ser.in_waiting > 0:
return true
else:
return false
except:
return false
def start_auto_reconnect(self, interval=5):
"""启动自动重连"""
if self.reconnect_thread and self.reconnect_thread.is_alive():
print("自动重连已在运行")
return
self.reconnect_flag = true
self.reconnect_thread = threading.thread(
target=self._reconnect_loop,
args=(interval,),
daemon=true
)
self.reconnect_thread.start()
print("自动重连已启动")
def _reconnect_loop(self, interval):
"""重连循环"""
while self.reconnect_flag:
if not self.connected:
print("尝试重新连接...")
self.connect()
# 定期检查连接状态
if self.connected:
if not self._test_connection():
print("连接断开")
self.connected = false
if self.ser:
self.ser.close()
time.sleep(interval)
def stop_auto_reconnect(self):
"""停止自动重连"""
self.reconnect_flag = false
if self.reconnect_thread:
self.reconnect_thread.join(timeout=2)
print("自动重连已停止")
def send(self, data):
"""发送数据"""
if not self.connected or not self.ser:
print("未连接,无法发送数据")
return false
try:
if isinstance(data, str):
data = data.encode('utf-8')
self.ser.write(data)
return true
except exception as e:
print(f"发送失败: {e}")
self.connected = false
return false
def receive(self, timeout=1):
"""接收数据"""
if not self.connected or not self.ser:
print("未连接,无法接收数据")
return none
try:
# 设置临时超时
original_timeout = self.ser.timeout
self.ser.timeout = timeout
data = self.ser.readline()
# 恢复原始超时
self.ser.timeout = original_timeout
if data:
try:
return data.decode('utf-8').strip()
except:
return data.hex()
else:
return none
except exception as e:
print(f"接收失败: {e}")
return none
def close(self):
"""关闭连接"""
self.stop_auto_reconnect()
self.connected = false
if self.ser:
self.ser.close()
print("连接已关闭")
# 使用示例
def smart_connection_example():
serial_conn = smartserialconnection(baudrate=9600)
# 尝试连接
if serial_conn.connect('com3'):
print("连接成功")
# 启动自动重连
serial_conn.start_auto_reconnect(interval=10)
# 发送和接收数据
for i in range(10):
serial_conn.send(f"test {i}\n")
response = serial_conn.receive()
if response:
print(f"收到: {response}")
time.sleep(1)
# 停止并关闭
serial_conn.close()
else:
print("连接失败")
# 运行示例
smart_connection_example()
5.2 数据协议解析器
import serial
import struct
import crcmod
import time
class serialprotocol:
"""串口协议处理器"""
def __init__(self, ser):
self.ser = ser
self.buffer = bytearray()
# 初始化crc计算
self.crc16 = crcmod.mkcrcfun(0x18005, rev=true, initcrc=0xffff, xorout=0x0000)
def send_packet(self, command, data=none):
"""发送数据包"""
if data is none:
data = bytearray()
# 构建数据包
packet = bytearray()
# 起始字节
packet.append(0xaa)
packet.append(0x55)
# 命令字节
packet.append(command)
# 数据长度
packet.append(len(data))
# 数据
packet.extend(data)
# 计算crc
crc = self.crc16(bytes(packet))
packet.extend(struct.pack('<h', crc))
# 发送数据包
self.ser.write(packet)
print(f"发送数据包: {packet.hex()}")
return packet
def receive_packet(self, timeout=1):
"""接收数据包"""
start_time = time.time()
while time.time() - start_time < timeout:
# 读取可用数据
if self.ser.in_waiting > 0:
self.buffer.extend(self.ser.read(self.ser.in_waiting))
# 尝试解析数据包
packet = self._parse_buffer()
if packet:
return packet
return none
def _parse_buffer(self):
"""解析缓冲区中的数据包"""
# 查找起始字节
start_index = -1
for i in range(len(self.buffer) - 1):
if self.buffer[i] == 0xaa and self.buffer[i+1] == 0x55:
start_index = i
break
if start_index == -1:
return none
# 检查数据包长度
if len(self.buffer[start_index:]) < 6: # 最小包长度
return none
# 获取数据长度
data_length = self.buffer[start_index + 3]
total_length = 4 + data_length + 2 # 头部4字节 + 数据 + crc2字节
# 检查是否接收到完整数据包
if len(self.buffer[start_index:]) < total_length:
return none
# 提取完整数据包
packet = self.buffer[start_index:start_index + total_length]
# 验证crc
crc_received = struct.unpack('<h', packet[-2:])[0]
crc_calculated = self.crc16(packet[:-2])
if crc_received != crc_calculated:
print("crc校验失败")
# 移除错误的数据包
self.buffer = self.buffer[start_index + 1:]
return none
# 解析数据包
parsed = {
'command': packet[2],
'data_length': packet[3],
'data': packet[4:4+data_length],
'crc': crc_received,
'raw': packet
}
# 从缓冲区中移除已处理的数据
self.buffer = self.buffer[start_index + total_length:]
return parsed
def send_command_with_response(self, command, data=none, timeout=1, retries=3):
"""发送命令并等待响应"""
for attempt in range(retries):
print(f"尝试 {attempt + 1}/{retries}")
# 发送命令
self.send_packet(command, data)
# 等待响应
response = self.receive_packet(timeout)
if response:
print(f"收到响应: 命令={response['command']}, 数据={response['data'].hex()}")
return response
print("未收到响应,重试...")
time.sleep(0.5)
print("所有重试均失败")
return none
# 使用示例
def protocol_example():
try:
ser = serial.serial('com3', 115200, timeout=1)
protocol = serialprotocol(ser)
# 示例1: 发送读取传感器命令
response = protocol.send_command_with_response(
command=0x01, # 读取传感器命令
data=bytearray([0x00, 0x01]), # 传感器地址
timeout=2,
retries=3
)
if response:
# 解析传感器数据
if len(response['data']) >= 4:
temperature, humidity = struct.unpack('<hh', response['data'][:4])
temperature = temperature / 10.0
humidity = humidity / 10.0
print(f"温度: {temperature}°c, 湿度: {humidity}%")
# 示例2: 发送控制命令
control_data = bytearray([0x01, 0x01]) # 打开设备
protocol.send_command_with_response(
command=0x02, # 控制命令
data=control_data
)
ser.close()
except exception as e:
print(f"错误: {e}")
# 运行示例
protocol_example()
6. 错误处理和调试
import serial
import time
import logging
# 配置日志
logging.basicconfig(
level=logging.debug,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getlogger(__name__)
class robustserial:
"""健壮的串口通信类,包含错误处理和调试功能"""
def __init__(self, port, baudrate=9600, **kwargs):
self.port = port
self.baudrate = baudrate
self.kwargs = kwargs
self.ser = none
self.error_count = 0
self.max_errors = 10
self.last_error_time = 0
self.error_cooldown = 5 # 错误冷却时间(秒)
def open(self):
"""打开串口"""
try:
logger.info(f"尝试打开串口 {self.port}")
self.ser = serial.serial(
port=self.port,
baudrate=self.baudrate,
**self.kwargs
)
# 检查串口是否真正打开
if not self.ser.is_open:
raise serial.serialexception("串口未成功打开")
logger.info(f"串口 {self.port} 已成功打开")
# 清空缓冲区
self._flush_buffers()
return true
except serial.serialexception as e:
logger.error(f"打开串口失败: {e}")
self._handle_error("open", e)
return false
except exception as e:
logger.error(f"未知错误: {e}")
self._handle_error("unknown", e)
return false
def _flush_buffers(self):
"""清空输入输出缓冲区"""
try:
if self.ser:
self.ser.reset_input_buffer()
self.ser.reset_output_buffer()
logger.debug("缓冲区已清空")
except exception as e:
logger.warning(f"清空缓冲区失败: {e}")
def write(self, data, retries=3):
"""写入数据,带重试机制"""
for attempt in range(retries):
try:
if not self.ser or not self.ser.is_open:
logger.warning("串口未打开,尝试重新打开")
if not self.open():
continue
if isinstance(data, str):
data = data.encode('utf-8')
bytes_written = self.ser.write(data)
logger.debug(f"写入 {bytes_written} 字节: {data[:50]}...")
# 确保数据发送完成
self.ser.flush()
return bytes_written
except serial.serialtimeoutexception:
logger.warning(f"写入超时,尝试 {attempt + 1}/{retries}")
time.sleep(0.1)
except serial.serialexception as e:
logger.error(f"写入失败: {e}")
self._handle_error("write", e)
# 尝试重新打开串口
time.sleep(0.5)
self.open()
except exception as e:
logger.error(f"未知写入错误: {e}")
self._handle_error("write_unknown", e)
break
logger.error(f"写入失败,已重试 {retries} 次")
return 0
def read(self, size=none, timeout=none):
"""读取数据"""
try:
if not self.ser or not self.ser.is_open:
logger.warning("串口未打开")
return none
# 设置临时超时
original_timeout = self.ser.timeout
if timeout is not none:
self.ser.timeout = timeout
if size:
data = self.ser.read(size)
else:
# 读取所有可用数据
data = self.ser.read(self.ser.in_waiting)
# 恢复原始超时
if timeout is not none:
self.ser.timeout = original_timeout
if data:
logger.debug(f"读取 {len(data)} 字节")
return data
else:
return none
except serial.serialexception as e:
logger.error(f"读取失败: {e}")
self._handle_error("read", e)
return none
except exception as e:
logger.error(f"未知读取错误: {e}")
self._handle_error("read_unknown", e)
return none
def readline(self, timeout=none):
"""读取一行"""
try:
if not self.ser or not self.ser.is_open:
logger.warning("串口未打开")
return none
# 设置临时超时
original_timeout = self.ser.timeout
if timeout is not none:
self.ser.timeout = timeout
line = self.ser.readline()
# 恢复原始超时
if timeout is not none:
self.ser.timeout = original_timeout
if line:
logger.debug(f"读取行: {line[:50]}...")
return line
else:
return none
except serial.serialexception as e:
logger.error(f"读取行失败: {e}")
self._handle_error("readline", e)
return none
def _handle_error(self, error_type, error):
"""处理错误"""
current_time = time.time()
# 检查是否在冷却期内
if current_time - self.last_error_time < self.error_cooldown:
return
self.error_count += 1
self.last_error_time = current_time
logger.error(f"错误类型: {error_type}, 错误: {error}")
logger.error(f"错误计数: {self.error_count}/{self.max_errors}")
# 如果错误过多,可能需要采取特殊措施
if self.error_count >= self.max_errors:
logger.critical("错误过多,建议检查硬件连接")
self.error_count = 0 # 重置计数器
def close(self):
"""关闭串口"""
try:
if self.ser and self.ser.is_open:
self.ser.close()
logger.info("串口已关闭")
except exception as e:
logger.error(f"关闭串口失败: {e}")
def get_info(self):
"""获取串口信息"""
if not self.ser:
return "串口未打开"
info = {
"port": self.ser.port,
"baudrate": self.ser.baudrate,
"bytesize": self.ser.bytesize,
"parity": self.ser.parity,
"stopbits": self.ser.stopbits,
"timeout": self.ser.timeout,
"is_open": self.ser.is_open,
"in_waiting": self.ser.in_waiting if self.ser.is_open else 0,
"error_count": self.error_count
}
return info
# 调试工具函数
def debug_serial_port(port, baudrate=9600):
"""调试串口"""
print(f"=== 串口调试: {port} ===")
# 1. 检查端口是否存在
import serial.tools.list_ports
ports = [p.device for p in serial.tools.list_ports.comports()]
if port not in ports:
print(f"警告: 端口 {port} 不在可用端口列表中")
print(f"可用端口: {ports}")
# 2. 尝试打开端口
try:
ser = serial.serial(port, baudrate, timeout=1)
print(f"✓ 端口可以打开")
# 3. 获取端口信息
print(f"端口信息:")
print(f" 名称: {ser.name}")
print(f" 波特率: {ser.baudrate}")
print(f" 数据位: {ser.bytesize}")
print(f" 校验位: {ser.parity}")
print(f" 停止位: {ser.stopbits}")
# 4. 测试读写
print("\n测试读写功能:")
# 清空缓冲区
ser.reset_input_buffer()
ser.reset_output_buffer()
# 发送测试数据
test_data = b'at\r\n'
ser.write(test_data)
print(f" 发送: {test_data}")
# 等待响应
time.sleep(0.1)
# 读取响应
if ser.in_waiting > 0:
response = ser.read(ser.in_waiting)
print(f" 接收: {response}")
print(f" 解码: {response.decode('utf-8', errors='ignore')}")
else:
print(" 警告: 没有收到响应")
# 5. 测试不同波特率
print("\n测试不同波特率:")
baudrates = [9600, 19200, 38400, 57600, 115200]
for baud in baudrates:
try:
ser.baudrate = baud
ser.write(b'at\r\n')
time.sleep(0.1)
if ser.in_waiting > 0:
response = ser.read(ser.in_waiting)
print(f" {baud} bps: 收到响应 ({len(response)} 字节)")
else:
print(f" {baud} bps: 无响应")
except exception as e:
print(f" {baud} bps: 错误 - {e}")
ser.close()
print("\n✓ 调试完成")
except serial.serialexception as e:
print(f"✗ 无法打开端口: {e}")
print("\n可能的解决方案:")
print("1. 检查端口名称是否正确")
print("2. 检查设备是否连接")
print("3. 检查是否有其他程序占用该端口")
print("4. 检查驱动程序是否安装")
print("5. 尝试以管理员权限运行")
except exception as e:
print(f"✗ 未知错误: {e}")
# 使用示例
def debug_example():
# 调试特定端口
debug_serial_port('com3', 9600)
# 使用健壮的串口类
logger.info("开始测试健壮串口类")
robust_ser = robustserial(
port='com3',
baudrate=9600,
timeout=1,
write_timeout=1
)
if robust_ser.open():
# 发送数据
robust_ser.write("hello device!\n")
# 读取响应
response = robust_ser.readline(timeout=2)
if response:
print(f"收到响应: {response.decode('utf-8', errors='ignore')}")
# 获取信息
info = robust_ser.get_info()
print(f"串口信息: {info}")
robust_ser.close()
# 运行示例
debug_example()
7. 实际应用示例
7.1 arduino 通信示例
import serial
import time
import json
from datetime import datetime
class arduinocommunicator:
"""arduino 串口通信类"""
def __init__(self, port, baudrate=9600):
self.port = port
self.baudrate = baudrate
self.ser = none
self.connected = false
def connect(self):
"""连接到 arduino"""
try:
self.ser = serial.serial(self.port, self.baudrate, timeout=1)
time.sleep(2) # 等待 arduino 重启
self.connected = true
print(f"已连接到 arduino ({self.port})")
return true
except exception as e:
print(f"连接失败: {e}")
return false
def send_command(self, command, value=none):
"""发送命令到 arduino"""
if not self.connected:
print("未连接")
return none
try:
# 构建命令字符串
if value is not none:
cmd_str = f"{command}:{value}\n"
else:
cmd_str = f"{command}\n"
# 发送命令
self.ser.write(cmd_str.encode('utf-8'))
print(f"发送命令: {cmd_str.strip()}")
# 读取响应
response = self.ser.readline().decode('utf-8', errors='ignore').strip()
if response:
print(f"arduino 响应: {response}")
return response
except exception as e:
print(f"通信错误: {e}")
return none
def read_sensors(self):
"""读取所有传感器数据"""
# 发送读取命令
self.ser.write(b'read_all\n')
# 等待响应
time.sleep(0.5)
# 读取所有可用数据
data_lines = []
while self.ser.in_waiting > 0:
line = self.ser.readline().decode('utf-8', errors='ignore').strip()
if line:
data_lines.append(line)
# 解析传感器数据
sensors = {}
for line in data_lines:
if ':' in line:
key, value = line.split(':', 1)
try:
# 尝试转换为数值
sensors[key.strip()] = float(value.strip())
except:
sensors[key.strip()] = value.strip()
return sensors
def control_led(self, led_id, state):
"""控制 led"""
state_str = "on" if state else "off"
return self.send_command(f"led{led_id}", state_str)
def read_analog(self, pin):
"""读取模拟引脚值"""
return self.send_command(f"analog{pin}")
def set_pwm(self, pin, value):
"""设置 pwm 值 (0-255)"""
return self.send_command(f"pwm{pin}", str(value))
def close(self):
"""关闭连接"""
if self.ser:
self.ser.close()
self.connected = false
print("连接已关闭")
# arduino 数据采集应用
def arduino_data_logger():
"""arduino 数据记录器"""
arduino = arduinocommunicator('com3', 9600)
if not arduino.connect():
print("无法连接到 arduino")
return
try:
# 创建数据文件
timestamp = datetime.now().strftime("%y%m%d_%h%m%s")
filename = f"arduino_data_{timestamp}.csv"
with open(filename, 'w') as f:
# 写入 csv 头部
f.write("timestamp,temperature,humidity,light,analog0,analog1\n")
print(f"数据将保存到: {filename}")
print("开始数据采集 (按 ctrl+c 停止)...")
# 数据采集循环
while true:
try:
# 读取传感器数据
sensors = arduino.read_sensors()
if sensors:
# 获取时间戳
current_time = datetime.now().strftime("%y-%m-%d %h:%m:%s")
# 准备数据行
data_row = {
'timestamp': current_time,
'temperature': sensors.get('temp', 0),
'humidity': sensors.get('humi', 0),
'light': sensors.get('light', 0),
'analog0': sensors.get('a0', 0),
'analog1': sensors.get('a1', 0)
}
# 显示数据
print(f"[{current_time}] "
f"温度: {data_row['temperature']:.1f}°c, "
f"湿度: {data_row['humidity']:.1f}%, "
f"光照: {data_row['light']}")
# 保存到文件
with open(filename, 'a') as f:
csv_line = f"{data_row['timestamp']},"
csv_line += f"{data_row['temperature']},"
csv_line += f"{data_row['humidity']},"
csv_line += f"{data_row['light']},"
csv_line += f"{data_row['analog0']},"
csv_line += f"{data_row['analog1']}\n"
f.write(csv_line)
# 间隔时间
time.sleep(2)
except keyboardinterrupt:
print("\n数据采集已停止")
break
except exception as e:
print(f"数据采集错误: {e}")
time.sleep(1)
finally:
arduino.close()
print(f"数据已保存到: {filename}")
# 运行示例
arduino_data_logger()
7.2 gps 数据解析示例
import serial
import time
import pynmea2
class gpsreader:
"""gps 数据读取器"""
def __init__(self, port, baudrate=9600):
self.port = port
self.baudrate = baudrate
self.ser = none
self.running = false
def start(self):
"""开始读取 gps 数据"""
try:
self.ser = serial.serial(self.port, self.baudrate, timeout=1)
self.running = true
print("gps 接收器已启动")
# 读取数据
self._read_loop()
except exception as e:
print(f"启动失败: {e}")
def _read_loop(self):
"""读取循环"""
buffer = ""
while self.running:
try:
# 读取数据
if self.ser.in_waiting > 0:
data = self.ser.read(self.ser.in_waiting).decode('utf-8', errors='ignore')
buffer += data
# 处理完整的 nmea 语句
while '\n' in buffer:
line, buffer = buffer.split('\n', 1)
line = line.strip()
if line.startswith('$'):
self._parse_nmea(line)
time.sleep(0.1)
except keyboardinterrupt:
print("\n正在停止 gps 接收器...")
self.stop()
break
except exception as e:
print(f"读取错误: {e}")
time.sleep(1)
def _parse_nmea(self, sentence):
"""解析 nmea 语句"""
try:
msg = pynmea2.parse(sentence)
# 处理不同类型的 nmea 语句
if isinstance(msg, pynmea2.types.talker.rmc):
# 推荐最小定位信息
if msg.status == 'a': # 数据有效
print(f"位置: {msg.latitude:.6f}, {msg.longitude:.6f}")
print(f"速度: {msg.spd_over_grnd} 节, 航向: {msg.true_course}°")
print(f"时间: {msg.datetime}")
elif isinstance(msg, pynmea2.types.talker.gga):
# gps 定位信息
print(f"质量: {msg.gps_qual}, 卫星数: {msg.num_sats}")
print(f"海拔: {msg.altitude} {msg.altitude_units}")
print(f"大地水准面高度: {msg.geo_sep} {msg.geo_sep_units}")
elif isinstance(msg, pynmea2.types.talker.gsa):
# 当前卫星信息
print(f"模式: {msg.mode}, 定位类型: {msg.mode_fix_type}")
print(f"pdop: {msg.pdop}, hdop: {msg.hdop}, vdop: {msg.vdop}")
except pynmea2.parseerror as e:
# 忽略解析错误
pass
except exception as e:
print(f"解析错误: {e}")
def stop(self):
"""停止读取"""
self.running = false
if self.ser:
self.ser.close()
print("gps 接收器已停止")
# 使用示例
def gps_example():
"""gps 数据读取示例"""
gps = gpsreader('com4', 4800) # 典型 gps 波特率为 4800
try:
# 开始读取 gps 数据
gps.start()
except exception as e:
print(f"gps 示例错误: {e}")
# 运行示例
gps_example()
7.3 串口聊天程序
import serial
import threading
import time
import sys
class serialchat:
"""简单的串口聊天程序"""
def __init__(self, port, baudrate=9600):
self.port = port
self.baudrate = baudrate
self.ser = none
self.running = false
self.receive_thread = none
def start(self):
"""开始聊天"""
try:
# 打开串口
self.ser = serial.serial(
port=self.port,
baudrate=self.baudrate,
timeout=0.1
)
self.running = true
# 启动接收线程
self.receive_thread = threading.thread(target=self._receive_messages)
self.receive_thread.daemon = true
self.receive_thread.start()
print(f"=== 串口聊天 ({self.port}) ===")
print("输入消息并按回车发送")
print("输入 'quit' 或 'exit' 退出")
print("=" * 40)
# 发送循环
self._send_loop()
except exception as e:
print(f"启动失败: {e}")
def _receive_messages(self):
"""接收消息线程"""
buffer = ""
while self.running:
try:
if self.ser and self.ser.in_waiting > 0:
data = self.ser.read(self.ser.in_waiting).decode('utf-8', errors='ignore')
buffer += data
# 处理完整行
while '\n' in buffer:
line, buffer = buffer.split('\n', 1)
line = line.strip()
if line:
print(f"\n[接收] {line}")
print("[发送] ", end="", flush=true)
time.sleep(0.01)
except exception as e:
if self.running: # 只有在运行中才报告错误
print(f"接收错误: {e}")
time.sleep(0.1)
def _send_loop(self):
"""发送消息循环"""
try:
while self.running:
# 获取用户输入
message = input("[发送] ")
# 检查退出命令
if message.lower() in ['quit', 'exit', 'q']:
print("正在退出...")
self.stop()
break
# 发送消息
if message and self.ser:
self.ser.write((message + '\n').encode('utf-8'))
except keyboardinterrupt:
print("\n正在退出...")
self.stop()
except exception as e:
print(f"发送错误: {e}")
self.stop()
def stop(self):
"""停止聊天"""
self.running = false
if self.receive_thread:
self.receive_thread.join(timeout=1)
if self.ser:
self.ser.close()
print("聊天已结束")
# 使用示例
def chat_example():
"""串口聊天示例"""
if len(sys.argv) > 1:
port = sys.argv[1]
else:
port = input("请输入串口号 (如 com3): ")
if len(sys.argv) > 2:
baudrate = int(sys.argv[2])
else:
baudrate = int(input("请输入波特率 (默认 9600): ") or "9600")
chat = serialchat(port, baudrate)
chat.start()
if __name__ == "__main__":
chat_example()
总结
本教程涵盖了 pyserial 的主要功能:
- 基础使用:安装、打开串口、基本读写
- 串口配置:参数设置、自动检测端口
- 数据操作:文本和二进制数据通信
- 高级功能:协议处理、错误处理、自动重连
- 实际应用:arduino 通信、gps 解析、聊天程序
最佳实践建议:
- 错误处理:始终使用 try-except 包装串口操作
- 超时设置:合理设置 timeout 和 write_timeout
- 缓冲区管理:定期清空输入输出缓冲区
- 资源清理:确保在程序结束时关闭串口
- 线程安全:在多线程环境中使用适当的锁机制
常见问题解决:
- 无法打开串口:检查端口名称、权限、是否被其他程序占用
- 数据乱码:检查波特率设置、数据编码
- 通信不稳定:检查硬件连接、线缆质量、接地
- 速度慢:调整波特率,优化读写策略
到此这篇关于pyserial 串口通信的多种实现方法的文章就介绍到这了,更多相关pyserial 串口通信内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论