网上学习资料一大堆,但如果学到的知识不成体系,遇到问题时只是浅尝辄止,不再深入研究,那么很难做到真正的技术提升。
一个人可以走的很快,但一群人才能走的更远!不论你是正从事it行业的老鸟或是对it行业感兴趣的新人,都欢迎加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!
spark.sql.repl.eagereval.enabled
pyspark dataframe转换为pandas dataframe
前言
要想了解pyspark能够干什么可以去看看我之前写的文章,里面很详细介绍了spark的生态:
pyspark只是通过jvm转换使得python代码能够在spark集群上识别运行。故spark的绝大多数功能都可以被python程序使用。
上篇文章:一文速学-pyspark数据分析基础:pyspark原理详解
已经把pyspark运行原理讲的很清楚了,现在我们需要了解pyspark语法基础来逐渐编写pyspark程序实现分布式数据计算。
已搭建环境:
spark:3.3.0
hadoop:3.3.3
scala:2.11.12
jdk:1.8.0_201
pyspark:3.1.2
一、pyspark基础功能
pyspark是python中apache spark的接口。它不仅可以使用python api编写spark应用程序,还提供了pyspark shell,用于在分布式环境中交互分析数据。pyspark支持spark的大多数功能,如spark sql、dataframe、streaming、mllib(机器学习)和spark core。
1.spark sql 和dataframe
spark sql是用于结构化数据处理的spark模块。它提供了一种称为dataframe的编程抽象,是由schemardd发展而来。不同于schemardd直接继承rdd,dataframe自己实现了rdd的绝大多数功能。可以把spark sql dataframe理解为一个的row对象的数据集合。
spark sql已经集成在spark-shell中,因此只要启动spark-shell就可以使用spark sql的shell交互接口。如果在spark-shell中执行sql语句,需要使用sqlcontext对象来调用sql()方法。spark sql对数据的查询分成了两个分支:sqlcontext和hivecontext,其中hivecontext继承了sqlcontext,因此hivecontext除了拥有sqlcontext的特性之外还拥有自身的特性。
spark sql允许开发人员直接处理rdd,同时也可查询例如在 apache hive上存在的外部数据。spark sql的一个重要特点是其能够统一处理关系表和rdd,使得开发人员可以轻松地使用sql命令进行外部查询,同时进行更复杂的数据分析。
2.pandas api on spark
spark上的pandas api可以扩展使用 python pandas库。
- 轻松切换到pandas api和pyspark api上下文,无需任何开销。
- 有一个既适用于pandas(测试,较小的数据集)又适用于spark(分布式数据集)的代码库。
- 熟练使用pandas的话很快上手
3.streaming
apache spark中的streaming功能运行在spark之上,支持跨streaming和历史数据的强大交互和分析应用程序,同时继承了spark的易用性和容错特性。spark streaming是将流式计算分解成一系列短小的批处理作业。这里的批处理引擎是spark core,也就是把spark streaming的输入数据按照batch size(如1秒)分成一段一段的数据(discretized stream),每一段数据都转换成spark中的rdd(resilient distributed dataset),然后将spark streaming中对dstream的transformation操作变为针对spark中对rdd的transformation操作,将rdd经过操作变成中间结果保存在内存中。
4.mlbase/mllib
mllib构建在spark之上,是一个可扩展的机器学习库,它提供了一组统一的高级api,帮助用户创建和调整实用的机器学习管道。mlbase分为四部分:mllib、mli、ml optimizer和mlruntime。
- ml optimizer会选择它认为最适合的已经在内部实现好了的机器学习算法和相关参数,来处理用户输入的数据,并返回模型或别的帮助分析的结果;
- mli 是一个进行特征抽取和高级ml编程抽象的算法实现的api或平台;
- mllib是spark实现一些常见的机器学习算法和实用程序,包括分类、回归、聚类、协同过滤、降维以及底层优化,该算法可以进行可扩充; mlruntime 基于spark计算框架,将spark的分布式计算应用到机器学习领域。
5.spark core
spark core是spark平台的底层通用执行引擎,所有其他功能都构建在其之上。它提供了rdd(弹性分布式数据集)和内存计算能力。
二、pyspark依赖
dependencies
package | 最低版本限制 | note |
pandas | 1.0.5 | 支撑spark sql |
numpy | 1.7 | 满足支撑mllib基础api |
pyarrow | 1.0.0 | 支撑spark sql |
py4j | 0.10.9.5 | 要求 |
pandas | 1.0.5 | pandas api on spark需要 |
pyarrow | 1.0.0 | pandas api on spark需要 |
numpy | 1.14 | pandas api on spark需要 |
请注意,pyspark需要java 8或更高版本,并正确设置java_home。如果使用jdk 11,请设置dio.netty.tryreflectionsetaccessible=true
以获取与箭头相关的功能。
aarch64(arm64)用户注意:pyarrow是pyspark sql所必需的,但pyarrow 4.0.0中引入了对aarch64的pyarrow支持。如果由于pyarrow安装错误导致pyarrow安装在aarch64上失败,可以按如下方式安装pyarrow>=4.0.0:
pip install "pyarrow>=4.0.0" --prefer-binary
三、dataframe
pyspark应用程序从初始化sparksession开始,sparksession是pyspark的入口点,如下所示。如果通过pyspark可执行文件在pyspark shell中运行它,shell会自动在变量spark中为用户创建会话。
from pyspark.sql import sparksession
spark = sparksession.builder.getorcreate()
1.创建
pyspark dataframe能够通过pyspark.sql.sparksession.createdataframe创建,通常通过传递列表(list)、元组(tuples)和字典(dictionaries)的列表和pyspark.sql.rows,pandas dataframe,由此类列表组成的rdd转换。pyspark.sql.sparksession.createdataframe接收schema参数指定dataframe的架构(优化可加速)。省略时,pyspark通过从数据中提取样本来推断相应的模式。
创建不输入schema格式的dataframe
from datetime import datetime, date
import pandas as pd
from pyspark.sql import row
df = spark.createdataframe([
row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])
df
dataframe[a: bigint, b: double, c: string, d: date, e: timestamp]
创建带有schema的dataframe
df = spark.createdataframe([
(1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
(2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
(3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')
df
dataframe[a: bigint, b: double, c: string, d: date, e: timestamp]
从pandas dataframe创建
pandas_df = pd.dataframe({
'a': [1, 2, 3],
'b': [2., 3., 4.],
'c': ['string1', 'string2', 'string3'],
'd': [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1)],
'e': [datetime(2000, 1, 1, 12, 0), datetime(2000, 1, 2, 12, 0), datetime(2000, 1, 3, 12, 0)]
})
df = spark.createdataframe(pandas_df)
df
dataframe[a: bigint, b: double, c: string, d: date, e: timestamp]
通过由元组列表组成的rdd创建
rdd = spark.sparkcontext.parallelize([
(1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
(2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
(3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
])
df = spark.createdataframe(rdd, schema=['a', 'b', 'c', 'd', 'e'])
df
dataframe[a: bigint, b: double, c: string, d: date, e: timestamp]
以上的dataframe格式创建的都是一样的。
df.printschema()
root
|-- a: long (nullable = true)
|-- b: double (nullable = true)
|-- c: string (nullable = true)
|-- d: date (nullable = true)
|-- e: timestamp (nullable = true)
2.查看
dataframe.show()
使用格式:
df.show(<int>)
df.show(1)
+---+---+-------+----------+-------------------+
| a| b| c| d| e|
+---+---+-------+----------+-------------------+
| 1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
+---+---+-------+----------+-------------------+
only showing top 1 row
spark.sql.repl.eagereval.enabled
spark.sql.repl.eagereval.enabled用于在notebooks(如jupyter)中快速生成pyspark dataframe的配置。控制行数可以使用spark.sql.repl.eagereval.maxnumrows。
spark.conf.set('spark.sql.repl.eagereval.enabled', true)
df
spark.conf.set('spark.sql.repl.eagereval.maxnumrows',1)
df
纵向显示
行也可以垂直显示。当行太长而无法水平显示时,纵向显示就很明显。
df.show(1, vertical=true)
-record 0------------------
a | 1
b | 2.0
c | string1
d | 2000-01-01
e | 2000-01-01 12:00:00
only showing top 1 row
查看dataframe格式和列名
df.columns
['a', 'b', 'c', 'd', 'e']
df.printschema()
root
|-- a: long (nullable = true)
|-- b: double (nullable = true)
|-- c: string (nullable = true)
|-- d: date (nullable = true)
|-- e: timestamp (nullable = true)
查看统计描述信息
df.select("a", "b", "c").describe().show()
+-------+---+---+-------+
|summary| a| b| c|
+-------+---+---+-------+
| count| 3| 3| 3|
| mean|2.0|3.0| null|
| stddev|1.0|1.0| null|
| min| 1|2.0|string1|
| max| 3|4.0|string3|
+-------+---+---+-------+
dataframe.collect()将分布式数据收集到驱动程序端,作为python中的本地数据。请注意,当数据集太大而无法容纳在驱动端时,这可能会引发内存不足错误,因为它将所有数据从执行器收集到驱动端。
df.collect()
[row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1), e=datetime.datetime(2000, 1, 1, 12, 0)),
row(a=2, b=3.0, c='string2', d=datetime.date(2000, 2, 1), e=datetime.datetime(2000, 1, 2, 12, 0)),
row(a=3, b=4.0, c='string3', d=datetime.date(2000, 3, 1), e=datetime.datetime(2000, 1, 3, 12, 0))]
为了避免引发内存不足异常可以使用dataframe.take()或者是dataframe.tail():
df.take(1)
[row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1), e=datetime.datetime(2000, 1, 1, 12, 0))]
df.tail(1)
[row(a=3, b=4.0, c='string3', d=datetime.date(2000, 3, 1), e=datetime.datetime(2000, 1, 3, 12, 0))]
pyspark dataframe转换为pandas dataframe
pyspark dataframe还提供了到pandas dataframe的转换,以利用pandas api。注意,topandas还将所有数据收集到driver端,当数据太大而无法放入driver端时,很容易导致内存不足错误。
df.topandas()
3.查询
pyspark dataframe是惰性计算的,仅选择一列不会触发计算,但它会返回一个列实例:
df.a
column<'a'>
大多数按列操作都返回列:
from pyspark.sql import column
from pyspark.sql.functions import upper
type(df.c) == type(upper(df.c)) == type(df.c.isnull())
true
上述生成的column可用于从dataframe中选择列。例如,dataframe.select()获取返回另一个dataframe的列实例:
df.select(df.c).show()
+-------+
| c|
+-------+
|string1|
|string2|
|string3|


**网上学习资料一大堆,但如果学到的知识不成体系,遇到问题时只是浅尝辄止,不再深入研究,那么很难做到真正的技术提升。**
**[需要这份系统化资料的朋友,可以戳这里获取](https://bbs.csdn.net/topics/618545628)**
**一个人可以走的很快,但一群人才能走的更远!不论你是正从事it行业的老鸟或是对it行业感兴趣的新人,都欢迎加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!**
t upper
type(df.c) == type(upper(df.c)) == type(df.c.isnull())
true
上述生成的column可用于从dataframe中选择列。例如,dataframe.select()获取返回另一个dataframe的列实例:
df.select(df.c).show()
+-------+
| c|
+-------+
|string1|
|string2|
|string3|
[外链图片转存中...(img-vfalhqqz-1715681349992)]
[外链图片转存中...(img-b2x14m94-1715681349992)]
**网上学习资料一大堆,但如果学到的知识不成体系,遇到问题时只是浅尝辄止,不再深入研究,那么很难做到真正的技术提升。**
**[需要这份系统化资料的朋友,可以戳这里获取](https://bbs.csdn.net/topics/618545628)**
**一个人可以走的很快,但一群人才能走的更远!不论你是正从事it行业的老鸟或是对it行业感兴趣的新人,都欢迎加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!**
发表评论