第九章:避免洗牌和减少操作费用
在本章中,我们将学习如何避免洗牌并减少我们作业的操作费用,以及检测过程中的洗牌。然后,我们将测试在 apache spark 中导致洗牌的操作,以找出我们何时应该非常小心以及我们应该避免哪些操作。接下来,我们将学习如何改变具有广泛依赖关系的作业设计。之后,我们将使用keyby()
操作来减少洗牌,在本章的最后一节中,我们将看到如何使用自定义分区来减少数据的洗牌。
在本章中,我们将涵盖以下主题:
-
检测过程中的洗牌
-
在 apache spark 中进行导致洗牌的测试操作
-
改变具有广泛依赖关系的作业设计
-
使用
keyby()
操作来减少洗牌 -
使用自定义分区器来减少洗牌
检测过程中的洗牌
在本节中,我们将学习如何检测过程中的洗牌。
在本节中,我们将涵盖以下主题:
-
加载随机分区的数据
-
使用有意义的分区键发出重新分区
-
通过解释查询来理解洗牌是如何发生的
我们将加载随机分区的数据,以查看数据是如何加载的以及数据加载到了哪里。接下来,我们将使用有意义的分区键发出一个分区。然后,我们将使用确定性和有意义的键将数据重新分区到适当的执行程序。最后,我们将使用explain()
方法解释我们的查询并理解洗牌。在这里,我们有一个非常简单的测试。
我们将创建一个带有一些数据的 dataframe。例如,我们创建了一个带有一些随机 uid 和user_1
的inputrecord
,以及另一个带有user_1
中随机 id 的输入,以及user_2
的最后一条记录。假设这些数据是通过外部数据系统加载的。数据可以从 hdfs 加载,也可以从数据库加载,例如 cassandra 或 nosql:
class detectingshuffle extends funsuite {
val spark: sparksession = sparksession.builder().master("local[2]").getorcreate()
test("should explain plan showing logical and physical with udf and df") {
//given
import spark.sqlcontext.implicits._
val df = spark.sparkcontext.makerdd(list(
inputrecord("1234-3456-1235-1234", "user_1"),
inputrecord("1123-3456-1235-1234", "user_1"),
inputrecord("1123-3456-1235-9999", "user_2")
)).todf()
在加载的数据中,我们的数据没有预定义或有意义的分区,这意味着输入记录编号 1 可能会最先出现在执行程序中,而记录编号 2 可能会最先出现在执行程序中。因此,即使数据来自同一用户,我们也很可能会为特定用户执行操作。
如前一章第八章中所讨论的,不可变设计,我们使用了reducebykey()
方法,该方法获取用户 id 或特定 id 以减少特定键的所有值。这是一个非常常见的操作,但具有一些随机分区。最好使用有意义的键repartition
数据。
在使用userid
时,我们将使用repartition
的方式,使结果记录具有相同用户 id 的数据。因此,例如user_1
最终将出现在第一个执行程序上:
//when
val q = df.repartition(df("userid"))
第一个执行程序将拥有所有userid
的数据。如果inputrecord("1234-3456-1235-1234", "user_1")
在执行程序 1 上,而inputrecord("1123-3456-1235-1234", "user_1")
在执行程序 2 上,在对来自执行程序 2 的数据进行分区后,我们需要将其发送到执行程序 1,因为它是此分区键的父级。这会导致洗牌。洗牌是由于加载数据而导致的,这些数据没有有意义地分区,或者根本没有分区。我们需要处理我们的数据,以便我们可以对特定键执行操作。
我们可以进一步repartition
数据,但应该在链的开头进行。让我们开始测试来解释我们的查询:
q.explain(true)
我们在逻辑计划中对userid
表达式进行了重新分区,但当我们检查物理计划时,显示使用了哈希分区,并且我们将对userid
值进行哈希处理。因此,我们扫描所有 rdd 和所有具有相同哈希的键,并将其发送到相同的执行程序以实现我们的目标:
在下一节中,我们将测试在 apache spark 中导致洗牌的操作。
在 apache spark 中进行导致洗牌的测试操作
在本节中,我们将测试在 apache spark 中导致洗牌的操作。我们将涵盖以下主题:
-
使用 join 连接两个 dataframe
-
使用分区不同的两个 dataframe
-
测试导致洗牌的连接
连接是一种特定的操作,会导致洗牌,我们将使用它来连接我们的两个 dataframe。我们将首先检查它是否会导致洗牌,然后我们将检查如何避免它。为了理解这一点,我们将使用两个分区不同的 dataframe,并检查连接两个未分区或随机分区的数据集或 dataframe 的操作。如果它们位于不同的物理机器上,将会导致洗牌,因为没有办法连接具有相同分区键的两个数据集。
在我们连接数据集之前,我们需要将它们发送到同一台物理机器上。我们将使用以下测试。
我们需要创建userdata
,这是一个我们已经见过的案例类。它有用户 id 和数据。我们有用户 id,即user_1
,user_2
和user_4
:
test("example of operation that is causing shuffle") {
import spark.sqlcontext.implicits._
val userdata =
spark.sparkcontext.makerdd(list(
userdata("user_1", "1"),
userdata("user_2", "2"),
userdata("user_4", "200")
)).tods()
然后我们创建一些类似于用户 id(user_1
,user_2
和user_3
)的交易数据:
val transactiondata =
spark.sparkcontext.makerdd(list(
usertransaction("user_1", 100),
usertransaction("user_2", 300),
usertransaction("user_3", 1300)
)).tods()
我们使用joinwith
在userdata
上的交易,使用userdata
和transactiondata
的userid
列。由于我们发出了inner
连接,结果有两个元素,因为记录和交易之间有连接,即userdata
和usertransaction
。但是,userdata
没有交易,usertransaction
没有用户数据:
//shuffle: userdata can stay on the current executors, but data from
//transactiondata needs to be send to those executors according to joincolumn
//causing shuffle
//when
val res: dataset[(userdata, usertransaction)]
= userdata.joinwith(transactiondata, userdata("userid") === transactiondata("userid"), "inner")
当我们连接数据时,数据没有分区,因为这是 spark 的一些随机数据。它无法知道用户 id 列是分区键,因为它无法猜测。由于它没有预分区,要连接来自两个数据集的数据,需要将数据从用户 id 发送到执行器。因此,由于数据没有分区,将会有大量数据从执行器洗牌。
让我们解释查询,执行断言,并通过启动测试显示结果:
//then
res.show()
assert(res.count() == 2)
}
}
我们可以看到我们的结果如下:
+------------+-------------+
| _1 | _2|
+----------- +-------------+
+ [user_1,1] | [user_1,100]|
| [user_2,2] | [user_2,300]|
+------------+-------------+
我们有[user_1,1]
和[user_1,100]
,即userid
和usertransaction
。看起来连接工作正常,但让我们看看物理参数。我们使用sortmergejoin
对第一个数据集和第二个数据集使用userid
,然后我们使用sort
和hashpartitioning
。
在前一节中,检测过程中的洗牌,我们使用了partition
方法,该方法在底层使用了hashpartitioning
。虽然我们使用了join
,但我们仍然需要使用哈希分区,因为我们的数据没有正确分区。因此,我们需要对第一个数据集进行分区,因为会有大量的洗牌,然后我们需要对第二个 dataframe 做完全相同的事情。再次,洗牌将会进行两次,一旦数据根据连接字段进行分区,连接就可以在执行器本地进行。
在执行物理计划后,将对记录进行断言,指出userid
用户数据一与用户交易userid
一位于同一执行器上。没有hashpartitioning
,就没有保证,因此我们需要进行分区。
在下一节中,我们将学习如何更改具有广泛依赖的作业的设计,因此我们将看到如何在连接两个数据集时避免不必要的洗牌。
更改具有广泛依赖的作业的设计
在本节中,我们将更改在未分区数据上执行join
的作业。我们将更改具有广泛依赖的作业的设计。
在本节中,我们将涵盖以下主题:
-
使用公共分区键对 dataframe 进行重新分区
-
理解使用预分区数据进行连接
-
理解我们如何避免洗牌
我们将在 dataframe 上使用repartition
方法,使用一个公共分区键。我们发现,当进行连接时,重新分区会在底层发生。但通常,在使用 spark 时,我们希望在 dataframe 上执行多个操作。因此,当我们与其他数据集执行连接时,hashpartitioning
将需要再次执行。如果我们在加载数据时进行分区,我们将避免再次分区。
在这里,我们有我们的示例测试用例,其中包含我们之前在 apache spark 的“导致洗牌的测试操作”部分中使用的数据。我们有userdata
,其中包含三条用户 id 的记录 - user_1
,user_2
和user_4
- 以及usertransaction
数据,其中包含用户 id - 即user_1
,user_2
,user_3
:
test("example of operation that is causing shuffle") {
import spark.sqlcontext.implicits._
val userdata =
spark.sparkcontext.makerdd(list(
userdata("user_1", "1"),
userdata("user_2", "2"),
userdata("user_4", "200")
)).tods()
然后,我们需要对数据进行repartition
,这是要做的第一件非常重要的事情。我们使用userid
列来重新分区我们的userdata
:
val repartitioneduserdata = userdata.repartition(userdata("userid"))
然后,我们将使用userid
列重新分区我们的数据,这次是针对transactiondata
:
val repartitionedtransactiondata = transactiondata.repartition(transactiondata("userid"))
一旦我们重新分区了我们的数据,我们就可以确保具有相同分区键的任何数据 - 在本例中是userid
- 将落在同一个执行器上。因此,我们的重新分区数据将不会有洗牌,连接将更快。最终,我们能够进行连接,但这次我们连接的是预分区的数据:
//when
//data is already partitioned using join-column. don't need to shuffle
val res: dataset[(userdata, usertransaction)]
= repartitioneduserdata.joinwith(repartitionedtransactiondata, userdata("userid") === transactiondata("userid"), "inner")
我们可以使用以下代码显示我们的结果:
//then
res.show()
assert(res.count() == 2)
}
}
输出显示在以下截图中:
在上述截图中,我们有用户 id 和交易的物理计划。我们对用户 id 数据和交易数据的用户 id 列执行了哈希分区。在连接数据之后,我们可以看到数据是正确的,并且连接有一个物理计划。
这次,物理计划有点不同。
我们有一个sortmergejoin
操作,并且我们正在对我们的数据进行排序,这些数据在我们执行引擎的上一步已经预分区。这样,我们的 spark 引擎将执行排序合并连接,无需进行哈希连接。它将正确排序数据,连接将更快。
在下一节中,我们将使用keyby()
操作来进一步减少洗牌。
使用 keyby()操作来减少洗牌
在本节中,我们将使用keyby()
操作来减少洗牌。我们将涵盖以下主题:
-
加载随机分区的数据
-
尝试以有意义的方式预分区数据
-
利用
keyby()
函数
我们将加载随机分区的数据,但这次使用 rdd api。我们将以有意义的方式重新分区数据,并提取底层正在进行的信息,类似于 dataframe 和 dataset api。我们将学习如何利用keyby()
函数为我们的数据提供一些结构,并在 rdd api 中引起预分区。
本节中我们将使用以下测试。我们创建两个随机输入记录。第一条记录有一个随机用户 id,user_1
,第二条记录有一个随机用户 id,user_1
,第三条记录有一个随机用户 id,user_2
:
test("should use keyby to distribute traffic properly"){
//given
val rdd = spark.sparkcontext.makerdd(list(
inputrecord("1234-3456-1235-1234", "user_1"),
inputrecord("1123-3456-1235-1234", "user_1"),
inputrecord("1123-3456-1235-9999", "user_2")
))
我们将使用rdd.todebugstring
提取 spark 底层发生的情况:
println(rdd.todebugstring)
此时,我们的数据是随机分布的,用户 id 字段的记录可能在不同的执行器上,因为 spark 执行引擎无法猜测user_1
是否对我们有意义,或者1234-3456-1235-1234
是否有意义。我们知道1234-3456-1235-1234
不是一个有意义的键,而是一个唯一标识符。将该字段用作分区键将给我们一个随机分布和大量的洗牌,因为在使用唯一字段作为分区键时没有数据局部性。
spark 无法知道相同用户 id 的数据将落在同一个执行器上,这就是为什么在分区数据时我们需要使用用户 id 字段,即user_1
、user_1
或user_2
。为了在 rdd api 中实现这一点,我们可以在我们的数据中使用keyby(_.userid)
,但这次它将改变 rdd 类型:
val res = rdd.keyby(_.userid)
如果我们检查 rdd 类型,我们会发现这次,rdd 不是输入记录,而是字符串和输入记录的 rdd。字符串是我们在这里期望的字段类型,即userid
。我们还将通过在结果上使用todebugstring
来提取有关keyby()
函数的信息:
println(res.todebugstring)
一旦我们使用keyby()
,相同用户 id 的所有记录都将落在同一个执行器上。正如我们所讨论的,这可能是危险的,因为如果我们有一个倾斜的键,这意味着我们有一个具有非常高基数的键,我们可能会耗尽内存。此外,结果上的所有操作都将按键进行,因此我们将在预分区数据上进行操作:
res.collect()
让我们开始这个测试。输出将如下所示:
我们可以看到我们的第一个调试字符串非常简单,我们只有 rdd 上的集合,但第二个有点不同。我们有一个keyby()
方法,并在其下面创建了一个 rdd。我们有来自第一部分的子 rdd 和父 rdd,即测试在 apache spark 中引起洗牌的操作,当我们扩展了 rdd 时。这是由keyby()
方法发出的父子链。
在下一节中,我们将使用自定义分区器进一步减少洗牌。
使用自定义分区器来减少洗牌
在本节中,我们将使用自定义分区器来减少洗牌。我们将涵盖以下主题:
-
实现自定义分区器
-
使用
partitionby
方法在 spark 上使用分区器 -
验证我们的数据是否被正确分区
我们将使用自定义逻辑实现自定义分区器,该分区器将对数据进行分区。它将告诉 spark 每条记录应该落在哪个执行器上。我们将使用 spark 上的partitionby
方法。最后,我们将验证我们的数据是否被正确分区。为了测试的目的,我们假设有两个执行器:
import com.tomekl007.usertransaction
import org.apache.spark.sql.sparksession
import org.apache.spark.{partitioner, sparkcontext}
import org.scalatest.funsuite
import org.scalatest.matchers._
class custompartitioner extends funsuite {
val spark: sparkcontext = sparksession.builder().master("local[2]").getorcreate().sparkcontext
test("should use custom partitioner") {
//given
val numberofexecutors = 2
假设我们想将我们的数据均匀地分成2
个执行器,并且具有相同键的数据实例将落在同一个执行器上。因此,我们的输入数据是一个usertransactions
列表:"a"
,"b"
,"a"
,"b"
和"c"
。值并不那么重要,但我们需要记住它们以便稍后测试行为。给定usertransactions
的amount
分别为100
,101
,202
,1
和55
:
val data = spark
.parallelize(list(
usertransaction("a", 100),
usertransaction("b", 101),
usertransaction("a", 202),
usertransaction("b", 1),
usertransaction("c", 55)
当我们使用keyby
时,(_.userid)
被传递给我们的分区器,因此当我们发出partitionby
时,我们需要扩展override
方法:
).keyby(_.userid)
.partitionby(new partitioner {
override def numpartitions: int = numberofexecutors
getpartition
方法接受一个key
,它将是userid
。键将在这里传递,类型将是字符串:
override def getpartition(key: any): int = {
key.hashcode % numberofexecutors
}
})
这些方法的签名是any
,所以我们需要override
它,并且还需要覆盖分区的数量。
然后我们打印我们的两个分区,numpartitions
返回值为2
:
println(data.partitions.length)
getpartition
非常简单,因为它获取hashcode
和numberofexecutors
的模块。它确保相同的键将落在同一个执行器上。
然后,我们将为各自的分区映射每个分区,因为我们得到一个迭代器。在这里,我们正在为测试目的获取amount
:
//when
val res = data.mappartitionslong.map(_.amount)
).collect().tolist
最后,我们断言55
,100
,202
,101
和1
;顺序是随机的,所以不需要关心顺序:
//then
res should contain thesameelementsas list(55, 100, 202, 101, 1)
}
}
如果我们仍然希望,我们应该使用sortby
方法。让我们开始这个测试,看看我们的自定义分区器是否按预期工作。现在,我们可以开始了。我们有2
个分区,所以它按预期工作,如下面的截图所示:
总结
在本章中,我们学习了如何检测过程中的洗牌。我们涵盖了在 apache spark 中导致洗牌的测试操作。我们还学习了如何在 rdd 中使用分区。如果需要分区数据,了解如何使用 api 是很重要的,因为 rdd 仍然被广泛使用,所以我们使用keyby
操作来减少洗牌。我们还学习了如何使用自定义分区器来减少洗牌。
在下一章中,我们将学习如何使用 spark api 以正确的格式保存数据。
第十章:将数据保存在正确的格式中
在之前的章节中,我们专注于处理和加载数据。我们学习了有关转换、操作、连接、洗牌和 spark 的其他方面。
在本章中,我们将学习如何以正确的格式保存数据,还将使用 spark 的标准 api 以纯文本格式保存数据。我们还将利用 json 作为数据格式,并学习如何使用标准 api 保存 json。spark 有 csv 格式,我们也将利用该格式。然后,我们将学习更高级的基于模式的格式,其中需要支持导入第三方依赖项。接下来,我们将使用 avro 与 spark,并学习如何使用和保存列格式的数据,即 parquet。到本章结束时,我们还将学会如何检索数据以验证其是否以正确的方式存储。
在本章中,我们将涵盖以下主题:
-
以纯文本格式保存数据
-
利用 json 作为数据格式
-
表格式 - csv
-
使用 avro 与 spark
-
列格式 - parquet
以纯文本格式保存数据
在本节中,我们将学习如何以纯文本格式保存数据。将涵盖以下主题:
-
以纯文本格式保存数据
-
加载纯文本数据
-
测试
我们将以纯文本格式保存我们的数据,并研究如何将其保存到 spark 目录中。然后我们将加载纯文本数据,然后测试并保存以检查我们是否可以产生相同的结果代码。这是我们的saveplaintext.scala
文件:
package com.tomekl007.chapter_4
import java.io.file
import com.tomekl007.usertransaction
import org.apache.spark.sql.sparksession
import org.apache.spark.{partitioner, sparkcontext}
import org.scalatest.{beforeandaftereach, funsuite}
import org.scalatest.matchers._
import scala.reflect.io.path
class saveplaintext extends funsuite with beforeandaftereach{
val spark: sparkcontext = sparksession.builder().master("local[2]").getorcreate().sparkcontext
private val filename = "transactions.txt"
override def aftereach() {
val path = path (filename)
path.deleterecursively()
}
test("should save and load in plain text") {
//given
val rdd = spark.makerdd(list(usertransaction("a", 100), usertransaction("b", 200)))
//when
rdd.coalesce(1).saveastextfile(filename)
val fromfile = spark.textfile(filename)
fromfile.collect().tolist should contain thesameelementsas list(
"usertransaction(a,100)", "usertransaction(b,200)"
//note - this is string!
)
}
}
我们将需要一个filename
变量,在我们的情况下,它将是一个文件夹名称,然后 spark 将在其下创建一些文件:
import java.io.file
import com.tomekl007.usertransaction
import org.apache.spark.sql.sparksession
import org.apache.spark.{partitioner, sparkcontext}
import org.scalatest.{beforeandaftereach, funsuite}
import org.scalatest.matchers._
import scala.reflect.io.path
class saveplaintext extends funsuite with beforeandaftereach{
val spark: sparkcontext = sparksession.builder().master("local[2]").getorcreate().sparkcontext
private val filename = "transactions.txt"
我们将在我们的测试用例中使用beforeandaftereach
来清理我们的目录,这意味着路径应该被递归删除。测试后整个路径将被删除,因为需要重新运行测试而没有失败。我们需要注释掉以下代码,以便在第一次运行时调查保存的文本文件的结构:
//override def aftereach() {
// val path = path (filename)
// path.deleterecursively()
// }
//test("should save and load in plain text") {
然后我们将创建两个交易的 rdd,usertransaction("a", 100)
和usertransaction("b", 200)
:
val rdd = spark.makerdd(list(usertransaction("a", 100), usertransaction("b", 200)))
然后,我们将我们的数据合并为一个分区。coalesce()
是一个非常重要的方面。如果我们想将数据保存在单个文件中,我们需要将其合并为一个,但这样做有一个重要的含义:
rdd.coalesce(1).saveastextfile(filename)
如果我们将其合并为一个文件,那么只有一个执行程序可以将数据保存到我们的系统中。这意味着保存数据将非常缓慢,并且还存在内存不足的风险,因为所有数据将被发送到一个执行程序。通常,在生产环境中,我们根据可用的执行程序将其保存为多个分区,甚至乘以自己的因子。因此,如果我们有 16 个执行程序,那么我们可以将其保存为64
。但这会导致64
个文件。出于测试目的,我们将保存为一个文件,如前面的代码片段所示:
rdd.coalesce (numpartitions = 1).saveastextfile(filename)
现在,我们将加载数据。我们只需要将文件名传递给textfile
方法,它将返回fromfile
:
val fromfile = spark.textfile(filename)
然后我们断言我们的数据,这将产生thesameelementsas list
,usertransaction(a,100)
和usertransaction(b,200)
:
fromfile.collect().tolist should contain thesameelementsas list(
"usertransaction(a,100)", "usertransaction(b,200)"
//note - this is string!
)
}
}
需要注意的重要事项是,对于字符串列表,spark 不知道我们的数据模式,因为我们将其保存为纯文本。
这是在保存纯文本时需要注意的一点,因为加载数据并不容易,因为我们需要手动将每个字符串映射到usertransaction
。因此,我们将不得不手动解析每条记录,但是,出于测试目的,我们将把我们的交易视为字符串。
现在,让我们开始测试并查看创建的文件夹的结构:
在前面的屏幕截图中,我们可以看到我们的测试通过了,我们得到了transactions.txt
。在文件夹中,我们有四个文件。第一个是._success.crc
,这意味着保存成功。接下来,我们有.part-00000.crc
,用于控制和验证一切是否正常工作,这意味着保存是正确的。然后,我们有_success
和part-00000
,这两个文件都有校验和,但part-00000
也包含了所有的数据。然后,我们还有usertransaction(a,100)
和usertransaction(b,200)
:
在下一节中,我们将学习如果增加分区数量会发生什么。
利用 json 作为数据格式
在本节中,我们将利用 json 作为数据格式,并将我们的数据保存为 json。以下主题将被涵盖:
-
以 json 格式保存数据
-
加载 json 数据
-
测试
这些数据是人类可读的,并且比简单的纯文本给我们更多的含义,因为它携带了一些模式信息,比如字段名。然后,我们将学习如何以 json 格式保存数据并加载我们的 json 数据。
我们将首先创建一个usertransaction("a", 100)
和usertransaction("b", 200)
的 dataframe,并使用.todf()
保存 dataframe api:
val rdd = spark.sparkcontext
.makerdd(list(usertransaction("a", 100), usertransaction("b", 200)))
.todf()
然后我们将发出coalesce()
,这次我们将取值为2
,并且我们将得到两个结果文件。然后我们将发出write.format
方法,并且需要指定一个格式,我们将使用json
格式:
rdd.coalesce(2).write.format("json").save(filename)
如果我们使用不支持的格式,我们将得到一个异常。让我们通过将源输入为not
来测试这一点:
rdd.coalesce(2).write.format("not").save(filename)
我们将得到诸如“此格式不是预期的”、“找不到数据源:not”和“没有这样的数据源”等异常:
在我们原始的 json 代码中,我们将指定格式,并且需要将其保存到filename
。如果我们想要读取,我们需要将其指定为read
模式,并且还需要添加一个文件夹的路径:
val fromfile = spark.read.json(filename)
在这种情况下,让我们注释掉aftereach()
来调查生成的 json:
// override def aftereach() {
// val path = path(filename)
// path.deleterecursively()
// }
让我们开始测试:
fromfile.show()
assert(fromfile.count() == 2)
}
}
输出如下:
+------+------+
|amount|userid|
| 200| b|
| 100| a|
+------+------+
在前面的代码输出中,我们可以看到我们的测试通过了,并且 dataframe 包含了所有有意义的数据。
从输出中,我们可以看到 dataframe 具有所需的所有模式。它有amount
和userid
,这非常有用。
transactions.json
文件夹有两部分——一部分是r-00000
,另一部分是r-00001
,因为我们发出了两个分区。如果我们在生产系统中保存数据有 100 个分区,我们最终会得到 100 个部分文件,而且每个部分文件都会有一个 crc 校验和文件。
这是第一个文件:
{"userid":"a","amount":"100"}
在这里,我们有一个带有模式的 json 文件,因此我们有一个userid
字段和amount
字段。
另一方面,我们有第二个文件,其中包含第二条记录,包括userid
和amount
:
{"userid":"b","amount":"200"}
这样做的好处是 spark 能够从模式中推断出数据,并且以格式化的 dataframe 加载,具有适当的命名和类型。然而,缺点是每条记录都有一些额外的开销。每条记录都需要在其中有一个字符串,并且在每个字符串中,如果我们有一个包含数百万个文件的文件,并且我们没有对其进行压缩,那么将会有相当大的开销,这是不理想的。
json 是人类可读的,但另一方面,它消耗了大量资源,就像 cpu 用于压缩、读取和写入,以及磁盘和内存用于开销一样。除了 json 之外,还有更好的格式,我们将在接下来的部分中介绍。
在下一节中,我们将查看表格格式,我们将介绍一个经常用于导入到 microsoft excel 或 google 电子表格的 csv 文件。这对数据科学家也是非常有用的格式,但仅在使用较小的数据集时。
表格式——csv
在本节中,我们将介绍文本数据,但以表格格式——csv。以下主题将被涵盖:
-
以 csv 格式保存数据
-
加载 csv 数据
-
测试
保存 csv 文件比 json 和纯文本更复杂,因为我们需要指定是否要在 csv 文件中保留数据的头信息。
首先,我们将创建一个 dataframe:
test("should save and load csv with header") {
//given
import spark.sqlcontext.implicits._
val rdd = spark.sparkcontext
.makerdd(list(usertransaction("a", 100), usertransaction("b", 200)))
.todf()
然后,我们将使用write
格式 csv。我们还需要指定我们不想在其中包含header
选项:
//when
rdd.coalesce(1)
.write
.format("csv")
.option("header", "false")
.save(filename)
然后,我们将进行测试以验证条件是true
还是false
:
//when
rdd.coalesce(1)
.write
.format("csv")
.option("header", "true")
.save(filename)
此外,我们无需添加任何额外的依赖来支持 csv,如以前的版本所需。
然后,我们将指定应该与write
模式相似的read
模式,并且我们需要指定是否有header
:
val fromfile = spark.read.option("header", "false").csv(filename)
让我们开始测试并检查输出:
+---+---+
|_c0|_c1|
+---+---+
| a|100|
| b|200|
+---+---+
在前面的代码输出中,我们可以看到数据已加载,但我们丢失了我们的模式。c0
和c1
是由 spark 创建的列 0(c0
)和列 1(c1
)的别名。
因此,如果我们指定header
应保留该信息,让我们在write
和read
时指定header
:
val fromfile = spark.read.option("header", "true).csv(filename)
我们将指定header
应保留我们的信息。在以下输出中,我们可以看到关于模式的信息在读写操作中被感知到:
+------+------+
|userid|amount|
+------+------+
| a| 100|
| b| 200|
+------+------+
让我们看看如果我们在write
时使用header
,而在read
时不使用它会发生什么。我们的测试应该失败,如下面的代码截图所示:
在前面的截图中,我们可以看到我们的测试失败了,因为我们没有模式,因为我们在没有头的情况下进行读取。第一条记录,也就是header
,被视为列值。
让我们尝试一个不同的情况,我们在没有header
的情况下进行写入,并在有header
的情况下进行读取:
//when
rdd.coalesce(1)
.write
.format("csv")
.option("header", "false")
.save(filename)
val fromfile = spark.read.option("header", "false").csv(filename)
我们的测试将再次失败,因为这一次,我们将我们的第一条记录视为头记录。
让我们将读和写操作都设置为header
并在之前添加的注释后测试我们的代码:
override def aftereach() {
val path = path(filename)
path.deleterecursively()
}
csv 和 json 文件将具有模式,但开销较小。因此,它甚至可能比 json 更好。
在下一节中,我们将看到如何将基于模式的格式作为整体与 spark 一起使用。
使用 avro 与 spark
到目前为止,我们已经看过基于文本的文件。我们使用纯文本、json 和 csv。json 和 csv 比纯文本更好,因为它们携带了一些模式信息。
在本节中,我们将研究一个名为 avro 的高级模式。将涵盖以下主题:
-
以 avro 格式保存数据
-
加载 avro 数据
-
测试
avro 具有嵌入其中的模式和数据。这是一种二进制格式,不是人类可读的。我们将学习如何以 avro 格式保存数据,加载数据,然后进行测试。
首先,我们将创建我们的用户交易:
test("should save and load avro") {
//given
import spark.sqlcontext.implicits._
val rdd = spark.sparkcontext
.makerdd(list(usertransaction("a", 100), usertransaction("b", 200)))
.todf()
然后我们将进行coalesce
并写入 avro:
//when
rdd.coalesce(2)
.write
.avro(filename)
在使用 csv 时,我们指定了像 csv 这样的格式,当我们指定 json 时,这也是一个格式。但是在 avro 中,我们有一个方法。这种方法不是标准的 spark 方法;它来自第三方库。为了具有 avro 支持,我们需要访问build.sbt
并从com.databricks
添加spark-avro
支持。
然后我们需要导入适当的方法。我们将导入com.databricks.spark.avro._
以给我们扩展 spark dataframe 的隐式函数:
import com.databricks.spark.avro._
实际上我们正在使用一个 avro 方法,我们可以看到implicit class
接受一个dataframewriter
类,并以 spark 格式写入我们的数据。
在我们之前使用的coalesce
代码中,我们可以使用write
,指定格式,并执行com.databricks.spark.avro
类。avro
是一个快捷方式,不需要将com.databricks.spark.avro
作为整个字符串写入:
//when
rdd.coalesce(2)
.write.format(com.databricks.spark.avro)
.avro(filename)
简而言之,无需指定格式;只需应用隐式avro
方法。
让我们注释掉代码并删除 avro 以检查它是如何保存的:
// override def aftereach() {
// val path = path(filename)
// path.deleterecursively()
// }
如果我们打开transactions.avro
文件夹,我们有两部分——part-r-00000
和part-r-00001
。
第一部分将包含二进制数据。它由许多二进制记录和一些人类可读的数据组成,这就是我们的模式:
我们有两个字段 - user id
,它是一个字符串类型或空值,和name
:amount
,它是一个整数。作为原始类型,jvm 不能有空值。需要注意的重要事情是,在生产系统中,我们必须保存非常大的数据集,将有成千上万条记录。模式始终在每个文件的第一行。如果我们检查第二部分,我们将看到完全相同的模式,然后是二进制数据。
通常,如果有复杂的模式,我们只有一行或更多行,但仍然是非常少量的数据。
我们可以看到在生成的数据集中,我们有userid
和amount
:
+------+------+
|userid|amount|
+------+------+
| a| 100|
| b| 200|
+------+------+
在上面的代码块中,我们可以看到模式被描绘在文件中。虽然它是一个二进制文件,但我们可以提取它。
在下一节中,我们将研究列格式 - parquet。
列格式 - parquet
在本节中,我们将研究第二种基于模式的格式 parquet。将涵盖以下主题:
-
以 parquet 格式保存数据
-
加载 parquet 数据
-
测试
这是一种列格式,因为数据是以列方式存储的,而不是以行方式,就像我们在 json、csv、纯文本和 avro 文件中看到的那样。
这是一个非常有趣和重要的大数据处理格式,可以加快处理过程。在本节中,我们将专注于向 spark 添加 parquet 支持,将数据保存到文件系统中,重新加载数据,然后进行测试。parquet 与 avro 类似,因为它提供了一个parquet
方法,但这次是一个稍微不同的实现。
在build.sbt
文件中,对于 avro 格式,我们需要添加外部依赖,但对于 parquet,我们已经在 spark 中有了该依赖。因此,parquet 是 spark 的首选,因为它包含在标准包中。
让我们来看看saveparquet.scala
文件中用于保存和加载 parquet 文件的逻辑。
首先,我们合并了两个分区,指定了格式,然后指定我们要保存parquet
:
package com.tomekl007.chapter_4
import com.databricks.spark.avro._
import com.tomekl007.usertransaction
import org.apache.spark.sql.sparksession
import org.scalatest.{beforeandaftereach, funsuite}
import scala.reflect.io.path
class saveparquet extends funsuite with beforeandaftereach {
val spark = sparksession.builder().master("local[2]").getorcreate()
private val filename = "transactions.parquet"
override def aftereach() {
val path = path(filename)
path.deleterecursively()
}
test("should save and load parquet") {
//given
import spark.sqlcontext.implicits._
val rdd = spark.sparkcontext
.makerdd(list(usertransaction("a", 100), usertransaction("b", 200)))
.todf()
//when
rdd.coalesce(2)
.write
.parquet(filename)
read
方法也实现了完全相同的方法:
val fromfile = spark.read.parquet(filename)
fromfile.show()
assert(fromfile.count() == 2)
}
}
让我们开始这个测试,但在此之前,我们将在saveparquet.scala
文件中注释掉以下代码,以查看文件的结构:
// override def aftereach() {
// val path = path(filename)
// path.deleterecursively()
// }
创建了一个新的transactions.parquet
文件夹,里面有两个部分 - part-r-00000
和part-r-00001
。但这次,格式完全是二进制的,并且嵌入了一些元数据。
我们嵌入了元数据,还有amount
和userid
字段,它们是string
类型。r-00000
部分完全相同,并且嵌入了模式。因此,parquet 也是一种基于模式的格式。当我们读取数据时,我们可以看到我们有userid
和amount
列可用。
摘要
在本章中,我们学习了如何以纯文本格式保存数据。我们注意到,当我们没有正确加载数据时,模式信息会丢失。然后我们学习了如何利用 json 作为数据格式,并发现 json 保留了模式,但它有很多开销,因为模式是针对每条记录的。然后我们了解了 csv,并发现 spark 对其有嵌入支持。然而,这种方法的缺点是模式不是关于特定类型的记录,并且需要隐式推断制表符。在本章的最后,我们介绍了 avro 和 parquet,它们具有列格式,也嵌入了 spark。
在下一章中,我们将使用 spark 的键/值 api。
第十一章:使用 spark 键/值 api
在本章中,我们将使用 spark 键/值 api。我们将首先查看可用的键/值对转换。然后,我们将学习如何使用aggregatebykey
方法而不是groupby()
方法。稍后,我们将研究键/值对的操作,并查看可用的键/值数据分区器。在本章结束时,我们将实现一个高级分区器,该分区器将能够按范围对我们的数据进行分区。
在本章中,我们将涵盖以下主题:
-
可用的键/值对操作
-
使用
aggregatebykey
而不是groupby()
-
键/值对操作
-
可用的键/值数据分区器
-
实现自定义分区器
可用的键/值对操作
在本节中,我们将涵盖以下主题:
-
可用的键/值对转换
-
使用
countbykey()
-
了解其他方法
因此,这是我们众所周知的测试,我们将在其中使用键/值对的转换。
首先,我们将为用户a
,b
,a
,b
和c
创建一个用户交易数组,以某种金额,如下例所示:
val keyswithvalueslist =
array(
usertransaction("a", 100),
usertransaction("b", 4),
usertransaction("a", 100001),
usertransaction("b", 10),
usertransaction("c", 10)
)
然后,根据以下示例,我们需要按特定字段对数据进行键入:
val keyed = data.keyby(_.userid)
我们将通过调用keyby
方法并使用userid
参数对其进行键入。
现在,我们的数据分配给了keyed
变量,其类型为元组。第一个元素是字符串,即userid
,第二个元素是usertransaction
。
让我们看一下可用的转换。首先,我们将看看countbykey
。
让我们看一下它的实现,如下例所示:
val data = spark.parallelize(keyswithvalueslist)
val keyed = data.keyby(_.userid)
//when
val counted = keyed.countbykey()
// keyed.combinebykey()
// keyed.aggregatebykey()
// keyed.foldbykey()
// keyed.groupbykey()
//then
counted should contain thesameelementsas map("b" -> 2, "a" -> 2, "c" -> 1)
这将返回一个map
,键k
和long
是一种通用类型,因为它可以是任何类型的键。在本例中,键将是一个字符串。每个返回映射的操作都不是完全安全的。如果您看到返回映射的方法的签名,这表明这些数据将被发送到驱动程序,并且需要适合内存。如果有太多的数据无法适应一个驱动程序的内存,那么我们将耗尽内存。因此,在使用此方法时,我们需要谨慎。
然后,我们执行一个包含与地图相同元素的断言计数,如下例所示:
counted should contain thesameelementsas map("b" -> 2, "a" -> 2, "c" -> 1)
b
是2
,因为我们有两个值。另外,a
与c
类似,因为它们只有一个值。countbykey()
不占用内存,因为它只存储键和计数器。但是,如果键是一个复杂且大的对象,例如具有多个字段的交易,超过两个,那么该映射可能会非常大。
但让我们从下面的例子开始这个测试:
从前面的屏幕截图中,我们可以看到我们的测试通过了。
我们还有一个combinebykey()
方法,它将相同键的相同元素组合在一起,并共享负面的aggregatebykey()
,能够聚合不同类型。我们有foldbykey
,它正在获取当前状态和值,但返回与键的值相同的类型。
我们还有groupbykey()
,我们在上一节中了解过。这将根据特定键对所有内容进行分组,并返回键的值迭代器。这也是一个非常占用内存的操作,因此在使用时需要小心。
在下一节中,我们将使用aggregatebykey
而不是groupby
。我们将学习groupby
的工作原理并修复其缺陷。
使用aggregatebykey
而不是groupby()
在本节中,我们将探讨为什么我们使用aggregatebykey
而不是groupby
。
我们将涵盖以下主题:
-
为什么我们应该避免使用
groupbykey
-
aggregatebykey
给我们的是什么 -
使用
aggregatebykey
实现逻辑
首先,我们将创建我们的用户交易数组,如下例所示:
val keyswithvalueslist =
array(
usertransaction("a", 100),
usertransaction("b", 4),
usertransaction("a", 100001),
usertransaction("b", 10),
usertransaction("c", 10)
)
然后,我们将使用parallelize
创建一个 rdd,因为我们希望我们的数据按键排序。这在下面的例子中显示:
val data = spark.parallelize(keyswithvalueslist)
val keyed = data.keyby(_.userid)
在前面的代码中,我们调用了keyby
来对userid
进行操作,以获得付款人、键和用户交易的数据。
让我们假设我们想要聚合,我们想要对相同的键执行一些特定的逻辑,如下面的例子所示:
val aggregatedtransactionsforuserid = keyed
.aggregatebykey(amountforuser)(addamount, mergeamounts)
这样做的原因可能是选择最大元素、最小元素或计算平均值。aggregatebykey
需要接受三个参数,如下面的例子所示:
aggregatebykey(amountforuser)(addamount, mergeamounts)
第一个参数是 t 类型的初始参数,定义amountforuser
是一个类型为arraybuffer
的初始参数。这非常重要,因为 scala 编译器将推断出该类型,并且在这个例子中,参数 1 和 2 需要具有完全相同的类型 t:arraybuffer.empty[long]
。
下一个参数是一个方法,它接受我们正在处理的当前元素。在这个例子中,transaction: usertransaction) =>
是一个当前交易,也需要带上我们初始化函数的状态,因此这里将是一个数组缓冲区。
它需要与以下代码块中显示的相同类型,因此这是我们的类型 t:
mutable.arraybuffer.empty[long]
在这一点上,我们能够获取任何交易并将其添加到特定状态中。这是以分布式方式完成的。对于一个键,执行在一个执行器上完成,对于完全相同的键,执行在不同的执行器上完成。这是并行进行的,因此对于相同的键将添加多个交易。
现在,spark 知道,对于完全相同的键,它有多个 t 类型的状态arraybuffer
,需要合并。因此,我们需要为相同的键mergeamounts
我们的交易。
mergeargument
是一个方法,它接受两个状态,这两个状态都是 t 类型的中间状态,如下面的代码块所示:
val mergeamounts = (p1: mutable.arraybuffer[long], p2: mutable.arraybuffer[long]) => p1 ++= p2
在这个例子中,我们想要将释放缓冲区合并成一个数组缓冲区。因此,我们发出p1 ++= p2
。这将两个数组缓冲区合并成一个。
现在,我们已经准备好所有参数,我们能够执行aggregatebykey
并查看结果是什么样子的。结果是一个字符串和类型 t 的 rdd,arraybuffer[long]
,这是我们的状态。我们将不再在 rdd 中保留usertransaction
,这有助于减少内存使用。usertransaction
是一个重量级对象,因为它可以有多个字段,在这个例子中,我们只对金额字段感兴趣。因此,这样我们可以减少内存的使用。
下面的例子展示了我们的结果应该是什么样子的:
aggregatedtransactionsforuserid.collect().tolist should contain thesameelementsas list(
("a", arraybuffer(100, 100001)),
("b", arraybuffer(4,10)),
("c", arraybuffer(10)))
我们应该有一个键a
,和一个arraybuffer
的100
和10001
,因为这是我们的输入数据。b
应该是4
和10
,最后,c
应该是10
。
让我们开始测试,检查我们是否已经正确实现了aggregatebykey
,如下面的例子所示:
从前面的输出中,我们可以看到它按预期工作。
在下一节中,我们将研究可用于键/值对的操作。
键/值对上的操作
在本节中,我们将研究键/值对上的操作。
我们将涵盖以下主题:
-
检查键/值对上的操作
-
使用
collect()
-
检查键/值 rdd 的输出
在本章的第一部分中,我们介绍了可用于键/值对的转换。我们看到它们与 rdd 相比有些不同。此外,对于操作,结果略有不同,但方法名称并没有变化。
因此,我们将使用collect()
,并且我们将检查我们对这些键/值对的操作的输出。
首先,我们将根据userid
创建我们的交易数组和 rdd,如下面的例子所示:
val keyswithvalueslist =
array(
usertransaction("a", 100),
usertransaction("b", 4),
usertransaction("a", 100001),
usertransaction("b", 10),
usertransaction("c", 10)
)
我们首先想到的操作是collect()
。collect()
会取出每个元素并将其分配给结果,因此我们的结果与keyby
的结果非常不同。
我们的结果是一对键,userid
和一个值,即usertransaction
。我们可以从下面的例子中看到,我们可以有一个重复的键:
res should contain thesameelementsas list(
("a",usertransaction("a",100)),
("b",usertransaction("b",4)),
("a",usertransaction("a",100001)),
("b",usertransaction("b",10)),
("c",usertransaction("c",10))
)//note duplicated key
在前面的代码中,我们可以看到同一个订单有多个出现。对于一个简单的字符串键,重复并不是很昂贵。然而,如果我们有一个更复杂的键,那么就会很昂贵。
因此,让我们开始这个测试,如下例所示:
从前面的输出中,我们可以看到我们的测试已经通过。要查看其他动作,我们将查看不同的方法。
如果一个方法返回 rdd,比如collect[u] (f: partialfunction[(string, usertransaction), u])
,这意味着这不是一个动作。如果某些东西返回 rdd,这意味着它不是一个动作。这适用于键/值对。
collect()
不会返回 rdd,而是返回数组,因此它是一个动作。count
返回long
,因此这也是一个动作。countbykey
返回 map。如果我们想要reduce
我们的元素,那么这是一个动作,但reducebykey
不是一个动作。这就是reduce
和reducebykey
之间的重大区别。
我们可以看到根据 rdd,一切都是正常的,因此动作是相同的,差异只在于转换。
在下一节中,我们将看一下键/值数据上可用的分区器。
键/值数据上可用的分区器
我们知道分区和分区器是 apache spark 的关键组件。它们影响我们的数据如何分区,这意味着它们影响数据实际驻留在哪些执行器上。如果我们有一个良好的分区器,那么我们将有良好的数据局部性,这将减少洗牌。我们知道洗牌对处理来说是不可取的,因此减少洗牌是至关重要的,因此选择适当的分区器对我们的系统也是至关重要的。
在本节中,我们将涵盖以下主题:
-
检查
hashpartitioner
-
检查
rangepartitioner
-
测试
我们将首先检查我们的hashpartitioner
和rangepartitioner
。然后我们将比较它们并使用两个分区器测试代码。
首先,我们将创建一个usertransaction
数组,如下例所示:
val keyswithvalueslist =
array(
usertransaction("a", 100),
usertransaction("b", 4),
usertransaction("a", 100001),
usertransaction("b", 10),
usertransaction("c", 10)
)
然后我们将使用keyby
(如下例所示),因为分区器将自动处理我们数据的键:
val keyed = data.keyby(_.userid)
然后我们将获取键数据的partitioner
,如下例所示:
val partitioner = keyed.partitioner
代码显示partitioner.isempty
,因为我们还没有定义任何partitioner
,因此在这一点上它是空的,如下例所示:
assert(partitioner.isempty)
我们可以使用partitionby
方法指定一个partitioner
,如下例所示:
val hashpartitioner = keyed.partitionby(new hashpartitioner(100))
该方法期望一个partitioner
抽象类的实现。我们将有一些实现,但首先,让我们专注于hashpartitioner
。
hashpartitioner
需要一个分区数,并且有一个分区数。numpartition
返回我们的参数,但getpartition
会更加复杂,如下例所示:
def numpartitions: int = partitions
def getpartition(key: any): int = key match {
case null => 0
case_ => utils.nonnegativemode(key.hashcode, numpartitions)
}
它首先检查我们的key
是否为null
。如果是null
,它将落在分区号0
。如果我们有带有null
键的数据,它们都将落在相同的执行器上,正如我们所知,这不是一个好的情况,因为执行器将有很多内存开销,并且它们可能会因为内存异常而失败。
如果key
不是null
,那么它会从hashcode
和分区数中进行nonnegativemod
。它必须是分区数的模数,这样它才能分配到适当的分区。因此,hashcode
方法对我们的键非常重要。
如果我们提供了一个自定义的键而不是像整数或字符串这样的原始类型,它有一个众所周知的hashcode
,我们需要提供和实现一个适当的hashcode
。但最佳实践是使用 scala 中的case
类,因为它们已经为你实现了hashcode
和 equals。
我们现在已经定义了partitioner
,但partitioner
是可以动态更改的。我们可以将我们的partitioner
更改为rangepartitioner
。rangepartitioner
接受 rdd 中的分区。
rangepartitioner
更复杂,因为它试图将我们的数据划分为范围,这不像hashpartitioner
在获取分区时那样简单。该方法非常复杂,因为它试图均匀地分布我们的数据,并且对将其分布到范围中的逻辑非常复杂。
让我们开始我们的测试,检查我们是否能够正确地分配partitioner
,如下所示的输出:
我们的测试已经通过。这意味着,在最初的时候,partitioner
是空的,然后我们必须在partitionby
处对 rdd 进行洗牌,还有一个branchpartitioner
。但它只显示了我们创建partitioner
接口的实例的数值线。
在下一部分,我们将尝试改进它,或者尝试通过实现自定义分区器来调整和玩弄分区器。
实现自定义分区器
在这一部分,我们将实现一个自定义的分区器,并创建一个接受带有范围的解析列表的分区器。如果我们的键落入特定范围,我们将分配列表的分区号索引。
我们将涵盖以下主题:
-
实现自定义分区器
-
实现一个范围分区器
-
测试我们的分区器
我们将根据我们自己的范围分区逻辑来实现范围分区,并测试我们的分区器。让我们从不查看实现的黑盒测试开始。
代码的第一部分与我们已经使用的类似,但这次我们有keyby
数量的数据,如下例所示:
val keyswithvalueslist =
array(
usertransaction("a", 100),
usertransaction("b", 4),
usertransaction("a", 100001),
usertransaction("b", 10),
usertransaction("c", 10)
)
val data = spark.parallelize(keyswithvalueslist)
val keyed = data.keyby(_.amount)
我们按数量进行分组,我们有以下键:100
,4
,100001
,10
和10
。
然后,我们将创建一个分区器,并将其命名为customrangepartitioner
,它将接受一个元组列表,如下例所示:
val partitioned = keyed.partitionby(new customrangepartitioner(list((0,100), (100, 10000), (10000, 1000000))))
第一个元素是从0
到100
,这意味着如果键在0
到100
的范围内,它应该进入分区0
。因此,有四个键应该落入该分区。下一个分区号的范围是100
和10000
,因此该范围内的每条记录都应该落入分区号1
,包括两端。最后一个范围是10000
到1000000
元素之间,因此,如果记录在该范围内,它应该落入该分区。如果我们有一个超出范围的元素,那么分区器将因非法参数异常而失败。
让我们看一下下面的例子,展示了我们自定义范围分区器的实现:
class customrangepartitioner(ranges: list[(int,int)]) extends partitioner{
override def numpartitions: int = ranges.size
override def getpartition(key: any): int = {
if(!key.isinstanceof[int]){
throw new illegalargumentexception("partitioner works only for int type")
}
val keyint = key.asinstanceof[int]
val index = ranges.lastindexwhere(v => keyint >= v._1 && keyint <= v._2)
println(s"for key: $key return $index")
index
}
}
它将范围作为元组的参数列表,如下例所示:
(ranges: list[(int,int)])
我们的numpartitions
应该等于ranges.size
,因此分区的数量等于范围的数量。
接下来,我们有getpartition
方法。首先,我们的分区器只对整数有效,如下例所示:
if(!key.isinstanceof[int])
我们可以看到这是一个整数,不能用于其他类型。出于同样的原因,我们首先需要检查我们的键是否是整数的实例,如果不是,我们会得到一个illegalargumentexception
,因为该分区器只对 int 类型有效。
我们现在可以通过asinstanceof
来测试我们的keyint
。完成后,我们可以遍历范围,并在索引在谓词之间时取最后一个范围。我们的谓词是一个元组v
,应该如下所示:
val index = ranges.lastindexwhere(v => keyint >= v._1 && keyint <= v._2)
keyint
应该大于或等于v._1
,即元组的第一个元素,但也应该小于第二个元素v._2
。
范围的起始是v._1
,范围的结束是v._2
,因此我们可以检查我们的元素是否在范围内。
最后,我们将打印我们在调试目的中找到的键的索引,并返回索引,这将是我们的分区。如下例所示:
println(s"for key: $key return $index")
让我们开始下面的测试:
我们可以看到对于键100001
,代码返回了预期的分区号2
。对于键100
返回分区一,对于10
,4
,10
返回分区零,这意味着我们的代码按预期工作。
摘要
在本章中,我们首先看到了关于键/值对的转换操作。然后我们学习了如何使用aggregatebykey
而不是groupby
。我们还涵盖了关于键/值对的操作。之后,我们看了一下可用的分区器,比如rangepartitioner
和hashpartition
在键/值数据上。在本章结束时,我们已经实现了我们自定义的分区器,它能够根据范围的起始和结束来分配分区,以便学习目的。
在下一章中,我们将学习如何测试我们的 spark 作业和 apache spark 作业。
第十二章:测试 apache spark 作业
在本章中,我们将测试 apache spark 作业,并学习如何将逻辑与 spark 引擎分离。
我们将首先对我们的代码进行单元测试,然后在 sparksession 中进行集成测试。之后,我们将使用部分函数模拟数据源,然后学习如何利用 scalacheck 进行基于属性的测试以及 scala 中的类型。在本章结束时,我们将在不同版本的 spark 中执行测试。
在本章中,我们将涵盖以下主题:
-
将逻辑与 spark 引擎分离-单元测试
-
使用 sparksession 进行集成测试
-
使用部分函数模拟数据源
-
使用 scalacheck 进行基于属性的测试
-
在不同版本的 spark 中进行测试
将逻辑与 spark 引擎分离-单元测试
让我们从将逻辑与 spark 引擎分离开始。
在本节中,我们将涵盖以下主题:
-
创建具有逻辑的组件
-
该组件的单元测试
-
使用模型类的案例类进行领域逻辑
让我们先看逻辑,然后是简单的测试。
因此,我们有一个bonusverifier
对象,只有一个方法quaifyforbonus
,它接受我们的usertransaction
模型类。根据以下代码中的登录,我们加载用户交易并过滤所有符合奖金资格的用户。首先,我们需要测试它以创建一个 rdd 并对其进行过滤。我们需要创建一个 sparksession,并为模拟 rdd 或 dataframe 创建数据,然后测试整个 spark api。由于这涉及逻辑,我们将对其进行隔离测试。逻辑如下:
package com.tomekl007.chapter_6
import com.tomekl007.usertransaction
object bonusverifier {
private val superusers = list("a", "x", "100-million")
def qualifyforbonus(usertransaction: usertransaction): boolean = {
superusers.contains(usertransaction.userid) && usertransaction.amount > 100
}
}
我们有一个超级用户列表,其中包括a
、x
和100-million
用户 id。如果我们的usertransaction.userid
在superusers
列表中,并且usertransaction.amount
高于100
,那么用户就有资格获得奖金;否则,他们就没有资格。在现实世界中,奖金资格逻辑将更加复杂,因此非常重要的是对逻辑进行隔离测试。
以下代码显示了我们使用usertransaction
模型的测试。我们知道我们的用户交易包括userid
和amount
。以下示例显示了我们的领域模型对象,它在 spark 执行集成测试和我们的单元测试之间共享,与 spark 分开:
package com.tomekl007
import java.util.uuid
case class userdata(userid: string , data: string)
case class usertransaction(userid: string, amount: int)
case class inputrecord(uuid: string = uuid.*randomuuid()*.tostring(), userid: string)
我们需要为用户 id x
和金额101
创建我们的usertransaction
,如下例所示:
package com.tomekl007.chapter_6
import com.tomekl007.usertransaction
import org.scalatest.funsuite
class separatinglogic extends funsuite {
test("test complex logic separately from spark engine") {
//given
val usertransaction = usertransaction("x", 101)
//when
val res = bonusverifier.qualifyforbonus(usertransaction)
//then
assert(res)
}
}
然后我们将usertransaction
传递给qualifyforbonus
,结果应该是true
。这个用户应该有资格获得奖金,如下输出所示:
现在,让我们为负面用例编写一个测试,如下所示:
test(testname = "test complex logic separately from spark engine - non qualify") {
//given
val usertransaction = usertransaction("x", 99)
//when
val res = bonusverifier.*qualifyforbonus*(usertransaction)
//then
assert(!res)
}
在这里,我们有一个用户x
,花费99
,所以我们的结果应该是 false。当我们验证我们的代码时,我们可以看到从以下输出中,我们的测试已经通过了:
我们已经涵盖了两种情况,但在现实世界的场景中,还有更多。例如,如果我们想测试指定userid
不在这个超级用户列表中的情况,我们有一个花了很多钱的some_new_user
,在我们的案例中是100000
,我们得到以下结果:
test(testname = "test complex logic separately from spark engine - non qualify2") {
//given
val usertransaction = usertransaction("some_new_user", 100000)
//when
val res = bonusverifier.*qualifyforbonus*(usertransaction)
//then
assert(!res)
}
假设它不应该符合条件,因此这样的逻辑有点复杂。因此,我们以单元测试的方式进行测试:
我们的测试非常快,因此我们能够检查一切是否按预期工作,而无需引入 spark。在下一节中,我们将使用 sparksession 进行集成测试来更改逻辑。
使用 sparksession 进行集成测试
现在让我们学习如何使用 sparksession 进行集成测试。
在本节中,我们将涵盖以下主题:
-
利用 sparksession 进行集成测试
-
使用经过单元测试的组件
在这里,我们正在创建 spark 引擎。以下行对于集成测试至关重要:
val spark: sparkcontext = sparksession.builder().master("local[2]").getorcreate().sparkcontext
创建一个轻量级对象并不是一件简单的事情。sparksession 是一个非常重的对象,从头开始构建它是一项昂贵的操作,从资源和时间的角度来看。与上一节的单元测试相比,诸如创建 sparksession 的测试将花费更多的时间。
出于同样的原因,我们应该经常使用单元测试来转换所有边缘情况,并且仅在逻辑的较小部分,如资本边缘情况时才使用集成测试。
以下示例显示了我们正在创建的数组:
val keyswithvalueslist =
array(
usertransaction("a", 100),
usertransaction("b", 4),
usertransaction("a", 100001),
usertransaction("b", 10),
usertransaction("c", 10)
)
以下示例显示了我们正在创建的 rdd:
val data = spark.parallelize(keyswithvalueslist)
这是 spark 第一次参与我们的集成测试。创建 rdd 也是一个耗时的操作。与仅创建数组相比,创建 rdd 真的很慢,因为它也是一个重量级对象。
我们现在将使用我们的data.filter
来传递一个qualifyforbonus
函数,如下例所示:
val aggregatedtransactionsforuserid = data.filter(bonusverifier.qualifyforbonus)
这个函数已经经过单元测试,所以我们不需要考虑所有边缘情况,不同的 id,不同的金额等等。我们只是创建了一些 id 和一些金额来测试我们整个逻辑链是否按预期工作。
应用了这个逻辑之后,我们的输出应该类似于以下内容:
usertransaction("a", 100001)
让我们开始这个测试,检查执行单个集成测试需要多长时间,如下输出所示:
执行这个简单测试大约需要646 毫秒
。
如果我们想要覆盖每一个边缘情况,与上一节的单元测试相比,值将乘以数百倍。让我们从三个边缘情况开始这个单元测试,如下输出所示:
我们可以看到我们的测试只花了18 毫秒
,这意味着即使我们覆盖了三个边缘情况,与只有一个情况的集成测试相比,速度快了 20 倍。
在这里,我们覆盖了许多逻辑,包括数百个边缘情况,我们可以得出结论,尽可能低的级别进行单元测试是非常明智的。
在下一节中,我们将使用部分函数来模拟数据源。
使用部分函数模拟数据源
在本节中,我们将涵盖以下主题:
-
创建一个从 hive 读取数据的 spark 组件
-
模拟组件
-
测试模拟组件
假设以下代码是我们的生产线:
ignore("loading data on prod from hive") {
userdatalogic.loadandgetamount(spark, hivedataloader.loadusertransactions)
}
在这里,我们使用userdatalogic.loadandgetamount
函数,它需要加载我们的用户数据交易并获取交易的金额。这个方法需要两个参数。第一个参数是sparksession
,第二个参数是sparksession
的provider
,它接受sparksession
并返回dataframe
,如下例所示:
object userdatalogic {
def loadandgetamount(sparksession: sparksession, provider: sparksession => dataframe): dataframe = {
val df = provider(sparksession)
df.select(df("amount"))
}
}
对于生产,我们将加载用户交易,并查看hivedataloader
组件只有一个方法,sparksession.sql
和("select * from transactions")
,如下代码块所示:
object hivedataloader {
def loadusertransactions(sparksession: sparksession): dataframe = {
sparksession.sql("select * from transactions")
}
}
这意味着该函数去 hive 检索我们的数据并返回一个 dataframe。根据我们的逻辑,它执行了返回 dataframe 的provider
,然后从 dataframe 中选择amount
。
这个逻辑并不简单,因为我们的 sparksession provider
在生产中与外部系统进行交互。因此,我们可以创建一个如下的函数:
userdatalogic.loadandgetamount(spark, hivedataloader.loadusertransactions)
让我们看看如何测试这样一个组件。首先,我们将创建一个用户交易的 dataframe,这是我们的模拟数据,如下例所示:
val df = spark.sparkcontext
.makerdd(list(usertransaction("a", 100), usertransaction("b", 200)))
.todf()
然而,我们需要将数据保存到 hive 中,嵌入它,然后启动 hive。
由于我们使用了部分函数,我们可以将部分函数作为第二个参数传递,如下例所示:
val res = userdatalogic.loadandgetamount(spark, _ => df)
第一个参数是spark
,但这次我们的方法中没有使用它。第二个参数是一个接受 sparksession 并返回 dataframe 的方法。
然而,我们的执行引擎、架构和代码并不考虑这个 sparksession 是否被使用,或者是否进行了外部调用;它只想返回 dataframe。我们可以使用_
作为我们的第一个参数,因为它被忽略,只返回 dataframe 作为返回类型。
因此我们的loadandgetamount
将获得一个模拟 dataframe,这是我们创建的 dataframe。
但是,对于所示的逻辑,它是透明的,不考虑 dataframe 是来自 hive、sql、cassandra 还是其他任何来源,如下例所示:
val df = provider(sparksession)
df.select(df("amount"))
在我们的例子中,df
来自我们为测试目的创建的内存。我们的逻辑继续并选择了数量。
然后,我们展示我们的列,res.show()
,并且该逻辑应该以一个列的数量结束。让我们开始这个测试,如下例所示:
我们可以从上面的例子中看到,我们的结果 dataframe 在100
和200
值中有一个列的数量。这意味着它按预期工作,而无需启动嵌入式 hive。关键在于使用提供程序而不是在逻辑中嵌入我们的选择开始。
在下一节中,我们将使用 scalacheck 进行基于属性的测试。
使用 scalacheck 进行基于属性的测试
在本节中,我们将涵盖以下主题:
-
基于属性的测试
-
创建基于属性的测试
让我们看一个简单的基于属性的测试。在定义属性之前,我们需要导入一个依赖项。我们还需要一个 scalacheck 库的依赖项,这是一个用于基于属性的测试的库。
在上一节中,每个测试都扩展了funsuite
。我们使用了功能测试,但是必须显式提供参数。在这个例子中,我们扩展了来自 scalacheck 库的properties
,并测试了stringtype
,如下所示:
object propertybasedtesting extends properties("stringtype")
我们的 scalacheck 将为我们生成一个随机字符串。如果我们为自定义类型创建基于属性的测试,那么 scalacheck 是不知道的。我们需要提供一个生成器,它将生成该特定类型的实例。
首先,让我们以以下方式定义我们字符串类型的第一个属性:
property("length of strings") = forall { (a: string, b: string) =>
a.length + b.length >= a.length
}
forall
是 scalacheck 属性的一个方法。我们将在这里传递任意数量的参数,但它们需要是我们正在测试的类型。
假设我们想要获得两个随机字符串,并且在这些字符串中,不变性应该被感知。
如果我们将字符串a
的长度加上字符串b
的长度,那么它们的总和应该大于或等于a.length
,因为如果b
是0
,那么它们将相等,如下例所示:
a.length + b.length >= a.length
然而,这是string
的不变性,对于每个输入字符串,它应该是true
。
我们正在定义的第二个属性更复杂,如下代码所示:
property("creating list of strings") = forall { (a: string, b: string, c: string) =>
list(a,b,c).map(_.length).sum == a.length + b.length + c.length
}
在上面的代码中,我们要求 scalacheck 运行时引擎这次共享三个字符串,即a
、b
和c
。当我们创建一个字符串列表时,我们将测试这个。
在这里,我们正在创建一个字符串列表,即a
、b
、c
,如下代码所示:
list(a,b,c)
当我们将每个元素映射到length
时,这些元素的总和应该等于通过长度添加所有元素。在这里,我们有a.length + b.length + c.length
,我们将测试集合 api,以检查映射和其他函数是否按预期工作。
让我们开始这个基于属性的测试,以检查我们的属性是否正确,如下例所示:
我们可以看到string
的stringtype.length
属性通过并执行了100
次测试。100
次测试被执行可能会让人惊讶,但让我们尝试看看通过以下代码传递了什么参数:
println(s"a: $a, b: $b")
我们将打印a
参数和b
参数,并通过测试以下输出来重试我们的属性:
我们可以看到生成了许多奇怪的字符串,因此这是一个我们无法事先创建的边缘情况。基于属性的测试将创建一个非常奇怪的唯一代码,这不是一个合适的字符串。因此,这是一个用于测试我们的逻辑是否按预期针对特定类型工作的好工具。
在下一节中,我们将在不同版本的 spark 中进行测试。
在不同版本的 spark 中进行测试
在本节中,我们将涵盖以下主题:
-
将组件更改为与 spark pre-2.x 一起使用
-
mock 测试 pre-2.x
-
rdd 模拟测试
让我们从本章第三节开始,模拟数据源——使用部分函数模拟数据源。
由于我们正在测试userdatalogic.loadandgetamount
,请注意一切都在 dataframe 上操作,因此我们有一个 sparksession 和 dataframe。
现在,让我们将其与 spark pre-2.x 进行比较。我们可以看到这一次我们无法使用 dataframe。假设以下示例显示了我们在以前的 spark 中的逻辑:
test("mock loading data from hive"){
//given
import spark.sqlcontext.implicits._
val df = spark.sparkcontext
.makerdd(list(usertransaction("a", 100), usertransaction("b", 200)))
.todf()
.rdd
//when
val res = userdatalogicpre2.loadandgetamount(spark, _ => df)
//then
println(res.collect().tolist)
}
}
我们可以看到这一次我们无法使用 dataframe。
在前面的部分中,loadandgetamount
正在接受spark
和 dataframe,但在下面的示例中,dataframe 是一个 rdd,不再是 dataframe,因此我们传递了一个rdd
:
val res = userdatalogicpre2.loadandgetamount(spark, _ => rdd)
然而,我们需要为 spark 创建一个不同的userdatalogicpre2
,它接受 sparksession 并在映射整数的 rdd 之后返回一个 rdd,如下例所示:
object userdatalogicpre2 {
def loadandgetamount(sparksession: sparksession, provider: sparksession => rdd[row]): rdd[int] = {
provider(sparksession).map(_.getasint)
}
}
object hivedataloaderpre2 {
def loadusertransactions(sparksession: sparksession): rdd[row] = {
sparksession.sql("select * from transactions").rdd
}
}
在前面的代码中,我们可以看到provider
正在执行我们的提供程序逻辑,映射每个元素,将其作为int
获取。然后,我们得到了金额。row
是一个可以有可变数量参数的泛型类型。
在 spark pre-2.x 中,我们没有sparksession
,因此需要使用sparkcontext
并相应地更改我们的登录。
总结
在本章中,我们首先学习了如何将逻辑与 spark 引擎分离。然后,我们查看了一个在没有 spark 引擎的情况下经过良好测试的组件,并使用 sparksession 进行了集成测试。为此,我们通过重用已经经过良好测试的组件创建了一个 sparksession 测试。通过这样做,我们不必在集成测试中涵盖所有边缘情况,而且我们的测试速度更快。然后,我们学习了如何利用部分函数在测试阶段提供模拟数据。我们还介绍了 scalacheck 用于基于属性的测试。在本章结束时,我们已经在不同版本的 spark 中测试了我们的代码,并学会了将 dataframe 模拟测试更改为 rdd。
在下一章中,我们将学习如何利用 spark graphx api。
第十三章:利用 spark graphx api
在本章中,我们将学习如何从数据源创建图。然后,我们将使用 edge api 和 vertex api 进行实验。在本章结束时,您将知道如何计算顶点的度和 pagerank。
在本章中,我们将涵盖以下主题:
-
从数据源创建图
-
使用 vertex api
-
使用 edge api
-
计算顶点的度
-
计算 pagerank
从数据源创建图
我们将创建一个加载器组件,用于加载数据,重新审视图格式,并从文件加载 spark 图。
创建加载器组件
graph.g
文件包含顶点到顶点的结构。在下面的graph.g
文件中,如果我们将1
对齐到2
,这意味着顶点 id1
和顶点 id2
之间有一条边。第二行表示从顶点 id1
到顶点 id3
有一条边,然后从2
到3
,最后从3
到5
:
1 2
1 3
2 3
3 5
我们将取graph.g
文件,加载它,并查看它将如何在 spark 中提供结果。首先,我们需要获取我们的graph.g
文件的资源。我们将使用getclass.getresource()
方法来获取它的路径,如下所示:
package com.tomekl007.chapter_7
import org.apache.spark.sparkcontext
import org.apache.spark.sql.sparksession
import org.scalatest.funsuite
class creatinggraph extends funsuite {
val spark: sparkcontext = sparksession.builder().master("local[2]").getorcreate().sparkcontext
test("should load graph from a file") {
//given
val path = getclass.getresource("/graph.g").getpath
重新审视图格式
接下来,我们有graphbuilder
方法,这是我们自己的组件:
//when
val graph = graphbuilder.loadfromfile(spark, path)
以下是我们的graphbuilder.scala
文件,用于我们的graphbuilder
方法:
package com.tomekl007.chapter_7
import org.apache.spark.sparkcontext
import org.apache.spark.graphx.{graph, graphloader}
object graphbuilder {
def loadfromfile(sc: sparkcontext, path: string): graph[int, int] = {
graphloader.edgelistfile(sc, path)
}
}
它使用了org.apache.spark.graphx.{graph, graphloader}
包中的graphloader
类,并且我们指定了格式。
这里指定的格式是edgelistfile
。我们传递了sc
参数,即sparkcontext
和path
参数,其中包含文件的路径。得到的图将是graph [int, int]
,我们将使用它作为我们顶点的标识符。
从文件加载 spark
一旦我们得到了结果图,我们可以将spark
和path
参数传递给我们的graphbuilder.loadfromfile()
方法,此时,我们将得到一个graph [int, int]
的构造图,如下所示:
val graph = graphbuilder.loadfromfile(spark, path)
迭代和验证我们的图是否被正确加载,我们将使用图
中的三元组
,它们是一对顶点到顶点,也是这些顶点之间的边。我们将看到图的结构是否被正确加载:
//then
graph.triplets.foreach(println(_))
最后,我们断言我们得到了4
个三元组(如前面在创建加载器组件部分中所示,我们从graph.g
文件中有四个定义):
assert(graph.triplets.count() == 4)
}
}
我们将开始测试并查看我们是否能够正确加载我们的图。
我们得到了以下输出。这里,我们有(2, 1)
,(3, 1)
,(3,1)
,(5,1)
,(1,1)
,(2,1)
,(1,1)
和(3,1)
:
因此,根据输出的图,我们能够使用 spark 重新加载我们的图。
使用 vertex api
在这一部分,我们将使用边来构建图。我们将学习使用 vertex api,并利用边的转换。
使用顶点构建图
构建图不是一项简单的任务;我们需要提供顶点和它们之间的边。让我们专注于第一部分。第一部分包括我们的users
,users
是一个vertexid
和string
的 rdd,如下所示:
package com.tomekl007.chapter_7
import org.apache.spark.sparkcontext
import org.apache.spark.graphx.{edge, graph, vertexid}
import org.apache.spark.rdd.rdd
import org.apache.spark.sql.sparksession
import org.scalatest.funsuite
class vertexapi extends funsuite {
val spark: sparkcontext = sparksession.builder().master("local[2]").getorcreate().sparkcontext
test("should use vertex api") {
//given
val users: rdd[(vertexid, (string))] =
spark.parallelize(array(
(1l, "a"),
(2l, "b"),
(3l, "c"),
(4l, "d")
))
vertexid
是long
类型;这只是long
的type
别名:
type vertexid = long
但由于我们的图有时包含大量内容,vertexid
应该是唯一的且非常长的数字。我们的顶点 rdd 中的每个顶点都应该有一个唯一的vertexid
。与顶点关联的自定义数据可以是任何类,但我们将选择使用string
类来简化。首先,我们创建一个 id 为1
的顶点和字符串数据a
,下一个 id 为2
的顶点和字符串数据b
,下一个 id 为3
的顶点和字符串数据c
,以及 id 为4
的数据和字符串d
,如下所示:
val users: rdd[(vertexid, (string))] =
spark.parallelize(array(
(1l, "a"),
(2l, "b"),
(3l, "c"),
(4l, "d")
))
仅从顶点创建图是正确的,但并不是非常有用。图是查找数据之间关系的最佳方式,这就是为什么图是社交网络的主要构建块。
创建夫妻关系
在这一部分,我们将创建顶点之间的夫妻关系和边缘。在这里,我们将有一个关系,即edge
。edge
是来自org.apache.spark.graphx
包的一个样例类。它稍微复杂一些,因为我们需要指定源顶点 id 和目标顶点 id。我们想要指定顶点 id1
和2
有一个关系,所以让我们为这个关系创建一个标签。在下面的代码中,我们将指定顶点 id1
和 id2
为friend
,然后我们还将指定顶点 id1
和 id3
也为friend
。最后,顶点 id2
和 id4
将是wife
:
val relationships =
spark.parallelize(array(
edge(1l, 2l, "friend"),
edge(1l, 3l, "friend"),
edge(2l, 4l, "wife")
))
此外,标签可以是任何类型-它不需要是string
类型;我们可以输入我们想要的内容并传递它。一旦我们有了我们的顶点、用户和边缘关系,我们就可以创建一个图。我们使用graph
类的apply
方法来构建我们的 spark graphx 图。我们需要传递users
、vertexid
和relationships
,如下所示:
返回的graph
是一个 rdd,但它是一个特殊的 rdd:
val graph = graph(users, relationships)
当我们转到graph
类时,我们会看到graph
类有一个顶点的 rdd 和一个边缘的 rdd,所以graph
类是两个 rdd 的伴生对象,如下截图所示:
我们可以通过发出一些方法来获取vertices
和edges
的基础 rdd。例如,如果要获取所有顶点,我们可以映射所有顶点,我们将获取属性和vertexid
。在这里,我们只对属性感兴趣,我们将其转换为大写,如下所示:
val res = graph.mapvertices((_, att) => att.touppercase())
以下是属性:
val users: rdd[(vertexid, (string))] =
spark.parallelize(array(
(1l, "a"),
(2l, "b"),
(3l, "c"),
(4l, "d")
))
一旦我们将其转换为大写,我们可以收集所有顶点并执行tolist()
,如下所示:
println(res.vertices.collect().tolist)
}
}
我们可以看到,在对值应用转换后,我们的图具有以下顶点:
使用 edge api
在这一部分,我们将使用 edge api 构建图。我们还将使用顶点,但这次我们将专注于边缘转换。
使用边缘构建图
正如我们在前面的部分中看到的,我们有边缘和顶点,这是一个 rdd。由于这是一个 rdd,我们可以获取一个边缘。我们有许多可用于普通 rdd 的方法。我们可以使用max
方法、min
方法、sum
方法和所有其他操作。我们将应用reduce
方法,因此reduce
方法将获取两个边缘,我们将获取e1
、e2
,并对其执行一些逻辑。
e1
边缘是一个具有属性、目的地和源的边缘,如下截图所示:
由于边缘将两个顶点链接在一起,我们可以在这里执行一些逻辑。例如,如果e1
边缘属性等于friend
,我们希望使用filter
操作提升一个边缘。因此,filter
方法只获取一个边缘,然后如果边缘e1
是friend
,它将被自动感知。我们可以看到最后我们可以collect
它并执行tolist
,以便 spark 上的 api 可供我们使用。以下代码将帮助我们实现我们的逻辑:
import org.apache.spark.sparkcontext
import org.apache.spark.graphx.{edge, graph, vertexid}
import org.apache.spark.rdd.rdd
import org.apache.spark.sql.sparksession
import org.scalatest.funsuite
class edgeapi extends funsuite {
val spark: sparkcontext = sparksession.builder().master("local[2]").getorcreate().sparkcontext
test("should use edge api") {
//given
val users: rdd[(vertexid, (string))] =
spark.parallelize(array(
(1l, "a"),
(2l, "b"),
(3l, "c"),
(4l, "d")
))
val relationships =
spark.parallelize(array(
edge(1l, 2l, "friend"),
edge(1l, 3l, "friend"),
edge(2l, 4l, "wife")
))
val graph = graph(users, relationships)
//when
val resfromfilter = graph.edges.filter((e1) => e1.attr == "friend").collect().tolist
println(resfromfilter)
它还具有标准 rdd 的一些方法。例如,我们可以执行一个 map edge,它将获取一个边缘,我们可以获取一个属性,并将每个标签映射为大写,如下所示:
val res = graph.mapedges(e => e.attr.touppercase)
在图上,我们还可以执行边缘分组。边缘分组类似于group by
,但仅适用于边缘。
输入以下命令以打印线路映射边缘:
println(res.edges.collect().tolist)
让我们开始我们的代码。我们可以在输出中看到,我们的代码已经过滤了wife
边缘-我们只能感知从顶点 id1
到 id2
的friend
边缘,以及从顶点 id1
到 id3
的边缘,并将边缘映射如下截图所示:
计算顶点的度
在本节中,我们将涵盖总度数,然后将其分为两部分——入度和出度——并且我们将了解这在代码中是如何工作的。
对于我们的第一个测试,让我们构建我们已经了解的图:
package com.tomekl007.chapter_7
import org.apache.spark.sparkcontext
import org.apache.spark.graphx.{edge, graph, vertexid}
import org.apache.spark.rdd.rdd
import org.apache.spark.sql.sparksession
import org.scalatest.funsuite
import org.scalatest.matchers._
class calculatedegreetest extends funsuite {
val spark: sparkcontext = sparksession.builder().master("local[2]").getorcreate().sparkcontext
test("should calculate degree of vertices") {
//given
val users: rdd[(vertexid, (string))] =
spark.parallelize(array(
(1l, "a"),
(2l, "b"),
(3l, "c"),
(4l, "d")
))
val relationships =
spark.parallelize(array(
edge(1l, 2l, "friend"),
edge(1l, 3l, "friend"),
edge(2l, 4l, "wife")
))
我们可以使用degrees
方法获得度。degrees
方法返回vertexrdd
,因为degrees
是一个顶点:
val graph = graph(users, relationships)
//when
val degrees = graph.degrees.collect().tolist
结果如下:
//then
degrees should contain thesameelementsas list(
(4l, 1l),
(2l, 2l),
(1l, 2l),
(3l, 1l)
)
}
上面的代码解释了对于vertexid 4l
实例,只有一个关系,因为2l
和4l
之间存在关系。
然后,对于vertexid 2l
实例,有两个,分别是1l, 2l
和2l, 4l
。对于vertexid 1l
实例,有两个,分别是1l, 2l
和1l, 3l
,对于vertexid 3l
,只有一个关系,即1l
和3l
之间。通过这种方式,我们可以检查我们的图是如何耦合的,以及有多少关系。我们可以通过对它们进行排序来找出哪个顶点最知名,因此我们可以看到我们的测试在下面的截图中通过了。
入度
入度告诉我们有多少个顶点进入第二个顶点,但反之则不然。这次,我们可以看到对于vertexid 2l
实例,只有一个入站顶点。我们可以看到2l
与1l
有关系,3l
也与1l
有关系,4l
与1l
有关系。在下面的结果数据集中,将没有vertexid 1l
的数据,因为1l
是输入。所以,1l
只会是一个源,而不是目的地:
test("should calculate in-degree of vertices") {
//given
val users: rdd[(vertexid, (string))] =
spark.parallelize(array(
(1l, "a"),
(2l, "b"),
(3l, "c"),
(4l, "d")
))
val relationships =
spark.parallelize(array(
edge(1l, 2l, "friend"),
edge(1l, 3l, "friend"),
edge(2l, 4l, "wife")
))
val graph = graph(users, relationships)
//when
val degrees = graph.indegrees.collect().tolist
//then
degrees should contain thesameelementsas list(
(2l, 1l),
(3l, 1l),
(4l, 1l)
)
}
入度的前面特征是一个非常有用的属性。当我们无法找出哪些页面非常重要,因为它们通过页面而不是从页面链接时,我们使用入度。
通过运行这个测试,我们可以看到它按预期工作:
出度
出度解释了有多少个顶点出去。这次,我们将计算边缘、关系的源,而不是目的地,就像我们在入度方法中所做的那样。
为了获得出度,我们将使用以下代码:
val degrees = graph.outdegrees.collect().tolist
outdegrees
方法包含rdd
和vertexrdd
,我们使用collect
和tolist
方法将其收集到列表中。
在这里,vertexid 1l
应该有两个出站顶点,因为1l, 2l
和1l, 3l
之间存在关系:
test("should calculate out-degree of vertices") {
//given
val users: rdd[(vertexid, (string))] =
spark.parallelize(array(
(1l, "a"),
(2l, "b"),
(3l, "c"),
(4l, "d")
))
val relationships =
spark.parallelize(array(
edge(1l, 2l, "friend"),
edge(1l, 3l, "friend"),
edge(2l, 4l, "wife")
))
val graph = graph(users, relationships)
//when
val degrees = graph.outdegrees.collect().tolist
//then
degrees should contain thesameelementsas list(
(1l, 2l),
(2l, 1l)
)
}
}
另外,vertexid 2l
应该有一个出站顶点,因为2l
和4l
之间存在关系,而反之则不然,如前面的代码所示。
我们将运行这个测试并得到以下输出:
计算 pagerank
在本节中,我们将加载关于用户的数据并重新加载关于他们关注者的数据。我们将使用图形 api 和我们的数据结构,并计算 pagerank 来计算用户的排名。
首先,我们需要加载edgelistfile
,如下所示:
package com.tomekl007.chapter_7
import org.apache.spark.graphx.graphloader
import org.apache.spark.sql.sparksession
import org.scalatest.funsuite
import org.scalatest.matchers._
class pageranktest extends funsuite {
private val sc = sparksession.builder().master("local[2]").getorcreate().sparkcontext
test("should calculate page rank using graphx api") {
//given
val graph = graphloader.edgelistfile(sc, getclass.getresource("/pagerank/followers.txt").getpath)
我们有一个followers.txt
文件;以下截图显示了文件的格式,与我们在创建加载器组件部分看到的文件类似:
我们可以看到每个顶点 id 之间存在关系。因此,我们从followers.txt
文件加载graph
,然后发出 pagerank。我们将需要vertices
,如下所示:
val ranks = graph.pagerank(0.0001).vertices
pagerank 将计算我们的顶点之间的影响和关系。
加载和重新加载关于用户和关注者的数据
为了找出哪个用户有哪个名字,我们需要加载users.txt
文件。users.txt
文件将vertexid
分配给用户名和自己的名字。我们使用以下代码:
val users = sc.textfile(getclass.getresource("/pagerank/users.txt").getpath).map { line =>
以下是users.txt
文件:
我们在逗号上拆分,第一组是我们的整数,它将是顶点 id,然后fields(1)
是顶点的名称,如下所示:
val fields = line.split(",")
(fields(0).tolong, fields(1))
}
接下来,我们将使用join
将users
与ranks
连接起来。我们将使用用户的vertexid
通过用户的username
和rank
来join
users
。一旦我们有了这些,我们就可以按rank
对所有内容进行排序,所以我们将取元组的第二个元素,并且应该按sortby ((t) =>t.2
进行排序。在文件的开头,我们将拥有影响力最大的用户:
//when
val rankbyusername = users.join(ranks).map {
case (_, (username, rank)) => (username, rank)
}.sortby((t) => t._2, ascending = false)
.collect()
.tolist
我们将打印以下内容并按rankbyusername
进行排序:
println(rankbyusername)
//then
rankbyusername.map(_._1) should contain thesameelementsinorderas list(
"barackobama",
"ladygaga",
"odersky",
"jeresig",
"matei_zaharia",
"justinbieber"
)
}
}
如果我们跳过sortby
方法,spark 不保证元素的任何排序;为了保持排序,我们需要使用sortby
方法。
在运行代码后,我们得到以下输出:
当我们开始运行这个测试时,我们可以看到 graphx pagerank 是否能够计算出我们用户的影响力。我们得到了前面截图中显示的输出,其中barackobama
的影响力最大为1.45
,然后是ladygaga
,影响力为1.39
,odersky
为1.29
,jeresig
为0.99
,matai_zaharia
为0.70
,最后是justinbieber
,影响力为0.15
。
根据前面的信息,我们能够用最少的代码计算复杂的算法。
总结
在本章中,我们深入研究了转换和操作,然后学习了 spark 的不可变设计。我们研究了如何避免洗牌以及如何减少运营成本。然后,我们看了如何以正确的格式保存数据。我们还学习了如何使用 spark 键/值 api 以及如何测试 apache spark 作业。之后,我们学习了如何从数据源创建图形,然后研究并尝试了边缘和顶点 api。我们学习了如何计算顶点的度。最后,我们看了 pagerank 以及如何使用 spark graphicx api 进行计算。
发表评论