概述:为什么需要专业的串口调试工具
在嵌入式开发、物联网设备调试过程中,串口通信是最基础的调试手段。但系统自带的串口工具功能简陋,商业软件又价格昂贵。本文将带你用python+ttkbootstrap打造一款高颜值、多功能的串口调试助手,具备以下亮点功能:
核心功能亮点:
- 现代化ui界面 - 基于ttkbootstrap的多主题切换
- 实时数据统计 - 发送/接收字节计数
- 自动发送功能 - 可配置间隔时间
- 发送历史记录 - 支持上下箭头导航
- 数据持久化 - 接收内容保存为文件
- 自动端口检测 - 实时监控串口热插拔
项目架构设计
1.1 技术栈选型
import serial # 串口通信核心库 import serial.tools.list_ports # 串口设备枚举 import threading # 多线程处理 import queue # 线程安全队列 import ttkbootstrap as ttk # 现代化ui框架 from tkinter import filedialog # 文件对话框
1.2 关键类说明
serialtool:主控制类,采用mvc设计模式
数据层:serial_port管理物理连接
视图层:create_widgets()构建界面
控制层:事件处理方法群
1.3 线程模型
定时轮询异步推送定时触发主线程接收队列接收子线程自动发送任务
环境配置指南
2.1 基础环境
# 必需库安装 pip install pyserial ttkbootstrap
2.2 硬件准备
任意usb转串口设备(如ch340、cp2102等)
开发板或目标设备
2.3 兼容性说明
支持windows/macos/linux
测试python版本:3.8+
核心功能实现
3.1 串口通信核心
def open_serial(self): # 参数映射转换 parity_map = { 'none': serial.parity_none, 'even': serial.parity_even, # ...其他校验位映射 } self.serial_port = serial.serial( port=self.port_cb.get(), baudrate=int(self.baudrate_cb.get()), parity=parity_map[self.parity_cb.get()], timeout=0.1 # 非阻塞读取 )
3.2 多线程数据处理
def receive_worker(self): while not self.receive_thread_event.is_set(): try: # 非阻塞读取 if self.serial_port.in_waiting > 0: data = self.serial_port.read(self.serial_port.in_waiting) self.receive_queue.put(data) except serial.serialexception: break
3.3 自动发送机制
def auto_send_task(self): if self.auto_send_flag: try: interval = int(self.interval_entry.get()) self.send_data() # 执行发送 self.master.after(interval, self.auto_send_task) # 定时循环 except valueerror: self.auto_var.set(false)
ui界面详解
4.1 三栏式布局
main_frame = ttk.frame(self.master) left_frame = ttk.labelframe(main_frame, text="串口配置") # 左侧配置区 send_frame = ttk.labelframe(right_frame, text="数据发送") # 右上发送区 recv_frame = ttk.labelframe(right_frame, text="数据接收") # 右下接收区
4.2 主题切换实现
def change_theme(self): selected_theme = self.theme_cb.get() self.style.theme_use(selected_theme) # 动态切换主题
4.3 控件亮点功能
历史记录导航:通过<up>/<down>键遍历
智能滚动文本框:自动滚动到最新内容
状态栏提示:实时显示连接状态
运行效果展示
5.1 主题切换演示
5.2 数据收发演示
[2023-08-20 14:25:36] [send] at+gmr
[2023-08-20 14:25:36] at version:2.1.0.0-dev
5.3 统计功能展示
发送: 2456 字节 | 接收: 18923 字节
源码下载
import serial import serial.tools.list_ports import threading import queue import os import time import ttkbootstrap as ttk from ttkbootstrap.constants import * from ttkbootstrap.dialogs import messagebox from ttkbootstrap.scrolled import scrolledtext from tkinter import booleanvar, stringvar, intvar import platform from tkinter import filedialog import json class serialtool: def __init__(self, master): self.master = master self.master.title("串口调试助手") self.master.geometry("900x520") self.master.resizable(false, false) # 禁止调整窗口大小 self.master.update() # 强制应用尺寸限制 # 初始化样式 self.style = ttk.style(theme='cosmo') # 配置边框线为纯黑色的样式 self.style.configure('blackborder.tlabelframe', bordercolor='#d3d3d3', relief='solid', borderwidth=1) # 串口参数 self.serial_port = none self.receive_queue = queue.queue() self.auto_send_flag = false self.send_count = 0 self.receive_count = 0 self.receive_thread = none self.receive_thread_event = threading.event() # 用于控制接收线程的事件 # 发送历史记录 self.send_history = [] self.history_index = -1 # 自动检测串口变化 self.last_port_count = 0 # 创建界面 self.create_widgets() self.refresh_ports() self.master.after(100, self.process_queue) self.check_ports_change() # 开始检测串口变化 def create_widgets(self): """创建三栏式布局""" main_frame = ttk.frame(self.master) main_frame.pack(fill=both, expand=true, padx=10, pady=10) # 主题切换控件 theme_frame = ttk.frame(self.master) theme_frame.pack(fill=x, padx=10, pady=(0, 5)) ttk.label(theme_frame, text="主题:").pack(side=left, padx=5) self.theme_cb = ttk.combobox( theme_frame, values=sorted(ttk.style().theme_names()), state='readonly' ) self.theme_cb.pack(side=left, padx=5) self.theme_cb.set('cosmo') self.theme_cb.bind('<<comboboxselected>>', self.change_theme) # 左侧串口配置区 left_frame = ttk.labelframe(main_frame, text="串口配置", padding=15, style='blackborder.tlabelframe') left_frame.grid(row=0, column=0, sticky=nsew, padx=5, pady=5) # 右侧上下分区 right_frame = ttk.frame(main_frame) right_frame.grid(row=0, column=1, sticky=nsew, padx=5, pady=5) # 发送区(右上) send_frame = ttk.labelframe(right_frame, text="数据发送", padding=15, style='blackborder.tlabelframe') send_frame.pack(fill=both, expand=true, side=top) # 接收区(右下) recv_frame = ttk.labelframe(right_frame, text="数据接收", padding=15, style='blackborder.tlabelframe') recv_frame.pack(fill=both, expand=true, side=top) # 配置网格权重 main_frame.columnconfigure(1, weight=1) main_frame.rowconfigure(0, weight=1) right_frame.rowconfigure(1, weight=1) # 创建各区域组件 self.create_serial_controls(left_frame) self.create_send_controls(send_frame) self.create_recv_controls(recv_frame) # 状态栏 self.status_var = stringvar(value="就绪") ttk.label(self.master, textvariable=self.status_var, bootstyle=(secondary, inverse)).pack(fill=x, side=bottom) def change_theme(self, event=none): """切换主题""" selected_theme = self.theme_cb.get() self.style.theme_use(selected_theme) def create_serial_controls(self, parent): """串口参数控件""" param_frame = ttk.frame(parent) param_frame.pack(fill=x) # 串口号 ttk.label(param_frame, text="com端口:").grid(row=0, column=0, padx=5, pady=5, sticky=w) self.port_cb = ttk.combobox(param_frame, width=15) self.port_cb.grid(row=0, column=1, padx=5, pady=5) # 波特率 ttk.label(param_frame, text="波特率:").grid(row=1, column=0, padx=5, pady=5, sticky=w) self.baudrate_cb = ttk.combobox(param_frame, values=[ '9600', '115200', '57600', '38400', '19200', '14400', '4800', '2400', '1200' ], width=15) self.baudrate_cb.set('9600') self.baudrate_cb.grid(row=1, column=1, padx=5, pady=5) # 校验位 ttk.label(param_frame, text="校验位:").grid(row=2, column=0, padx=5, pady=5, sticky=w) self.parity_cb = ttk.combobox(param_frame, values=[ 'none', 'even', 'odd', 'mark', 'space' ], width=15) self.parity_cb.set('none') self.parity_cb.grid(row=2, column=1, padx=5, pady=5) # 数据位 ttk.label(param_frame, text="数据位:").grid(row=3, column=0, padx=5, pady=5, sticky=w) self.databits_cb = ttk.combobox(param_frame, values=['8', '7', '6', '5'], width=15) self.databits_cb.set('8') self.databits_cb.grid(row=3, column=1, padx=5, pady=5) # 停止位 ttk.label(param_frame, text="停止位:").grid(row=4, column=0, padx=5, pady=5, sticky=w) self.stopbits_cb = ttk.combobox(param_frame, values=['1', '1.5', '2'], width=15) self.stopbits_cb.set('1') self.stopbits_cb.grid(row=4, column=1, padx=5, pady=5) # 操作按钮 # 按钮容器 btn_frame = ttk.frame(parent) btn_frame.pack(pady=10, fill=x) # 配置网格列权重实现自动伸缩 btn_frame.columnconfigure((0, 1, 2), weight=1, uniform='btns') # uniform 确保列宽一致 # 刷新按钮 ttk.button( btn_frame, text="刷新端口", command=self.refresh_ports, bootstyle=outline ).grid(row=0, column=0, padx=5, sticky="ew") # 连接按钮 self.conn_btn = ttk.button( btn_frame, text="打开串口", command=self.toggle_connection, bootstyle=outline + success ) self.conn_btn.grid(row=0, column=1, padx=5, sticky="ew") # 手动发送按钮(移动到此处) ttk.button( btn_frame, text="手动发送", command=self.send_data, bootstyle=outline + primary ).grid(row=0, column=2, padx=5, sticky="ew") def create_send_controls(self, parent): """发送区控件""" # 自动发送设置 auto_frame = ttk.frame(parent) auto_frame.pack(fill=x, pady=5) self.auto_var = booleanvar() ttk.checkbutton(auto_frame, text="自动发送", variable=self.auto_var, command=self.toggle_auto_send).pack(side=left) ttk.label(auto_frame, text="间隔(ms):").pack(side=left, padx=5) self.interval_entry = ttk.entry(auto_frame, width=8) self.interval_entry.insert(0, "1000") self.interval_entry.pack(side=left) # 发送内容 self.send_text = scrolledtext(parent, height=4, autohide=true) self.send_text.pack(fill=both, expand=true) # 绑定上下箭头键用于历史记录导航 self.send_text.bind("<up>", self.prev_history) self.send_text.bind("<down>", self.next_history) def create_recv_controls(self, parent): """接收区控件""" # 接收显示 self.recv_text = scrolledtext(parent, height=5, autohide=true) self.recv_text.pack(fill=both, expand=true) # 统计栏 stat_frame = ttk.frame(parent) stat_frame.pack(fill=x, pady=5) ttk.label(stat_frame, text="发送:").pack(side=left, padx=5) self.send_label = ttk.label(stat_frame, text="0") self.send_label.pack(side=left) ttk.label(stat_frame, text="接收:").pack(side=left, padx=10) self.recv_label = ttk.label(stat_frame, text="0") self.recv_label.pack(side=left) # 添加保存接收按钮 ttk.button(stat_frame, text="保存接收", command=self.save_received, bootstyle=outline + info).pack(side=right, padx=5) ttk.button(stat_frame, text="清空", command=self.clear_received, bootstyle=outline + warning).pack(side=right) def refresh_ports(self): """刷新端口列表""" try: ports = [p.device for p in serial.tools.list_ports.comports()] self.port_cb['values'] = ports self.status_var.set(f"自动检测到主板有{len(ports)} 个串口可用,请注意选择正确的。") self.last_port_count = len(ports) except exception as e: print(f"error refreshing ports: {e}") self.status_var.set(f"刷新端口时出错: {e}") def check_ports_change(self): """检查串口变化""" current_count = len(list(serial.tools.list_ports.comports())) if current_count != self.last_port_count: self.refresh_ports() self.master.after(1000, self.check_ports_change) # 每秒检查一次 def toggle_connection(self): """切换连接状态""" if self.serial_port and self.serial_port.is_open: self.close_serial() else: self.open_serial() def open_serial(self): """打开串口""" try: port = self.port_cb.get() if not port: raise valueerror("请选择串口") parity_map = { 'none': serial.parity_none, 'even': serial.parity_even, 'odd': serial.parity_odd, 'mark': serial.parity_mark, 'space': serial.parity_space } self.serial_port = serial.serial( port=port, baudrate=int(self.baudrate_cb.get()), parity=parity_map[self.parity_cb.get()], bytesize=int(self.databits_cb.get()), stopbits=float(self.stopbits_cb.get()), timeout=0.1 ) self.conn_btn.configure(text="关闭串口", bootstyle=outline + success) self.status_var.set(f"已连接 {port}") self.receive_thread_event.clear() # 清除事件标志 self.receive_thread = threading.thread(target=self.receive_worker, daemon=true) self.receive_thread.start() except exception as e: messagebox.show_error(f"主板上没有这个串口或你选的被测端口跟主板端口不对应,请在设备管理器中确认正确的端口: {str(e)}", "错误") self.status_var.set("连接失败") def close_serial(self): """关闭串口""" self.receive_thread_event.set() # 设置事件标志,通知接收线程停止 if self.receive_thread and self.receive_thread.is_alive(): self.receive_thread.join() # 等待接收线程结束 if self.serial_port: try: self.serial_port.close() except exception as e: print(f"关闭串口时出错: {e}") self.conn_btn.configure(text="打开串口", bootstyle=danger) self.status_var.set("已断开连接") def receive_worker(self): """接收线程工作函数""" while not self.receive_thread_event.is_set() and self.serial_port and self.serial_port.is_open: try: if self.serial_port.in_waiting > 0: data = self.serial_port.read(self.serial_port.in_waiting) self.receive_queue.put(data) except exception as e: print(f"接收错误: {e}") break def process_queue(self): """处理接收队列""" while not self.receive_queue.empty(): data = self.receive_queue.get() self.display_received(data) self.receive_count += len(data) self.recv_label.configure(text=str(self.receive_count)) self.master.after(100, self.process_queue) def display_received(self, data): """显示接收数据(带时间戳)""" timestamp = time.strftime("[%y-%m-%d %h:%m:%s] ", time.localtime()) try: text = data.decode('utf-8') self.recv_text.insert(end, timestamp + text + '\n') self.recv_text.see(end) except unicodedecodeerror: self.recv_text.insert(end, timestamp + data.hex(' ') + '\n') self.recv_text.see(end) def toggle_auto_send(self): """切换自动发送""" self.auto_send_flag = self.auto_var.get() if self.auto_send_flag: self.auto_send_task() def auto_send_task(self): """自动发送任务""" if self.auto_send_flag and self.serial_port and self.serial_port.is_open: try: interval = int(self.interval_entry.get()) self.send_data() self.master.after(interval, self.auto_send_task) except valueerror: self.auto_var.set(false) messagebox.show_error("无效的间隔时间", "错误") def send_data(self): """发送数据""" if not self.serial_port or not self.serial_port.is_open: messagebox.show_warning("请先打开串口", "警告") return data = self.send_text.get(1.0, end).strip() if not data: return try: # 添加到历史记录 if data and (not self.send_history or data != self.send_history[0]): self.send_history.insert(0, data) if len(self.send_history) > 20: # 限制历史记录数量 self.send_history.pop() self.history_index = -1 # 重置历史索引 self.serial_port.write(data.encode('utf-8')) self.send_count += len(data) self.send_label.configure(text=str(self.send_count)) # 显示发送的数据(带时间戳) timestamp = time.strftime("[%y-%m-%d %h:%m:%s] ", time.localtime()) self.recv_text.insert(end, f"{timestamp}[send] {data}\n") self.recv_text.see(end) except exception as e: messagebox.show_error(f"发送失败: {str(e)}", "错误") def prev_history(self, event): """上一条历史记录""" if self.send_history: if self.history_index < len(self.send_history) - 1: self.history_index += 1 self.send_text.delete(1.0, end) self.send_text.insert(end, self.send_history[self.history_index]) return "break" def next_history(self, event): """下一条历史记录""" if self.history_index > 0: self.history_index -= 1 self.send_text.delete(1.0, end) self.send_text.insert(end, self.send_history[self.history_index]) elif self.history_index == 0: self.history_index = -1 self.send_text.delete(1.0, end) return "break" def save_received(self): """保存接收内容到文件""" filename = filedialog.asksaveasfilename( defaultextension=".txt", filetypes=[("text files", "*.txt"), ("all files", "*.*")] ) if filename: try: with open(filename, 'w', encoding='utf-8') as f: f.write(self.recv_text.get(1.0, end)) self.status_var.set(f"接收内容已保存到 {filename}") except exception as e: messagebox.show_error(f"保存文件失败: {str(e)}", "错误") def clear_received(self): """清空接收区""" self.recv_text.delete(1.0, end) self.receive_count = 0 self.recv_label.configure(text="0") self.send_text.delete(1.0, end) self.send_count = 0 self.send_label.configure(text="0") def on_closing(self): """安全关闭程序""" # 停止自动发送循环 self.auto_send_flag = false # 关闭串口连接 self.close_serial() # 确保完全退出 self.master.quit() # 终止mainloop self.master.destroy() # 销毁所有tkinter对象 self.master.after(500, self.force_exit) # 500ms后强制退出 def force_exit(self): """最终退出保障""" import os os._exit(0) # 强制终止进程 if __name__ == "__main__": root = ttk.window() app = serialtool(root) root.protocol("wm_delete_window", app.on_closing) root.mainloop()
总结与扩展
7.1 项目总结
- 采用生产者-消费者模式处理串口数据
- 通过队列实现线程间安全通信
- 现代化ui提升使用体验
7.2 扩展方向
- 增加协议解析功能(modbus/at指令等)
- 实现数据图表可视化
- 添加插件系统支持
以上就是利用python调试串口的示例代码的详细内容,更多关于python调试串口的资料请关注代码网其它相关文章!
发表评论