当前位置: 代码网 > it编程>游戏开发>ar > 大数据学习之Spark基础

大数据学习之Spark基础

2024年08月04日 ar 我要评论
后一个RDD中的分区数据,除KV函数以外,对应的是前一个RDD中的分区数据所进行逻辑处理后的结果。当重复触发相同的执行的时候,对于同一个DAG有向无环图而言,会直接从shuffle之后的RDD开始执行(省略从前一个RDD写数据到磁盘中的过程),可以直接从磁盘读取数据。1)窄依赖 前一个RDD中的某一个分区数据只会到后一个RDD中的某唯一分区中 一对一(也可能前多个分区到后一个分区中)的关系。RDD中流动的数据,可能会来自不同的datanode中的block块数据。

spark基础

简述

1、spark作业执行的特点:
2、rdd: 弹性分布式数据集
rdd的5大特性:(面试必问!)
1、rdd是由一系列分区构成
2、算子是作用在每一个分区上的(每一个分区都会处理)
3、rdd与rdd之间存在一些依赖关系
4、kv算子只能作用在kv的rdd上
5、spark会提供最优的任务计算方式,只移动计算,不移动数据。
spark实例:wordcount
object wordcount2 {
  def main(args: array[string]): unit = {
    //创建spark配置文件对象
    val conf: sparkconf = new sparkconf()
    //设置运行模式
    //如果是本地local模式运行的话,需要设置setmaster
    //将来如果是集群进行,将这句话注释即可
    conf.setmaster("local")
    //设置spark作业的名字
    conf.setappname("wordcount")

    //创建spark core上下文环境对象
    val sc: sparkcontext = new sparkcontext(conf)
    //===================================================================================

    //读取文件,每次读取一行
    //rdd是spark core中的核心数据结构,将来运行的时候,数据会在rdd之间流动,默认基于内存计算
    val linesrdd: rdd[string] = sc.textfile("spark/data/wcs/*")
    //    println(s"linesrdd的分区数:${linesrdd.getnumpartitions}")

    //一行数据根据分隔符分割
    val wordrdd: rdd[string] = linesrdd.flatmap(_.split("\\|"))
    //    println(s"wordrdd的分区数:${wordrdd.getnumpartitions}")


    //将每一个单词组成(word,1)
    val kvrdd: rdd[(string, int)] = wordrdd.map((_, 1))
        println(s"kvrdd的分区数:${kvrdd.getnumpartitions}")

    //根据键进行分组,并设置分区数为 5
    val kvrdd2: rdd[(string, iterable[(string, int)])] = kvrdd.groupby(_._1,numpartitions = 5)
        println(s"kvrdd2的分区数:${kvrdd2.getnumpartitions}")

    val resrdd: rdd[(string, int)] = kvrdd2.map((e: (string, iterable[(string, int)])) => (e._1, e._2.size))
        println(s"resrdd的分区数:${resrdd.getnumpartitions}")

    //打印
    resrdd2.foreach(println)

    //指定的是所要写入数据的文件夹的路径
    //spark如果是local本地运行的话,会将本地文件系统看作一个hdfs文件系统
    resrdd.saveastextfile("spark/data/outdata1")

  }
}
spark中rdd调用的函数,称之为算子
1、转换算子
1)map
import org.apache.spark.rdd.rdd
import org.apache.spark.{sparkconf, sparkcontext}

object demo1map {
  def main(args: array[string]): unit = {
    val conf = new sparkconf()
    conf.setmaster("local")
    conf.setappname("map算子演示")
    val sc: sparkcontext = new sparkcontext(conf)

    val linerdd: rdd[string] = sc.textfile("spark/data/students.txt")
    //map操作算子:将rdd中的数据依次取出,传递给后面函数逻辑,将计算后的数据返回到新的rdd中
    //将rdd中的数据依次取出,处理完的数据返回下一个rdd直接继续执行后续的逻辑
    val rdd2: rdd[(string,string,string,string,string)] = linerdd.map((line: string) => {
      println("==============处理后的数据========================")
      val array1: array[string] = line.split(",")
      (array1(0),array1(1),array1(2),array1(3),array1(4))
    })

    //foreach是一个行动算子,遇到行动算子,触发作业执行
    /**
     * 转换操作(转换算子中定义了操作逻辑)仅仅是定义了数据应该如何被转换,而不会立即执行。
     * 只有当需要计算一个结果时(即调用行动算子时),才会执行。
     * 打印结果:
     * ==============处理后的数据========================
     * (1500100001,施笑槐,22,女,文科六班)
     * ==============处理后的数据========================
     * (1500100002,吕金鹏,24,男,文科六班)
     *每次调用行动算子(foreach)打印一条数据,都会是整个rdd重新执行一次(所有rdd的执行关系是一个有向无环图)
     */
    rdd2.foreach(println)

  }

}
2)filter
import org.apache.spark.{sparkconf, sparkcontext}
import org.apache.spark.rdd.rdd

object demo2filter {
  def main(args: array[string]): unit = {
    val conf = new sparkconf()
    conf.setmaster("local")
    conf.setappname("map算子演示")
    val sc: sparkcontext = new sparkcontext(conf)

    //===============================================================
    val linerdd: rdd[string] = sc.textfile("spark/data/students.txt")
    //需求:过滤出所有的男生
    //filter转换算子:将rdd中的数据依次取出,传递给后面的函数,跟map一样,也是依次传递一条
    // 若不匹配,则无数据在rdd间流动,在下面执行.foreach(println)时也无数据进行打印,
    // 但是判断中的println()属于scala,并不受影响
    val genderrdd: rdd[string] = linerdd.filter((line: string) => {
      var b: boolean = false
      if ("女".equals(line.split(",")(3))) {
        println("============这是女生==================")
      } else {
        println("============这是男生==================")
        b = "男".equals(line.split(",")(3))
      }
      b
    })

    genderrdd.foreach(println)

  }
}
3)flatmap
import org.apache.spark.{sparkconf, sparkcontext}
import org.apache.spark.rdd.rdd

object demo3flatmap {
  def main(args: array[string]): unit = {
    val conf = new sparkconf()
    conf.setmaster("local")
    conf.setappname("map算子演示")
    val sc: sparkcontext = new sparkcontext(conf)

    //===============================================================
    val linerdd: rdd[string] = sc.textfile("spark/data/wcs/words.txt")

    /**
     * flatmap: 将rdd中的每一条数据传递给后面的函数,最终将返回的数组或者是序列进行扁平化,返回给新的集合
     * 由于flatmap会“扁平化”结果,因此words rdd将包含所有分割后的单词,而不是单词数组(返回一个元素为单个单词的集合)。
     * 打印结果:
     * ===============一条数据====================
     * hello
     * world
     * ===============一条数据====================
     * java
     * hadoop
     * linux
     */
    val rdd1: rdd[string] = linerdd.flatmap((line:string)=>{
      println("===============一条数据====================")
      line.split("\\|")
    })

    rdd1.foreach(println)


  }
}
4)sample
import org.apache.spark.{sparkconf, sparkcontext}
import org.apache.spark.rdd.rdd

object demo4sample {
  def main(args: array[string]): unit = {
    val conf = new sparkconf()
    conf.setmaster("local")
    conf.setappname("map算子演示")
    val sc: sparkcontext = new sparkcontext(conf)

    //===============================================================
    val linerdd: rdd[string] = sc.textfile("spark/data/students.txt")

    /**
     * sample抽样,1000条数据,抽0.1比例,结果的数量在100左右
     * 这个函数主要在机器学习的时候会用到
     * withreplacement :
     * 为true时,抽样结果中可能会包含重复的元素。
     * 为false时,抽样结果中不会包含重复的元素。
     * fraction:这是一个浮点数(double),指定了抽样的比例,取值范围在[0, 1]之间。
     */
    val rdd1: rdd[string] = linerdd.sample(withreplacement = false, fraction = 0.1)

    rdd1.foreach(println)
  }
}
5)groupby
import org.apache.spark.rdd.rdd
import org.apache.spark.{sparkconf, sparkcontext}

object demo5groupby {
  def main(args: array[string]): unit = {
    val conf: sparkconf = new sparkconf()
      .setmaster("local")
      .setappname("groupby")

    val sc: sparkcontext = new sparkcontext(conf)

    //===================================================
    val linesrdd: rdd[string] = sc.textfile("spark/data/students.txt")
    //求每个班级的平均年龄
    val arrayrdd: rdd[array[string]] = linesrdd.map((line: string) => line.split(","))

    //像这种rdd中的元素是(key,value)类型的,我们将这种rdd称之为键值对rdd(kv格式rdd)
    val clazzwithagerdd: rdd[(string, int)] = arrayrdd.map {
      case array(_, _, age: string, _, clazz: string) =>
        (clazz, age.toint)
    }

    /**
     * groupby算子的使用
     *
     * 1、groupby的算子,后面的分组条件是我们自己指定的
     * 2、spark中groupby之后的,所有值会被封装到一个iterable迭代器中存储(与scala中不同)
     * 输出:
     * (理科二班,22.556962025316455)
     * (文科三班,22.680851063829788)
     * (理科四班,22.63736263736264)
     * (理科一班,22.333333333333332)
     * (文科五班,22.30952380952381)
     */
    // val map: map[string, list[score]] = scorelist.groupby((s: score) => s.id)
    val grouprdd: rdd[(string, iterable[(string, int)])] = clazzwithagerdd.groupby(_._1)
//    grouprdd.foreach(println)

    val reskvrdd: rdd[(string, double)] = grouprdd.map((kv: (string, iterable[(string, int)])) => {
      val clazz: string = kv._1
      val avgage: double = kv._2.map(_._2).sum.todouble / kv._2.size

      (clazz, avgage)
    })
    reskvrdd.foreach(println)

//    while (true){
//
//    }
  }
}

在这里插入图片描述

6)groupbykey
import org.apache.spark.{sparkconf, sparkcontext}
import org.apache.spark.rdd.rdd

object demo6groupbykey {
  def main(args: array[string]): unit = {
    val conf: sparkconf = new sparkconf()
      .setmaster("local")
      .setappname("groupbykey")

    val sc: sparkcontext = new sparkcontext(conf)

    //===================================================
    val linesrdd: rdd[string] = sc.textfile("spark/data/students.txt")
    //求每个班级的平均年龄
    val arrayrdd: rdd[array[string]] = linesrdd.map((line: string) => line.split(","))


    //像这种rdd中的元素是(key,value)类型的,我们将这种rdd称之为键值对rdd(kv格式rdd)
    val clazzwithagerdd: rdd[(string, int)] = arrayrdd.map {
      case array(_, _, age: string, _, clazz: string) =>
        (clazz, age.toint)
    }

    /**
     * groupbykey属于kv格式的算子,只能作用在kv格式的rdd上
     * 也就说,只有kv格式的rdd才能调用kv格式的算子
     * 输出:
     * (理科二班,22.556962025316455)
     * (文科三班,22.680851063829788)
     * (理科四班,22.63736263736264)
     * (理科一班,22.333333333333332)
     * (文科五班,22.30952380952381)
     */
    val groupbykeyrdd: rdd[(string, iterable[int])] = clazzwithagerdd.groupbykey()

    val reskvrdd2: rdd[(string, double)] = groupbykeyrdd.map((kv: (string, iterable[int])) => (kv._1, kv._2.sum.todouble / kv._2.size))
    reskvrdd2.foreach(println)

    /**
     * 面试题:spark core中 groupby算子与groupbykey算子的区别?
     * 1、代码格式上:
     * groupby的分组条件可以自己指定,并且绝大部分的rdd都可以调用该算子,返回的是键和元素本身组成的迭代器构成的kv格式rdd
     * groupbykey算子,只能由kv格式的rdd进行调用,分组的条件会自动根据键进行分组,不需要在自己指定,返回的是键和值组成的迭代器构成的kv格式rdd
     *
     * 2、执行shuffle数据量来看
     *  groupby产生的shuffle数据量在一定程度上要大于groupbykey产生的shuffle数据量
     *  所以groupbykey算子的执行效率要比groupby算子的执行效率要高
     */

    while (true) {

    }
  }
}

在这里插入图片描述

7)reducebykey
import org.apache.spark.{sparkconf, sparkcontext}
import org.apache.spark.rdd.rdd

object demo7reducebykey {
  def main(args: array[string]): unit = {

    val conf: sparkconf = new sparkconf()
      .setmaster("local")
      .setappname("reducebykey")

    val sc: sparkcontext = new sparkcontext(conf)

    //===================================================
    val linesrdd: rdd[string] = sc.textfile("spark/data/score.txt")
    //求每个班级的平均年龄
    val arrayrdd: rdd[array[string]] = linesrdd.map((line: string) => line.split(","))
    //分别使用groupbykey和reducebykey计算每个学生的总分
    val idwithscorerdd: rdd[(string, int)] = arrayrdd.map {
      case array(id: string, _, score: string) =>
        (id, score.toint)
    }

    /**
     * groupbykey实现
     */
//        val kvrdd1: rdd[(string, iterable[int])] = idwithscorerdd.groupbykey()
//        val resrdd1: rdd[(string, int)] = kvrdd1.map((kv: (string, iterable[int])) => (kv._1, kv._2.sum))
//        resrdd1.foreach(println)

    /**
     * reducebykey实现
     * 输出:
     * (1500100113,519)
     * (1500100724,440)
     * (1500100369,376)
     * (1500100378,402)
     * (1500100306,505)
     * (1500100578,397)
     */
    val resrdd2: rdd[(string, int)] = idwithscorerdd.reducebykey((v1: int, v2: int) => v1 + v2)
    resrdd2.foreach(println)


    /**
     * 面试题:
     * groupbykey与reducebykey的区别?
     * 相同点:
     * 它们都是kv格式的算子,只有kv格式的rdd才能调用
     * 不同点:
     * 1)groupbykey只是单纯地根据键进行分组,分组后的逻辑可以在后续的处理中调用其他的算子实现
     * 2)reducebykey 相当于mr中的预聚合,所以shuffle产生的数据量要比groupbykey中shuffle产生的数据量少,效率高,速度要快一些
     * 3)groupbykey的灵活度要比reducebykey灵活度要高,reducebykey无法做一些复杂的操作,比如方差。但是groupbykey可以在分组之后的rdd进行方差操作
     */

    while (true){

    }
  }
}

在这里插入图片描述

8)union
import org.apache.spark.rdd.rdd
import org.apache.spark.{sparkconf, sparkcontext}

object demo8union {
  def main(args: array[string]): unit = {
    val conf: sparkconf = new sparkconf()
      .setmaster("local")
      .setappname("reducebykey")

    val sc: sparkcontext = new sparkcontext(conf)

    //===================================================
    //parallelize:将scala的集合变成spark中的rdd
    val rdd1: rdd[(string, string)] = sc.parallelize(list(
      ("1001", "张三"),
      ("1002", "张三2"),
      ("1003", "张三3"),
      ("1004", "张三4"),
      ("1005", "张三5")
    ))
    println(s"rdd1的分区数:${rdd1.getnumpartitions}")

    val rdd2: rdd[(string, string)] = sc.parallelize(list(
      ("1006", "李四6"),
      ("1007", "李四7"),
      ("1003", "张三3"),
      ("1008", "李四8"),
      ("1009", "李四9")
    ))
    println(s"rdd2的分区数:${rdd2.getnumpartitions}")

    val rdd3: rdd[(string, int)] = sc.parallelize(list(
      ("1006", 111),
      ("1007", 22),
      ("1003", 33),
      ("1008", 444),
      ("1009", 55)
    ))

    //两个rdd要想进行union合并,必须保证元素的格式和数据类型是一致的
    //分区数也会进行合并,最终的分区数由两个rdd总共的分区数决定
    //    rdd1.union(rdd3)
    val resrdd1: rdd[(string, string)] = rdd1.union(rdd2)
    resrdd1.foreach(println)
    println(s"resrdd1的分区数:${resrdd1.getnumpartitions}")

    /**
     * 输出:
     * rdd1的分区数:1
     * rdd2的分区数:1
     * (1001,张三)
     * (1002,张三2)
     * (1003,张三3)
     * (1004,张三4)
     * (1005,张三5)
     * (1006,李四6)
     * (1007,李四7)
     * (1003,张三3)
     * (1008,李四8)
     * (1009,李四9)
     * resrdd1的分区数:2
     */
  }
}

9)join

import org.apache.spark.{sparkconf, sparkcontext}
import org.apache.spark.rdd.rdd

/**
 * join算子也要作用在kv格式的rdd上
 */
object demo9join {
  def main(args: array[string]): unit = {
    val conf: sparkconf = new sparkconf()
      .setmaster("local")
      .setappname("reducebykey")

    val sc: sparkcontext = new sparkcontext(conf)

    //===================================================
    //parallelize:将scala的集合变成spark中的rdd
    val rdd1: rdd[(string, string)] = sc.parallelize(list(
      ("1001", "张三"),
      ("1002", "李四"),
      ("1003", "王五"),
      ("1004", "小明"),
      ("1005", "小红")
    ))

    val rdd2: rdd[(string, string)] = sc.parallelize(list(
      ("1001", "看美女"),
      ("1002", "看综艺"),
      ("1003", "看八卦"),
      ("1004", "打游戏"),
      ("1009", "学习")
    ))

    /**
     * join 内连接
     * right join 右连接
     * left join 左连接
     * full join 全连接
     */
    // join 内连接 两个rdd共同拥有的键才会进行关联
    /**
     * (1001,张三,看美女)
     * (1002,李四,看综艺)
     * (1004,小明,打游戏)
     * (1003,王五,看八卦)
     */
        val resrdd1: rdd[(string, (string, string))] = rdd1.join(rdd2)
        val resrdd2: rdd[(string, string, string)] = resrdd1.map {
          case (id: string, (name: string, like: string)) =>
            (id, name, like)
        }
        resrdd2.foreach(println)

    //right join 右连接 保证右边rdd键的完整性
    /**
     * (1001,张三,看美女)
     * (1002,李四,看综艺)
     * (1004,小明,打游戏)
     * (1009,查无此人,学习)
     * (1003,王五,看八卦)
     */
        val resrdd2: rdd[(string, (option[string], string))] = rdd1.rightouterjoin(rdd2)
        val resrdd3: rdd[(string, string, string)] = resrdd2.map {
          case (id: string, (some(name), like: string)) =>
            (id, name, like)
          case (id: string, (none, like: string)) =>
            (id, "查无此人", like)
        }
        resrdd3.foreach(println)

    //left join: 左连接
    /**
     * (1005,小红,此人无爱好)
     * (1001,张三,看美女)
     * (1002,李四,看综艺)
     * (1004,小明,打游戏)
     * (1003,王五,看八卦)
     */
    val resrdd1: rdd[(string, (string, option[string]))] = rdd1.leftouterjoin(rdd2)
    val resrdd2: rdd[(string, string, string)] = resrdd1.map {
      case (id: string, (name: string, some(like: string))) =>
        (id, name, like)
      case (id: string, (name: string, none)) =>
        (id, name, "此人无爱好")
    }
    resrdd2.foreach(println)


    //全连接,保证所有的键、值的完整
    /**
     * (1005,小红,此人无爱好)
     * (1001,张三,看美女)
     * (1002,李四,看综艺)
     * (1004,小明,打游戏)
     * (1009,查无此人,学习)
     * (1003,王五,看八卦)
     */
    val resrdd2: rdd[(string, (option[string], option[string]))] = rdd1.fullouterjoin(rdd2)
    val resrdd3: rdd[(string, string, string)] = resrdd2.map {
      case (id: string, (some(name), some(like))) =>
        (id, name, like)
      case (id: string, (some(name), none)) =>
        (id, name, "此人无爱好")
      case (id: string, (none, some(like))) =>
        (id, "查无此人", like)
    }
    resrdd3.foreach(println)


  }
}
(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com