1、常见dsl代码整理
| 分类 | 格式 | 含义 | 示例 |
|---|---|---|---|
| api/方法 | select | 查询字段 | select('id1', 'id2') |
| where | 对数据过滤 | where('avg_score>3') | |
| groupby | 对数据分组 | groupby('userid') | |
| orderby | 对数据排序 | orderby('cnt', ascending=false) | |
| limit | 取前几条数据 | orderby('cnt', ascending=false).limit(1) | |
| agg | 聚合操作,里面可以写多个聚合表达式 | agg(f.round(f.avg('score'), 2).alias('avg_score')) | |
| show | 打印数据 | init_df.show() | |
| printschema | 打印数据的schema信息,也就是元数据信息 | init_df.printschema() | |
| alias | 对字段取别名 | f.count('movieid').alias('cnt') | |
| join | 关联2个dataframe | etl_df.join(avg_score_dsl_df, 'movieid') | |
| withcolumn | 基于目前的数据产生一个新列 | init_df.withcolumn('word',f.explode(f.split('value', ' '))) | |
| dropduplicates | 删除重复数据 | init_df.dropduplicates(subset=["id","name"]) | |
| dropna | 删除缺失值 | init_df.dropna(thresh=2,subset=["name","age","address"]) | |
| fillna | 替换缺失值 | init_df.fillna(value={"id":111,"name":"未知姓名","age":100,"address":"北京"}) | |
| first | 取dataframe中的第一行数据 | ||
| over | 创建一个窗口列 | ||
| 窗口 | partitionby | 对数据分区 | |
| orderby | 对数据排序 | orderby(f.desc('pv')) | |
| 函数 | row_number | 行号。从1开始编号 | |
| desc | 降序排序 | ||
| avg | 计算均值 | ||
| count | 计数 | ||
| round | 保留小数位 | ||
| col | 将字段包装成column对象,一般用于对新列的包装 | ||
1- 什么使用使用select(),什么时候使用groupby()+agg()/select()实现聚合?:如果不需要对数据分组,那么可以直接使用select()实现聚合;如果有分组操作,需要使用groupby()+agg()/select(),推荐使用agg() 2- first(): 如果某个dataframe中只有一行数据,并且不使用join来对比数据,那么一般需要使用first()明确指定和第一行进行比较 3- f.col(): 对于在计算过程中临时产生的字段,需要使用f.col()封装成column对象,然后去使用
-
api/方法:是由dataframe来调用
-
函数:需要先通过import pyspark.sql.functions as f导入,使用f调用。spark sql内置提供的函数spark sql, built-in functions
-
窗口:需要先通过from pyspark.sql import window导入
2、电影分析案例
需求说明:
数据集的介绍:

数据说明 : userid,movieid,score,datestr
字段的分隔符号为: \t
需求分析:
-
需求一: 查询用户平均分
-
需求二: 查询每部电影的平均分
-
需求三: 查询大于平均分的电影的数量
-
需求四: 查询高分电影中(电影平均分大于3)打分次数最多的用户, 并且求出此人所有的打分记录中, 打的平均分是多少
-
需求五: 查询每个用户的平均打分, 最低打分, 最高打分(课后作业)
-
需求六: 查询被评分超过100次的电影的平均分 排名 top10(课后作业)
一三四需求实现代码:
# 导包
import os
from pyspark.sql import sparksession,functions as f
# 绑定指定的python解释器
os.environ['spark_home'] = '/export/server/spark'
os.environ['pyspark_python'] = '/root/anaconda3/bin/python3'
os.environ['pyspark_driver_python'] = '/root/anaconda3/bin/python3'
def demo1_get_user_avg_score():
# 方式1: sql方式
spark.sql(
'select userid,round(avg(score),3) as user_avg_score from movie group by userid'
).show()
# 方式2: dsl方式
etldf.groupby('userid').agg(
f.round(f.avg('score'), 3).alias('user_avg_score')
).show()
def demo2_get_lag_avg_movie_cnt():
# 方式1: sql
spark.sql(
"""
select count(1) as cnt from (
select movieid,avg(score) as movie_avg_score from movie group by movieid
having movie_avg_score > (select avg(score) as all_avg_score from movie)
) t
"""
).show()
# 方式2: dsl
# col():把临时结果作为新列使用 first():取第一个值
etldf.groupby('movieid').agg(
f.avg('score').alias('movie_avg_score')
).where(
f.col('movie_avg_score') > etldf.select(f.avg('score').alias('all_avg_score')).first()['all_avg_score']
).agg(
f.count('movieid').alias('cnt')
).show()
def demo3_get_top1_user_avg_sql():
# 方式1: sql
# ①先查询高分电影:
spark.sql(
"select movieid,avg(score) as movie_avg_score from movie group by movieid having movie_avg_score > 3"
).createtempview('hight_score_tb')
# ②再求打分次数最多的用户(先不考虑并列,只取最大1个)
spark.sql(
"select userid,count(1) as cnt from hight_score_tb h join movie m on h.movieid = m.movieid group by userid order by cnt desc limit 1"
).createtempview('top1_user_tb')
# ③最后求此人所有打分的平均分
spark.sql(
"select avg(score) as top1_user_avg from movie where userid = (select userid from top1_user_tb)"
).show()
def demo3_get_top1_user_avg_dsl():
# ①先查询高分电影:
hight_score_df = etldf.groupby('movieid').agg(
f.avg('score').alias('movie_avg_score')
).where('movie_avg_score>3')
# ②再求打分次数最多的用户(先不考虑并列,只取最大1个)
top1_user_df = hight_score_df.join(etldf, on=hight_score_df['movieid'] == etldf['movieid']) \
.groupby('userid').agg(f.count('userid').alias('cnt')) \
.orderby('cnt', ascending=false).limit(1)
# ③最后求此人所有打分的平均分
etldf.where(
etldf['userid'] == top1_user_df.first()['userid']
).agg(
f.avg('score').alias('top1_user_avg')
).show()
# 创建main函数
if __name__ == '__main__':
# 1.创建sparkcontext对象
spark = sparksession.builder.appname('pyspark_demo').master('local[*]').getorcreate()
# 2.数据输入
df = spark.read.csv(
schema='userid string,movieid string,score int,datestr string',
sep='\t',
path='file:///export/data/spark_project/spark_sql/data/u.data'
)
print(df.count())
# 3.数据处理(切分,转换,分组聚合)
etldf = df.dropduplicates().dropna()
print(etldf.count())
# 4.数据分析
# 方便后续所有sql方式使用,提前创建临时视图作为表
etldf.createtempview('movie')
# 需求1: 查询用户的平均分
# demo1_get_user_avg_score()
# 需求3: 查询大于平均分的电影的数量
# demo2_get_lag_avg_movie_cnt()
# 需求4: 查询高分电影(平均分>3)中,打分次数最多的用户,并求出此人所有打分的平均分
# 方式1: sql
demo3_get_top1_user_avg_sql()
# 方式2: dsl
demo3_get_top1_user_avg_dsl()
# 5.数据输出
# 6.关闭资源
spark.stop()
附录: 问题
可能出现的错误一:

原因: 是使用withcolumn产生新列,但是表达式中有聚合的操作。缺少groupby调用
可能出现的错误二:

错误原因:dataframe结果是单行的情况,列值获取错误

解决办法:
将df_total_avg_score['total_avg_score']改成df_total_avg_score.first()['total_avg_score']
可能遇到的错误三:

原因:对于在计算过程中临时产生的字段,需要使用f.col封装成column对象
解决办法:f.col('avg_score')
发表评论