当前位置: 代码网 > it编程>前端脚本>Python > Python PySpark 核心实操入门案例指南

Python PySpark 核心实操入门案例指南

2026年04月03日 Python 我要评论
本文目标本文以 真实案例 + 深度拆解 的方式,带你从零开始掌握 pyspark 的核心机制,涵盖:rdd 基本概念与创建数据类型转换:t → t 与 t → u核心算子详解(ma

本文目标

本文以 真实案例 + 深度拆解 的方式,带你从零开始掌握 pyspark 的核心机制,涵盖:

  • rdd 基本概念与创建
  • 数据类型转换:t → tt → u
  • 核心算子详解(map, flatmap, filter, distinct, sortby, reducebykey
  • 如何修改 rdd 分区?
  • 如何修复 msvcr120.dll 缺失?(windows 专属)
  • 如何解决 spark 超时问题?
  • 两个实战案例:城市销量排名 & 词频统计

一、pyspark 环境搭建基础配置(关键!)

特此强调:windows 用户务必设置以下环境变量,否则几乎所有操作都可能失败!

import os
# 1. 指定 python 解释器路径(必须!)
os.environ["pyspark_python"] = "d:\\app\\anaconda\\envs\\spark_env\\python.exe"
# 2. windows 超时问题(防止 task 超时崩溃)
os.environ['pyspark_timeout'] = '600'
os.environ['pyspark_driver_timeout'] = '600'
# 3.  hadoop native dll 加载导致 msvcr120.dll 报错
os.environ['path'] += os.pathsep + 'e:\\app\\hadoop-3.4.2\\bin'
# 4. 强制设置 driver python(可选但推荐)
os.environ["pyspark_driver_python"] = "d:\\app\\anaconda\\envs\\spark_env\\python.exe"

为什么这步如此重要?

  • msvcr120.dllvisual c++ 2013 运行时库,常在 hadoop 的 hadoop.dll 调用失败时出现。
  • 即使你不用 saveastextfile(),如果 path 中包含 hadoop/bin,spark 仍会触发 native io!
  • 终极建议:仅当必须使用原生 io 时才保留 path;否则删除该路径!

如何检查到是 msvcr120.dll 的问题: 这是我的报错

from pyspark import sparkconf,sparkcontext
import os
os.environ["pyspark_python"] = "d:\app\anaconda\envs\spark_env\python.exe"
os.environ['path'] += os.pathsep + 'e:\\app\\hadoop-3.4.2\\bin'
conf = sparkconf().setmaster("local").setappname("test_spark_app")
sc = sparkcontext(conf=conf)
rdd = sc.parallelize([1,2,3,4,5])
rdd.saveastextfile("d:/output1")
sc.stop()d:\app\anaconda\envs\spark_env\python.exe d:\pythonprojects\python\day11_pyspark\数据输出\输出为文本文档.py 
setting default log level to "warn".
to adjust logging level use sc.setloglevel(newlevel). for sparkr, use setloglevel(newlevel).
26/04/01 22:22:55 warn nativecodeloader: unable to load native-hadoop library for your platform... using builtin-java classes where applicable
26/04/01 22:22:58 error sparkhadoopwriter: aborting job job_202604012222567528972640969878075_0003.
java.lang.unsatisfiedlinkerror: org.apache.hadoop.io.nativeio.nativeio$windows.access0(ljava/lang/string;i)z
	at org.apache.hadoop.io.nativeio.nativeio$windows.access0(native method)
	at org.apache.hadoop.io.nativeio.nativeio$windows.access(nativeio.java:793)
	at org.apache.hadoop.fs.fileutil.canread(fileutil.java:1249)
	at org.apache.hadoop.fs.fileutil.list(fileutil.java:1454)
	at org.apache.hadoop.fs.rawlocalfilesystem.liststatus(rawlocalfilesystem.java:601)
	at org.apache.hadoop.fs.filesystem.liststatus(filesystem.java:1972)
	at org.apache.hadoop.fs.filesystem.liststatus(filesystem.java:2014)
	at org.apache.hadoop.fs.checksumfilesystem.liststatus(checksumfilesystem.java:761)
	at org.apache.hadoop.fs.filesystem.liststatus(filesystem.java:1972)
	at org.apache.hadoop.fs.filesystem.liststatus(filesystem.java:2014)
	at org.apache.hadoop.mapreduce.lib.output.fileoutputcommitter.getallcommittedtaskpaths(fileoutputcommitter.java:334)
	at org.apache.hadoop.mapreduce.lib.output.fileoutputcommitter.commitjobinternal(fileoutputcommitter.java:404)
	at org.apache.hadoop.mapreduce.lib.output.fileoutputcommitter.commitjob(fileoutputcommitter.java:377)
	at org.apache.hadoop.mapred.fileoutputcommitter.commitjob(fileoutputcommitter.java:136)
	at org.apache.hadoop.mapred.outputcommitter.commitjob(outputcommitter.java:291)
	at org.apache.spark.internal.io.hadoopmapreducecommitprotocol.commitjob(hadoopmapreducecommitprotocol.scala:192)
	at org.apache.spark.internal.io.sparkhadoopwriter$.$anonfun$write$3(sparkhadoopwriter.scala:100)
	at scala.runtime.java8.jfunction0$mcv$sp.apply(jfunction0$mcv$sp.java:23)
	at org.apache.spark.util.utils$.timetakenms(utils.scala:552)
	at org.apache.spark.internal.io.sparkhadoopwriter$.write(sparkhadoopwriter.scala:100)
	at org.apache.spark.rdd.pairrddfunctions.$anonfun$saveashadoopdataset$1(pairrddfunctions.scala:1091)
	at scala.runtime.java8.jfunction0$mcv$sp.apply(jfunction0$mcv$sp.java:23)
	at org.apache.spark.rdd.rddoperationscope$.withscope(rddoperationscope.scala:151)
	at org.apache.spark.rdd.rddoperationscope$.withscope(rddoperationscope.scala:112)
	at org.apache.spark.rdd.rdd.withscope(rdd.scala:407)
	at org.apache.spark.rdd.pairrddfunctions.saveashadoopdataset(pairrddfunctions.scala:1089)
	at org.apache.spark.rdd.pairrddfunctions.$anonfun$saveashadoopfile$4(pairrddfunctions.scala:1062)
	at scala.runtime.java8.jfunction0$mcv$sp.apply(jfunction0$mcv$sp.java:23)
	at org.apache.spark.rdd.rddoperationscope$.withscope(rddoperationscope.scala:151)
	at org.apache.spark.rdd.rddoperationscope$.withscope(rddoperationscope.scala:112)
	at org.apache.spark.rdd.rdd.withscope(rdd.scala:407)
	at org.apache.spark.rdd.pairrddfunctions.saveashadoopfile(pairrddfunctions.scala:1027)
	at org.apache.spark.rdd.pairrddfunctions.$anonfun$saveashadoopfile$3(pairrddfunctions.scala:1009)
	at scala.runtime.java8.jfunction0$mcv$sp.apply(jfunction0$mcv$sp.java:23)
	at org.apache.spark.rdd.rddoperationscope$.withscope(rddoperationscope.scala:151)
	at org.apache.spark.rdd.rddoperationscope$.withscope(rddoperationscope.scala:112)
	at org.apache.spark.rdd.rdd.withscope(rdd.scala:407)
	at org.apache.spark.rdd.pairrddfunctions.saveashadoopfile(pairrddfunctions.scala:1008)
	at org.apache.spark.rdd.pairrddfunctions.$anonfun$saveashadoopfile$2(pairrddfunctions.scala:965)
	at scala.runtime.java8.jfunction0$mcv$sp.apply(jfunction0$mcv$sp.java:23)
	at org.apache.spark.rdd.rddoperationscope$.withscope(rddoperationscope.scala:151)
	at org.apache.spark.rdd.rddoperationscope$.withscope(rddoperationscope.scala:112)
	at org.apache.spark.rdd.rdd.withscope(rdd.scala:407)
	at org.apache.spark.rdd.pairrddfunctions.saveashadoopfile(pairrddfunctions.scala:963)
	at org.apache.spark.rdd.rdd.$anonfun$saveastextfile$2(rdd.scala:1620)
	at scala.runtime.java8.jfunction0$mcv$sp.apply(jfunction0$mcv$sp.java:23)
	at org.apache.spark.rdd.rddoperationscope$.withscope(rddoperationscope.scala:151)
	at org.apache.spark.rdd.rddoperationscope$.withscope(rddoperationscope.scala:112)
	at org.apache.spark.rdd.rdd.withscope(rdd.scala:407)
	at org.apache.spark.rdd.rdd.saveastextfile(rdd.scala:1620)
	at org.apache.spark.rdd.rdd.$anonfun$saveastextfile$1(rdd.scala:1606)
	at scala.runtime.java8.jfunction0$mcv$sp.apply(jfunction0$mcv$sp.java:23)
	at org.apache.spark.rdd.rddoperationscope$.withscope(rddoperationscope.scala:151)
	at org.apache.spark.rdd.rddoperationscope$.withscope(rddoperationscope.scala:112)
	at org.apache.spark.rdd.rdd.withscope(rdd.scala:407)
	at org.apache.spark.rdd.rdd.saveastextfile(rdd.scala:1606)
	at org.apache.spark.api.java.javarddlike.saveastextfile(javarddlike.scala:564)
	at org.apache.spark.api.java.javarddlike.saveastextfile$(javarddlike.scala:563)
	at org.apache.spark.api.java.abstractjavarddlike.saveastextfile(javarddlike.scala:45)
	at sun.reflect.nativemethodaccessorimpl.invoke0(native method)
	at sun.reflect.nativemethodaccessorimpl.invoke(unknown source)
	at sun.reflect.delegatingmethodaccessorimpl.invoke(unknown source)
	at java.lang.reflect.method.invoke(unknown source)
	at py4j.reflection.methodinvoker.invoke(methodinvoker.java:244)
	at py4j.reflection.reflectionengine.invoke(reflectionengine.java:374)
	at py4j.gateway.invoke(gateway.java:282)
	at py4j.commands.abstractcommand.invokemethod(abstractcommand.java:132)
	at py4j.commands.callcommand.execute(callcommand.java:79)
	at py4j.clientserverconnection.waitforcommands(clientserverconnection.java:182)
	at py4j.clientserverconnection.run(clientserverconnection.java:106)
	at java.lang.thread.run(unknown source)
traceback (most recent call last):
  file "d:\pythonprojects\python\day11_pyspark\数据输出\输出为文本文档.py", line 9, in <module>
    rdd.saveastextfile("d:/output1")
  file "d:\app\anaconda\envs\spark_env\lib\site-packages\pyspark\rdd.py", line 3425, in saveastextfile
    keyed._jrdd.map(self.ctx._jvm.bytestostring()).saveastextfile(path)
  file "d:\app\anaconda\envs\spark_env\lib\site-packages\py4j\java_gateway.py", line 1322, in __call__
    return_value = get_return_value(
  file "d:\app\anaconda\envs\spark_env\lib\site-packages\py4j\protocol.py", line 326, in get_return_value
    raise py4jjavaerror(
py4j.protocol.py4jjavaerror: an error occurred while calling o33.saveastextfile.
: org.apache.spark.sparkexception: job aborted.
	at org.apache.spark.internal.io.sparkhadoopwriter$.write(sparkhadoopwriter.scala:106)
	at org.apache.spark.rdd.pairrddfunctions.$anonfun$saveashadoopdataset$1(pairrddfunctions.scala:1091)
	at scala.runtime.java8.jfunction0$mcv$sp.apply(jfunction0$mcv$sp.java:23)
	at org.apache.spark.rdd.rddoperationscope$.withscope(rddoperationscope.scala:151)
	at org.apache.spark.rdd.rddoperationscope$.withscope(rddoperationscope.scala:112)
	at org.apache.spark.rdd.rdd.withscope(rdd.scala:407)
	at org.apache.spark.rdd.pairrddfunctions.saveashadoopdataset(pairrddfunctions.scala:1089)
	at org.apache.spark.rdd.pairrddfunctions.$anonfun$saveashadoopfile$4(pairrddfunctions.scala:1062)
	at scala.runtime.java8.jfunction0$mcv$sp.apply(jfunction0$mcv$sp.java:23)
	at org.apache.spark.rdd.rddoperationscope$.withscope(rddoperationscope.scala:151)
	at org.apache.spark.rdd.rddoperationscope$.withscope(rddoperationscope.scala:112)
	at org.apache.spark.rdd.rdd.withscope(rdd.scala:407)
	at org.apache.spark.rdd.pairrddfunctions.saveashadoopfile(pairrddfunctions.scala:1027)
	at org.apache.spark.rdd.pairrddfunctions.$anonfun$saveashadoopfile$3(pairrddfunctions.scala:1009)
	at scala.runtime.java8.jfunction0$mcv$sp.apply(jfunction0$mcv$sp.java:23)
	at org.apache.spark.rdd.rddoperationscope$.withscope(rddoperationscope.scala:151)
	at org.apache.spark.rdd.rddoperationscope$.withscope(rddoperationscope.scala:112)
	at org.apache.spark.rdd.rdd.withscope(rdd.scala:407)
	at org.apache.spark.rdd.pairrddfunctions.saveashadoopfile(pairrddfunctions.scala:1008)
	at org.apache.spark.rdd.pairrddfunctions.$anonfun$saveashadoopfile$2(pairrddfunctions.scala:965)
	at scala.runtime.java8.jfunction0$mcv$sp.apply(jfunction0$mcv$sp.java:23)
	at org.apache.spark.rdd.rddoperationscope$.withscope(rddoperationscope.scala:151)
	at org.apache.spark.rdd.rddoperationscope$.withscope(rddoperationscope.scala:112)
	at org.apache.spark.rdd.rdd.withscope(rdd.scala:407)
	at org.apache.spark.rdd.pairrddfunctions.saveashadoopfile(pairrddfunctions.scala:963)
	at org.apache.spark.rdd.rdd.$anonfun$saveastextfile$2(rdd.scala:1620)
	at scala.runtime.java8.jfunction0$mcv$sp.apply(jfunction0$mcv$sp.java:23)
	at org.apache.spark.rdd.rddoperationscope$.withscope(rddoperationscope.scala:151)
	at org.apache.spark.rdd.rddoperationscope$.withscope(rddoperationscope.scala:112)
	at org.apache.spark.rdd.rdd.withscope(rdd.scala:407)
	at org.apache.spark.rdd.rdd.saveastextfile(rdd.scala:1620)
	at org.apache.spark.rdd.rdd.$anonfun$saveastextfile$1(rdd.scala:1606)
	at scala.runtime.java8.jfunction0$mcv$sp.apply(jfunction0$mcv$sp.java:23)
	at org.apache.spark.rdd.rddoperationscope$.withscope(rddoperationscope.scala:151)
	at org.apache.spark.rdd.rddoperationscope$.withscope(rddoperationscope.scala:112)
	at org.apache.spark.rdd.rdd.withscope(rdd.scala:407)
	at org.apache.spark.rdd.rdd.saveastextfile(rdd.scala:1606)
	at org.apache.spark.api.java.javarddlike.saveastextfile(javarddlike.scala:564)
	at org.apache.spark.api.java.javarddlike.saveastextfile$(javarddlike.scala:563)
	at org.apache.spark.api.java.abstractjavarddlike.saveastextfile(javarddlike.scala:45)
	at sun.reflect.nativemethodaccessorimpl.invoke0(native method)
	at sun.reflect.nativemethodaccessorimpl.invoke(unknown source)
	at sun.reflect.delegatingmethodaccessorimpl.invoke(unknown source)
	at java.lang.reflect.method.invoke(unknown source)
	at py4j.reflection.methodinvoker.invoke(methodinvoker.java:244)
	at py4j.reflection.reflectionengine.invoke(reflectionengine.java:374)
	at py4j.gateway.invoke(gateway.java:282)
	at py4j.commands.abstractcommand.invokemethod(abstractcommand.java:132)
	at py4j.commands.callcommand.execute(callcommand.java:79)
	at py4j.clientserverconnection.waitforcommands(clientserverconnection.java:182)
	at py4j.clientserverconnection.run(clientserverconnection.java:106)
	at java.lang.thread.run(unknown source)
caused by: java.lang.unsatisfiedlinkerror: org.apache.hadoop.io.nativeio.nativeio$windows.access0(ljava/lang/string;i)z
	at org.apache.hadoop.io.nativeio.nativeio$windows.access0(native method)
	at org.apache.hadoop.io.nativeio.nativeio$windows.access(nativeio.java:793)
	at org.apache.hadoop.fs.fileutil.canread(fileutil.java:1249)
	at org.apache.hadoop.fs.fileutil.list(fileutil.java:1454)
	at org.apache.hadoop.fs.rawlocalfilesystem.liststatus(rawlocalfilesystem.java:601)
	at org.apache.hadoop.fs.filesystem.liststatus(filesystem.java:1972)
	at org.apache.hadoop.fs.filesystem.liststatus(filesystem.java:2014)
	at org.apache.hadoop.fs.checksumfilesystem.liststatus(checksumfilesystem.java:761)
	at org.apache.hadoop.fs.filesystem.liststatus(filesystem.java:1972)
	at org.apache.hadoop.fs.filesystem.liststatus(filesystem.java:2014)
	at org.apache.hadoop.mapreduce.lib.output.fileoutputcommitter.getallcommittedtaskpaths(fileoutputcommitter.java:334)
	at org.apache.hadoop.mapreduce.lib.output.fileoutputcommitter.commitjobinternal(fileoutputcommitter.java:404)
	at org.apache.hadoop.mapreduce.lib.output.fileoutputcommitter.commitjob(fileoutputcommitter.java:377)
	at org.apache.hadoop.mapred.fileoutputcommitter.commitjob(fileoutputcommitter.java:136)
	at org.apache.hadoop.mapred.outputcommitter.commitjob(outputcommitter.java:291)
	at org.apache.spark.internal.io.hadoopmapreducecommitprotocol.commitjob(hadoopmapreducecommitprotocol.scala:192)
	at org.apache.spark.internal.io.sparkhadoopwriter$.$anonfun$write$3(sparkhadoopwriter.scala:100)
	at scala.runtime.java8.jfunction0$mcv$sp.apply(jfunction0$mcv$sp.java:23)
	at org.apache.spark.util.utils$.timetakenms(utils.scala:552)
	at org.apache.spark.internal.io.sparkhadoopwriter$.write(sparkhadoopwriter.scala:100)
	... 51 more
进程已结束,退出代码为 1

由于找不到 msvcr120.dll,无法继续执行代码。重新安装程序可能会解决此问题。 是 windows 系统缺少 c++ 运行时库 的典型问题。

🚨 根本原因:
msvcr120.dll 是 microsoft visual c++ 2013 redistributable 的一部分。

它被 hadoop 的原生 .dll 文件(如 hadoop.dll)所依赖,当你使用 saveastextfile() 时,spark 会尝试加载 hadoop 的本地库,而这些库又依赖于 vc++ 运行时。

即使你现在不用 saveastextfile(),你之前配置了 path += e:\app\hadoop-3.4.2\bin,说明你还在用 hadoop 二进制文件,所以问题依然存在。

我一开始怀疑是缺少winutils 环境配置 winutils配置可以看我上个文章 或者是 winutils.exe 权限不对
你的 winutils.exe 必须放在hadoop的bin目录下 e:\app\hadoop-3.4.2\bin\winutils.exe
然后检查 权限
以 管理员身份 打开 cmd
一定要右键 → 以管理员身份运行命令提示符
. 执行你这条命令

```bash
winutils.exe chmod 777 c:\tmp\hive

可以用联想的电脑异常修复直接修复

📌 二、rdd 基础概念与创建方式 💡

什么是 rdd?

rdd(resilient distributed dataset):弹性分布式数据集,是 spark 的核心抽象。它是一个不可变、可并行、容错的数据集合。

创建 rdd 的 6 种方式:

类型示例说明
python 列表sc.parallelize([1,2,3])最常用
字典sc.parallelize({"k":1})返回 [{"k":1}]
元组sc.parallelize(("a","b"))返回 [("a","b")]
集合sc.parallelize([1,1,2,3])包含重复项
字符串sc.parallelize("abc")每个字符为一个元素
文件sc.textfile("path.txt")逐行读取文本

推荐使用 parallelize() + textfile() 搭配处理本地数据。

三、核心算子详解:类型转换与功能解析

1.map(t)→u:传入类型与返回类型不一致

作用:对每个元素进行函数映射,返回新值

rdd = sc.parallelize([1,2,3,4,5])
rdd2 = rdd.map(lambda x: x * 2)
print(rdd2.collect())  # 输出: [2, 4, 6, 8, 10]
特点说明
t → u输入是 int,输出是 int(类型一致),但语义上是“变换”
按元素处理每个元素独立转换
无聚合不改变数据总量

适用场景:数据清洗、类型转换、数学计算

2.map(t)→t:传入类型与返回类型一致

作用:返回原类型,但可以修改内容

rdd = sc.parallelize(["apple", "banana"])
rdd2 = rdd.map(lambda x: x.upper())
print(rdd2.collect())  # ['apple', 'banana']

虽然返回类型仍是 str,但内容被修改 → 仍属于 t → t

适用场景:字符串大小写转换、去除空格、文本标准化

3.flatmap(t)→u:扁平化处理,去嵌套

作用:将结果展开成平面列表

rdd = sc.parallelize(["hello world", "py spark"])
rdd2 = rdd.flatmap(lambda x: x.split(" "))
print(rdd2.collect())  # ['hello', 'world', 'py', 'spark']
区别map vs flatmap
map 返回 [[x], [y]]返回嵌套列表
flatmap 返回 [x, y]展开为单层列表

适用场景:文本分词、多行拆分、列表合并

4.filter(t)→t:条件过滤

作用:根据返回 true/false 过滤元素

rdd = sc.parallelize([1,2,3,4,5,6])
rdd2 = rdd.filter(lambda x: x % 2 == 0)
print(rdd2.collect())  # [2, 4, 6]

适用场景:筛选城市、过滤商品类别、去除空值

5.distinct():去重

作用:返回去重后的集合(全局去重)

rdd = sc.parallelize([1,2,2,3,4,4,5])
rdd2 = rdd.distinct()
print(rdd2.collect())  # [1, 2, 3, 4, 5]

注意:distinct() 耗时高,会 shuffle 所有数据!慎用于大数据集!

6.sortby(keyfunc, ascending):排序

作用:按指定规则排序

rdd = sc.parallelize([1,5,3,2,4])
rdd2 = rdd.sortby(lambda x: x, ascending=false)
print(rdd2.collect())  # [5, 4, 3, 2, 1]

适用场景:销售排名、分数排序、top n 推荐

7.reducebykey(func):分组聚合(键值对专属)

前提:rdd 数据必须是 (k, v) 元组形式

rdd = sc.parallelize([("a",1),("a",2),("b",3),("b",4)])
rdd2 = rdd.reducebykey(lambda a, b: a + b)
print(rdd2.collect())  # [('a', 3), ('b', 7)]
要求说明
必须是 (k, v)否则报错
func(v,v) → v两个 value 聚合出一个 value
本地预聚合减少网络传输,效率极高

适用场景:统计商品销量、城市销售额、词频统计

四、修改 rdd 分区的 3 种方法

方法说明代码示例
set("spark.default.parallelism", "1")设置全局并行度conf.set("spark.default.parallelism", "1")
numslices=1parallelize 时指定分区数sc.parallelize([1,2,3], 1)
repartition(n)重新分区(常用于调优)rdd.repartition(2)

建议:小数据集用 numslices=1 避免创建过多任务。

五、重难点攻克:如何修复msvcr120.dll缺失问题?💥

报错信息:

由于找不到 msvcr120.dll,无法继续执行代码

🔍 根本原因:

  • spark 调用 hadoop.dll 会依赖 vc++ 2013 运行时。
  • path 中包含 e:\app\hadoop-3.4.2\bin → 启动 native io → 加载 dll → 找不到函数 → 报错。

终极解决方案:

修复 msvcr120.dll 可以用联想的电脑异常修复直接修复

六、案例实战:综合数据分析

案例 1:城市销量排名(p09_案例2.py)

需求:

  1. 读取 sales.txt 文件(json 格式)
  2. 统计每个城市销售额从大到小排序
  3. 获取全部城市商品类别(去重)
  4. 查询“北京”有哪些商品类别(过滤 + 去重)

步骤详解:

import json
from pyspark import sparkcontext, sparkconf
import os
os.environ["pyspark_python"] = "d:\\app\\anaconda\\envs\\spark_env\\python.exe"
os.environ['pyspark_timeout'] = '600'
os.environ['pyspark_driver_timeout'] = '600'
conf = sparkconf().setmaster("local[*]").setappname("sales_analysis")
sc = sparkcontext(conf=conf)
# 1. 读取文件并转为字典
file_rdd = sc.textfile("d:\\python_testdata\\sales.txt")
dict_rdd = file_rdd.map(lambda x: json.loads(x.rstrip().rstrip(',')))
# 2. 各城市销售额排名
city_money = dict_rdd.map(lambda x: (x['areaname'], int(x['money'])))
city_total = city_money.reducebykey(lambda a, b: a + b)
city_rank = city_total.sortby(lambda x: x[1], ascending=false)
print("各城市销售额排名:")
print(city_rank.collect())
# 3. 全部城市有哪些商品类别(去重)
categories = dict_rdd.map(lambda x: x['category'])
unique_categories = categories.distinct()
print("全部商品类别:")
print(unique_categories.collect())
# 4. 北京市的商品类别
beijing_rdd = dict_rdd.filter(lambda x: x['areaname'] == '北京')
beijing_cats = beijing_rdd.map(lambda x: x['category']).distinct()
print("北京市售卖的商品类别:")
print(beijing_cats.collect())
sc.stop()

知识点总结:

  • json.loads():解析 json 字符串
  • map:提取字段
  • reducebykey:聚合销售额
  • sortby:排序
  • filter + distinct:组合过滤去重

案例 2:词频统计(p05_pyspark案例.py)

需求:统计words.txt中每个词出现次数

from pyspark import sparkcontext, sparkconf
import os
os.environ["pyspark_python"] = "d:\\app\\anaconda\\envs\\spark_env\\python.exe"
os.environ['pyspark_timeout'] = '600'
conf = sparkconf().setmaster("local[*]").setappname("word_count")
sc = sparkcontext(conf=conf)
rdd = sc.textfile("d:\\python_testdata\\words.txt")
words = rdd.flatmap(lambda x: x.split(" "))
word_pairs = words.map(lambda x: (x, 1))
word_count = word_pairs.reducebykey(lambda a, b: a + b)
print("词频统计结果:")
print(word_count.collect())
sc.stop()

知识点:

  • flatmap 分词
  • map(word, 1)
  • reducebykey 聚合
  • 典型的 mapreduce 模型

七、总结与建议

项目推荐做法
数据输出使用 collect() + open() 写文件,避免 saveastextfile()
环境配置✅ 设置 pyspark_timeout + spark.hadoop.io.native.io.enabled=false
修复 dll 错误✅ 删除 path 中 hadoop/bin,不加载 native io
超时问题✅ 设置 pyspark_timeout=600,防止任务中断
小数据集✅ 用 numslices=1 控制分区
大数据集✅ 用 repartition() 调优

到此这篇关于python pyspark 核心实操入门案例指南的文章就介绍到这了,更多相关python pyspark 入门内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2026  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com