当前位置: 代码网 > it编程>前端脚本>Python > Python使用PyShark分析网络流量的脚本分享

Python使用PyShark分析网络流量的脚本分享

2025年08月21日 Python 我要评论
作为一名网络安全工程师,是常常需要自己编写一些实用的脚本的。下面是一个使用 pyshark 库编写的网络流量分析脚本,专为网络安全工程师设计。这个脚本可以分析 http 和 tcp 流量,并提取关键信

作为一名网络安全工程师,是常常需要自己编写一些实用的脚本的。下面是一个使用 pyshark 库编写的网络流量分析脚本,专为网络安全工程师设计。这个脚本可以分析 http 和 tcp 流量,并提取关键信息用于安全分析。

#!/usr/bin/env python3
"""
网络流量分析工具 - 使用 pyshark 分析 pcap 文件
适用于网络安全工程师进行流量分析和安全审计
"""

import pyshark
import argparse
from collections import defaultdict
import json
import sys

class trafficanalyzer:
    def __init__(self, pcap_file):
        self.pcap_file = pcap_file
        self.http_flows = defaultdict(list)
        self.tcp_flows = defaultdict(list)
        self.suspicious_activities = []
    
    def analyze_http_traffic(self):
        """分析 http 流量"""
        print("[+] 分析 http 流量...")
        try:
            # 创建显示过滤器,只捕获 http 流量
            cap = pyshark.filecapture(self.pcap_file, display_filter='http')
            
            for packet in cap:
                try:
                    if hasattr(packet, 'http'):
                        http_layer = packet.http
                        ip_layer = packet.ip
                        
                        # 提取 http 请求基本信息
                        flow_key = f"{ip_layer.src}:{packet.tcp.srcport} -> {ip_layer.dst}:{packet.tcp.dstport}"
                        
                        http_info = {
                            'timestamp': packet.sniff_time,
                            'method': getattr(http_layer, 'request_method', 'n/a'),
                            'uri': getattr(http_layer, 'request_uri', 'n/a'),
                            'host': getattr(http_layer, 'host', 'n/a'),
                            'user_agent': getattr(http_layer, 'user_agent', 'n/a'),
                            'status_code': getattr(http_layer, 'response_code', 'n/a'),
                            'content_type': getattr(http_layer, 'content_type', 'n/a'),
                            'length': packet.length
                        }
                        
                        self.http_flows[flow_key].append(http_info)
                        
                        # 检测可疑的 http 活动
                        self._detect_suspicious_http(http_info, flow_key)
                        
                except attributeerror as e:
                    # 忽略没有 http 层的包
                    continue
                    
        except exception as e:
            print(f"[-] 分析 http 流量时出错: {e}")
    
    def analyze_tcp_traffic(self):
        """分析 tcp 流量"""
        print("[+] 分析 tcp 流量...")
        try:
            # 创建显示过滤器,只捕获 tcp 流量
            cap = pyshark.filecapture(self.pcap_file, display_filter='tcp')
            
            for packet in cap:
                try:
                    if hasattr(packet, 'tcp'):
                        tcp_layer = packet.tcp
                        ip_layer = packet.ip
                        
                        # 提取 tcp 流基本信息
                        flow_key = f"{ip_layer.src}:{tcp_layer.srcport} -> {ip_layer.dst}:{tcp_layer.dstport}"
                        
                        tcp_info = {
                            'timestamp': packet.sniff_time,
                            'flags': getattr(tcp_layer, 'flags', 'n/a'),
                            'seq': getattr(tcp_layer, 'seq', 'n/a'),
                            'ack': getattr(tcp_layer, 'ack', 'n/a'),
                            'window_size': getattr(tcp_layer, 'window_size_value', 'n/a'),
                            'length': packet.length
                        }
                        
                        self.tcp_flows[flow_key].append(tcp_info)
                        
                        # 检测可疑的 tcp 活动
                        self._detect_suspicious_tcp(tcp_info, flow_key)
                        
                except attributeerror as e:
                    # 忽略没有 tcp 层的包
                    continue
                    
        except exception as e:
            print(f"[-] 分析 tcp 流量时出错: {e}")
    
    def _detect_suspicious_http(self, http_info, flow_key):
        """检测可疑的 http 活动"""
        # 检测可能的目录遍历攻击
        if '..' in http_info['uri'] or '/etc/passwd' in http_info['uri']:
            self.suspicious_activities.append({
                'type': 'http - 可能的目录遍历攻击',
                'flow': flow_key,
                'details': http_info,
                'timestamp': http_info['timestamp']
            })
        
        # 检测 sql 注入特征
        sql_injection_patterns = ['union select', 'select *', 'insert into', 'drop table', '1=1']
        if any(pattern in http_info['uri'].lower() for pattern in sql_injection_patterns):
            self.suspicious_activities.append({
                'type': 'http - 可能的 sql 注入尝试',
                'flow': flow_key,
                'details': http_info,
                'timestamp': http_info['timestamp']
            })
        
        # 检测可疑用户代理
        suspicious_user_agents = ['sqlmap', 'nmap', 'nessus', 'nikto', 'w3af']
        if any(agent in http_info['user_agent'].lower() for agent in suspicious_user_agents):
            self.suspicious_activities.append({
                'type': 'http - 可疑用户代理',
                'flow': flow_key,
                'details': http_info,
                'timestamp': http_info['timestamp']
            })
    
    def _detect_suspicious_tcp(self, tcp_info, flow_key):
        """检测可疑的 tcp 活动"""
        # 检测可能的端口扫描 (多个 syn 包而没有完整握手)
        if tcp_info['flags'] == '0x00000002':  # syn 标志
            self.suspicious_activities.append({
                'type': 'tcp - 可能的端口扫描 (syn)',
                'flow': flow_key,
                'details': tcp_info,
                'timestamp': tcp_info['timestamp']
            })
        
        # 检测可能的网络侦察 (fin 扫描)
        if tcp_info['flags'] == '0x00000001':  # fin 标志
            self.suspicious_activities.append({
                'type': 'tcp - 可能的 fin 扫描',
                'flow': flow_key,
                'details': tcp_info,
                'timestamp': tcp_info['timestamp']
            })
    
    def generate_report(self, output_file=none):
        """生成分析报告"""
        report = {
            'http_flows': dict(self.http_flows),
            'tcp_flows': dict(self.tcp_flows),
            'suspicious_activities': self.suspicious_activities,
            'summary': {
                'total_http_flows': len(self.http_flows),
                'total_tcp_flows': len(self.tcp_flows),
                'total_suspicious_activities': len(self.suspicious_activities)
            }
        }
        
        if output_file:
            with open(output_file, 'w') as f:
                json.dump(report, f, indent=4, default=str)
            print(f"[+] 报告已保存到: {output_file}")
        else:
            print(json.dumps(report, indent=4, default=str))
        
        return report

def main():
    parser = argparse.argumentparser(description='网络流量分析工具')
    parser.add_argument('-f', '--file', required=true, help='pcap 文件路径')
    parser.add_argument('-o', '--output', help='输出报告文件路径 (json 格式)')
    parser.add_argument('--http', action='store_true', help='只分析 http 流量')
    parser.add_argument('--tcp', action='store_true', help='只分析 tcp 流量')
    
    args = parser.parse_args()
    
    if not args.http and not args.tcp:
        # 如果没有指定协议,默认分析所有
        args.http = true
        args.tcp = true
    
    analyzer = trafficanalyzer(args.file)
    
    if args.http:
        analyzer.analyze_http_traffic()
    
    if args.tcp:
        analyzer.analyze_tcp_traffic()
    
    report = analyzer.generate_report(args.output)
    
    # 打印简要摘要
    print("\n[+] 分析完成!")
    print(f"    - http 流数量: {report['summary']['total_http_flows']}")
    print(f"    - tcp 流数量: {report['summary']['total_tcp_flows']}")
    print(f"    - 可疑活动数量: {report['summary']['total_suspicious_activities']}")
    
    if report['summary']['total_suspicious_activities'] > 0:
        print("\n[!] 检测到可疑活动:")
        for activity in report['suspicious_activities']:
            print(f"    - {activity['type']} ({activity['flow']})")

if __name__ == '__main__':
    main()

使用示例

安装依赖:

pip install pyshark

运行脚本:

# 分析所有流量
python traffic_analyzer.py -f capture.pcap

# 只分析 http 流量
python traffic_analyzer.py -f capture.pcap --http

# 只分析 tcp 流量
python traffic_analyzer.py -f capture.pcap --tcp

# 保存报告到文件
python traffic_analyzer.py -f capture.pcap -o report.json
(.venv) (base) liuxiaowei@localhost 内网渗透 % python pyshark分析流量.py -f /users/liuxiaowei/attacker_2025/cap1.pcap --http
[+] 分析 http 流量...
{
    "http_flows": {
        "192.168.1.61:65094 -> 140.207.56.109:80": [
            {
                "timestamp": "2025-08-21 07:10:13.445231",
                "method": "post",
                "uri": "/mmtls/004d48a0",
                "host": "extshort.weixin.qq.com",
                "user_agent": "micromessenger client",
                "status_code": "n/a",
                "content_type": "application/octet-stream",
                "length": "1003"
            }
        ],
        "140.207.56.109:80 -> 192.168.1.61:65094": [
            {
                "timestamp": "2025-08-21 07:10:13.602393",
                "method": "n/a",
                "uri": "n/a",
                "host": "n/a",
                "user_agent": "n/a",
                "status_code": "200",
                "content_type": "application/octet-stream",
                "length": "1236"
            }
        ],
        "192.168.1.61:65095 -> 140.207.56.109:80": [
            {
                "timestamp": "2025-08-21 07:10:13.634277",
                "method": "post",
                "uri": "/mmtls/004d48a0",
                "host": "extshort.weixin.qq.com",
                "user_agent": "micromessenger client",
                "status_code": "n/a",
                "content_type": "application/octet-stream",
                "length": "745"
            }
        ],
        "140.207.56.109:80 -> 192.168.1.61:65095": [
            {
                "timestamp": "2025-08-21 07:10:13.749410",
                "method": "n/a",
                "uri": "n/a",
                "host": "n/a",
                "user_agent": "n/a",
                "status_code": "200",
                "content_type": "application/octet-stream",
                "length": "1437"
            }
        ],
        "192.168.1.61:65096 -> 140.207.56.109:80": [
            {
                "timestamp": "2025-08-21 07:10:16.744114",
                "method": "post",
                "uri": "/mmtls/004e0d95",
                "host": "extshort.weixin.qq.com",
                "user_agent": "micromessenger client",
                "status_code": "n/a",
                "content_type": "application/octet-stream",
                "length": "785"
            }
        ],
        "140.207.56.109:80 -> 192.168.1.61:65096": [
            {
                "timestamp": "2025-08-21 07:10:16.805098",
                "method": "n/a",
                "uri": "n/a",
                "host": "n/a",
                "user_agent": "n/a",
                "status_code": "200",
                "content_type": "application/octet-stream",
                "length": "401"
            }
        ]
    },
    "tcp_flows": {},
    "suspicious_activities": [],
    "summary": {
        "total_http_flows": 6,
        "total_tcp_flows": 0,
        "total_suspicious_activities": 0
    }
}

[+] 分析完成!
    - http 流数量: 6
    - tcp 流数量: 0
    - 可疑活动数量: 0

功能说明

http 流量分析:

  • 提取请求方法、uri、主机、用户代理等信息
  • 检测目录遍历攻击
  • 识别 sql 注入尝试
  • 发现可疑用户代理(如扫描工具)

tcp 流量分析:

  • 分析 tcp 标志位、序列号、确认号等
  • 检测端口扫描活动(syn 扫描)
  • 识别网络侦察活动(fin 扫描)

安全检测:

  • 自动识别多种常见攻击模式
  • 生成详细的安全报告
  • 输出 json 格式的报告便于进一步处理

注意事项

  • 确保已安装 wireshark/tshark,因为 pyshark 依赖于这些工具
  • 处理大型 pcap 文件可能需要较长时间和大量内存
  • 脚本中的检测规则是基础示例,实际环境中可能需要根据具体需求调整和扩展

这个脚本为网络安全工程师提供了一个起点,可以根据实际需求进一步扩展功能,如添加更多协议的解析、实现更复杂的安全检测规则等。

到此这篇关于python使用pyshark分析网络流量的脚本分享的文章就介绍到这了,更多相关python网络流量分析内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com