当前位置: 代码网 > it编程>前端脚本>Python > 使用Python高效读取ZIP压缩文件中的JSON数据

使用Python高效读取ZIP压缩文件中的JSON数据

2025年10月14日 Python 我要评论
本文将详细介绍如何使用python快速高效地读取zip压缩文件中的utf-8编码json文件,并将其转换为pandas dataframe和pyspark dataframe。我们将探讨多种方法,包括

本文将详细介绍如何使用python快速高效地读取zip压缩文件中的utf-8编码json文件,并将其转换为pandas dataframe和pyspark dataframe。我们将探讨多种方法,包括标准库方法、优化技巧以及处理大文件的策略。

准备工作与环境设置

在开始之前,确保已安装必要的python库:

pip install pandas pyspark pyarrow

使用标准库方法读取zip中的json

python的标准库zipfile提供了处理zip文件的基本功能。以下是基础读取方法:

import zipfile
import json
import pandas as pd
from pyspark.sql import sparksession

# 初始化spark会话
spark = sparksession.builder \
    .appname("zip_json_reader") \
    .getorcreate()

高效读取方法

方法1:使用zipfile和io模块

import zipfile
import json
import io

def read_json_from_zip_basic(zip_path, json_filename):
    """基础方法:读取zip中的单个json文件"""
    with zipfile.zipfile(zip_path, 'r') as zip_ref:
        with zip_ref.open(json_filename, 'r') as json_file:
            # 读取并解析json内容
            json_data = json.loads(json_file.read().decode('utf-8'))
            return json_data

方法2:批量处理zip中的多个json文件

def read_multiple_json_from_zip(zip_path, file_extension='.json'):
    """读取zip中所有json文件"""
    all_data = []
    
    with zipfile.zipfile(zip_path, 'r') as zip_ref:
        # 获取所有json文件
        json_files = [f for f in zip_ref.namelist() 
                     if f.endswith(file_extension)]
        
        for json_file in json_files:
            with zip_ref.open(json_file, 'r') as file:
                try:
                    json_data = json.loads(file.read().decode('utf-8'))
                    all_data.append(json_data)
                except json.jsondecodeerror as e:
                    print(f"error reading {json_file}: {e}")
    
    return all_data

转换为pandas dataframe

方法1:直接转换

def zip_json_to_pandas_simple(zip_path, json_filename):
    """将zip中的json文件转换为pandas dataframe(简单版)"""
    json_data = read_json_from_zip_basic(zip_path, json_filename)
    
    # 如果json是数组格式,直接转换为dataframe
    if isinstance(json_data, list):
        return pd.dataframe(json_data)
    # 如果json是对象格式,可能需要特殊处理
    else:
        return pd.dataframe([json_data])

方法2:使用pandas直接读取(推荐)

def zip_json_to_pandas_efficient(zip_path, json_filename):
    """高效方法:使用pandas直接读取zip中的json文件"""
    with zipfile.zipfile(zip_path, 'r') as zip_ref:
        with zip_ref.open(json_filename, 'r') as json_file:
            # 使用pandas直接读取json流
            df = pd.read_json(json_file, encoding='utf-8')
            return df

方法3:处理大型json文件

import ijson

def read_large_json_from_zip(zip_path, json_filename):
    """使用流式处理读取大型json文件"""
    items = []
    
    with zipfile.zipfile(zip_path, 'r') as zip_ref:
        with zip_ref.open(json_filename, 'r') as json_file:
            # 使用ijson进行流式解析
            parser = ijson.parse(json_file)
            
            for prefix, event, value in parser:
                # 根据json结构进行相应处理
                if event == 'start_array' or event == 'end_array':
                    continue
                # 这里需要根据实际json结构调整解析逻辑
                
    return pd.dataframe(items)

转换为pyspark dataframe

方法1:通过pandas中转

def zip_json_to_pyspark_via_pandas(zip_path, json_filename):
    """通过pandas将zip中的json转换为pyspark dataframe"""
    # 先读取为pandas dataframe
    pandas_df = zip_json_to_pandas_efficient(zip_path, json_filename)
    
    # 转换为pyspark dataframe
    spark_df = spark.createdataframe(pandas_df)
    return spark_df

方法2:直接使用pyspark读取(需解压)

import tempfile
import os

def zip_json_to_pyspark_direct(zip_path, json_filename):
    """将zip文件解压后使用pyspark直接读取"""
    with tempfile.temporarydirectory() as temp_dir:
        # 解压zip文件
        with zipfile.zipfile(zip_path, 'r') as zip_ref:
            zip_ref.extract(json_filename, temp_dir)
        
        # 使用pyspark读取解压后的json文件
        json_path = os.path.join(temp_dir, json_filename)
        spark_df = spark.read \
            .option("encoding", "utf-8") \
            .json(json_path)
        
        return spark_df

方法3:处理zip中的多个json文件

def multiple_zip_json_to_pyspark(zip_path):
    """读取zip中所有json文件到pyspark dataframe"""
    all_dfs = []
    
    with tempfile.temporarydirectory() as temp_dir:
        with zipfile.zipfile(zip_path, 'r') as zip_ref:
            # 解压所有json文件
            json_files = [f for f in zip_ref.namelist() if f.endswith('.json')]
            zip_ref.extractall(temp_dir, json_files)
        
        # 读取所有json文件
        for json_file in json_files:
            json_path = os.path.join(temp_dir, json_file)
            df = spark.read.option("encoding", "utf-8").json(json_path)
            all_dfs.append(df)
    
    # 合并所有dataframe
    if all_dfs:
        result_df = all_dfs[0]
        for df in all_dfs[1:]:
            result_df = result_df.union(df)
        return result_df
    else:
        return spark.createdataframe([], schema=none)

处理大型zip文件的策略

方法1:分块读取

def read_large_zip_json_chunked(zip_path, json_filename, chunk_size=1000):
    """分块读取大型zip中的json文件"""
    chunks = []
    
    with zipfile.zipfile(zip_path, 'r') as zip_ref:
        with zip_ref.open(json_filename, 'r') as json_file:
            # 使用pandas的分块读取功能
            for chunk in pd.read_json(json_file, encoding='utf-8', 
                                    lines=true, chunksize=chunk_size):
                chunks.append(chunk)
    
    # 合并所有块
    if chunks:
        return pd.concat(chunks, ignore_index=true)
    else:
        return pd.dataframe()

方法2:使用内存映射

def read_zip_json_with_mmap(zip_path, json_filename):
    """使用内存映射处理大型zip文件"""
    import mmap
    
    with zipfile.zipfile(zip_path, 'r') as zip_ref:
        # 获取文件信息
        file_info = zip_ref.getinfo(json_filename)
        
        with zip_ref.open(json_filename, 'r') as json_file:
            # 创建内存映射
            with mmap.mmap(json_file.fileno(), 0, access=mmap.access_read) as mmapped_file:
                df = pd.read_json(mmapped_file, encoding='utf-8')
                return df

性能优化建议

1. 使用适当的数据类型

def optimize_pandas_dataframe(df):
    """优化pandas dataframe的内存使用"""
    # 转换数据类型以减少内存使用
    for col in df.columns:
        if df[col].dtype == 'object':
            # 尝试转换为分类类型
            if df[col].nunique() / len(df) < 0.5:
                df[col] = df[col].astype('category')
        
        # 转换数值类型
        elif df[col].dtype in ['int64', 'float64']:
            df[col] = pd.to_numeric(df[col], downcast='integer')
    
    return df

2. 并行处理

from concurrent.futures import threadpoolexecutor

def parallel_zip_processing(zip_paths, processing_function, max_workers=4):
    """并行处理多个zip文件"""
    with threadpoolexecutor(max_workers=max_workers) as executor:
        results = list(executor.map(processing_function, zip_paths))
    return results

完整示例代码

import zipfile
import json
import pandas as pd
from pyspark.sql import sparksession
import tempfile
import os

class zipjsonreader:
    """zip文件中的json读取器"""
    
    def __init__(self):
        self.spark = sparksession.builder \
            .appname("zip_json_reader") \
            .getorcreate()
    
    def read_to_pandas(self, zip_path, json_filename=none, optimize=true):
        """读取zip中的json文件到pandas dataframe"""
        
        # 如果未指定文件名,自动查找第一个json文件
        if json_filename is none:
            with zipfile.zipfile(zip_path, 'r') as zip_ref:
                json_files = [f for f in zip_ref.namelist() 
                            if f.endswith('.json')]
                if not json_files:
                    raise valueerror("no json files found in zip")
                json_filename = json_files[0]
        
        # 读取json文件
        with zipfile.zipfile(zip_path, 'r') as zip_ref:
            with zip_ref.open(json_filename, 'r') as json_file:
                df = pd.read_json(json_file, encoding='utf-8')
        
        # 优化内存使用
        if optimize:
            df = self._optimize_dataframe(df)
        
        return df
    
    def read_to_pyspark(self, zip_path, json_filename=none):
        """读取zip中的json文件到pyspark dataframe"""
        
        # 使用临时目录解压文件
        with tempfile.temporarydirectory() as temp_dir:
            with zipfile.zipfile(zip_path, 'r') as zip_ref:
                if json_filename:
                    # 解压指定文件
                    zip_ref.extract(json_filename, temp_dir)
                    json_path = os.path.join(temp_dir, json_filename)
                else:
                    # 解压所有json文件
                    json_files = [f for f in zip_ref.namelist() 
                                if f.endswith('.json')]
                    if not json_files:
                        raise valueerror("no json files found in zip")
                    zip_ref.extractall(temp_dir, json_files)
                    json_path = temp_dir
            
            # 使用pyspark读取
            df = self.spark.read \
                .option("encoding", "utf-8") \
                .json(json_path)
            
            return df
    
    def _optimize_dataframe(self, df):
        """优化dataframe内存使用"""
        for col in df.columns:
            col_type = df[col].dtype
            
            if col_type == 'object':
                # 转换为分类类型
                num_unique_values = len(df[col].unique())
                num_total_values = len(df[col])
                if num_unique_values / num_total_values < 0.5:
                    df[col] = df[col].astype('category')
            elif col_type in ['int64']:
                # 下转换整数类型
                df[col] = pd.to_numeric(df[col], downcast='integer')
            elif col_type in ['float64']:
                # 下转换浮点类型
                df[col] = pd.to_numeric(df[col], downcast='float')
        
        return df
    
    def close(self):
        """关闭spark会话"""
        self.spark.stop()

# 使用示例
if __name__ == "__main__":
    reader = zipjsonreader()
    
    try:
        # 读取到pandas
        pandas_df = reader.read_to_pandas('data.zip', 'example.json')
        print("pandas dataframe:")
        print(pandas_df.head())
        print(f"pandas dataframe shape: {pandas_df.shape}")
        
        # 读取到pyspark
        spark_df = reader.read_to_pyspark('data.zip', 'example.json')
        print("\npyspark dataframe:")
        spark_df.show(5)
        print(f"pyspark dataframe count: {spark_df.count()}")
        
    finally:
        reader.close()

总结

本文介绍了多种高效读取zip压缩文件中utf-8编码json数据的方法:

对于pandas dataframe

  • 使用zipfilepandas.read_json直接读取
  • 处理大型文件时使用分块读取
  • 优化数据类型以减少内存使用

对于pyspark dataframe

  • 通过pandas中转(适合中小型数据)
  • 解压后直接读取(适合大型数据)
  • 支持处理zip中的多个json文件

性能优化

  • 使用适当的数据类型
  • 并行处理多个文件
  • 流式处理大型文件

根据数据大小和处理需求选择合适的方法,可以在保证性能的同时高效地处理zip压缩文件中的json数据。

以上就是使用python高效读取zip压缩文件中的json数据的详细内容,更多关于python读取zip文件的json数据的资料请关注代码网其它相关文章!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com