当前位置: 代码网 > 科技>电脑产品>内存 > Spark编程基础

Spark编程基础

2024年07月28日 内存 我要评论
Spark 是一种基于内存的快速、通用、可扩展的大数据分析计算引擎特点:运行速度快、容易使用、通用性、运行模式多样。

目录

一,spark设计与运行原理

1,spark简介

2,spark与hadoop对比

二,spark运行架构

1,rdd设计与运行原理

方法1.parallelize()

方法2.makerdd()

方法3.通过hdfs文件创建 rdd

方法4.通过 linux 本地文件创建 rdd

2,rdd方法归纳

1.使用map()方法转换数据​

2.使用 sortby()方法进行排序 ​

 3.使用flatmap()方法转换数据​

4.使用take()方法查询某几个值 ​

 5.使用union()方法合并多个rdd​

6.使用distinct()方法进行去重 ​

3, 使用简单的集合操作​

 (1)intersection()方法​

 (2)subtract()方法​

(3)cartesian()方法 ​

任务实现: 

三,spark快速上手

 1,创建maven项目

(1)增加scala插件​

(2)增加依赖关系 

 (3)wordcount

(4)异常处理

四,spark运行环境 

1,local模式

上传并解压缩文件

启动local环境

命令行工具 

退出本地模式

提交应用

五,spark运行架构

1,运行架构

2,核心概念

(1)executor与core


一,spark设计与运行原理

1,spark简介

spark 是一种基于内存的快速、通用、可扩展的大数据分析计算引擎

特点:运行速度快、容易使用、通用性、运行模式多样

2,spark与hadoop对比

hadoop存在的缺点:

表达能力有限 ,磁盘io开销大 ,延迟高

spark优点:

编程模型更灵活,迭代运算效率更高,任务调度机制更优

二,spark运行架构

1,rdd设计与运行原理

创建rdd

方法1.parallelize()

parallelizeo方法有两个输人参数,说明如下:
(1)要转化的集合:必须是 seq集合。seq 表示序列,指的是一类具有一定长度的、可迭代访问的对象,其中每个数据元素均带有一个从0开始的、固定的索引。
(2)分区数。若不设分区数,则rdd 的分区数默认为该程序分配到的资源的 cpu核心数。
通过 parallelizeo方法用一个数组的数据创建rdd,并设置分区数为4,创建后查看该 rdd 的分区数

方法2.makerdd()

makerdd0方法有两种使用方式,第一种使用方式与 parallelize0方法一致;第二种方式是通过接收一个 seq[(t,seq[string])]参数类型创建 rdd。第二种方式生成的rdd中保存的是t的值,seq[string]部分的数据会按照 seqf(t,seq[string])的顺序存放到各个分区中,一个 seq[stringl对应存放至一个分区,并为数据提供位置信息,通过preferredlocations0方法可以根据位置信息查看每一个分区的值。调用 makerdd0时不可以直接指定 rdd 的分区个数,分区的个数与 seq[string]参数的个数是保持一致的,使用 makerdd0方法创建 rdd,并根据位置信息查看每一个分区的值

方法3.通过hdfs文件创建 rdd

这种方式较为简单和常用,直接通过 textfile()方法读取 hdfs文件的位置即可。
在hdfs 的/user/toot 目录下有一个文件test.txt,读取该文件创建一个 rdd

方法4.通过 linux 本地文件创建 rdd

本地文件的读取也是通过 sc.textfile("路径")的方法实现的,在路径前面加上“file://”表示从linux 本地文件系统读取。在 intellijidea 开发环境中可以直接读取本地文件;但在 spark-shell 中,要求在所有节点的相同位置保存该文件才可以读取它,例如,在linux的/opt 目录下创建一个文件 test.txt,任意输入4行数据并保存,将 test.txt 文件远程传输至所有节点的/opt 目录下,才可以读取文件 test.txt。读取 test.txt 文件,并且统计文件的数据行数

2,rdd方法归纳

1.使用map()方法转换数据

2.使用 sortby()方法进行排序 

 3.使用flatmap()方法转换数据

4.使用take()方法查询某几个值 

 5.使用union()方法合并多个rdd

6.使用distinct()方法进行去重 

3, 使用简单的集合操作

 (1)intersection()方法

 (2)subtract()方法

(3)cartesian()方法 

任务实现: 

三,spark快速上手

 1,创建maven项目

(1)增加scala插件

(2)增加依赖关系 

    修改maven项目中的pom文件,增加spark框架的依赖关系

<dependencies>

        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->

        <dependency>

            <groupid>org.apache.spark</groupid>

            <artifactid>spark-core_2.12</artifactid>

            <version>2.4.5</version>

        </dependency>

    </dependencies>

    <build>

        <plugins>

            <!-- 该插件用于将 scala 代码编译成 class 文件 -->

            <plugin>

                <groupid>net.alchim31.maven</groupid>

                <artifactid>scala-maven-plugin</artifactid>

                <version>3.2.2</version>

                <executions>

                    <execution>

                        <!-- 声明绑定到 maven 的 compile 阶段 -->

                        <goals>

                            <goal>testcompile</goal>

                        </goals>

                    </execution>

                </executions>

            </plugin>

            <plugin>

                <groupid>org.apache.maven.plugins</groupid>

                <artifactid>maven-assembly-plugin</artifactid>

                <version>3.1.0</version>

                <configuration>

                    <descriptorrefs>

                        <descriptorref>jar-with-dependencies</descriptorref>

                    </descriptorrefs>

                </configuration>

                <executions>

                    <execution>

                        <id>make-assembly</id>

                        <phase>package</phase>

                        <goals>

                            <goal>single</goal>

                        </goals>

                    </execution>

                </executions>

            </plugin>

        </plugins>

    </build>

 (3)wordcount

   为了能直观地感受spark框架的效果,接下来我们实现一个大数据学科中最常见的教学案例         wordcount

/**
 * spark实现单词计数
 */
 
object wordcountspark {
 
  def main(args: array[string]): unit = {
 
    //创建spark运行配置对象
 
    val spark: sparkconf = new sparkconf()
 
      .setmaster("local[*]")
 
      .setappname("wordcountsparkapps")
 
    //创建spark上下文对象
 
    val sc: sparkcontext = new sparkcontext(spark)
 
    //读文件数据
 
    val wordsrdd: rdd[string] = sc.textfile("data/word.txt")
 
    //讲文件中的数据进行分词
 
    val word: rdd[string] = wordsrdd.flatmap(_.split(","))
 
    //转换数据结构word ---->(word,1)
 
    val word2: rdd[(string, int)] = word.map((_, 1))
 
    //将转换结构后的数据按照相同的单词进行分组聚合
 
    val word2countrdd: rdd[(string, int)] = word2.reducebykey(_ + _)
 
    //将数据聚合结果采集到内存中
 
    val word2count: array[(string, int)] = word2countrdd.collect()
 
    //打印结果
 
    word2count.foreach(println)
 
    //关闭spark连接
 
    sc.stop()
 
  }
 
}

(4)异常处理

如果本机操作系统是windows,在程序中使用了hadoop相关的东西,比如写入文件到hdfs,则会遇到如下异常:

出现这个问题的原因,并不是程序的错误,而是windows系统用到了hadoop相关的服务,解决办法是通过配置关联到windows的系统依赖就可以了 

在idea中配置runconfiguration,添加hadoop_home变量或者在windows上配置环境变量: 

四,spark运行环境 

1,local模式

上传并解压缩文件

(1)上传文件至/usr/local/packages中

(2)解压缩到指定目录 

[root@master local]# tar -zxvf spark-2.4.5-bin-hadoop2.6.tgz -c /usr/local/soft/

(3)重命名 

[root@master soft]# mv spark-2.4.5-bin-hadoop2.6/ spark-local

启动local环境

(1)进入解压缩(spark-local)目录

(2)启动成功后,可以输入网址进行web ui监控页面访问 

命令行工具 

sc.textfile("data/word.txt").flatmap(_.split(",")).map((_,1)).reducebykey(_+_).collect

退出本地模式

scala> :quit 

提交应用

bin/spark-submit \

--class org.apache.spark.examples.sparkpi \

--master local[1] \

./examples/jars/spark-examples_2.11-2.4.5.jar \

10

五,spark运行架构

1,运行架构

spark框架的核心是一个计算引擎,整体来说,它采用了标准master-slave的结构。如下图所示,它展示了一个 spark 执行时的基本结构。图形中的driver表示 master,负责管理整个集群中的作业任务调度。图形中的executor则是 slave,负责实际执行任务。

2,核心概念

(1)executor与core

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com