spark sql 概述
- 一、spark sql 架构
- 二、spark sql 特点
- 三、spark sql 运行原理
- 四、spark sql api 相关概述
- 五、spark sql 依赖
- 六、spark sql 数据集
- 七、spark sql 基本用法
一、spark sql 架构
spark sql 的架构主要由以下几个组件组成:
- sparksession:spark 应用的统一入口点,用于创建 dataframe、dataset 和执行 sql 查询。
- catalyst 优化器:spark sql 的查询优化引擎,负责解析、分析、优化和生成物理执行计划。
- dataframe 和 dataset api:提供面向对象的编程接口,支持丰富的数据操作方法。
- 数据源接口:支持多种数据源,如 hdfs、s3、hbase、cassandra、hive 等。
- 执行引擎:将优化后的查询计划转换为执行任务,并在分布式集群上并行执行这些任务。
二、spark sql 特点
- 统一数据访问接口:支持多种数据源(如 csv、json、parquet、hive、jdbc、hbase 等)并提供一致的查询接口。
- dataframe 和 dataset api:提供面向对象的编程接口,支持类型安全的操作,便于数据处理。
- catalyst 优化器:自动将用户的查询转换为高效的执行计划,提升查询性能。
- 与 hive 的集成:无缝集成 hive,能够直接访问现存的 hive 数据,并使用 hive 的 udf 和 udaf。
- 高性能:通过 catalyst 优化器和 tungsten 执行引擎,实现高效的查询性能和内存管理。
- 多种操作方式:支持 sql 和 api 编程两种操作方式,灵活性高。
- 外部工具接口:提供 jdbc/odbc 接口供第三方工具借助 spark 进行数据处理。
- 高级接口:提供了更高层级的接口,方便地处理数据。
三、spark sql 运行原理
查询解析(query parsing):将 sql 查询解析成抽象语法树(ast)。
逻辑计划生成(logical plan generation):将 ast 转换为未优化的逻辑计划。
逻辑计划优化(logical plan optimization):使用 catalyst 优化器对逻辑计划进行一系列规则优化。
物理计划生成(physical plan generation):将优化后的逻辑计划转换为一个或多个物理计划,并选择最优的物理计划。
执行(execution):将物理计划转换为 rdd,并在集群上并行执行。
四、spark sql api 相关概述
sparkcontext:sparkcontext 是 spark 应用程序的主入口点,负责连接到 spark 集群,管理资源和任务调度。在 spark 2.0 之后,推荐使用 sparksession 取代 sparkcontext。
sqlcontext:sqlcontext 是 spark sql 的编程入口点,允许用户通过 sql 查询或 dataframe api 进行数据处理。它提供了基本的 spark sql 功能。
hivecontext:hivecontext 是 sqlcontext 的子集,增加了对 hive 的集成支持,可以直接访问 hive 中的数据和元数据,使用 hive 的 udf 和 udaf。
sparksession:sparksession 是 spark 2.0 引入的新概念,合并了 sqlcontext 和 hivecontext 的功能,提供了统一的编程接口。sparksession 是 spark sql 的建议入口点,支持使用 dataframe 和 dataset api 进行数据处理。
创建 sparkcontext 和 sparksession 的注意事项:如果同时需要创建 sparkcontext 和 sparksession,必须先创建 sparkcontext,再创建 sparksession。如果先创建 sparksession,再创建 sparkcontext,会导致异常,因为在同一个 jvm 中只能运行一个 sparkcontext。
五、spark sql 依赖
<properties>
<spark.version>3.1.2</spark.version>
<spark.scala.version>2.12</spark.scala.version>
</properties>
<dependency>
<groupid>org.apache.spark</groupid>
<artifactid>spark-sql_${spark.scala.version}</artifactid>
<version>${spark.version}</version>
</dependency>
六、spark sql 数据集
1、dataframe
- 类似于二维表格:dataframe 类似于传统的关系数据库中的二维表格。
- schema(数据结构信息):在 rdd 的基础上加入了 schema,描述数据结构的信息。
- 支持嵌套数据类型:dataframe 的 schema 支持嵌套的数据类型,如
struct
、map
和array
。 - 丰富的 sql 操作 api:提供更多类似 sql 操作的 api,便于进行数据查询和操作。
2、dataset
- 强类型:spark 1.6中引入的一个更通用的数据集合,dataset 是强类型的,提供类型安全的操作。
- rdd + schema:可以认为 dataset 是 rdd 和 schema 的结合,既有 rdd 的分布式计算能力,又有 schema 描述数据结构的信息。
- 适用于特定领域对象:可以存储和操作特定领域对象的强类型集合。
- 并行操作:可以使用函数或者相关操作并行地进行转换和操作。
3、dataframe 和 dataset 的关系
- dataframe 是特殊的 dataset:dataframe 是 dataset 的一个特例,即
dataframe = dataset[row]
。 - 数据抽象和操作方式的统一:dataframe 和 dataset 统一了 spark sql 的数据抽象和操作方式,提供了灵活且强大的数据处理能力。
七、spark sql 基本用法
1、scala 创建 sparksession 对象
import org.apache.spark.sql.sparksession
object sparksqlcontext {
def main(args: array[string]): unit = {
// 创建 sparkconf 对象,设置应用程序的配置
val conf: sparkconf = new sparkconf()
.setmaster("local[4]") // 设置本地运行模式,使用 4 个线程
.setappname("spark sql") // 设置应用程序名称为 "spark sql"
// 创建 sparksession 对象,用于 spark sql 的编程入口
val spark: sparksession = sparksession.builder()
.config(conf) // 将 sparkconf 配置应用于 sparksession
.getorcreate() // 获取现有的 sparksession,或者新建一个
// 获取 sparkcontext 对象,可以直接从 sparksession 中获取
val sc: sparkcontext = spark.sparkcontext
// 导入 sparksession 的隐式转换,可以使用 dataframe api 的方法
import spark.implicits._
// 在这里可以编写数据处理代码,例如创建 dataframe 和 dataset,进行数据操作等...
// 停止 sparksession,释放资源
spark.stop()
}
}
2、dataframe 和 dataset 的创建方式
1、从集合创建
case class person(name: string, age: int) // 下同
val data1 = seq(person("alice", 25), person("bob", 30))
val ds: dataset[person] = spark.createdataset(data) // 这里的spark是sparksession对象(如上代码),下同
val data2 = seq(("alice", 25), ("bob", 30))
val df: dataframe = data.todf("name", "age")
1、从文件系统读取
val schema = structtype(seq(
structfield("name", stringtype, nullable = false),
structfield("age", integertype, nullable = false)
))
val dsjson: dataset[person] = spark.read.json("/path/to/json/file").as[person]
val dfcsv: dataframe = spark.read
// 使用.schema方法指定csv文件的模式(schema)其定义了dataframe的列名和类型。
// 这是一个可选步骤,但如果csv文件没有头部行,或者你想覆盖文件中的头部行,则必须指定。
.schema(schema)
// 这里设置"header"为"true",表示csv文件的第一行是列名,不需要spark从文件中自动推断。
.option("header", "true")
.csv("/path/to/csv/file")
3、从关系型数据库读取
val url = "jdbc:mysql://localhost:3306/database"
val properties = new java.util.properties()
properties.setproperty("user", "username")
properties.setproperty("password", "password")
val dsdb: dataset[person] = spark.read.jdbc(url, "table", properties).as[person]
val dfdb: dataframe = spark.read.jdbc(url, "table", properties)
4、从非结构化数据源读取
val dsparquet: dataset[person] = spark.read.parquet("/path/to/parquet/file").as[person]
val dfparquet: dataframe = spark.read.parquet("/path/to/parquet/file")
5、手动创建 dataset
import org.apache.spark.sql.types._
val schema = structtype(seq(
structfield("name", stringtype, nullable = false),
structfield("age", integertype, nullable = false)
))
val data = seq(row("alice", 25), row("bob", 30))
val dsmanual: dataset[person] = spark.createdataframe(spark.sparkcontext.parallelize(data), schema).as[person]
val dfmanual: dataframe = spark.createdataframe(
spark.sparkcontext.parallelize(data), schema
)
3、dataframe api
语法示例一
模拟数据(1000条):
id,name,gender,age,city
1,邵睿,男,12,上海市
2,林子异,男,48,广州市
3,孟秀英,女,46,上海市
4,金嘉伦,男,8,北京市
...
需求:哪些城市和性别组合在人口较多(id数量>50)的情况下具有最高的平均年龄,以及这些组合在各自性别中的排名。
// 导入sparksession的隐式转换,这样可以使用dataframe的便捷方法(例如下面的'$'符号)
import spark.implicits._
// 定义了一个dataframe的schema,但在这个例子中,使用了csv的header来自动推断schema
val schema = structtype(seq(
structfield("id", longtype),
structfield("name", stringtype),
structfield("gender", stringtype),
structfield("age", integertype),
structfield("city", stringtype),
))
// 定义windowspec,用于后续的窗口函数操作,按gender分区,按avg_age降序排序,(复用使用此)
val windowspec: windowspec = window
.partitionby($"gender")
.orderby($"avg_age".desc)
// 从csv文件中读取数据,使用header作为列名,然后选择特定的列,进行分组和聚合操作
// 哪些城市和性别组合在人口较多(id数量>50)的情况下具有最高的平均年龄,以及这些组合在各自性别中的排名。
spark.read
// .schema(schema) // 应用我们定义的schema
.option("header", "true") // 使用csv的header作为列名
.csv("d:\\projects\\sparksql\\people.csv") // dataframe
.select($"id", $"name", $"age", $"city", $"gender") // 选择需要的列(不写默认就是全选)
.groupby($"city", $"gender") // 按城市和性别分组
.agg( // 多重聚合
count($"id").as("count"), // 计算每个组的id数量
round(avg($"age"), 2).as("avg_age") // 计算每个组的平均年龄,并保留两位小数
)
.where($"count".gt(50)) // 过滤出id数量大于(可以使用>)50的组
.orderby($"avg_age".desc) // 按平均年龄降序排序
.select($"city", $"gender", $"avg_age",
dense_rank().over(window.partitionby($"gender").orderby($"avg_age".desc)).as("gender_avg_age_rank"))
.show() // 显示结果
结果:
+------+------+-------+-------------------+
| city|gender|avg_age|gender_avg_age_rank|
+------+------+-------+-------------------+
|北京市| 男| 41.05| 1|
| 东莞| 男| 42.81| 2|
|上海市| 男| 43.92| 3|
|成都市| 男| 45.89| 4|
| 中山| 男| 47.08| 5|
|广州市| 男| 47.47| 6|
| 深圳| 男| 48.36| 7|
|上海市| 女| 46.02| 1|
| 中山| 女| 49.55| 2|
+------+------+-------+-------------------+
语法示例二:视图,sql
// 读取csv文件到dataframe,使用header作为列名
val dfpeople: dataframe = spark.read
.option("header", "true") // 使用csv的header作为列名
.csv("d:\\projects\\sparksql\\people.csv")
// 将dataframe注册为临时视图
dfpeople.createorreplacetempview("people_view")
// 可以使用spark sql来查询这个视图了
// 例如,查询所有人的姓名和年龄
spark.sql("select name, age from people_view").show()
// 二
spark.sql(
"""
|select * from people_view
|where gender = '男'
|""".stripmargin
).show()
语法示例三:join
case class student(name: string, classid: int)
case class class(classid: int, classname: string)
val frmstu = spark.createdataframe(
seq(
student("张三", 1),
student("李四", 1),
student("王五", 2),
student("赵六", 2),
student("李明", 2),
student("王刚", 4),
student("王朋", 5),
)
)
val frmclass = spark.createdataframe(
seq(
class(1, "name1"),
class(2, "name2"),
class(3, "name3"),
class(4, "name4")
)
)
left
左连接,rignt
右连接, full
全外连接,anti
左差集,semi
左交集
// 别名 + inner 内连接
frmstu.as("s")
.join(frmclass.as("c"), $"s.classid" === $"c.classid") // jointype 默认 inner内连接
.show()
// 使用左外连接将df和frmclass根据classid合并
frmstu
.join(frmclass, seq("classid"), "left")
.show()
// 左差集
frmstu
.join(frmclass, seq("classid"), "anti")
.show()
// 左交集
frmstu
.join(frmclass, seq("classid"), "semi")
.show()
结果
别名 + inner 内连接
+----+-------+-------+---------+
|name|classid|classid|classname|
+----+-------+-------+---------+
|张三| 1| 1| name1|
|李四| 1| 1| name1|
|王五| 2| 2| name2|
|赵六| 2| 2| name2|
|李明| 2| 2| name2|
|王刚| 4| 4| name4|
+----+-------+-------+---------+
使用左外连接将df和frmclass根据classid合并
+-------+----+---------+
|classid|name|classname|
+-------+----+---------+
| 1|张三| name1|
| 1|李四| name1|
| 2|王五| name2|
| 2|赵六| name2|
| 2|李明| name2|
| 4|王刚| name4|
| 5|王朋| null|
+-------+----+---------+
左差集
+-------+----+
|classid|name|
+-------+----+
| 5|王朋|
+-------+----+
左交集
+-------+----+
|classid|name|
+-------+----+
| 1|张三|
| 1|李四|
| 2|王五|
| 2|赵六|
| 2|李明|
| 4|王刚|
+-------+----+
发表评论