目录
一、spark运行环境初始化
任何spark程序都是以sparkcontext对象开始的,因为sparkcontext是spark应用程序的上下文和入口,无论是scala、python、r程序,都是通过sparkcontext对象的实例来创建rdd,spark shell中的sc就是sparkcontext对象的实例。因此在实际spark应用程序的开发中,在main方法中需要创建sparkcontext对象,作为spark应用程序的入口,并在spark程序结束时关闭sparkcontext对象。
初始化sparkcontext需要一个sparkconf对象,sparkconf包含了spark集群配置的各种参数,属性参数是一种键值对的格式,一般可以通过set(属性名,属性设置值)的方法修改属性。其中还包含了设置程序名setappname、设置运行模式setmaster等方法。
例如:
二、rdd的创建
rdd是一个容错的、只读的、可进行并行操作的数据结构,是一个分布在集群各个节点中的存放元素的集合。rdd的创建有3种不同的方法。
- 第一种是将程序中已存在的seq集合(如集合、列表、数组)转换成rdd。
- 第二种是对已有rdd进行转换得到新的rdd,这两种方法都是通过内存中已有的集合创建rdd的。
- 第三种是直接读取外部存储系统的数据创建rdd。
从内存中读取数据
- parallelize()
parallelize()方法有两个输入参数,说明如下。
- 要转化的集合,必须是seq集合。seq表示序列,指的是一类具有一定长度的、可迭代访问的对象,其中每个数据元素均带有一个从0开始的、固定的索引。
- 分区数,若不设分区数,则rdd的分区数默认为该程序分配到的资源的cpu核心数。
2. makerdd()
makerdd()方法有两种使用方式。
- 第一种方式的使用与parallelize()方法一致;
- 第二种方式是通过接收一个是seq[(t,seq[string])]参数类型创建rdd。
生成的rdd中保存的是t的值,seq[string]部分的数据会按照seq[(t,seq[string])]的顺序存放到各个分区中,一个seq[string]对应存放至一个分区,并为数据提供位置信息,通过preferredlocations()方法可以根据位置信息查看每一个分区的值。调用makerdd()时不可以直接指定rdd的分区个数,分区的个数与seq[string]参数的个数是保持一致的。
从外部存储系统中读取数据
- 从外部存储系统中读取数据创建rdd是指直接读取存放在文件系统中的数据文件创建rdd。
- 从内存中读取数据创建rdd的方法常用于测试,从外部存储系统中读取数据创建rdd才是用于实践操作的常用方法。
- 从外部存储系统中读取数据创建rdd可以有很多种数据来源,可通过sparkcontext对象的textfile()方法读取数据集,该方法支持多种类型的数据集,如目录、文本文件、压缩文件和通配符匹配的文件等,并且允许设定分区个数。
- 分别读取hdfs文件和linux本地文件的数据并创建rdd,具体操作如下。
- 通过hdfs文件创建rdd
直接通过textfile()方法读取hdfs文件的位置即可。
- 通过linux本地文件创建rdd
本地文件的读取也是通过sc.textfile("路径")的方法实现的,在路径前面加上“file://”表示从linux本地文件系统读取。在intellij idea开发环境中可以直接读取本地文件;但在spark-shell中,要求在所有节点的相同位置保存该文件才可以读取它
转换操作可以将一个rdd转换为一个新的rdd,但是转换操作是懒操作,不会立刻执行计算;
行动操作是用于触发转换操作的操作,这时才会真正开始进行计算。
三、rdd常用的方法
单个rdd
1.map()方法转换数据
map()方法是一种基础的rdd转换操作,可以对rdd中的每一个数据元素通过某种函数进行转换并返回新的rdd。 map()方法是转换操作,不会立即进行计算。
2. sortby()方法进行排序
sortby()方法用于对标准rdd进行排序,有3个可输入参数,说明如下。
第1个参数是一个函数f:(t) => k,左边是要被排序对象中的每一个元素,右边返回的值是元素中要进行排序的值。(该参数是必须输入的)
第2个参数是ascending,决定排序后rdd中的元素是升序的还是降序的,默认是true,即升序排序,如果需要降序排序那么需要将参数的值设置为false。
第3个参数是numpartitions,决定排序后的rdd的分区个数,默认排序后的分区个数和排序之前的分区个数相等,即this.partitions.size。
#创建rdd
val data = sc.parallelize(list((1,3),(45,3),(7,6)))
#对元组第二个值进行降序排序,分区个数设置为1
val sort_data = data.sortby(x => x._2,false,1)
3.collect()方法查询数据
collect()方法是一种行动操作,以数组 array 的形式返回数据集的所有元素
collect()里将生成目录方法有以下两种操作方式。
- collect:直接调用collect返回该rdd中的所有元素,返回类型是一个array[t]数组。
- collect[u: classtag](f: partialfunction[t, u]):rdd[u]。这种方式需要提供一个标准的偏函数,将元素保存至一个rdd中。首先定义一个函数one,用于将collect方法得到的数组中数值里将生成目录为1的值替换为“one”,将其他值替换为“other”。
4.flatmap()方法转换数据
使用flatmap()方法时先进行map(映射)再进行flat(扁平化)操作,数据会先经过跟map一样的操作,为每一条输入返回一个迭代器(可迭代的数据类型),然后将所得到的不同级别的迭代器中的元素全部当成同级别的元素,返回一个元素级别全部相同的rdd。
这个转换操作通常用来切分单词。
5.take()方法查询某几个值
take(n)方法用于获取rdd的前n个元素,返回数据为数组。 take()与collect()方法的原理相似,collect()方法用于获取全部数据,take()方法获取指定个数的数据。
eg:获取rdd的前5个元素:
6.filter()方法进行过滤
filter()方法是一种转换操作,用于过滤rdd中的元素。
filter()方法需要一个参数,这个参数是一个用于过滤的函数,该函数的返回值为boolean类型。 filter()方法将返回值为true的元素保留,将返回值为false的元素过滤掉,最后返回一个存储符合过滤条件的所有元素的新rdd。
eg:创建一个rdd,并且过滤掉每个元组第二个值小于等于1的元素。
7.distinct()方法进行去重
distinct()方法是一种转换操作,用于rdd的数据去重,去除两个完全相同的元素,没有参数。
eg:创建一个带有重复数据的rdd,并使用distinct()方法去重。
两个rdd集合操作
1.union()方法 合并
union()方法是一种转换操作,用于将两个rdd合并成一个,不进行去重操作,而且两个rdd中每个元素中的值的个数、数据类型需要保持一致。 使用union()方法合并两个rdd
2.intersection()方法 交集
intersection()方法用于求出两个rdd的共同元素,即找出两个rdd的交集,参数是另一个rdd,先后顺序与结果无关。
eg:创建两个rdd,其中有相同的元素,通过intersection()方法求出两个rdd的交集。
3.subtract()方法 补集
subtract()方法用于将前一个rdd中在后一个rdd出现的元素删除,可以认为是求补集的操作,返回值为前一个rdd去除与后一个rdd相同元素后的剩余值所组成的新的rdd。两个rdd的顺序会影响结果。
eg:创建两个rdd,分别为rdd1和rdd2,包含相同元素和不同元素,通过subtract()方法求rdd1和rdd2彼此的补集。
4.cartesian()方法 笛卡尔积
cartesian()方法可将两个集合的元素两两组合成一组,即求笛卡儿积。
eg:创建两个rdd,分别有4个元素,通过cartesian()方法求两个rdd的笛卡儿积。
四、键值对rdd
spark的大部分rdd操作都支持所有种类的单值rdd,但是有少部分特殊的操作只能作用于键值对类型的rdd。
键值对rdd由一组组的键值对组成,这些rdd被称为pairrdd。pairrdd提供了并行操作各个键或跨节点重新进行数据分组的操作接口。
创建键值对rdd
将一个普通的rdd转化为一个pairrdd时可以使用map函数
键值对rdd的常用方法
1.keys()和values()方法
spark提供了两种方法,分别获取键值对rdd的键和值。
keys()返回一个仅包含键的rdd。
values()返回一个仅包含值的rdd。
2.reducebykey()方法
reducebykey()方法,一种转换操作,用于合并具有相同键的值,作用对象是键值对,并且只对每个键的值进行处理,当rdd中有多个键相同的键值对时,则会对每个键对应的值进行处理。
3.groupbykey()方法
groupbykey()方法用于对具有相同键的值进行分组,可以对同一组的数据进行计数、求和等操作。
多个rdd键的连接方法
与合并不同,连接会对键相同的值进行合并,连接方式多种多样,包含内连接、右外连接、左外连接、全外连接,不同的连接方式需要使用不同的连接方法。
1.join()方法连接两个rdd
join()方法用于根据键对两个rdd进行内连接,将两个rdd中键相同的数据的值存放在一个元组中,最后只返回两个rdd中都存在的键的连接结果。
例如,在两个rdd中分别有键值对(k,v)和(k,w),通过join()方法连接会返回(k,(v,w))。
2.rightouterjoin()方法
rightouterjoin()方法用于根据键对两个rdd进行右外连接,连接结果是右边rdd的所有键的连接结果,不管这些键在左边rdd中是否存在。
在rightouterjoin()方法中,如果在左边rdd中有对应的键,那么连接结果中值显示为some类型值;如果没有,那么显示为none值。
3.leftouterjoin()方法
leftouterjoin()方法用于根据键对两个rdd进行左外连接,与rightouterjoin()方法相反,返回结果保留左边rdd的所有键。
4.fullouterjoin()方法
fullouterjoin()方法用于对两个rdd进行全外连接,保留两个rdd中所有键的连接结果。
5.zip()方法
zip()方法用于将两个rdd组合成键值对rdd,要求两个rdd的分区数量以及元素数量相同,否则会抛出异常。
6.combinebykey()方法
combinebykey()方法用于将键相同的数据聚合,并且允许返回类型与输入数据的类型不同的返回值。
combinebykey()方法接收3个重要的参数,具体说明如下。
- createcombiner:v=>c,v是键值对rdd中的值部分,将该值转换为另一种类型的值c,c会作为每一个键的累加器的初始值。
- mergevalue:(c,v)=>c,该函数将元素v聚合到之前的元素c(createcombiner)上(这个操作在每个分区内进行)。
- mergecombiners:(c,c)=>c,该函数将两个元素c进行合并(这个操作在不同分区间进行)。
7.lookup()方法
lookup(key:k)方法作用于键值对rdd,返回指定键的所有值。
五、数据读取和保存
1.json文件
读取:
存储:
2.csv文件
读取:
存储:
3.sequencefile文件
读取:
存储:
4.文本文件
读取:
通过textfile()方法即可直接读取,一条记录(一行)作为一个元素。
存储:
rdd数据可以直接调用saveastextfile()方法将数据存储为文本文件。
发表评论