当前位置: 代码网 > it编程>数据库>MsSqlserver > Spark SQL 概述

Spark SQL 概述

2024年07月31日 MsSqlserver 我要评论
架构、特点、运行原理、API 相关概述、依赖、数据集、基本用法

spark sql 概述

一、spark sql 架构

spark sql 的架构主要由以下几个组件组成:

  1. sparksession:spark 应用的统一入口点,用于创建 dataframe、dataset 和执行 sql 查询。
  2. catalyst 优化器:spark sql 的查询优化引擎,负责解析、分析、优化和生成物理执行计划。
  3. dataframe 和 dataset api:提供面向对象的编程接口,支持丰富的数据操作方法。
  4. 数据源接口:支持多种数据源,如 hdfs、s3、hbase、cassandra、hive 等。
  5. 执行引擎:将优化后的查询计划转换为执行任务,并在分布式集群上并行执行这些任务。

二、spark sql 特点

  • 统一数据访问接口:支持多种数据源(如 csv、json、parquet、hive、jdbc、hbase 等)并提供一致的查询接口。
  • dataframe 和 dataset api:提供面向对象的编程接口,支持类型安全的操作,便于数据处理。
  • catalyst 优化器:自动将用户的查询转换为高效的执行计划,提升查询性能。
  • 与 hive 的集成:无缝集成 hive,能够直接访问现存的 hive 数据,并使用 hive 的 udf 和 udaf。
  • 高性能:通过 catalyst 优化器和 tungsten 执行引擎,实现高效的查询性能和内存管理。
  • 多种操作方式:支持 sql 和 api 编程两种操作方式,灵活性高。
  • 外部工具接口:提供 jdbc/odbc 接口供第三方工具借助 spark 进行数据处理。
  • 高级接口:提供了更高层级的接口,方便地处理数据。

三、spark sql 运行原理

在这里插入图片描述

查询解析(query parsing):将 sql 查询解析成抽象语法树(ast)。

逻辑计划生成(logical plan generation):将 ast 转换为未优化的逻辑计划。

逻辑计划优化(logical plan optimization):使用 catalyst 优化器对逻辑计划进行一系列规则优化。

物理计划生成(physical plan generation):将优化后的逻辑计划转换为一个或多个物理计划,并选择最优的物理计划。

执行(execution):将物理计划转换为 rdd,并在集群上并行执行。

四、spark sql api 相关概述

sparkcontext:sparkcontext 是 spark 应用程序的主入口点,负责连接到 spark 集群,管理资源和任务调度。在 spark 2.0 之后,推荐使用 sparksession 取代 sparkcontext。

sqlcontext:sqlcontext 是 spark sql 的编程入口点,允许用户通过 sql 查询或 dataframe api 进行数据处理。它提供了基本的 spark sql 功能。

hivecontext:hivecontext 是 sqlcontext 的子集,增加了对 hive 的集成支持,可以直接访问 hive 中的数据和元数据,使用 hive 的 udf 和 udaf。

sparksession:sparksession 是 spark 2.0 引入的新概念,合并了 sqlcontext 和 hivecontext 的功能,提供了统一的编程接口。sparksession 是 spark sql 的建议入口点,支持使用 dataframe 和 dataset api 进行数据处理。

创建 sparkcontext 和 sparksession 的注意事项:如果同时需要创建 sparkcontext 和 sparksession,必须先创建 sparkcontext,再创建 sparksession。如果先创建 sparksession,再创建 sparkcontext,会导致异常,因为在同一个 jvm 中只能运行一个 sparkcontext。

五、spark sql 依赖

<properties>
    <spark.version>3.1.2</spark.version>
    <spark.scala.version>2.12</spark.scala.version>
</properties>
<dependency>
    <groupid>org.apache.spark</groupid>
    <artifactid>spark-sql_${spark.scala.version}</artifactid>
    <version>${spark.version}</version>
</dependency>

六、spark sql 数据集

1、dataframe

  • 类似于二维表格:dataframe 类似于传统的关系数据库中的二维表格。
  • schema(数据结构信息):在 rdd 的基础上加入了 schema,描述数据结构的信息。
  • 支持嵌套数据类型:dataframe 的 schema 支持嵌套的数据类型,如 structmaparray
  • 丰富的 sql 操作 api:提供更多类似 sql 操作的 api,便于进行数据查询和操作。

2、dataset

  • 强类型:spark 1.6中引入的一个更通用的数据集合,dataset 是强类型的,提供类型安全的操作。
  • rdd + schema:可以认为 dataset 是 rdd 和 schema 的结合,既有 rdd 的分布式计算能力,又有 schema 描述数据结构的信息。
  • 适用于特定领域对象:可以存储和操作特定领域对象的强类型集合。
  • 并行操作:可以使用函数或者相关操作并行地进行转换和操作。

3、dataframe 和 dataset 的关系

  • dataframe 是特殊的 dataset:dataframe 是 dataset 的一个特例,即 dataframe = dataset[row]
  • 数据抽象和操作方式的统一:dataframe 和 dataset 统一了 spark sql 的数据抽象和操作方式,提供了灵活且强大的数据处理能力。

七、spark sql 基本用法

1、scala 创建 sparksession 对象

import org.apache.spark.sql.sparksession
object sparksqlcontext {

  def main(args: array[string]): unit = {
    // 创建 sparkconf 对象,设置应用程序的配置
    val conf: sparkconf = new sparkconf()
      .setmaster("local[4]")   // 设置本地运行模式,使用 4 个线程
      .setappname("spark sql") // 设置应用程序名称为 "spark sql"

    // 创建 sparksession 对象,用于 spark sql 的编程入口
    val spark: sparksession = sparksession.builder()
      .config(conf) // 将 sparkconf 配置应用于 sparksession
      .getorcreate() // 获取现有的 sparksession,或者新建一个
	
    // 获取 sparkcontext 对象,可以直接从 sparksession 中获取
    val sc: sparkcontext = spark.sparkcontext

    // 导入 sparksession 的隐式转换,可以使用 dataframe api 的方法
    import spark.implicits._

    // 在这里可以编写数据处理代码,例如创建 dataframe 和 dataset,进行数据操作等...

    // 停止 sparksession,释放资源
    spark.stop()
  }
}

2、dataframe 和 dataset 的创建方式

1、从集合创建

case class person(name: string, age: int)				// 下同

val data1 = seq(person("alice", 25), person("bob", 30))	
val ds: dataset[person] = spark.createdataset(data)		// 这里的spark是sparksession对象(如上代码),下同

val data2 = seq(("alice", 25), ("bob", 30))
val df: dataframe = data.todf("name", "age")

1、从文件系统读取

val schema = structtype(seq(
  structfield("name", stringtype, nullable = false),
  structfield("age", integertype, nullable = false)
))

val dsjson: dataset[person] = spark.read.json("/path/to/json/file").as[person]

val dfcsv: dataframe = spark.read
	// 使用.schema方法指定csv文件的模式(schema)其定义了dataframe的列名和类型。
	// 这是一个可选步骤,但如果csv文件没有头部行,或者你想覆盖文件中的头部行,则必须指定。  
  .schema(schema)		  
   // 这里设置"header"为"true",表示csv文件的第一行是列名,不需要spark从文件中自动推断。 
  .option("header", "true")
  .csv("/path/to/csv/file")

3、从关系型数据库读取

val url = "jdbc:mysql://localhost:3306/database"
val properties = new java.util.properties()
properties.setproperty("user", "username")
properties.setproperty("password", "password")

val dsdb: dataset[person] = spark.read.jdbc(url, "table", properties).as[person]

val dfdb: dataframe = spark.read.jdbc(url, "table", properties)

4、从非结构化数据源读取

val dsparquet: dataset[person] = spark.read.parquet("/path/to/parquet/file").as[person]

val dfparquet: dataframe = spark.read.parquet("/path/to/parquet/file")

5、手动创建 dataset

import org.apache.spark.sql.types._

val schema = structtype(seq(
  structfield("name", stringtype, nullable = false),
  structfield("age", integertype, nullable = false)
))
val data = seq(row("alice", 25), row("bob", 30))

val dsmanual: dataset[person] = spark.createdataframe(spark.sparkcontext.parallelize(data), schema).as[person]

val dfmanual: dataframe = spark.createdataframe(
  spark.sparkcontext.parallelize(data), schema
)

3、dataframe api

语法示例一

模拟数据(1000条):

id,name,gender,age,city
1,邵睿,男,12,上海市
2,林子异,男,48,广州市
3,孟秀英,女,46,上海市
4,金嘉伦,男,8,北京市
...

需求:哪些城市和性别组合在人口较多(id数量>50)的情况下具有最高的平均年龄,以及这些组合在各自性别中的排名。

// 导入sparksession的隐式转换,这样可以使用dataframe的便捷方法(例如下面的'$'符号)
import spark.implicits._

// 定义了一个dataframe的schema,但在这个例子中,使用了csv的header来自动推断schema
val schema = structtype(seq(
  structfield("id", longtype),
  structfield("name", stringtype),
  structfield("gender", stringtype),
  structfield("age", integertype),
  structfield("city", stringtype),
))

// 定义windowspec,用于后续的窗口函数操作,按gender分区,按avg_age降序排序,(复用使用此)
val windowspec: windowspec = window
  .partitionby($"gender")
  .orderby($"avg_age".desc)

// 从csv文件中读取数据,使用header作为列名,然后选择特定的列,进行分组和聚合操作
// 哪些城市和性别组合在人口较多(id数量>50)的情况下具有最高的平均年龄,以及这些组合在各自性别中的排名。
spark.read
   // .schema(schema)	// 应用我们定义的schema 
  .option("header", "true") 							// 使用csv的header作为列名
  .csv("d:\\projects\\sparksql\\people.csv")			// dataframe
  .select($"id", $"name", $"age", $"city", $"gender") 	// 选择需要的列(不写默认就是全选)
  .groupby($"city", $"gender") 							// 按城市和性别分组
  .agg(			// 多重聚合
    count($"id").as("count"),   		// 计算每个组的id数量
    round(avg($"age"), 2).as("avg_age") // 计算每个组的平均年龄,并保留两位小数
  )
  .where($"count".gt(50))  		// 过滤出id数量大于(可以使用>)50的组
  .orderby($"avg_age".desc)     // 按平均年龄降序排序

  .select($"city", $"gender", $"avg_age",
    dense_rank().over(window.partitionby($"gender").orderby($"avg_age".desc)).as("gender_avg_age_rank"))
  .show() // 显示结果

结果:

+------+------+-------+-------------------+
|  city|gender|avg_age|gender_avg_age_rank|
+------+------+-------+-------------------+
|北京市|    男|  41.05|                  1|
|  东莞|    男|  42.81|                  2|
|上海市|    男|  43.92|                  3|
|成都市|    男|  45.89|                  4|
|  中山|    男|  47.08|                  5|
|广州市|    男|  47.47|                  6|
|  深圳|    男|  48.36|                  7|
|上海市|    女|  46.02|                  1|
|  中山|    女|  49.55|                  2|
+------+------+-------+-------------------+

语法示例二:视图,sql

// 读取csv文件到dataframe,使用header作为列名
val dfpeople: dataframe = spark.read
    .option("header", "true") // 使用csv的header作为列名
    .csv("d:\\projects\\sparksql\\people.csv")

// 将dataframe注册为临时视图
dfpeople.createorreplacetempview("people_view")
// 可以使用spark sql来查询这个视图了
// 例如,查询所有人的姓名和年龄
spark.sql("select name, age from people_view").show()
// 二
spark.sql(
      """
        |select * from people_view
        |where gender = '男'
        |""".stripmargin
    ).show()

语法示例三:join

case class student(name: string, classid: int)
case class class(classid: int, classname: string)

val frmstu = spark.createdataframe(
  seq(
    student("张三", 1),
    student("李四", 1),
    student("王五", 2),
    student("赵六", 2),
    student("李明", 2),
    student("王刚", 4),
    student("王朋", 5),
  )
)

val frmclass = spark.createdataframe(
  seq(
    class(1, "name1"),
    class(2, "name2"),
    class(3, "name3"),
    class(4, "name4")
  )
)

left 左连接,rignt 右连接, full 全外连接,anti左差集,semi左交集

// 别名 + inner 内连接
frmstu.as("s")
	.join(frmclass.as("c"), $"s.classid" === $"c.classid")	// jointype 默认 inner内连接
	.show()

// 使用左外连接将df和frmclass根据classid合并
frmstu
  .join(frmclass, seq("classid"), "left")	
  .show()

// 左差集
frmstu
  .join(frmclass, seq("classid"), "anti")	
  .show()

// 左交集
frmstu
  .join(frmclass, seq("classid"), "semi")	
  .show()

结果

别名 + inner 内连接
+----+-------+-------+---------+
|name|classid|classid|classname|
+----+-------+-------+---------+
|张三|      1|      1|    name1|
|李四|      1|      1|    name1|
|王五|      2|      2|    name2|
|赵六|      2|      2|    name2|
|李明|      2|      2|    name2|
|王刚|      4|      4|    name4|
+----+-------+-------+---------+

使用左外连接将df和frmclass根据classid合并
+-------+----+---------+
|classid|name|classname|
+-------+----+---------+
|      1|张三|    name1|
|      1|李四|    name1|
|      2|王五|    name2|
|      2|赵六|    name2|
|      2|李明|    name2|
|      4|王刚|    name4|
|      5|王朋|     null|
+-------+----+---------+

左差集
+-------+----+
|classid|name|
+-------+----+
|      5|王朋|
+-------+----+

左交集
+-------+----+
|classid|name|
+-------+----+
|      1|张三|
|      1|李四|
|      2|王五|
|      2|赵六|
|      2|李明|
|      4|王刚|
+-------+----+
(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com