python写了一个监控端口服务,自动重启的脚本
为了防止以后找不到代码,特别记录一下
import os import sys import threading import time import socket import subprocess def check_port_and_run_script(port, script_path): # 创建一个socket对象 sock = socket.socket(socket.af_inet, socket.sock_stream) # 设置超时时间 sock.settimeout(1) # 尝试连接到指定的端口 result = sock.connect_ex(('localhost', port)) # 如果连接成功,端口正在被使用 if result == 0: print(f"端口 {port} 正在被使用。") else: print(f"端口 {port} 未被使用,正在执行 {script_path} 脚本...") # 执行a.sh脚本 subprocess.run(['d:/software/git/bin/bash.exe', script_path]) # subprocess.run(['bash', script_path]) # 关闭socket sock.close() def read_cof_file(): current_directory = os.getcwd() # 使用glob查找所有.doc文件 if os.path.exists(os.path.join(current_directory, 'shany.txt')): with open(os.path.join(current_directory, 'shany.txt'), 'r', encoding='utf-8') as file: # 逐行读取文件内容 for line in file: if 'shany-end' in line: print('定时任务已结束') sys.exit(0) elif ':=>' in line: parts = line.split(":=>") check_port_and_run_script(int(parts[0]), parts[1]) def timer_task(): print("定时器任务执行了!") # check_port_and_run_script(8686, 'd:/workspace/shany/simulator/target/start.sh') read_cof_file() set_timer(20) def set_timer(interval): timer = threading.timer(interval, timer_task) timer.start() if __name__ == '__main__': # 首次设置定时器 set_timer(20) # 主线程可以继续执行其他任务,或者简单地等待定时器执行 try: while true: time.sleep(1) # 防止主线程立即退出,可以按需调整或添加其他逻辑 except keyboardinterrupt: print("程序被用户中断")
用来监控的配置文件,shany.txt 内容如下
8686:=>d:/workspace/shany/simulator/target/start.sh
其中8686是端口号, :=> 作为分隔符 ,后面地址对应脚本的所在路径
方法补充
下面小编为大家整理了一些其他python进行监控端口的方法,希望对大家有所帮助
python3使用 socket 库监控端口
import socket def monitor_port(port_number): # 创建一个 socket 对象 sock = socket.socket(socket.af_inet, socket.sock_stream) try: # 尝试连接到指定的端口 sock.connect(('localhost', port_number)) print(f"port {port_number} is active.") except socket.error as e: print(f"error occurred while monitoring port {port_number} - {str(e)}") finally: # 关闭 socket sock.close() # 监控 8001 端口 monitor_port(8001)
python监听网络端口号程序
import tkinter as tk from tkinter import messagebox import socket import time def check_port(host, port): sock = socket.socket(socket.af_inet, socket.sock_stream) sock.settimeout(5) # 设置超时时间 result = sock.connect_ex((host, port)) sock.close() # 在检查后关闭 socket if result == 0: return f"端口 {port} 是开放的。" else: return f"端口 {port} 是关闭的。" def on_check(): global host, ports, time_interval, port_success_counts, port_failure_counts host = entry_host.get() ports = entry_port.get().split(",") # 以逗号分隔多个端口号 try: time_interval = int(entry_timeset.get()) if time_interval <= 0: raise valueerror("时间间隔必须大于0") for port in ports: port = int(port) if not (0 <= port <= 65535): raise valueerror(f"端口号 {port} 必须在 0 到 65535 之间") # 初始化成功和失败计数 port_success_counts = {int(port): 0 for port in ports} port_failure_counts = {int(port): 0 for port in ports} start_monitoring() except valueerror as e: messagebox.showerror("错误", str(e)) def start_monitoring(): global host, ports, time_interval, port_success_counts, port_failure_counts for port in ports: port = int(port) result = check_port(host, port) timestamp = time.strftime("%y-%m-%d %h:%m:%s", time.localtime()) text_result.insert(tk.end, f"{timestamp}: {result}\n") # 更新成功和失败计数 if "开放" in result: port_success_counts[port] += 1 else: port_failure_counts[port] += 1 update_port_counts() root.after(time_interval * 1000, start_monitoring) # 定时监测 def update_port_counts(): text_port_counts.delete(1.0, tk.end) # 清空文本框 for port, success_count in port_success_counts.items(): failure_count = port_failure_counts[port] text_port_counts.insert(tk.end, f"端口 {port} 成功: {success_count}, 失败: {failure_count}\n") # 创建主窗口 root = tk.tk() root.title("端口监测工具") root.geometry("500x400") # 设置窗口大小 # 创建标签 label_host = tk.label(root, text="主机:") label_host.grid(row=0, column=0) label_port = tk.label(root, text="端口:") label_port.grid(row=1, column=0) label_port2 = tk.label(root, text="(0~65535, 用逗号分隔)") label_port2.grid(row=1, column=2) label_timeset = tk.label(root, text="时间 (间隔):") label_timeset.grid(row=2, column=0) label_timeset2 = tk.label(root, text="(单位:秒)") label_timeset2.grid(row=2, column=2) # 创建输入框 entry_host = tk.entry(root) entry_host.grid(row=0, column=1) entry_port = tk.entry(root) entry_port.grid(row=1, column=1) entry_timeset = tk.entry(root) entry_timeset.grid(row=2, column=1) # 创建按钮 button_check = tk.button(root, text="检查端口", command=on_check) button_check.grid(row=3, columnspan=3) # 创建文本框显示端口成功和失败次数 text_port_counts = tk.text(root, height=5, width=50) text_port_counts.grid(row=5, column=0, columnspan=3, sticky=tk.nsew) # 创建滚动条 scrollbar = tk.scrollbar(root) scrollbar.grid(row=4, column=3, sticky=tk.ns) # 创建文本框并关联滚动条 text_result = tk.text(root, height=10, width=50, yscrollcommand=scrollbar.set) text_result.grid(row=4, column=0, columnspan=3, sticky=tk.nsew) # 配置滚动条 scrollbar.config(command=text_result.yview) # 初始化全局计数器 port_success_counts = {} port_failure_counts = {} # 启动主循环 root.mainloop()
到此这篇关于python脚本实现定时监控端口的文章就介绍到这了,更多相关python监控端口内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论