作为一名网络安全工程师,是常常需要自己编写一些实用的脚本的。下面是一个使用 pyshark 库编写的网络流量分析脚本,专为网络安全工程师设计。这个脚本可以分析 http 和 tcp 流量,并提取关键信息用于安全分析。
#!/usr/bin/env python3 """ 网络流量分析工具 - 使用 pyshark 分析 pcap 文件 适用于网络安全工程师进行流量分析和安全审计 """ import pyshark import argparse from collections import defaultdict import json import sys class trafficanalyzer: def __init__(self, pcap_file): self.pcap_file = pcap_file self.http_flows = defaultdict(list) self.tcp_flows = defaultdict(list) self.suspicious_activities = [] def analyze_http_traffic(self): """分析 http 流量""" print("[+] 分析 http 流量...") try: # 创建显示过滤器,只捕获 http 流量 cap = pyshark.filecapture(self.pcap_file, display_filter='http') for packet in cap: try: if hasattr(packet, 'http'): http_layer = packet.http ip_layer = packet.ip # 提取 http 请求基本信息 flow_key = f"{ip_layer.src}:{packet.tcp.srcport} -> {ip_layer.dst}:{packet.tcp.dstport}" http_info = { 'timestamp': packet.sniff_time, 'method': getattr(http_layer, 'request_method', 'n/a'), 'uri': getattr(http_layer, 'request_uri', 'n/a'), 'host': getattr(http_layer, 'host', 'n/a'), 'user_agent': getattr(http_layer, 'user_agent', 'n/a'), 'status_code': getattr(http_layer, 'response_code', 'n/a'), 'content_type': getattr(http_layer, 'content_type', 'n/a'), 'length': packet.length } self.http_flows[flow_key].append(http_info) # 检测可疑的 http 活动 self._detect_suspicious_http(http_info, flow_key) except attributeerror as e: # 忽略没有 http 层的包 continue except exception as e: print(f"[-] 分析 http 流量时出错: {e}") def analyze_tcp_traffic(self): """分析 tcp 流量""" print("[+] 分析 tcp 流量...") try: # 创建显示过滤器,只捕获 tcp 流量 cap = pyshark.filecapture(self.pcap_file, display_filter='tcp') for packet in cap: try: if hasattr(packet, 'tcp'): tcp_layer = packet.tcp ip_layer = packet.ip # 提取 tcp 流基本信息 flow_key = f"{ip_layer.src}:{tcp_layer.srcport} -> {ip_layer.dst}:{tcp_layer.dstport}" tcp_info = { 'timestamp': packet.sniff_time, 'flags': getattr(tcp_layer, 'flags', 'n/a'), 'seq': getattr(tcp_layer, 'seq', 'n/a'), 'ack': getattr(tcp_layer, 'ack', 'n/a'), 'window_size': getattr(tcp_layer, 'window_size_value', 'n/a'), 'length': packet.length } self.tcp_flows[flow_key].append(tcp_info) # 检测可疑的 tcp 活动 self._detect_suspicious_tcp(tcp_info, flow_key) except attributeerror as e: # 忽略没有 tcp 层的包 continue except exception as e: print(f"[-] 分析 tcp 流量时出错: {e}") def _detect_suspicious_http(self, http_info, flow_key): """检测可疑的 http 活动""" # 检测可能的目录遍历攻击 if '..' in http_info['uri'] or '/etc/passwd' in http_info['uri']: self.suspicious_activities.append({ 'type': 'http - 可能的目录遍历攻击', 'flow': flow_key, 'details': http_info, 'timestamp': http_info['timestamp'] }) # 检测 sql 注入特征 sql_injection_patterns = ['union select', 'select *', 'insert into', 'drop table', '1=1'] if any(pattern in http_info['uri'].lower() for pattern in sql_injection_patterns): self.suspicious_activities.append({ 'type': 'http - 可能的 sql 注入尝试', 'flow': flow_key, 'details': http_info, 'timestamp': http_info['timestamp'] }) # 检测可疑用户代理 suspicious_user_agents = ['sqlmap', 'nmap', 'nessus', 'nikto', 'w3af'] if any(agent in http_info['user_agent'].lower() for agent in suspicious_user_agents): self.suspicious_activities.append({ 'type': 'http - 可疑用户代理', 'flow': flow_key, 'details': http_info, 'timestamp': http_info['timestamp'] }) def _detect_suspicious_tcp(self, tcp_info, flow_key): """检测可疑的 tcp 活动""" # 检测可能的端口扫描 (多个 syn 包而没有完整握手) if tcp_info['flags'] == '0x00000002': # syn 标志 self.suspicious_activities.append({ 'type': 'tcp - 可能的端口扫描 (syn)', 'flow': flow_key, 'details': tcp_info, 'timestamp': tcp_info['timestamp'] }) # 检测可能的网络侦察 (fin 扫描) if tcp_info['flags'] == '0x00000001': # fin 标志 self.suspicious_activities.append({ 'type': 'tcp - 可能的 fin 扫描', 'flow': flow_key, 'details': tcp_info, 'timestamp': tcp_info['timestamp'] }) def generate_report(self, output_file=none): """生成分析报告""" report = { 'http_flows': dict(self.http_flows), 'tcp_flows': dict(self.tcp_flows), 'suspicious_activities': self.suspicious_activities, 'summary': { 'total_http_flows': len(self.http_flows), 'total_tcp_flows': len(self.tcp_flows), 'total_suspicious_activities': len(self.suspicious_activities) } } if output_file: with open(output_file, 'w') as f: json.dump(report, f, indent=4, default=str) print(f"[+] 报告已保存到: {output_file}") else: print(json.dumps(report, indent=4, default=str)) return report def main(): parser = argparse.argumentparser(description='网络流量分析工具') parser.add_argument('-f', '--file', required=true, help='pcap 文件路径') parser.add_argument('-o', '--output', help='输出报告文件路径 (json 格式)') parser.add_argument('--http', action='store_true', help='只分析 http 流量') parser.add_argument('--tcp', action='store_true', help='只分析 tcp 流量') args = parser.parse_args() if not args.http and not args.tcp: # 如果没有指定协议,默认分析所有 args.http = true args.tcp = true analyzer = trafficanalyzer(args.file) if args.http: analyzer.analyze_http_traffic() if args.tcp: analyzer.analyze_tcp_traffic() report = analyzer.generate_report(args.output) # 打印简要摘要 print("\n[+] 分析完成!") print(f" - http 流数量: {report['summary']['total_http_flows']}") print(f" - tcp 流数量: {report['summary']['total_tcp_flows']}") print(f" - 可疑活动数量: {report['summary']['total_suspicious_activities']}") if report['summary']['total_suspicious_activities'] > 0: print("\n[!] 检测到可疑活动:") for activity in report['suspicious_activities']: print(f" - {activity['type']} ({activity['flow']})") if __name__ == '__main__': main()
使用示例
安装依赖:
pip install pyshark
运行脚本:
# 分析所有流量 python traffic_analyzer.py -f capture.pcap # 只分析 http 流量 python traffic_analyzer.py -f capture.pcap --http # 只分析 tcp 流量 python traffic_analyzer.py -f capture.pcap --tcp # 保存报告到文件 python traffic_analyzer.py -f capture.pcap -o report.json
(.venv) (base) liuxiaowei@localhost 内网渗透 % python pyshark分析流量.py -f /users/liuxiaowei/attacker_2025/cap1.pcap --http [+] 分析 http 流量... { "http_flows": { "192.168.1.61:65094 -> 140.207.56.109:80": [ { "timestamp": "2025-08-21 07:10:13.445231", "method": "post", "uri": "/mmtls/004d48a0", "host": "extshort.weixin.qq.com", "user_agent": "micromessenger client", "status_code": "n/a", "content_type": "application/octet-stream", "length": "1003" } ], "140.207.56.109:80 -> 192.168.1.61:65094": [ { "timestamp": "2025-08-21 07:10:13.602393", "method": "n/a", "uri": "n/a", "host": "n/a", "user_agent": "n/a", "status_code": "200", "content_type": "application/octet-stream", "length": "1236" } ], "192.168.1.61:65095 -> 140.207.56.109:80": [ { "timestamp": "2025-08-21 07:10:13.634277", "method": "post", "uri": "/mmtls/004d48a0", "host": "extshort.weixin.qq.com", "user_agent": "micromessenger client", "status_code": "n/a", "content_type": "application/octet-stream", "length": "745" } ], "140.207.56.109:80 -> 192.168.1.61:65095": [ { "timestamp": "2025-08-21 07:10:13.749410", "method": "n/a", "uri": "n/a", "host": "n/a", "user_agent": "n/a", "status_code": "200", "content_type": "application/octet-stream", "length": "1437" } ], "192.168.1.61:65096 -> 140.207.56.109:80": [ { "timestamp": "2025-08-21 07:10:16.744114", "method": "post", "uri": "/mmtls/004e0d95", "host": "extshort.weixin.qq.com", "user_agent": "micromessenger client", "status_code": "n/a", "content_type": "application/octet-stream", "length": "785" } ], "140.207.56.109:80 -> 192.168.1.61:65096": [ { "timestamp": "2025-08-21 07:10:16.805098", "method": "n/a", "uri": "n/a", "host": "n/a", "user_agent": "n/a", "status_code": "200", "content_type": "application/octet-stream", "length": "401" } ] }, "tcp_flows": {}, "suspicious_activities": [], "summary": { "total_http_flows": 6, "total_tcp_flows": 0, "total_suspicious_activities": 0 } } [+] 分析完成! - http 流数量: 6 - tcp 流数量: 0 - 可疑活动数量: 0
功能说明
http 流量分析:
- 提取请求方法、uri、主机、用户代理等信息
- 检测目录遍历攻击
- 识别 sql 注入尝试
- 发现可疑用户代理(如扫描工具)
tcp 流量分析:
- 分析 tcp 标志位、序列号、确认号等
- 检测端口扫描活动(syn 扫描)
- 识别网络侦察活动(fin 扫描)
安全检测:
- 自动识别多种常见攻击模式
- 生成详细的安全报告
- 输出 json 格式的报告便于进一步处理
注意事项
- 确保已安装 wireshark/tshark,因为 pyshark 依赖于这些工具
- 处理大型 pcap 文件可能需要较长时间和大量内存
- 脚本中的检测规则是基础示例,实际环境中可能需要根据具体需求调整和扩展
这个脚本为网络安全工程师提供了一个起点,可以根据实际需求进一步扩展功能,如添加更多协议的解析、实现更复杂的安全检测规则等。
到此这篇关于python使用pyshark分析网络流量的脚本分享的文章就介绍到这了,更多相关python网络流量分析内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论