当前位置: 代码网 > it编程>前端脚本>Python > 使用Python实现处理和清洗CSV格式的数据文件

使用Python实现处理和清洗CSV格式的数据文件

2025年12月05日 Python 我要评论
功能介绍这是一个强大的csv数据清洗和转换工具,专门用于处理和清理csv格式的数据文件。该工具具备以下核心功能:数据清洗功能:自动检测和处理缺失值去除重复记录清理特殊字符和多余空格标准化数据格式(日期

功能介绍

这是一个强大的csv数据清洗和转换工具,专门用于处理和清理csv格式的数据文件。该工具具备以下核心功能:

数据清洗功能

  • 自动检测和处理缺失值
  • 去除重复记录
  • 清理特殊字符和多余空格
  • 标准化数据格式(日期、数字、文本等)
  • 处理异常值和离群点

数据转换功能

  • 列名重命名和标准化
  • 数据类型自动识别和转换
  • 列合并和拆分
  • 数据聚合和分组统计
  • 编码格式转换(utf-8、gbk等)

数据验证功能

  • 数据完整性检查
  • 数据格式验证
  • 自定义规则验证
  • 错误记录标记和报告

批量处理功能

  • 支持多个csv文件批量处理
  • 处理进度显示和日志记录
  • 错误处理和恢复机制
  • 处理结果汇总报告

输出格式支持

  • 支持多种输出格式(csv、excel、json等)
  • 自定义分隔符和编码
  • 数据压缩和分割输出
  • 处理前后对比报告

场景应用

1. 数据分析预处理

  • 清洗原始数据,为数据分析和建模做准备
  • 标准化不同来源的数据格式
  • 处理数据质量问题,提高分析准确性
  • 生成数据质量报告,识别潜在问题

2. 业务数据整合

  • 整合来自不同系统的业务数据
  • 统一数据格式和编码标准
  • 清理历史数据中的错误和不一致
  • 为数据仓库提供干净的数据源

3. 数据迁移项目

  • 清洗和转换旧系统数据
  • 验证数据迁移的完整性和准确性
  • 处理数据格式不兼容问题
  • 生成数据迁移质量报告

4. 日常数据维护

  • 定期清洗业务系统导出的数据
  • 自动化处理重复性的数据清洗任务
  • 监控数据质量变化趋势
  • 建立数据清洗标准流程

报错处理

1. 文件操作异常

try:
    df = pd.read_csv(input_file, encoding=encoding)
except filenotfounderror:
    logger.error(f"输入文件不存在: {input_file}")
    raise dataprocessingerror(f"文件未找到: {input_file}")
except pd.errors.emptydataerror:
    logger.error(f"输入文件为空: {input_file}")
    raise dataprocessingerror(f"空文件: {input_file}")
except unicodedecodeerror:
    logger.error(f"文件编码错误: {input_file}")
    raise dataprocessingerror(f"编码错误,请尝试其他编码格式")

2. 数据处理异常

try:
    # 数据类型转换
    df[column] = pd.to_numeric(df[column], errors='coerce')
except exception as e:
    logger.warning(f"列 {column} 数据类型转换失败: {str(e)}")
    # 记录错误但继续处理其他列

3. 内存溢出异常

try:
    df = pd.read_csv(input_file, chunksize=chunk_size)
    # 分块处理大文件
except memoryerror:
    logger.error("内存不足,无法处理大文件")
    raise dataprocessingerror("内存不足,请减小文件大小或增加系统内存")

4. 配置验证异常

if not os.path.exists(config_file):
    logger.error(f"配置文件不存在: {config_file}")
    raise dataprocessingerror(f"配置文件缺失: {config_file}")

try:
    with open(config_file, 'r', encoding='utf-8') as f:
        config = json.load(f)
except json.jsondecodeerror:
    logger.error(f"配置文件格式错误: {config_file}")
    raise dataprocessingerror(f"配置文件格式无效")

代码实现

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
csv数据清洗和转换工具
功能:清洗、转换和验证csv格式数据
作者:cline
版本:1.0
"""

import pandas as pd
import numpy as np
import argparse
import sys
import json
import logging
import os
from datetime import datetime
import chardet
from typing import dict, list, any, optional
import warnings

# 忽略警告信息
warnings.filterwarnings('ignore')

# 配置日志
logging.basicconfig(
    level=logging.info,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.filehandler('csv_cleaner.log'),
        logging.streamhandler(sys.stdout)
    ]
)
logger = logging.getlogger(__name__)

class dataprocessingerror(exception):
    """数据处理异常类"""
    pass

class csvcleaner:
    def __init__(self, config: dict[str, any]):
        self.input_file = config.get('input_file')
        self.output_file = config.get('output_file', 'cleaned_data.csv')
        self.encoding = config.get('encoding', 'auto')
        self.separator = config.get('separator', ',')
        self.rules_file = config.get('rules_file')
        self.output_format = config.get('output_format', 'csv')
        self.chunk_size = config.get('chunk_size', 10000)
        self.backup_original = config.get('backup_original', true)
        
        # 清洗规则
        self.cleaning_rules = config.get('cleaning_rules', {})
        
        # 处理结果统计
        self.stats = {
            'total_rows': 0,
            'cleaned_rows': 0,
            'removed_rows': 0,
            'modified_cells': 0,
            'errors': []
        }
        
    def detect_encoding(self, file_path: str) -> str:
        """自动检测文件编码"""
        try:
            with open(file_path, 'rb') as f:
                raw_data = f.read(10000)  # 读取前10kb数据
                result = chardet.detect(raw_data)
                encoding = result['encoding']
                confidence = result['confidence']
                
                logger.info(f"检测到文件编码: {encoding} (置信度: {confidence:.2f})")
                return encoding if confidence > 0.7 else 'utf-8'
                
        except exception as e:
            logger.warning(f"编码检测失败,使用默认编码: {str(e)}")
            return 'utf-8'
            
    def load_data(self) -> pd.dataframe:
        """加载csv数据"""
        logger.info(f"开始加载数据文件: {self.input_file}")
        
        try:
            # 自动检测编码
            if self.encoding == 'auto':
                self.encoding = self.detect_encoding(self.input_file)
                
            # 加载数据
            df = pd.read_csv(
                self.input_file,
                sep=self.separator,
                encoding=self.encoding,
                low_memory=false
            )
            
            self.stats['total_rows'] = len(df)
            logger.info(f"成功加载 {len(df)} 行数据,{len(df.columns)} 列")
            return df
            
        except filenotfounderror:
            logger.error(f"输入文件不存在: {self.input_file}")
            raise dataprocessingerror(f"文件未找到: {self.input_file}")
        except pd.errors.emptydataerror:
            logger.error(f"输入文件为空: {self.input_file}")
            raise dataprocessingerror(f"空文件: {self.input_file}")
        except unicodedecodeerror as e:
            logger.error(f"文件编码错误: {str(e)}")
            raise dataprocessingerror(f"编码错误,请尝试指定其他编码格式")
        except exception as e:
            logger.error(f"加载数据时发生错误: {str(e)}")
            raise dataprocessingerror(f"数据加载失败: {str(e)}")
            
    def backup_original(self):
        """备份原始文件"""
        if not self.backup_original:
            return
            
        try:
            backup_name = f"{self.input_file}.backup_{datetime.now().strftime('%y%m%d_%h%m%s')}"
            import shutil
            shutil.copy2(self.input_file, backup_name)
            logger.info(f"原始文件已备份到: {backup_name}")
        except exception as e:
            logger.warning(f"备份原始文件失败: {str(e)}")
            
    def load_cleaning_rules(self) -> dict[str, any]:
        """加载清洗规则"""
        if not self.rules_file or not os.path.exists(self.rules_file):
            logger.info("未指定清洗规则文件,使用默认规则")
            return self.cleaning_rules
            
        try:
            with open(self.rules_file, 'r', encoding='utf-8') as f:
                rules = json.load(f)
            logger.info(f"成功加载清洗规则文件: {self.rules_file}")
            return rules
        except exception as e:
            logger.error(f"加载清洗规则文件失败: {str(e)}")
            return self.cleaning_rules
            
    def clean_missing_values(self, df: pd.dataframe, rules: dict[str, any]) -> pd.dataframe:
        """处理缺失值"""
        missing_rules = rules.get('missing_values', {})
        
        for column, strategy in missing_rules.items():
            if column not in df.columns:
                logger.warning(f"列 {column} 不存在于数据中")
                continue
                
            logger.info(f"处理列 {column} 的缺失值,策略: {strategy}")
            
            if strategy == 'drop_rows':
                # 删除包含缺失值的行
                original_count = len(df)
                df = df.dropna(subset=[column])
                removed_count = original_count - len(df)
                self.stats['removed_rows'] += removed_count
                logger.info(f"删除了 {removed_count} 行包含缺失值的数据")
                
            elif strategy == 'fill_zero':
                # 用0填充
                filled_count = df[column].isna().sum()
                df[column] = df[column].fillna(0)
                self.stats['modified_cells'] += filled_count
                logger.info(f"用0填充了 {filled_count} 个缺失值")
                
            elif strategy == 'fill_mean':
                # 用均值填充(仅适用于数值列)
                if df[column].dtype in ['int64', 'float64']:
                    mean_value = df[column].mean()
                    filled_count = df[column].isna().sum()
                    df[column] = df[column].fillna(mean_value)
                    self.stats['modified_cells'] += filled_count
                    logger.info(f"用均值 {mean_value:.2f} 填充了 {filled_count} 个缺失值")
                else:
                    logger.warning(f"列 {column} 不是数值类型,无法使用均值填充")
                    
            elif strategy == 'fill_forward':
                # 向前填充
                filled_count = df[column].isna().sum()
                df[column] = df[column].fillna(method='ffill')
                self.stats['modified_cells'] += filled_count
                logger.info(f"向前填充了 {filled_count} 个缺失值")
                
            elif isinstance(strategy, str):
                # 用指定值填充
                filled_count = df[column].isna().sum()
                df[column] = df[column].fillna(strategy)
                self.stats['modified_cells'] += filled_count
                logger.info(f"用 '{strategy}' 填充了 {filled_count} 个缺失值")
                
        return df
        
    def remove_duplicates(self, df: pd.dataframe, rules: dict[str, any]) -> pd.dataframe:
        """去除重复记录"""
        duplicate_rules = rules.get('duplicates', {})
        subset = duplicate_rules.get('subset')
        keep = duplicate_rules.get('keep', 'first')
        
        original_count = len(df)
        
        if subset:
            # 基于指定列去重
            if isinstance(subset, str):
                subset = [subset]
            df = df.drop_duplicates(subset=subset, keep=keep)
        else:
            # 基于所有列去重
            df = df.drop_duplicates(keep=keep)
            
        removed_count = original_count - len(df)
        self.stats['removed_rows'] += removed_count
        logger.info(f"去除了 {removed_count} 行重复数据")
        return df
        
    def clean_text_data(self, df: pd.dataframe, rules: dict[str, any]) -> pd.dataframe:
        """清理文本数据"""
        text_rules = rules.get('text_cleaning', {})
        
        for column, cleaning_ops in text_rules.items():
            if column not in df.columns:
                logger.warning(f"列 {column} 不存在于数据中")
                continue
                
            if df[column].dtype != 'object':
                logger.warning(f"列 {column} 不是文本类型,跳过文本清理")
                continue
                
            logger.info(f"清理列 {column} 的文本数据")
            original_modified = self.stats['modified_cells']
            
            # 转换为字符串类型
            df[column] = df[column].astype(str)
            
            for op in cleaning_ops:
                if op == 'strip_whitespace':
                    # 去除首尾空格
                    modified_mask = df[column].str.strip() != df[column]
                    modified_count = modified_mask.sum()
                    df.loc[modified_mask, column] = df.loc[modified_mask, column].str.strip()
                    self.stats['modified_cells'] += modified_count
                    
                elif op == 'remove_extra_spaces':
                    # 去除多余空格
                    modified_mask = df[column].str.replace(r'\s+', ' ', regex=true) != df[column]
                    modified_count = modified_mask.sum()
                    df.loc[modified_mask, column] = df.loc[modified_mask, column].str.replace(r'\s+', ' ', regex=true)
                    self.stats['modified_cells'] += modified_count
                    
                elif op == 'lowercase':
                    # 转换为小写
                    modified_mask = df[column].str.lower() != df[column]
                    modified_count = modified_mask.sum()
                    df.loc[modified_mask, column] = df.loc[modified_mask, column].str.lower()
                    self.stats['modified_cells'] += modified_count
                    
                elif op == 'uppercase':
                    # 转换为大写
                    modified_mask = df[column].str.upper() != df[column]
                    modified_count = modified_mask.sum()
                    df.loc[modified_mask, column] = df.loc[modified_mask, column].str.upper()
                    self.stats['modified_cells'] += modified_count
                    
                elif op == 'remove_special_chars':
                    # 去除特殊字符
                    modified_mask = df[column].str.replace(r'[^\w\s]', '', regex=true) != df[column]
                    modified_count = modified_mask.sum()
                    df.loc[modified_mask, column] = df.loc[modified_mask, column].str.replace(r'[^\w\s]', '', regex=true)
                    self.stats['modified_cells'] += modified_count
                    
            logger.info(f"列 {column} 文本清理完成,修改了 {self.stats['modified_cells'] - original_modified} 个单元格")
            
        return df
        
    def standardize_dates(self, df: pd.dataframe, rules: dict[str, any]) -> pd.dataframe:
        """标准化日期格式"""
        date_rules = rules.get('date_standardization', {})
        
        for column, format_info in date_rules.items():
            if column not in df.columns:
                logger.warning(f"列 {column} 不存在于数据中")
                continue
                
            logger.info(f"标准化列 {column} 的日期格式")
            original_modified = self.stats['modified_cells']
            
            try:
                target_format = format_info.get('target_format', '%y-%m-%d')
                source_formats = format_info.get('source_formats', [])
                
                # 尝试转换日期格式
                if source_formats:
                    for fmt in source_formats:
                        try:
                            df[column] = pd.to_datetime(df[column], format=fmt, errors='coerce')
                            break
                        except:
                            continue
                else:
                    # 自动推断日期格式
                    df[column] = pd.to_datetime(df[column], infer_datetime_format=true, errors='coerce')
                    
                # 格式化为目标格式
                df[column] = df[column].dt.strftime(target_format)
                modified_count = df[column].notna().sum()
                self.stats['modified_cells'] += modified_count
                
                logger.info(f"列 {column} 日期标准化完成,处理了 {modified_count} 个日期值")
                
            except exception as e:
                logger.error(f"标准化列 {column} 日期格式时出错: {str(e)}")
                
        return df
        
    def validate_data(self, df: pd.dataframe, rules: dict[str, any]) -> pd.dataframe:
        """数据验证"""
        validation_rules = rules.get('validation', {})
        
        for column, validations in validation_rules.items():
            if column not in df.columns:
                logger.warning(f"列 {column} 不存在于数据中")
                continue
                
            logger.info(f"验证列 {column} 的数据")
            
            for validation in validations:
                rule_type = validation.get('type')
                rule_value = validation.get('value')
                action = validation.get('action', 'log')
                
                if rule_type == 'range':
                    # 数值范围验证
                    min_val, max_val = rule_value
                    invalid_mask = (df[column] < min_val) | (df[column] > max_val)
                    
                elif rule_type == 'length':
                    # 字符串长度验证
                    if isinstance(rule_value, list):
                        min_len, max_len = rule_value
                        invalid_mask = (df[column].str.len() < min_len) | (df[column].str.len() > max_len)
                    else:
                        invalid_mask = df[column].str.len() != rule_value
                        
                elif rule_type == 'pattern':
                    # 正则表达式验证
                    invalid_mask = ~df[column].str.match(rule_value, na=false)
                    
                elif rule_type == 'not_null':
                    # 非空验证
                    invalid_mask = df[column].isna()
                    
                else:
                    continue
                    
                invalid_count = invalid_mask.sum()
                if invalid_count > 0:
                    logger.warning(f"列 {column} 发现 {invalid_count} 条不符合 {rule_type} 规则的数据")
                    
                    if action == 'remove':
                        # 删除不符合规则的行
                        df = df[~invalid_mask]
                        self.stats['removed_rows'] += invalid_count
                        logger.info(f"删除了 {invalid_count} 行不符合规则的数据")
                    elif action == 'mark':
                        # 标记不符合规则的数据
                        error_msg = f"列 {column} 不符合 {rule_type} 规则"
                        self.stats['errors'].append({
                            'column': column,
                            'count': invalid_count,
                            'message': error_msg
                        })
                        
        return df
        
    def process_chunk(self, chunk: pd.dataframe, rules: dict[str, any]) -> pd.dataframe:
        """处理数据块"""
        try:
            # 1. 处理缺失值
            chunk = self.clean_missing_values(chunk, rules)
            
            # 2. 去除重复记录
            chunk = self.remove_duplicates(chunk, rules)
            
            # 3. 清理文本数据
            chunk = self.clean_text_data(chunk, rules)
            
            # 4. 标准化日期格式
            chunk = self.standardize_dates(chunk, rules)
            
            # 5. 数据验证
            chunk = self.validate_data(chunk, rules)
            
            self.stats['cleaned_rows'] += len(chunk)
            return chunk
            
        except exception as e:
            logger.error(f"处理数据块时出错: {str(e)}")
            raise dataprocessingerror(f"数据块处理失败: {str(e)}")
            
    def process_large_file(self, df: pd.dataframe, rules: dict[str, any]) -> pd.dataframe:
        """处理大文件(分块处理)"""
        logger.info("开始分块处理大文件...")
        
        processed_chunks = []
        
        # 分块处理
        for i, chunk in enumerate(np.array_split(df, max(1, len(df) // self.chunk_size))):
            logger.info(f"处理第 {i+1} 块数据 ({len(chunk)} 行)")
            cleaned_chunk = self.process_chunk(chunk, rules)
            processed_chunks.append(cleaned_chunk)
            
        # 合并所有块
        final_df = pd.concat(processed_chunks, ignore_index=true)
        logger.info(f"大文件处理完成,共处理 {len(final_df)} 行数据")
        return final_df
        
    def run_cleaning(self):
        """运行数据清洗"""
        logger.info("开始csv数据清洗...")
        
        try:
            # 1. 备份原始文件
            self.backup_original()
            
            # 2. 加载数据
            df = self.load_data()
            
            # 3. 加载清洗规则
            rules = self.load_cleaning_rules()
            
            # 4. 处理数据
            if len(df) > self.chunk_size:
                # 大文件分块处理
                cleaned_df = self.process_large_file(df, rules)
            else:
                # 小文件直接处理
                cleaned_df = self.process_chunk(df, rules)
                
            # 5. 保存结果
            self.save_results(cleaned_df)
            
            # 6. 生成报告
            self.generate_report()
            
            logger.info("csv数据清洗完成")
            return cleaned_df
            
        except exception as e:
            logger.error(f"数据清洗过程中发生错误: {str(e)}")
            raise dataprocessingerror(f"清洗失败: {str(e)}")
            
    def save_results(self, df: pd.dataframe):
        """保存处理结果"""
        try:
            # 确保输出目录存在
            output_dir = os.path.dirname(self.output_file) if os.path.dirname(self.output_file) else '.'
            os.makedirs(output_dir, exist_ok=true)
            
            if self.output_format == 'csv':
                df.to_csv(self.output_file, index=false, encoding='utf-8-sig')
            elif self.output_format == 'excel':
                df.to_excel(self.output_file, index=false)
            elif self.output_format == 'json':
                df.to_json(self.output_file, orient='records', force_ascii=false, indent=2)
            else:
                logger.error(f"不支持的输出格式: {self.output_format}")
                raise dataprocessingerror(f"不支持的输出格式: {self.output_format}")
                
            logger.info(f"处理结果已保存到 {self.output_file}")
            
        except exception as e:
            logger.error(f"保存处理结果时出错: {str(e)}")
            raise dataprocessingerror(f"保存失败: {str(e)}")
            
    def generate_report(self):
        """生成处理报告"""
        try:
            report = {
                'timestamp': datetime.now().isoformat(),
                'input_file': self.input_file,
                'output_file': self.output_file,
                'processing_stats': self.stats,
                'cleaning_rules': self.cleaning_rules
            }
            
            report_file = f"{self.output_file}.report.json"
            with open(report_file, 'w', encoding='utf-8') as f:
                json.dump(report, f, indent=2, ensure_ascii=false)
                
            logger.info(f"处理报告已保存到 {report_file}")
            
            # 打印简要报告
            print("\n" + "="*50)
            print("csv数据清洗报告")
            print("="*50)
            print(f"处理时间: {report['timestamp']}")
            print(f"输入文件: {self.input_file}")
            print(f"输出文件: {self.output_file}")
            print("-"*50)
            print(f"总行数: {self.stats['total_rows']}")
            print(f"清洗后行数: {self.stats['cleaned_rows']}")
            print(f"删除行数: {self.stats['removed_rows']}")
            print(f"修改单元格数: {self.stats['modified_cells']}")
            print(f"错误记录数: {len(self.stats['errors'])}")
            print("="*50)
            
        except exception as e:
            logger.error(f"生成处理报告时出错: {str(e)}")

def create_sample_rules():
    """创建示例清洗规则文件"""
    sample_rules = {
        "missing_values": {
            "name": "fill_unknown",
            "age": "fill_mean",
            "email": "drop_rows"
        },
        "duplicates": {
            "subset": ["name", "email"],
            "keep": "first"
        },
        "text_cleaning": {
            "name": ["strip_whitespace", "remove_extra_spaces"],
            "description": ["strip_whitespace", "remove_special_chars"]
        },
        "date_standardization": {
            "created_date": {
                "target_format": "%y-%m-%d",
                "source_formats": ["%y/%m/%d", "%d-%m-%y"]
            }
        },
        "validation": {
            "age": [
                {
                    "type": "range",
                    "value": [0, 120],
                    "action": "remove"
                }
            ],
            "email": [
                {
                    "type": "pattern",
                    "value": r"^[a-za-z0-9._%+-]+@[a-za-z0-9.-]+\.[a-za-z]{2,}$",
                    "action": "mark"
                }
            ]
        }
    }
    
    with open('sample_cleaning_rules.json', 'w', encoding='utf-8') as f:
        json.dump(sample_rules, f, indent=2, ensure_ascii=false)
    logger.info("示例清洗规则文件已创建: sample_cleaning_rules.json")

def main():
    parser = argparse.argumentparser(description='csv数据清洗和转换工具')
    parser.add_argument('input_file', help='输入csv文件路径')
    parser.add_argument('-o', '--output', help='输出文件路径')
    parser.add_argument('-e', '--encoding', default='auto', help='文件编码 (auto/utf-8/gbk等)')
    parser.add_argument('-s', '--separator', default=',', help='字段分隔符')
    parser.add_argument('-r', '--rules', help='清洗规则文件路径')
    parser.add_argument('-f', '--format', choices=['csv', 'excel', 'json'], default='csv', help='输出格式')
    parser.add_argument('--chunk-size', type=int, default=10000, help='分块处理大小')
    parser.add_argument('--no-backup', action='store_true', help='不备份原始文件')
    parser.add_argument('--sample-rules', action='store_true', help='创建示例清洗规则文件')
    
    args = parser.parse_args()
    
    if args.sample_rules:
        create_sample_rules()
        return
        
    # 配置处理参数
    config = {
        'input_file': args.input_file,
        'output_file': args.output or f"cleaned_{os.path.basename(args.input_file)}",
        'encoding': args.encoding,
        'separator': args.separator,
        'rules_file': args.rules,
        'output_format': args.format,
        'chunk_size': args.chunk_size,
        'backup_original': not args.no_backup
    }
    
    # 创建清洗器实例
    cleaner = csvcleaner(config)
    
    try:
        # 执行清洗
        cleaner.run_cleaning()
        
    except keyboardinterrupt:
        logger.info("数据清洗被用户中断")
        sys.exit(1)
    except dataprocessingerror as e:
        logger.error(f"数据处理错误: {str(e)}")
        sys.exit(1)
    except exception as e:
        logger.error(f"数据清洗过程中发生未知错误: {str(e)}")
        sys.exit(1)

if __name__ == '__main__':
    main()

使用说明

1. 基本使用

# 基本数据清洗
python csv_cleaner.py data.csv

# 指定输出文件
python csv_cleaner.py data.csv -o cleaned_data.csv

# 指定文件编码
python csv_cleaner.py data.csv -e gbk

# 指定分隔符
python csv_cleaner.py data.csv -s ';'

2. 使用清洗规则

# 使用自定义清洗规则
python csv_cleaner.py data.csv -r cleaning_rules.json

# 创建示例清洗规则文件
python csv_cleaner.py --sample-rules

3. 输出格式

# 输出为excel格式
python csv_cleaner.py data.csv -f excel -o result.xlsx

# 输出为json格式
python csv_cleaner.py data.csv -f json -o result.json

4. 性能优化

# 调整分块大小处理大文件
python csv_cleaner.py large_data.csv --chunk-size 50000

# 不备份原始文件
python csv_cleaner.py data.csv --no-backup

清洗规则文件示例

创建一个名为 cleaning_rules.json 的文件:

{
  "missing_values": {
    "name": "fill_unknown",
    "age": "fill_mean",
    "salary": "fill_zero",
    "email": "drop_rows"
  },
  "duplicates": {
    "subset": ["name", "email"],
    "keep": "first"
  },
  "text_cleaning": {
    "name": ["strip_whitespace", "remove_extra_spaces", "lowercase"],
    "description": ["strip_whitespace", "remove_special_chars"]
  },
  "date_standardization": {
    "hire_date": {
      "target_format": "%y-%m-%d",
      "source_formats": ["%y/%m/%d", "%d-%m-%y", "%m/%d/%y"]
    }
  },
  "validation": {
    "age": [
      {
        "type": "range",
        "value": [0, 120],
        "action": "remove"
      }
    ],
    "email": [
      {
        "type": "pattern",
        "value": "^[a-za-z0-9._%+-]+@[a-za-z0-9.-]+\\.[a-za-z]{2,}$",
        "action": "mark"
      }
    ],
    "phone": [
      {
        "type": "pattern",
        "value": "^1[3-9]\\d{9}$",
        "action": "mark"
      }
    ]
  }
}

高级特性

1. 批量处理

可以通过脚本实现批量处理多个文件:

import glob
import os

# 处理目录下所有csv文件
csv_files = glob.glob("data/*.csv")

for csv_file in csv_files:
    config = {
        'input_file': csv_file,
        'output_file': f"cleaned_{os.path.basename(csv_file)}",
        'rules_file': 'cleaning_rules.json'
    }
    
    cleaner = csvcleaner(config)
    cleaner.run_cleaning()

2. 自动化调度

可以结合cron实现定期自动清洗:

# 每天凌晨2点清洗数据
0 2 * * * /usr/bin/python3 /path/to/csv_cleaner.py /path/to/daily_data.csv -r /path/to/cleaning_rules.json

3. 数据质量监控

可以定期检查数据质量并生成报告:

# 检查数据质量指标
def check_data_quality(df):
    quality_metrics = {
        'total_rows': len(df),
        'missing_values': df.isnull().sum().to_dict(),
        'duplicate_rows': df.duplicated().sum(),
        'data_types': df.dtypes.astype(str).to_dict()
    }
    return quality_metrics

性能优化

1. 内存管理

  • 对于大文件使用分块处理避免内存溢出
  • 及时释放不需要的数据帧
  • 使用适当的数据类型减少内存占用

2. 处理速度优化

  • 向量化操作替代循环处理
  • 合理设置分块大小平衡内存和速度
  • 使用多进程处理多个文件

3. 错误处理优化

  • 实现优雅的错误恢复机制
  • 记录详细的处理日志便于问题追踪
  • 提供友好的错误提示信息

安全考虑

1. 数据安全

  • 处理前自动备份原始数据
  • 敏感数据脱敏处理
  • 输出文件权限合理设置

2. 文件安全

  • 验证输入文件的合法性
  • 限制输出文件路径避免任意文件写入
  • 检查文件大小避免处理异常大文件

3. 系统安全

  • 限制处理文件的数量和大小
  • 实现处理超时机制
  • 记录所有操作日志便于审计

这个csv数据清洗和转换工具是一个功能强大、安全可靠的数据处理工具,能够帮助用户高效地清洗和转换csv格式的数据,为后续的数据分析和应用提供高质量的数据基础。

到此这篇关于使用python实现处理和清洗csv格式的数据文件的文章就介绍到这了,更多相关python csv数据清洗内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com