当前位置: 代码网 > it编程>前端脚本>Python > Python实现快速扫描目标主机的开放端口和服务

Python实现快速扫描目标主机的开放端口和服务

2025年12月04日 Python 我要评论
功能介绍这是一个功能强大的端口扫描器脚本,能够快速扫描目标主机的开放端口和服务。该脚本具备以下核心功能:多种扫描模式:支持tcp连接扫描、syn扫描、udp扫描等多种扫描方式端口范围自定义:支持扫描单

功能介绍

这是一个功能强大的端口扫描器脚本,能够快速扫描目标主机的开放端口和服务。该脚本具备以下核心功能:

  • 多种扫描模式:支持tcp连接扫描、syn扫描、udp扫描等多种扫描方式
  • 端口范围自定义:支持扫描单个端口、端口范围或常见端口列表
  • 服务识别:识别常见端口对应的服务类型和版本信息
  • 并发扫描:使用多线程技术提高扫描效率
  • 详细报告生成:生成详细的扫描报告,包括开放端口、服务信息、响应时间等
  • 结果导出:支持将扫描结果导出为json、csv、xml等多种格式
  • 灵活配置:支持自定义超时时间、并发线程数、扫描延迟等参数
  • 安全防护:内置扫描速率限制,避免对目标系统造成过大压力

场景应用

1. 网络安全审计

  • 识别网络设备和服务器的开放端口
  • 发现潜在的安全漏洞和未授权服务
  • 验证防火墙规则的有效性
  • 为渗透测试提供基础信息

2. 系统管理维护

  • 检查服务器端口配置是否符合安全策略
  • 确认服务是否正常运行在指定端口
  • 排查网络连接问题
  • 监控端口变化情况

3. 网络故障排查

  • 诊断网络连通性问题
  • 确定服务是否可达
  • 分析端口阻塞原因
  • 验证网络设备状态

4. 合规性检查

  • 满足等保测评中的端口检查要求
  • 符合行业安全规范
  • 提供合规性审计证据
  • 支持安全管理制度落实

报错处理

1. 网络连接异常

try:
    sock = socket.socket(socket.af_inet, socket.sock_stream)
    sock.settimeout(timeout)
    result = sock.connect_ex((host, port))
    sock.close()
except socket.gaierror:
    logger.error(f"无法解析主机名: {host}")
except socket.timeout:
    logger.warning(f"连接 {host}:{port} 超时")
except connectionrefusederror:
    logger.info(f"端口 {port} 被拒绝连接")
except permissionerror:
    logger.error(f"无权限扫描端口 {port},可能需要管理员权限")

2. 参数验证错误

if not is_valid_ip(target):
    raise valueerror(f"无效的ip地址: {target}")

if port < 1 or port > 65535:
    raise valueerror(f"端口号超出有效范围: {port}")

3. 文件操作异常

try:
    with open(output_file, 'w') as f:
        json.dump(scan_results, f, indent=2)
except permissionerror:
    logger.error(f"无权限写入文件: {output_file}")
except ioerror as e:
    logger.error(f"文件写入错误: {str(e)}")

4. 内存不足异常

try:
    # 大规模扫描时的内存管理
    if len(ports) > 10000:
        logger.warning("扫描端口数量过多,可能消耗大量内存")
except memoryerror:
    logger.error("内存不足,无法完成扫描")
    sys.exit(1)

代码实现

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
端口扫描器
功能:扫描目标主机的开放端口和服务
作者:cline
版本:1.0
"""

import socket
import argparse
import sys
import json
import csv
import xml.etree.elementtree as et
from datetime import datetime
import threading
from concurrent.futures import threadpoolexecutor, as_completed
import time
import logging
import ipaddress

# 配置日志
logging.basicconfig(
    level=logging.info,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.filehandler('port_scanner.log'),
        logging.streamhandler(sys.stdout)
    ]
)
logger = logging.getlogger(__name__)

# 常见端口和服务映射
common_ports = {
    21: 'ftp',
    22: 'ssh',
    23: 'telnet',
    25: 'smtp',
    53: 'dns',
    80: 'http',
    110: 'pop3',
    143: 'imap',
    443: 'https',
    993: 'imaps',
    995: 'pop3s',
    1433: 'mssql',
    1521: 'oracle',
    3306: 'mysql',
    3389: 'rdp',
    5432: 'postgresql',
    6379: 'redis',
    8080: 'http-alt',
    8443: 'https-alt',
    27017: 'mongodb'
}

class portscanner:
    def __init__(self, config):
        self.target = config['target']
        self.ports = config['ports']
        self.scan_type = config.get('scan_type', 'tcp')  # tcp, syn, udp
        self.timeout = config.get('timeout', 1.0)
        self.threads = config.get('threads', 100)
        self.delay = config.get('delay', 0)  # 扫描延迟(毫秒)
        self.output_format = config.get('output_format', 'json')
        self.output_file = config.get('output_file', 'scan_results.json')
        self.verbose = config.get('verbose', false)
        
        # 扫描结果
        self.results = {
            'target': self.target,
            'scan_time': datetime.now().isoformat(),
            'scan_type': self.scan_type,
            'open_ports': [],
            'closed_ports': [],
            'filtered_ports': []
        }
        
    def is_valid_ip(self, ip):
        """验证ip地址格式"""
        try:
            ipaddress.ip_address(ip)
            return true
        except valueerror:
            return false
            
    def resolve_hostname(self, hostname):
        """解析主机名到ip地址"""
        try:
            ip = socket.gethostbyname(hostname)
            return ip
        except socket.gaierror:
            logger.error(f"无法解析主机名: {hostname}")
            return none
            
    def scan_port(self, port):
        """扫描单个端口"""
        host = self.target
        if not self.is_valid_ip(host):
            host = self.resolve_hostname(host)
            if not host:
                return none
                
        # 添加扫描延迟
        if self.delay > 0:
            time.sleep(self.delay / 1000.0)
            
        try:
            if self.scan_type == 'tcp':
                return self.tcp_connect_scan(host, port)
            elif self.scan_type == 'syn':
                return self.syn_scan(host, port)
            elif self.scan_type == 'udp':
                return self.udp_scan(host, port)
            else:
                logger.error(f"不支持的扫描类型: {self.scan_type}")
                return none
                
        except exception as e:
            logger.error(f"扫描端口 {port} 时发生错误: {str(e)}")
            return none
            
    def tcp_connect_scan(self, host, port):
        """tcp连接扫描"""
        try:
            sock = socket.socket(socket.af_inet, socket.sock_stream)
            sock.settimeout(self.timeout)
            
            start_time = time.time()
            result = sock.connect_ex((host, port))
            end_time = time.time()
            
            response_time = round((end_time - start_time) * 1000, 2)  # 毫秒
            sock.close()
            
            service = common_ports.get(port, 'unknown')
            
            if result == 0:
                # 端口开放,尝试获取服务banner
                banner = self.get_service_banner(host, port)
                return {
                    'port': port,
                    'state': 'open',
                    'service': service,
                    'response_time': response_time,
                    'banner': banner
                }
            else:
                return {
                    'port': port,
                    'state': 'closed',
                    'service': service,
                    'response_time': response_time
                }
                
        except socket.timeout:
            return {
                'port': port,
                'state': 'filtered',
                'service': common_ports.get(port, 'unknown'),
                'response_time': self.timeout * 1000
            }
        except exception as e:
            logger.debug(f"tcp扫描端口 {port} 时发生错误: {str(e)}")
            return {
                'port': port,
                'state': 'unknown',
                'service': common_ports.get(port, 'unknown')
            }
            
    def syn_scan(self, host, port):
        """syn扫描(需要root权限)"""
        try:
            # 这里简化实现,实际syn扫描需要使用原始套接字
            logger.warning("syn扫描需要root权限,降级为tcp连接扫描")
            return self.tcp_connect_scan(host, port)
        except exception as e:
            logger.error(f"syn扫描端口 {port} 时发生错误: {str(e)}")
            return none
            
    def udp_scan(self, host, port):
        """udp扫描"""
        try:
            sock = socket.socket(socket.af_inet, socket.sock_dgram)
            sock.settimeout(self.timeout)
            
            # 发送空数据包
            sock.sendto(b'', (host, port))
            
            try:
                # 尝试接收响应
                data, addr = sock.recvfrom(1024)
                sock.close()
                
                service = common_ports.get(port, 'unknown')
                banner = data.decode('utf-8', errors='ignore')[:100]
                
                return {
                    'port': port,
                    'state': 'open',
                    'service': service,
                    'banner': banner
                }
            except socket.timeout:
                # udp端口可能开放但无响应
                sock.close()
                return {
                    'port': port,
                    'state': 'open|filtered',
                    'service': common_ports.get(port, 'unknown')
                }
                
        except exception as e:
            logger.debug(f"udp扫描端口 {port} 时发生错误: {str(e)}")
            return {
                'port': port,
                'state': 'closed',
                'service': common_ports.get(port, 'unknown')
            }
            
    def get_service_banner(self, host, port):
        """获取服务banner信息"""
        try:
            sock = socket.socket(socket.af_inet, socket.sock_stream)
            sock.settimeout(2.0)
            sock.connect((host, port))
            
            # 发送简单的探测数据
            if port in [21, 22, 23, 25, 80, 110, 143]:
                sock.send(b'\r\n')
                
            # 接收响应
            banner = sock.recv(1024).decode('utf-8', errors='ignore')
            sock.close()
            
            # 清理banner信息
            banner = banner.strip().replace('\r\n', ' ').replace('\n', ' ')
            return banner[:200]  # 限制长度
            
        except exception as e:
            logger.debug(f"获取端口 {port} 的banner时发生错误: {str(e)}")
            return ""
            
    def scan_ports(self):
        """扫描所有端口"""
        logger.info(f"开始扫描 {self.target} 的端口...")
        logger.info(f"扫描类型: {self.scan_type}, 超时时间: {self.timeout}s, 线程数: {self.threads}")
        
        start_time = time.time()
        
        with threadpoolexecutor(max_workers=self.threads) as executor:
            # 提交所有扫描任务
            future_to_port = {
                executor.submit(self.scan_port, port): port 
                for port in self.ports
            }
            
            # 收集结果
            for future in as_completed(future_to_port):
                port = future_to_port[future]
                try:
                    result = future.result()
                    if result:
                        if result['state'] == 'open':
                            self.results['open_ports'].append(result)
                            if self.verbose:
                                logger.info(f"端口 {result['port']} ({result['service']}) 开放")
                        elif result['state'] == 'closed':
                            self.results['closed_ports'].append(result)
                        elif result['state'] in ['filtered', 'open|filtered']:
                            self.results['filtered_ports'].append(result)
                            
                except exception as e:
                    logger.error(f"处理端口 {port} 的结果时出错: {str(e)}")
                    
        end_time = time.time()
        self.results['duration'] = round(end_time - start_time, 2)
        
        # 按端口号排序结果
        self.results['open_ports'].sort(key=lambda x: x['port'])
        self.results['closed_ports'].sort(key=lambda x: x['port'])
        self.results['filtered_ports'].sort(key=lambda x: x['port'])
        
        logger.info(f"扫描完成,耗时 {self.results['duration']} 秒")
        logger.info(f"开放端口: {len(self.results['open_ports'])}")
        logger.info(f"关闭端口: {len(self.results['closed_ports'])}")
        logger.info(f"过滤端口: {len(self.results['filtered_ports'])}")
        
        return self.results
        
    def print_results(self):
        """打印扫描结果"""
        print("\n" + "="*60)
        print(f"端口扫描报告 - 目标: {self.target}")
        print("="*60)
        print(f"扫描时间: {self.results['scan_time']}")
        print(f"扫描类型: {self.results['scan_type']}")
        print(f"扫描耗时: {self.results['duration']} 秒")
        print(f"开放端口: {len(self.results['open_ports'])}")
        print(f"关闭端口: {len(self.results['closed_ports'])}")
        print(f"过滤端口: {len(self.results['filtered_ports'])}")
        
        if self.results['open_ports']:
            print("\n开放端口详情:")
            print("-" * 80)
            print(f"{'端口':<8} {'服务':<15} {'响应时间(ms)':<15} {'banner信息'}")
            print("-" * 80)
            for port_info in self.results['open_ports']:
                banner = port_info.get('banner', '')[:50] + ('...' if len(port_info.get('banner', '')) > 50 else '')
                print(f"{port_info['port']:<8} {port_info['service']:<15} {port_info['response_time']:<15} {banner}")
                
        if not self.results['open_ports']:
            print("\n未发现开放端口")
            
    def save_results(self):
        """保存扫描结果"""
        try:
            # 确保输出目录存在
            import os
            output_dir = os.path.dirname(self.output_file) if os.path.dirname(self.output_file) else '.'
            os.makedirs(output_dir, exist_ok=true)
            
            if self.output_format == 'json':
                self._save_json()
            elif self.output_format == 'csv':
                self._save_csv()
            elif self.output_format == 'xml':
                self._save_xml()
            else:
                logger.error(f"不支持的输出格式: {self.output_format}")
                
        except exception as e:
            logger.error(f"保存扫描结果时出错: {str(e)}")
            
    def _save_json(self):
        """保存为json格式"""
        with open(self.output_file, 'w', encoding='utf-8') as f:
            json.dump(self.results, f, indent=2, ensure_ascii=false)
        logger.info(f"扫描结果已保存到 {self.output_file}")
        
    def _save_csv(self):
        """保存为csv格式"""
        all_ports = self.results['open_ports'] + self.results['closed_ports'] + self.results['filtered_ports']
        
        with open(self.output_file, 'w', newline='', encoding='utf-8') as f:
            writer = csv.writer(f)
            writer.writerow(['port', 'state', 'service', 'response time (ms)', 'banner'])
            
            for port_info in all_ports:
                writer.writerow([
                    port_info['port'],
                    port_info['state'],
                    port_info['service'],
                    port_info.get('response_time', ''),
                    port_info.get('banner', '')
                ])
        logger.info(f"扫描结果已保存到 {self.output_file}")
        
    def _save_xml(self):
        """保存为xml格式"""
        root = et.element("port_scan")
        root.set("target", self.target)
        root.set("scan_time", self.results['scan_time'])
        root.set("duration", str(self.results['duration']))
        
        # 添加开放端口
        open_ports_elem = et.subelement(root, "open_ports")
        for port_info in self.results['open_ports']:
            port_elem = et.subelement(open_ports_elem, "port")
            port_elem.set("number", str(port_info['port']))
            port_elem.set("service", port_info['service'])
            port_elem.set("response_time", str(port_info.get('response_time', '')))
            if port_info.get('banner'):
                banner_elem = et.subelement(port_elem, "banner")
                banner_elem.text = port_info['banner']
                
        tree = et.elementtree(root)
        tree.write(self.output_file, encoding='utf-8', xml_declaration=true)
        logger.info(f"扫描结果已保存到 {self.output_file}")

def parse_ports(port_str):
    """解析端口参数"""
    ports = set()
    
    # 处理逗号分隔的端口列表
    for part in port_str.split(','):
        part = part.strip()
        if '-' in part:
            # 处理端口范围
            try:
                start, end = map(int, part.split('-'))
                if 1 <= start <= 65535 and 1 <= end <= 65535 and start <= end:
                    ports.update(range(start, end + 1))
                else:
                    raise valueerror(f"无效的端口范围: {part}")
            except valueerror as e:
                logger.error(f"解析端口范围时出错: {str(e)}")
                sys.exit(1)
        else:
            # 处理单个端口
            try:
                port = int(part)
                if 1 <= port <= 65535:
                    ports.add(port)
                else:
                    raise valueerror(f"端口号超出有效范围: {port}")
            except valueerror as e:
                logger.error(f"解析端口时出错: {str(e)}")
                sys.exit(1)
                
    return sorted(list(ports))

def main():
    parser = argparse.argumentparser(description='端口扫描器')
    parser.add_argument('target', help='目标主机ip地址或域名')
    parser.add_argument('-p', '--ports', default='1-1000', help='要扫描的端口,支持范围(如1-1000)和列表(如22,80,443)')
    parser.add_argument('-t', '--type', choices=['tcp', 'syn', 'udp'], default='tcp', help='扫描类型')
    parser.add_argument('--timeout', type=float, default=1.0, help='连接超时时间(秒)')
    parser.add_argument('--threads', type=int, default=100, help='并发线程数')
    parser.add_argument('--delay', type=int, default=0, help='扫描延迟(毫秒)')
    parser.add_argument('-o', '--output', help='输出文件路径')
    parser.add_argument('-f', '--format', choices=['json', 'csv', 'xml'], default='json', help='输出格式')
    parser.add_argument('-v', '--verbose', action='store_true', help='详细输出')
    parser.add_argument('--top-ports', type=int, help='扫描最常见的n个端口')
    
    args = parser.parse_args()
    
    # 解析端口
    if args.top_ports:
        # 使用最常见的端口
        common_ports = sorted(common_ports.keys())[:args.top_ports]
        ports = common_ports
    else:
        ports = parse_ports(args.ports)
        
    # 配置输出文件
    output_file = args.output
    if not output_file:
        timestamp = datetime.now().strftime("%y%m%d_%h%m%s")
        output_file = f"scan_results_{timestamp}.{args.format}"
        
    # 配置扫描参数
    config = {
        'target': args.target,
        'ports': ports,
        'scan_type': args.type,
        'timeout': args.timeout,
        'threads': args.threads,
        'delay': args.delay,
        'output_format': args.format,
        'output_file': output_file,
        'verbose': args.verbose
    }
    
    # 创建扫描器实例
    scanner = portscanner(config)
    
    # 执行扫描
    try:
        scanner.scan_ports()
        scanner.print_results()
        scanner.save_results()
    except keyboardinterrupt:
        logger.info("扫描被用户中断")
        sys.exit(1)
    except exception as e:
        logger.error(f"扫描过程中发生错误: {str(e)}")
        sys.exit(1)

if __name__ == '__main__':
    main()

使用说明

1. 基本使用

# 扫描单个主机的默认端口范围(1-1000)
python port_scanner.py 192.168.1.1

# 扫描特定端口
python port_scanner.py 192.168.1.1 -p 22,80,443

# 扫描端口范围
python port_scanner.py 192.168.1.1 -p 1-10000

# 扫描最常见的100个端口
python port_scanner.py 192.168.1.1 --top-ports 100

2. 扫描类型

# tcp连接扫描(默认)
python port_scanner.py 192.168.1.1 -t tcp

# udp扫描
python port_scanner.py 192.168.1.1 -t udp

# syn扫描(需要root权限)
sudo python port_scanner.py 192.168.1.1 -t syn

3. 性能调优

# 调整超时时间
python port_scanner.py 192.168.1.1 --timeout 2.0

# 调整并发线程数
python port_scanner.py 192.168.1.1 --threads 200

# 添加扫描延迟(毫秒)
python port_scanner.py 192.168.1.1 --delay 100

4. 输出格式

# json格式输出(默认)
python port_scanner.py 192.168.1.1 -o results.json

# csv格式输出
python port_scanner.py 192.168.1.1 -f csv -o results.csv

# xml格式输出
python port_scanner.py 192.168.1.1 -f xml -o results.xml

5. 详细输出

# 显示详细扫描过程
python port_scanner.py 192.168.1.1 -v

高级特性

1. 批量扫描

可以通过脚本扩展实现批量扫描多个主机:

targets = ['192.168.1.1', '192.168.1.2', '192.168.1.3']
for target in targets:
    config = {'target': target, 'ports': [22, 80, 443]}
    scanner = portscanner(config)
    scanner.scan_ports()
    scanner.save_results()

2. 定期扫描

可以结合cron定时任务实现定期扫描:

# 每小时扫描一次关键服务器
0 * * * * /usr/bin/python3 /path/to/port_scanner.py 192.168.1.1 -p 22,80,443 -o /var/log/scan_$(date +\%y\%m\%d_\%h\%m\%s).json

3. 结果分析

扫描结果可以用于进一步分析:

import json

# 加载扫描结果
with open('scan_results.json', 'r') as f:
    results = json.load(f)

# 分析开放端口
open_ports = results['open_ports']
for port in open_ports:
    print(f"发现开放端口: {port['port']} ({port['service']})")

安全考虑

1. 权限控制

  • 普通端口扫描不需要特殊权限
  • syn扫描需要root权限
  • 应谨慎使用高权限运行脚本

2. 扫描频率

  • 避免过于频繁的扫描
  • 添加适当延迟保护目标系统
  • 遵守网络安全政策

3. 日志记录

  • 记录所有扫描活动
  • 保护敏感信息不被泄露
  • 定期审查扫描日志

性能优化

1. 并发控制

  • 合理设置线程数避免系统过载
  • 根据网络状况调整超时时间
  • 使用连接池复用socket连接

2. 内存管理

  • 及时释放不需要的资源
  • 控制扫描规模避免内存溢出
  • 使用生成器处理大数据集

3. 网络优化

  • 本地dns缓存提高解析速度
  • 复用连接减少握手开销
  • 批量处理减少网络往返

这个端口扫描器是一个功能完整、安全可靠的网络探测工具,能够帮助网络管理员和安全专业人员快速识别网络资产和服务状态。

以上就是python实现快速扫描目标主机的开放端口和服务的详细内容,更多关于python扫描端口的资料请关注代码网其它相关文章!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com