当前位置: 代码网 > it编程>编程语言>其他编程 > 4种Pandas高效读取文件数据的完整方法指南

4种Pandas高效读取文件数据的完整方法指南

2025年12月16日 其他编程 我要评论
痛点暴击开头你有没有遇到过这种情况:刚拿到一个1gb的csv文件,你双击打开...excel直接卡死,电脑风扇狂转,你只能强制重启电脑。或者更惨:你写了pd.read_csv('huge_fi

痛点暴击开头

你有没有遇到过这种情况:

刚拿到一个1gb的csv文件,你双击打开...excel直接卡死,电脑风扇狂转,你只能强制重启电脑。

或者更惨:你写了pd.read_csv('huge_file.csv'),然后去接了杯水,刷了会儿朋友圈,回来一看...内存爆了,python进程被强制杀死。

上周我们组来了个实习生,处理一个销售数据的excel文件,他老老实实地一个sheet一个sheet地复制粘贴,结果花了一整天还没弄完。

老大看了直摇头:"你知道pandas可以一键读取所有sheet吗?"

实习生愣住了。我也愣住了。

今天咱们就来彻底搞懂pandas的数据读取,让你从小白秒变数据读取高手!

csv读取:不是所有逗号都是平等的

错误示范(血的教训)

# ❌ 新手这样写(包括三个月前的我)
import pandas as pd

# 简单粗暴地读取
df = pd.read_csv('data.csv')

# 遇到中文乱码?凉拌!
# 遇到分号分隔?凉拌!
# 遇到没有表头?凉拌!

结果呢?要么中文变成乱码,要么列名错位,要么直接报错。就像你女朋友说"随便",然后你真的随便了...

正确姿势:优雅地处理各种奇葩csv

# ✅ 专业人士的打开方式
import pandas as pd

# 基础但完整的读取
df = pd.read_csv('data.csv',
                encoding='utf-8',        # 解决中文乱码
                sep=',',                 # 明确分隔符
                header=0,                # 表头在第几行
                index_col=0,             # 哪一列作为索引
                na_values=['na', 'null', 'n/a'])  # 空值怎么识别

但是,真实世界的csv文件远比你想象的要奇葩!

# 🚀 高手进阶:处理各种奇葩情况

# 1. 分号分隔的欧洲格式
df_euro = pd.read_csv('euro_data.csv', sep=';')

# 2. 没有表头,需要自己命名
df_no_header = pd.read_csv('data_no_header.csv',
                          header=none,
                          names=['id', 'name', 'age', 'score'])

# 3. 表头不在第一行(比如某些导出文件)
df_skip = pd.read_csv('report.csv', header=5)  # 跳过前5行,第6行作为表头

# 4. 自动识别分隔符(遇到奇葩文件时)
df_auto = pd.read_csv('weird_file.csv', sep=none, engine='python')

# 5. 只读取需要的列(内存优化利器)
df_cols = pd.read_csv('big_data.csv',
                     usecols=['name', 'age', 'salary'])  # 只要这3列

# 6. 分块读取超大文件(内存不够时的救星)
chunk_iter = pd.read_csv('huge_file.csv', chunksize=10000)
for chunk in chunk_iter:
    # 每次处理10000行
    process_chunk(chunk)

关键参数解析:

  • encodingutf-8gbklatin-1,解决乱码的神器
  • sep:分隔符,可以是逗号、分号、制表符等
  • header:表头行号,none表示没有表头
  • usecols:只读取指定列,节省内存的利器
  • chunksize:分块读取,处理大文件的法宝

excel读取:告别手动复制粘贴

错误示范:手动操作的地狱

# ❌ 低效到哭的操作
# 1. 手动打开excel
# 2. 一个sheet一个sheet地复制
# 3. 粘贴到python里
# 4. 重复以上步骤100次...

# 更惨的情况:
# - excel文件太大,打开就要5分钟
# - 有密码保护,还要输密码
# - 包含公式,复制过来全是错误值

正确姿势:一行代码读取所有sheet

# ✅ 优雅的excel读取
import pandas as pd

# 读取单个sheet
df = pd.read_excel('sales_data.xlsx', sheet_name='sheet1')

# 🚀 高手操作:一次性读取所有sheet
all_sheets = pd.read_excel('multi_sheet.xlsx', sheet_name=none)
# 结果是一个字典,key是sheet名,value是对应的dataframe

for sheet_name, df in all_sheets.items():
    print(f"sheet: {sheet_name}, 行数: {len(df)}")

# 🚀 指定读取第几个sheet(从0开始)
df_second = pd.read_excel('data.xlsx', sheet_name=1)

# 🚀 指定引擎(解决某些兼容性问题)
df_xls = pd.read_excel('old_file.xls', engine='openpyxl')
df_xlsx = pd.read_excel('new_file.xlsx', engine='xlrd')

处理excel的特殊情况

# 🎯 处理复杂的excel文件

# 1. 指定读取行范围
df_partial = pd.read_excel('data.xlsx',
                          sheet_name='sales',
                          skiprows=5,       # 跳过前5行
                          nrows=100,        # 只读取100行
                          usecols='a:e')    # 只读取a到e列

# 2. 处理日期列
df_dates = pd.read_excel('data.xlsx',
                        parse_dates=['订单日期', '发货日期'])

# 3. 处理多个表头(复杂的报表格式)
df_multi_header = pd.read_excel('complex_report.xlsx',
                               header=[0, 1])  # 前两行都是表头

# 4. 读取指定单元格区域
# 先读取整个sheet,再切片
df_full = pd.read_excel('data.xlsx', sheet_name='data')
df_subset = df_full.iloc[10:50, 2:8]  # 第11-50行,第3-8列

json读取:告别eval()的危险操作

错误示范:玩火的行为

# ❌ 危险操作!千万别这么干!
import json
import pandas as pd

# 有人居然用eval()...
with open('data.json', 'r') as f:
    data_str = f.read()
    data = eval(data_str)  # 😱 危险!可以执行任意代码!

# 或者更骚的操作:手动解析
# 结果:错漏百出,维护困难

正确姿势:安全优雅的json处理

# ✅ 专业人士的json读取
import pandas as pd
from io import stringio

# 1. 读取json文件
df = pd.read_json('data.json')

# 2. 从字符串读取json
json_str = '{"name": ["张三", "李四"], "age": [25, 30]}'
df = pd.read_json(stringio(json_str))

# 🚀 处理不同的json格式

# 3. 嵌套json的扁平化
nested_json = '''
{
    "employees": [
        {"name": "张三", "dept": {"name": "技术部", "id": 1}},
        {"name": "李四", "dept": {"name": "销售部", "id": 2}}
    ]
}
'''
df_nested = pd.read_json(stringio(nested_json))

# 4. 行格式json(每行一个json对象)
# 适用于日志文件、api返回等场景
lines_json = '''
{"name": "张三", "age": 25, "city": "北京"}
{"name": "李四", "age": 30, "city": "上海"}
{"name": "王五", "age": 28, "city": "广州"}
'''
df_lines = pd.read_json(stringio(lines_json), lines=true)

# 5. 指定json的读取方向
df_records = pd.read_json('data.json', orient='records')  # 记录格式
df_index = pd.read_json('data.json', orient='index')     # 索引格式
df_values = pd.read_json('data.json', orient='values')   # 值格式

json数据清洗实战

# 🎯 真实场景:处理api返回的json数据
import requests
import pandas as pd

# 模拟api调用
response = requests.get('https://api.example.com/users')
data = response.json()

# 转换为dataframe
df_users = pd.dataframe(data['users'])

# 嵌套字段展开
df_users['city'] = df_users['address'].apply(lambda x: x['city'])
df_users['street'] = df_users['address'].apply(lambda x: x['street'])

# 删除原始嵌套字段
df_users = df_users.drop('address', axis=1)

# 日期格式转换
df_users['created_at'] = pd.to_datetime(df_users['created_at'])

sql读取:数据库连接的艺术

错误示范:手动导出的痛苦

# ❌ 传统但低效的方式
# 1. 打开数据库管理工具
# 2. 写sql查询
# 3. 导出为csv
# 4. 用pandas读取csv
# 5. 发现数据不对,重复以上步骤...

# 或者更惨:
# - 手动复制粘贴结果,超过1000行就卡死
# - 导出的文件编码不对,中文乱码
# - 每次都要重复导出,烦死人

正确姿势:直接连接数据库

# ✅ 优雅的数据库读取
import pandas as pd
import sqlite3  # sqlite示例
from sqlalchemy import create_engine  # 更通用的方案

# 1. sqlite连接(轻量级,文件数据库)
conn = sqlite3.connect('database.db')

# 简单查询
df = pd.read_sql_query("select * from users where age > 25", conn)

# 🚀 高级操作

# 2. 使用sqlalchemy(支持各种数据库)
# mysql: engine = create_engine('mysql://user:password@host:port/database')
# postgresql: engine = create_engine('postgresql://user:password@host:port/database')
engine = create_engine('sqlite:///database.db')

# 查询并直接返回dataframe
df = pd.read_sql("select * from sales where date >= '2023-01-01'", engine)

# 3. 读取整个表
df_table = pd.read_sql_table('users', engine)

# 4. 使用参数化查询(防sql注入)
query = "select * from products where price > ? and category = ?"
df_params = pd.read_sql(query, engine, params=[100, 'electronics'])

# 5. 分块读取大表
chunk_iter = pd.read_sql("select * from large_table", engine, chunksize=10000)
for chunk in chunk_iter:
    process_chunk(chunk)

sql读取的最佳实践

# 🎯 生产环境的sql读取方案

# 1. 连接池配置(提高性能)
from sqlalchemy import create_engine
from sqlalchemy.pool import queuepool

engine = create_engine(
    'postgresql://user:password@localhost:5432/mydb',
    poolclass=queuepool,
    pool_size=5,
    max_overflow=10
)

# 2. 上下文管理器(确保连接关闭)
with engine.connect() as conn:
    df = pd.read_sql("select * from orders", conn)

# 3. 复杂查询的优化
# 只读取需要的列
df_optimized = pd.read_sql("""
    select id, name, email, created_at
    from users
    where status = 'active'
    order by created_at desc
    limit 1000
""", engine)

# 4. 处理大表的数据类型
# 避免内存溢出,指定数据类型
dtypes = {
    'user_id': 'int32',
    'price': 'float32',
    'status': 'category'
}
df_efficient = pd.read_sql("select * from transactions", engine, dtype=dtypes)

# 5. 时间处理
df_time = pd.read_sql("""
    select *,
           extract(year from created_at) as year,
           extract(month from created_at) as month
    from orders
""", engine, parse_dates=['created_at'])

实战演练:一个完整的数据处理流程

假设你收到了一份销售数据,包含csv、excel、json和数据库四种格式:

import pandas as pd
import sqlite3
from pathlib import path

# 🎯 完整的数据处理示例
def process_sales_data():

    # 1. 读取主数据(csv)
    df_sales = pd.read_csv('sales_main.csv',
                          encoding='utf-8',
                          parse_dates=['date'],
                          dtype={'product_id': 'str', 'customer_id': 'str'})

    # 2. 读取产品信息(excel)
    excel_data = pd.read_excel('products.xlsx', sheet_name=none)
    df_products = excel_data['productlist']
    df_categories = excel_data['categories']

    # 3. 读取客户信息(json)
    df_customers = pd.read_json('customers.json')

    # 4. 读取补充数据(数据库)
    conn = sqlite3.connect('supplementary.db')
    df_regions = pd.read_sql_query("select * from regions", conn)

    # 5. 数据合并和分析
    # 合并销售数据和产品信息
    df_merged = df_sales.merge(df_products, on='product_id', how='left')

    # 合并客户信息
    df_merged = df_merged.merge(df_customers, on='customer_id', how='left')

    # 合并地区信息
    df_merged = df_merged.merge(df_regions, on='region_code', how='left')

    # 6. 数据清洗和转换
    # 处理缺失值
    df_merged['price'] = df_merged['price'].fillna(df_merged['price'].median())

    # 计算销售金额
    df_merged['total_amount'] = df_merged['quantity'] * df_merged['price']

    # 提取月份
    df_merged['month'] = df_merged['date'].dt.month

    return df_merged

# 使用函数
df_result = process_sales_data()
print(f"处理完成!共{len(df_result)}条记录")

性能优化:让你的数据读取飞起来

内存优化技巧

# 🚀 大文件处理优化方案

# 1. 指定数据类型(减少内存占用)
dtypes_optimized = {
    'id': 'int32',
    'name': 'category',  # 重复值多的列用category类型
    'price': 'float32',
    'is_active': 'bool'  # 布尔值
}

df_small = pd.read_csv('large_file.csv', dtype=dtypes_optimized)

# 2. 只读取需要的列
df_selected = pd.read_csv('data.csv',
                         usecols=['name', 'age', 'salary'],
                         dtype={'name': 'str', 'age': 'int16', 'salary': 'float32'})

# 3. 分块处理大文件
chunk_size = 100000
results = []

for chunk in pd.read_csv('huge_file.csv', chunksize=chunk_size):
    # 对每个chunk进行处理
    processed_chunk = chunk[chunk['salary'] > 50000]
    results.append(processed_chunk)

df_final = pd.concat(results, ignore_index=true)

# 4. 使用低内存引擎
df_low_memory = pd.read_csv('data.csv', engine='c', low_memory=true)

并行读取优化

# 🎯 并行处理多个文件
import pandas as pd
from concurrent.futures import threadpoolexecutor
import glob

def read_file(file_path):
    """读取单个文件的函数"""
    try:
        return pd.read_csv(file_path)
    except exception as e:
        print(f"读取文件 {file_path} 失败: {e}")
        return none

# 获取所有csv文件
file_list = glob.glob('data/*.csv')

# 使用线程池并行读取
with threadpoolexecutor(max_workers=4) as executor:
    dfs = list(executor.map(read_file, file_list))

# 过滤掉读取失败的文件
dfs = [df for df in dfs if df is not none]

# 合并所有dataframe
df_combined = pd.concat(dfs, ignore_index=true)
print(f"成功读取并合并了{len(dfs)}个文件")

常见坑爹问题及解决方案

1. 编码问题

# ❌ 编码错误:unicodedecodeerror
# df = pd.read_csv('chinese_data.csv')  # 可能报错

# ✅ 解决方案
try:
    df = pd.read_csv('chinese_data.csv', encoding='utf-8')
except unicodedecodeerror:
    try:
        df = pd.read_csv('chinese_data.csv', encoding='gbk')
    except unicodedecodeerror:
        df = pd.read_csv('chinese_data.csv', encoding='latin-1')

2. 内存不足

# ❌ 内存溢出:memoryerror
# df = pd.read_csv('10gb_file.csv')

# ✅ 解决方案:分块读取
chunk_size = 50000
chunk_iter = pd.read_csv('10gb_file.csv', chunksize=chunk_size)

for i, chunk in enumerate(chunk_iter):
    # 处理每个chunk,比如保存到多个小文件
    chunk.to_csv(f'processed_chunk_{i}.csv', index=false)
    print(f"处理完成第{i+1}个chunk")

3. 日期时间解析

# ❌ 日期被当作字符串读取
# df = pd.read_csv('data.csv')

# ✅ 正确处理日期
df = pd.read_csv('data.csv',
                parse_dates=['date_col'],  # 指定日期列
                date_parser=lambda x: pd.datetime.strptime(x, '%y-%m-%d'))

# 或者读取后再转换
df['date_col'] = pd.to_datetime(df['date_col'])

总结:数据读取的核心原则

记住一句话:数据读取不只是打开文件,而是优雅地处理各种数据格式和异常情况。

核心要点:

  • 永远不要信任数据格式:做好异常处理
  • 内存优化很重要:大数据时要考虑分块和类型优化
  • 选择合适的工具:不同格式用对应的读取方法
  • 性能是王道:合理使用并行处理和缓存

下次再遇到数据读取问题,你就知道该怎么办了。

你在项目里遇到过什么奇葩的数据格式?

以上就是4种pandas高效读取文件数据的完整方法指南的详细内容,更多关于pandas读取文件数据的资料请关注代码网其它相关文章!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com