前言
最近在学习python,将所学内容整理成文章,以便以后翻阅和复习,同时分享给大家。有问题欢迎随时指正。
一、文件操作基础
1、打开文件
open(file_path, mode, encoding)
file_path
:文件路径,可以是相对路径或绝对路径。mode
:打开模式,常用模式包括:
模式 | 描述 |
---|---|
r | 只读(默认) |
w | 写入(覆盖原文件) |
a | 追加写入 |
x | 创建新文件并写入 |
b | 二进制模式(如 rb , wb ) |
+ | 读写模式(如 r+ , w+ ) |
# 示例:打开文件(推荐使用 with 自动管理资源) with open('example.txt', 'r', encoding='utf-8') as f: content = f.read()
2、关闭文件
close()
# 手动打开并关闭文件(不推荐,容易忘记关闭) f = open('demo.txt', 'w', encoding='utf-8') f.write('hello world') f.close() # 必须显式关闭
对比with
语句(推荐方式)
# 使用 with 自动关闭(最佳实践) with open('demo.txt', 'w') as f: f.write('auto-closed file') # 等效于: f = open('demo.txt', 'w') try: f.write('auto-closed file') finally: f.close()
实际场景中的关闭必要性
# 案例:文件未关闭导致写入失败 def write_data(): f = open('data.txt', 'w') f.write('important data') # 忘记 f.close() → 数据可能仍在缓冲区未写入磁盘 # 正确写法: def safe_write(): with open('data.txt', 'w') as f: f.write('saved data') # with 块结束自动关闭并写入磁盘 # 立即读取刚写入的内容 with open('data.txt', 'w') as f: f.write('test') with open('data.txt', 'r') as f: # 必须重新打开才能读取新内容 print(f.read()) # 输出 test
二、读取文件
1、读取全部内容
read()
with open('example.txt', 'r') as f: data = f.read() # 返回整个文件内容的字符串
2、逐行读取
readline()
with open('example.txt', 'r') as f: line = f.readline() # 读取一行 while line: print(line.strip()) # 去除末尾换行符 line = f.readline()
3、读取所有行到列表
with open('example.txt', 'r') as f: lines = f.readlines() # 返回包含所有行的列表 for line in lines: print(line.strip())
三、写入文件
1、写入字符串
write(text)
with open('output.txt', 'w') as f: f.write('hello, world!\n') # 写入内容(覆盖模式)
2、写入多行
writelines(lines)
lines = ['line 1\n', 'line 2\n'] with open('output.txt', 'w') as f: f.writelines(lines) # 写入列表中的每行内容
2.1、常见使用场景
场景1:从其他数据源生成行
# 将数值列表转为文本行 numbers = [1, 2, 3, 4, 5] lines = [f"数值: {n}\n" for n in numbers] with open('data.log', 'w') as f: f.writelines(lines) # 输出: # 数值: 1 # 数值: 2 # 数值: 3 # 数值: 4 # 数值: 5
场景2:追加模式写入
# 在已有文件后追加新行 new_lines = ["追加行1\n", "追加行2\n"] with open('existing_file.txt', 'a', encoding='utf-8') as f: f.writelines(new_lines)
场景3:动态生成内容
import datetime # 生成带时间戳的日志行 timestamps = [ f"[{datetime.datetime.now().strftime('%y-%m-%d %h:%m:%s')}] 事件 {i}\n" for i in range(3) ] with open('events.log', 'w') as f: f.writelines(timestamps) # 输出示例: # [2023-08-20 09:15:23] 事件 0 # [2023-08-20 09:15:23] 事件 1 # [2023-08-20 09:15:23] 事件 2
2.2、注意事项与常见错误
错误1:忘记换行符
# 错误写法:所有内容挤在一行 lines = ["内容a", "内容b", "内容c"] with open('wrong.txt', 'w') as f: f.writelines(lines) # 输出:内容a内容b内容c # 正确写法: lines = ["内容a\n", "内容b\n", "内容c\n"]
错误2:非字符串类型
# 错误案例:包含非字符串元素 mixed_data = ["文本\n", 123, true] with open('error.txt', 'w') as f: # f.writelines(mixed_data) # 将抛出 typeerror # 解决方案:转换为字符串 f.writelines([str(item)+'\n' for item in mixed_data])
2.3、高级用法
方法1:配合生成器处理大文件
def generate_large_data(num_lines): """生成大文件数据""" for i in range(num_lines): yield f"这是第 {i+1} 行数据\n" # 使用生成器避免内存爆炸 # 写入100万行(内存友好) with open('bigfile.txt', 'w') as f: f.writelines(generate_large_data(1_000_000))
方法2:二进制模式写入
# 写入二进制换行符(windows换行符为\r\n) lines = [ b"binary line 1\r\n", b"binary line 2\r\n" ] with open('binary_file.bin', 'wb') as f: f.writelines(lines)
2.4、性能对比
方法 | 10万行耗时 | 内存占用 | 适用场景 |
---|---|---|---|
write() 循环 | 0.45s | 高 | 需要逐行处理 |
writelines() | 0.12s | 低 | 批量写入 |
生成器 + writelines() | 0.15s | 极低 | 超大文件 |
2.5、最佳实践总结
始终添加换行符:
writelines
不会自动添加换行类型统一:确保列表元素都是字符串类型
大文件优化:使用生成器替代列表存储所有行
模式选择:
'w'
:覆盖写入'a'
:追加写入
搭配使用:常与列表推导式结合快速生成内容
# 典型安全写法示例 data = [f"处理结果: {x}\n" for x in some_dataset] with open('result.txt', 'w') as f: f.writelines(data)
3、追加写入
使用模式 a
with open('output.txt', 'a') as f: f.write('appended line.\n')
四、文件指针操作
获取当前位置:
tell()
移动指针:
seek(offset, whence)
whence
:0(文件开头),1(当前位置),2(文件末尾)
with open('example.txt', 'r') as f: f.seek(5) # 移动指针到第5字节 print(f.read(3)) # 读取接下来3个字符 print(f.tell()) # 输出当前指针位置
五、二进制文件操作
使用 b
模式处理非文本文件(如图片、视频):
# 复制图片 with open('input.jpg', 'rb') as src, open('copy.jpg', 'wb') as dst: data = src.read() dst.write(data)
六、实际工作场景中的常见代码示例
1、日志文件处理(按条件过滤)
import re from pathlib import path def filter_logs(input_file, output_file, keyword, start_time=none): """ 从日志文件中提取包含关键字的行,并可选时间范围 :param input_file: 输入日志文件路径 :param output_file: 输出结果文件路径 :param keyword: 需要过滤的关键字 :param start_time: 起始时间(格式:2023-08-01 10:00:00) """ input_path = path(input_file) if not input_path.exists(): raise filenotfounderror(f"日志文件 {input_file} 不存在") pattern = re.compile(r'\[(.*?)\]') # 假设日志时间格式为 [2023-08-01 10:00:00] with open(input_file, 'r', encoding='utf-8') as f_in, \ open(output_file, 'w', encoding='utf-8') as f_out: for line in f_in: # 时间戳提取 time_match = pattern.search(line) if not time_match: continue log_time = time_match.group(1) # 时间条件判断 if start_time and log_time < start_time: continue # 关键字匹配 if keyword in line: f_out.write(line) # 使用示例 filter_logs('app.log', 'error_logs.txt', 'error', '2023-08-01 12:00:00')
2、csv数据清洗(处理缺失值)
import csv def clean_csv(input_file, output_file, default_value='n/a'): """ 清洗csv文件:处理空值并保存新文件 """ with open(input_file, 'r', newline='', encoding='utf-8') as f_in, \ open(output_file, 'w', newline='', encoding='utf-8') as f_out: reader = csv.dictreader(f_in) writer = csv.dictwriter(f_out, fieldnames=reader.fieldnames) writer.writeheader() for row in reader: cleaned_row = { key: value if value.strip() != '' else default_value for key, value in row.items() } writer.writerow(cleaned_row) # 使用示例 clean_csv('dirty_data.csv', 'cleaned_data.csv', default_value='0')
3、大文件分块读取(内存优化)
def process_large_file(file_path, chunk_size=1024*1024): # 默认1mb块 """ 分块读取大文件,避免内存溢出 """ with open(file_path, 'r', encoding='utf-8') as f: while true: chunk = f.read(chunk_size) if not chunk: break # 在此处处理数据块(例如:计数、分析等) yield chunk # 使用示例(统计文件行数) line_count = 0 for chunk in process_large_file('huge_file.log'): line_count += chunk.count('\n') print(f"总行数: {line_count}")
4、配置文件解析(json/yaml)
import json import yaml # 需要安装pyyaml: pip install pyyaml def load_config(config_path): """ 自动识别json/yaml配置文件并加载 """ config_path = path(config_path) if not config_path.exists(): raise filenotfounderror("配置文件不存在") with open(config_path, 'r', encoding='utf-8') as f: if config_path.suffix == '.json': return json.load(f) elif config_path.suffix in ('.yaml', '.yml'): return yaml.safe_load(f) else: raise valueerror("不支持的配置文件格式") # 使用示例 config = load_config('app_config.yaml') print(config['database']['host'])
5、文件监控(实时处理新内容)
import time def tail_file(file_path, interval=1): """ 模拟linux tail -f 功能,实时监控文件新增内容 """ with open(file_path, 'r', encoding='utf-8') as f: # 移动到文件末尾 f.seek(0, 2) while true: line = f.readline() if not line: time.sleep(interval) continue yield line # 使用示例(监控日志并报警) for new_line in tail_file('app.log'): if 'critical' in new_line: send_alert(f"发现关键错误: {new_line}")
6、多文件合并(归并处理)
from pathlib import path def merge_csv_files(input_dir, output_file): """ 合并目录下所有csv文件(假设结构相同) """ input_dir = path(input_dir) csv_files = list(input_dir.glob('*.csv')) with open(output_file, 'w', newline='', encoding='utf-8') as f_out: header_written = false for csv_file in csv_files: with open(csv_file, 'r', newline='', encoding='utf-8') as f_in: reader = csv.reader(f_in) header = next(reader) if not header_written: csv.writer(f_out).writerow(header) header_written = true for row in reader: csv.writer(f_out).writerow(row) # 使用示例 merge_csv_files('daily_reports', 'combined_report.csv')
总结
到此这篇关于python学习教程之文件操作详解的文章就介绍到这了,更多相关python文件操作详解内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论