python操作spark的常用命令指南,涵盖从环境配置到数据分析的核心操作。
一、环境配置与spark初始化
pyspark是apache spark的python api,它结合了python的易用性和spark的分布式计算能力。
1. 安装与基础导入
# 安装pyspark !pip install pyspark # 导入必要的库 from pyspark.sql import sparksession import pyspark.sql.functions as f # 常用函数 import pyspark.sql.types as t # 数据类型
2. 创建sparksession
sparksession是与spark集群交互的统一入口点,绝大多数操作都从这里开始。
spark = sparksession.builder \
.appname("mysparkapp") \ # 设置应用名称
.config("spark.driver.memory", "2g") \ # 配置参数(可选)
.getorcreate()
3. 基础环境检查
# 检查spark版本
print(spark.version)
# 查看当前配置
print(spark.conf.getall())
# 列出数据库和表
spark.sql("show databases").show()
spark.sql("use your_database") # 切换数据库
spark.sql("show tables").show()
二、核心操作:数据读写
1. 创建dataframe
有多种方式创建dataframe,这是spark中的核心数据结构。
# 方式1:从python列表创建
data = [("alice", 29), ("bob", 35)]
columns = ["name", "age"]
df = spark.createdataframe(data, schema=columns)
# 方式2:指定详细模式(schema)
schema = t.structtype([
t.structfield("name", t.stringtype(), true),
t.structfield("age", t.integertype(), true)
])
df = spark.createdataframe(data, schema=schema)2. 从文件读取数据
pyspark支持csv、parquet、json等多种格式。
# 读取csv文件(常用)
df = spark.read.csv(
"path/to/file.csv",
header=true, # 第一行作为列名
inferschema=true # 自动推断列类型
)
# 读取parquet文件(列式存储,高效)
df = spark.read.parquet("path/to/file.parquet")
# 读取json文件
df = spark.read.json("path/to/file.json")3. 数据写入与保存
# 保存为parquet格式(推荐,压缩率高)
df.write.mode("overwrite").parquet("output_path.parquet")
# 保存为csv格式
df.write.mode("overwrite") \
.option("header", true) \
.csv("output_path.csv")
# 保存为spark表(可在集群中持久化)
df.write.saveastable("table_name")三、数据处理与转换
数据处理的核心是对dataframe进行列和行的操作。
1. 列操作
# 选择特定列
df.select("name", "age").show()
# 创建新列(示例:年龄加1)
df = df.withcolumn("age_plus_one", f.col("age") + 1)
# 重命名列
df = df.withcolumnrenamed("old_name", "new_name")
# 更改列类型(示例:整型转字符串)
df = df.withcolumn("age_str", f.col("age").cast(t.stringtype()))
# 删除列
df = df.drop("column_to_remove")2. 行操作(过滤与排序)
# 过滤行(示例:年龄大于30)
df_filtered = df.filter(f.col("age") > 30)
# 等价写法
df_filtered = df.where(df["age"] > 30)
# 排序
df_sorted = df.orderby(f.col("age").desc()) # 按年龄降序3. 处理缺失值
# 删除包含任何空值的行
df_clean = df.dropna()
# 填充缺失值(示例:用0填充特定列)
df_filled = df.fillna({"age": 0, "name": "unknown"})四、数据聚合与高级分析
1. 分组与聚合
这是数据分析中最常用的操作之一。
# 基础分组聚合(示例:按部门计算平均工资)
df.groupby("department").agg(
f.avg("salary").alias("avg_salary"),
f.count("*").alias("employee_count")
).show()
2. 连接(join)操作
用于合并两个dataframe。
# 假设有另一个部门信息表df_dept
df_joined = df.join(
df_dept,
on="department_id", # 连接键
how="inner" # 连接方式:inner, left, right, outer等
)
3. 窗口函数
用于计算排名、移动平均等高级分析。
from pyspark.sql.window import window
# 定义窗口:按部门分区,按工资降序
window_spec = window.partitionby("department").orderby(f.col("salary").desc())
# 计算部门内工资排名
df.withcolumn("salary_rank", f.rank().over(window_spec)).show()4. 用户自定义函数(udf)
当内置函数无法满足需求时使用。
from pyspark.sql.functions import udf
# 定义python函数
def categorize_age(age):
return "young" if age < 30 else "senior"
# 注册为udf(需指定返回类型)
categorize_udf = udf(categorize_age, t.stringtype())
# 应用udf
df.withcolumn("age_group", categorize_udf(f.col("age"))).show()五、在pyspark中运行sql查询
你可以直接在pyspark中执行sql语句,这为熟悉sql的用户提供了便利。
# 将dataframe注册为临时视图
df.createorreplacetempview("people")
# 执行sql查询
result = spark.sql("""
select department, avg(salary) as avg_sal
from people
where age > 25
group by department
order by avg_sal desc
""")
result.show()六、性能优化与最佳实践
- 避免数据混洗:
groupby、join等操作可能导致数据在节点间大量移动,应尽量减少这类操作或提前过滤数据。 - 选择合适的数据格式:生产环境中,parquet通常是比csv更好的选择,因为它支持列式存储和谓词下推,能显著提高查询性能。
- 利用缓存:对需要多次使用的中间结果进行缓存。
df.cache() # 将dataframe缓存到内存 df.unpersist() # 使用后释放缓存
- 及时关闭会话:处理完成后关闭sparksession以释放资源。
spark.stop()
七、一个完整的示例
from pyspark.sql import sparksession
import pyspark.sql.functions as f
# 1. 初始化
spark = sparksession.builder.appname("example").getorcreate()
# 2. 读取数据
df = spark.read.csv("sales.csv", header=true, inferschema=true)
# 3. 数据处理
result = (df
.filter(f.col("amount") > 100) # 筛选大额交易
.groupby("region", "product") # 按地区和产品分组
.agg(f.sum("amount").alias("total_sales")) # 计算总销售额
.orderby(f.col("total_sales").desc()) # 按销售额降序排序
)
# 4. 输出
result.show()
result.write.mode("overwrite").parquet("sales_summary.parquet")
# 5. 清理
spark.stop()到此这篇关于python操作spark常用命令指南的文章就介绍到这了,更多相关python spark命令内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论