网上学习资料一大堆,但如果学到的知识不成体系,遇到问题时只是浅尝辄止,不再深入研究,那么很难做到真正的技术提升。
一个人可以走的很快,但一群人才能走的更远!不论你是正从事it行业的老鸟或是对it行业感兴趣的新人,都欢迎加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!
在主字符串前添加 k8s://
将导致 spark 应用程序在 kubernetes 集群上启动。如果 url 中未指定 http 协议,则默认为https. 例如,将 master 设置为k8s://example.com:443
相当于将其设置为k8s://https://example.com:443
,但要在不同端口上并且不使用 tls 进行连接,则 master 必须设置为k8s://http://example.com:8080
。
client mode
spark executors必须能够通过可路由的主机名和端口连接到spark driver。
在客户端模式下,spark正常运行所需的特定网络配置将因设置而异。可以使用无头服务来使driver pod 能够通过稳定的主机名从 executors 路由。
在部署无头服务时,确保服务的标签选择器只匹配 driver pod 而不匹配其他pod。(为 driver pod分配一个足够唯一的标签,并在无头服务的标签选择器中使用该标签)通过spark.driver.host
配置指定spark driver的主机名,通过spark.driver.port
配置指定spark driver 的端口。
secret资源 management
kubernetes secrets可用于为 spark 应用程序提供访问安全服务的凭据。
--conf spark.kubernetes.driver.secrets.spark-secret=/etc/secrets
--conf spark.kubernetes.executor.secrets.spark-secret=/etc/secrets
要通过环境变量使用机密,使用以下命令选项spark-submit:
--conf spark.kubernetes.driver.secretkeyref.env_name=name:key
--conf spark.kubernetes.executor.secretkeyref.env_name=name:key
命名空间
spark on kubernetes 可以使用命名空间来启动 spark 应用程序。可以通过配置spark.kubernetes.namespace
来使用它。
rbac
在启用rbac的 kubernetes 集群中,用户可以配置 kubernetes rbac 角色和各种 spark on kubernetes 组件使用的服务帐户来访问 kubernetes api 服务器。
spark 驱动程序 pod 使用 kubernetes 服务帐户访问 kubernetes api 服务器来创建和监视执行程序 pod。至少必须向服务帐户授予 role或clusterrole权限,以允许驱动程序 pod 创建 pod 和服务。
--conf spark.kubernetes.authenticate.driver.serviceaccountname=spark
其他spark on k8s配置
有关 spark 配置的信息,请参阅配置页面。以下配置特定于 kubernetes 上的 spark。
spark properties:
property name | default | meaning |
---|---|---|
spark.kubernetes.namespace | default | 指定driver和executor的命名空间 |
spark.kubernetes.container.image | (none) | 用于指定 spark 应用程序的容器镜像。必填,除非为每种不同的容器类型提供了显式图像。(见下方两个位置) |
spark.kubernetes.driver.container.image | (value of spark.kubernetes.container.image) | driver镜像 |
spark.kubernetes.executor.container.image | (value of spark.kubernetes.container.image) | executor镜像 |
spark.kubernetes.container.image.pullpolicy | ifnotpresent | 在 kubernetes 中拉取镜像时使用的容器镜像拉取策略。 |
spark.kubernetes.container.image.pullsecrets | 用于从私有映像仓库中提取镜像的 kubernetes 密钥。 | |
spark.kubernetes.allocation.batch.size | 5 | 每轮执行程序 pod 分配中一次启动的 pod 数量。 |
spark.kubernetes.allocation.batch.delay | 1s | 每轮执行程序 pod 分配之间等待的时间。指定小于 1 秒的值可能会导致 spark driver的 cpu 使用率过高。 |
spark.kubernetes.authenticate.submission.cacertfile | (none) | path to the ca cert file for connecting to the kubernetes api server over tls when starting the driver. this file must be located on the submitting machine’s disk. specify this as a path as opposed to a uri (i.e. do not provide a scheme). in client mode, use spark.kubernetes.authenticate.cacertfile instead. |
spark.kubernetes.authenticate.submission.clientkeyfile | (none) | path to the client key file for authenticating against the kubernetes api server when starting the driver. this file must be located on the submitting machine’s disk. specify this as a path as opposed to a uri (i.e. do not provide a scheme). in client mode, use spark.kubernetes.authenticate.clientkeyfile instead. |
spark.kubernetes.authenticate.submission.clientcertfile | (none) | path to the client cert file for authenticating against the kubernetes api server when starting the driver. this file must be located on the submitting machine’s disk. specify this as a path as opposed to a uri (i.e. do not provide a scheme). in client mode, use spark.kubernetes.authenticate.clientcertfile instead. |
spark.kubernetes.authenticate.submission.oauthtoken | (none) | oauth token to use when authenticating against the kubernetes api server when starting the driver. note that unlike the other authentication options, this is expected to be the exact string value of the token to use for the authentication. in client mode, use spark.kubernetes.authenticate.oauthtoken instead. |
spark.kubernetes.authenticate.submission.oauthtokenfile | (none) | path to the oauth token file containing the token to use when authenticating against the kubernetes api server when starting the driver. this file must be located on the submitting machine’s disk. specify this as a path as opposed to a uri (i.e. do not provide a scheme). in client mode, use spark.kubernetes.authenticate.oauthtokenfile instead. |
spark.kubernetes.authenticate.driver.cacertfile | (none) | path to the ca cert file for connecting to the kubernetes api server over tls from the driver pod when requesting executors. this file must be located on the submitting machine’s disk, and will be uploaded to the driver pod. specify this as a path as opposed to a uri (i.e. do not provide a scheme). in client mode, use spark.kubernetes.authenticate.cacertfile instead. |
spark.kubernetes.authenticate.driver.clientkeyfile | (none) | path to the client key file for authenticating against the kubernetes api server from the driver pod when requesting executors. this file must be located on the submitting machine’s disk, and will be uploaded to the driver pod as a kubernetes secret. specify this as a path as opposed to a uri (i.e. do not provide a scheme). in client mode, use spark.kubernetes.authenticate.clientkeyfile instead. |
spark.kubernetes.authenticate.driver.clientcertfile | (none) | path to the client cert file for authenticating against the kubernetes api server from the driver pod when requesting executors. this file must be located on the submitting machine’s disk, and will be uploaded to the driver pod as a kubernetes secret. specify this as a path as opposed to a uri (i.e. do not provide a scheme). in client mode, use spark.kubernetes.authenticate.clientcertfile instead. |
spark.kubernetes.authenticate.driver.oauthtoken | (none) | oauth token to use when authenticating against the kubernetes api server from the driver pod when requesting executors. note that unlike the other authentication options, this must be the exact string value of the token to use for the authentication. this token value is uploaded to the driver pod as a kubernetes secret. in client mode, use spark.kubernetes.authenticate.oauthtoken instead. |
spark.kubernetes.authenticate.driver.oauthtokenfile | (none) | path to the oauth token file containing the token to use when authenticating against the kubernetes api server from the driver pod when requesting executors. note that unlike the other authentication options, this file must contain the exact string value of the token to use for the authentication. this token value is uploaded to the driver pod as a secret. in client mode, use spark.kubernetes.authenticate.oauthtokenfile instead. |
spark.kubernetes.authenticate.driver.mounted.cacertfile | (none) | path to the ca cert file for connecting to the kubernetes api server over tls from the driver pod when requesting executors. this path must be accessible from the driver pod. specify this as a path as opposed to a uri (i.e. do not provide a scheme). in client mode, use spark.kubernetes.authenticate.cacertfile instead. |
spark.kubernetes.authenticate.driver.mounted.clientkeyfile | (none) | path to the client key file for authenticating against the kubernetes api server from the driver pod when requesting executors. this path must be accessible from the driver pod. specify this as a path as opposed to a uri (i.e. do not provide a scheme). in client mode, use spark.kubernetes.authenticate.clientkeyfile instead. |
spark.kubernetes.authenticate.driver.mounted.clientcertfile | (none) | path to the client cert file for authenticating against the kubernetes api server from the driver pod when requesting executors. this path must be accessible from the driver pod. specify this as a path as opposed to a uri (i.e. do not provide a scheme). in client mode, use spark.kubernetes.authenticate.clientcertfile instead. |
spark.kubernetes.authenticate.driver.mounted.oauthtokenfile | (none) | path to the file containing the oauth token to use when authenticating against the kubernetes api server from the driver pod when requesting executors. this path must be accessible from the driver pod. note that unlike the other authentication options, this file must contain the exact string value of the token to use for the authentication. in client mode, use spark.kubernetes.authenticate.oauthtokenfile instead. |
spark.kubernetes.authenticate.driver.serviceaccountname | default | service account that is used when running the driver pod. the driver pod uses this service account when requesting executor pods from the api server. note that this cannot be specified alongside a ca cert file, client key file, client cert file, and/or oauth token. in client mode, use spark.kubernetes.authenticate.serviceaccountname instead. |
spark.kubernetes.authenticate.cacertfile | (none) | in client mode, path to the ca cert file for connecting to the kubernetes api server over tls when requesting executors. specify this as a path as opposed to a uri (i.e. do not provide a scheme). |
spark.kubernetes.authenticate.clientkeyfile | (none) | in client mode, path to the client key file for authenticating against the kubernetes api server when requesting executors. specify this as a path as opposed to a uri (i.e. do not provide a scheme). |
spark.kubernetes.authenticate.clientcertfile | (none) | in client mode, path to the client cert file for authenticating against the kubernetes api server when requesting executors. specify this as a path as opposed to a uri (i.e. do not provide a scheme). |
spark.kubernetes.authenticate.oauthtoken | (none) | in client mode, the oauth token to use when authenticating against the kubernetes api server when requesting executors. note that unlike the other authentication options, this must be the exact string value of the token to use for the authentication. |
spark.kubernetes.authenticate.oauthtokenfile | (none) | in client mode, path to the file containing the oauth token to use when authenticating against the kubernetes api server when requesting executors. |
spark.kubernetes.driver.label.[labelname] | (none) | add the label specified by labelname to the driver pod. for example, spark.kubernetes.driver.label.something=true . note that spark also adds its own labels to the driver pod for bookkeeping purposes. |
spark.kubernetes.driver.annotation.[annotationname] | (none) | add the annotation specified by annotationname to the driver pod. for example, spark.kubernetes.driver.annotation.something=true . |
spark.kubernetes.executor.label.[labelname] | (none) | add the label specified by labelname to the executor pods. for example, spark.kubernetes.executor.label.something=true . note that spark also adds its own labels to the driver pod for bookkeeping purposes. |
spark.kubernetes.executor.annotation.[annotationname] | (none) | add the annotation specified by annotationname to the executor pods. for example, spark.kubernetes.executor.annotation.something=true . |
spark.kubernetes.driver.pod.name | (none) | 驱动程序 pod 的名称。在集群模式下,如果未设置此选项,则驱动程序 pod 名称将设置为“spark.app.name”并添加当前时间戳后缀,以避免名称冲突。在客户端模式下,如果您的应用程序在 pod 内运行,强烈建议将其设置为您的驱动程序运行所在的 pod 的名称。在客户端模式下设置此值允许驱动程序成为其执行程序 pod 的所有者,这反过来又允许集群对执行器 pod 进行垃圾收集。 |
spark.kubernetes.executor.lostcheck.maxattempts | 10 | number of times that the driver will try to ascertain the loss reason for a specific executor. the loss reason is used to ascertain whether the executor failure is due to a framework or an application error which in turn decides whether the executor is removed and replaced, or placed into a failed state for debugging. |
spark.kubernetes.submission.waitappcompletion | true | 在集群模式下,是否等待应用程序完成后再退出启动器进程。when changed to false, the launcher has a “fire-and-forget” behavior when launching the spark job. |
spark.kubernetes.report.interval | 1s | interval between reports of the current spark job status in cluster mode. |
spark.kubernetes.driver.limit.cores | (none) | 指定driver pod 的cpu限制。 |
spark.kubernetes.executor.request.cores | (none) | 指定每个执行器pod的cpu请求。其值符合kubernetes的规范。示例值包括0.1、500m、1.5、5等,cpu单位的定义详见cpu单位文档。这与spark.executor.cores不同:它仅在设置了执行器pod的cpu请求时使用,并优先于spark.executor.cores。这不影响任务并行性,例如,执行器可以同时运行的任务数量不受此影响。 |
spark.kubernetes.executor.limit.cores | (none) | 为executor pod 指定cpu限制。 |
spark.kubernetes.node.selector.[labelkey] | (none) | adds to the node selector of the driver pod and executor pods, with key labelkey and the value as the configuration’s value. for example, setting spark.kubernetes.node.selector.identifier to myidentifier will result in the driver pod and executors having a node selector with key identifier and value myidentifier . multiple node selector keys can be added by setting multiple configurations with this prefix. |
spark.kubernetes.driverenv.[environmentvariablename] | (none) | add the environment variable specified by environmentvariablename to the driver process. the user can specify multiple of these to set multiple environment variables. |
spark.kubernetes.driver.secrets.[secretname] | (none) | add the kubernetes secret named secretname to the driver pod on the path specified in the value. for example, spark.kubernetes.driver.secrets.spark-secret=/etc/secrets . |
spark.kubernetes.executor.secrets.[secretname] | (none) | add the kubernetes secret named secretname to the executor pod on the path specified in the value. for example, spark.kubernetes.executor.secrets.spark-secret=/etc/secrets . |
spark.kubernetes.driver.secretkeyref.[envname] | (none) | add as an environment variable to the driver container with name envname (case sensitive), the value referenced by key key in the data of the referenced kubernetes secret. for example, spark.kubernetes.driver.secretkeyref.env_var=spark-secret:key . |
spark.kubernetes.executor.secretkeyref.[envname] | (none) | add as an environment variable to the executor container with name envname (case sensitive), the value referenced by key key in the data of the referenced kubernetes secret. for example, spark.kubernetes.executor.secrets.env_var=spark-secret:key . |
spark.kubernetes.driver.volumes.[volumetype].[volumename].mount.path | (none) | add the kubernetes volume named volumename of the volumetype type to the driver pod on the path specified in the value. for example, spark.kubernetes.driver.volumes.persistentvolumeclaim.checkpointpvc.mount.path=/checkpoint . |
spark.kubernetes.driver.volumes.[volumetype].[volumename].mount.readonly | (none) | specify if the mounted volume is read only or not. for example, spark.kubernetes.driver.volumes.persistentvolumeclaim.checkpointpvc.mount.readonly=false . |
spark.kubernetes.driver.volumes.[volumetype].[volumename].options.[optionname] | (none) | configure kubernetes volume options passed to the kubernetes with optionname as key having specified value, must conform with kubernetes option format. for example, spark.kubernetes.driver.volumes.persistentvolumeclaim.checkpointpvc.options.claimname=spark-pvc-claim . |
spark.kubernetes.executor.volumes.[volumetype].[volumename].mount.path | (none) | add the kubernetes volume named volumename of the volumetype type to the executor pod on the path specified in the value. for example, spark.kubernetes.executor.volumes.persistentvolumeclaim.checkpointpvc.mount.path=/checkpoint . |
spark.kubernetes.executor.volumes.[volumetype].[volumename].mount.readonly | false | specify if the mounted volume is read only or not. for example, spark.kubernetes.executor.volumes.persistentvolumeclaim.checkpointpvc.mount.readonly=false . |
spark.kubernetes.executor.volumes.[volumetype].[volumename].options.[optionname] | (none) | configure kubernetes volume options passed to the kubernetes with optionname as key having specified value. for example, spark.kubernetes.executor.volumes.persistentvolumeclaim.checkpointpvc.options.claimname=spark-pvc-claim . |
spark.kubernetes.memoryoverheadfactor | 0.1 | this sets the memory overhead factor that will allocate memory to non-jvm memory, which includes off-heap memory allocations, non-jvm tasks, and various systems processes. for jvm-based jobs this value will default to 0.10 and 0.40 for non-jvm jobs. this is done as non-jvm tasks need more non-jvm heap space and such tasks commonly fail with “memory overhead exceeded” errors. this prempts this error with a higher default. |
spark.kubernetes.pyspark.pythonversion | "2" | python版本,如果使用python的话 |
其余spark配置参考:https://spark.apache.org/docs/2.4.8/configuration.html。
举例:提交spark到k8s
可以这样来提交一个任务,同时设置 driver 和 executor 的 cpu、内存的资源 request 和 limit 值(driver 的内存 limit 值为 request 值的 110%)。
./spark-submit \
// 设置cluster模式启动
--deploy-mode cluster \
--class org.apache.spark.examples.sparkpi \
// 指定k8s apiserver的地址
--master k8s://https://172.20.0.113:6443 \
--kubernetes-namespace spark-cluster \
// 指定k8s的serviceaccount
--conf spark.kubernetes.authenticate.driver.serviceaccountname=spark \
// k8s资源限额
--conf spark.driver.memory=100g \
--conf spark.executor.memory=10g \
--conf spark.driver.cores=30 \
--conf spark.executor.cores=2 \
--conf spark.driver.maxresultsize=10240m \
--conf spark.kubernetes.driver.limit.cores=32 \
--conf spark.kubernetes.executor.limit.cores=3 \
--conf spark.kubernetes.executor.memoryoverhead=2g \
--conf spark.executor.instances=5 \
--conf spark.app.name=spark-pi \
// spark创建pod模板
--conf spark.kubernetes.driver.docker.image=sz-pg-oam-docker-hub-001.tendcloud.com/library/spark-driver:v2.1.0-kubernetes-0.3.1-1 \
--conf spark.kubernetes.executor.docker.image=sz-pg-oam-docker-hub-001.tendcloud.com/library/spark-executor:v2.1.0-kubernetes-0.3.1-1 \
--conf spark.kubernetes.initcontainer.docker.image=sz-pg-oam-docker-hub-001.tendcloud.com/library/spark-init:v2.1.0-kubernetes-0.3.1-1 \
// 提交真正的计算任务
local:///opt/spark/examples/jars/spark-examples\_2.11-2.2.0-k8s-0.4.0-snapshot.jar 10000000
这将启动一个包含一千万个 task 的计算 pi 的 spark 任务,任务运行过程中,drvier 的 cpu 实际消耗大约为 3 核,内存 40g,每个 executor 的 cpu 实际消耗大约不到 1 核,内存不到 4g,我们可以根据实际资源消耗不断优化资源的 request 值。
pyspark库
什么是pyspark
前面使用过bin/pyspark 程序, 提供一个python解释器执行环境来运行spark任务,pyspark指的是python的运行类库, 可以在python代码中:import pyspark
。
pyspark 是spark官方提供的一个python类库, 内置了完全的spark api, 可以通过pyspark类库来编写spark应用程序,并将其提交到spark集群中运行.
下图是pyspark类库和标准spark框架的简单对比:
可以通过python自带的pip程序进行安装:pip install pyspark -i https://pypi.tuna.tsinghua.edu.cn/simple
应用入口:sparkcontext
spark application程序入口为:sparkcontext,任何一个应用首先需要构建sparkcontext对象,如下两步构建:
- 第一步、创建sparkconf对象,设置spark application基本信息,比如应用的名称appname和应用运行master
- 第二步、基于sparkconf对象,创建sparkcontext对象
wordcount代码实战
首先创建一个本地word.txt文件:
hello spark
hello world
hello hello
然后编写对应处理脚本:
如果有如下报错:
24/03/17 19:46:06 error executor: exception in task 0.0 in stage 1.0 (tid 2)
org.apache.spark.sparkexception: python worker failed to connect back.
at org.apache.spark.api.python.pythonworkerfactory.createsimpleworker(pythonworkerfactory.scala:192)
at org.apache.spark.api.python.pythonworkerfactory.create(pythonworkerfactory.scala:109)
at org.apache.spark.sparkenv.createpythonworker(sparkenv.scala:124)
at org.apache.spark.api.python.basepythonrunner.compute(pythonrunner.scala:166)
at org.apache.spark.api.python.pythonrdd.compute(pythonrdd.scala:65)
at org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd.scala:364)
at org.apache.spark.rdd.rdd.iterator(rdd.scala:328)
at org.apache.spark.scheduler.resulttask.runtask(resulttask.scala:92)
at org.apache.spark.taskcontext.runtaskwithlisteners(taskcontext.scala:161)
at org.apache.spark.scheduler.task.run(task.scala:139)
at org.apache.spark.executor.executor$taskrunner.$anonfun$run$3(executor.scala:554)
at org.apache.spark.util.utils$.trywithsafefinally(utils.scala:1529)
at org.apache.spark.executor.executor$taskrunner.run(executor.scala:557)
at java.base/java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1136)
at java.base/java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:635)
at java.base/java.lang.thread.run(thread.java:833)
caused by: java.net.sockettimeoutexception: accept timed out
at java.base/sun.nio.ch.niosocketimpl.timedaccept(niosocketimpl.java:708)
at java.base/sun.nio.ch.niosocketimpl.accept(niosocketimpl.java:752)
at java.base/java.net.serversocket.implaccept(serversocket.java:675)
at java.base/java.net.serversocket.platformimplaccept(serversocket.java:641)
at java.base/java.net.serversocket.implaccept(serversocket.java:617)
at java.base/java.net.serversocket.implaccept(serversocket.java:574)
at java.base/java.net.serversocket.accept(serversocket.java:532)
at org.apache.spark.api.python.pythonworkerfactory.createsimpleworker(pythonworkerfactory.scala:179)
... 15 more
就需要加上两个环境变量:
os.environ['pyspark\_python'] = sys.executable
os.environ['pyspark\_driver\_python'] = sys.executable
这两行代码的作用是设置环境变量,用于指定 spark 使用的 python 解释器以及驱动程序所使用的 python 解释器。
os.environ[‘pyspark_python’] = sys.executable 将当前 python 解释器的路径赋给环境变量 pyspark_python,这告诉 spark 使用与当前 python 解释器相同的解释器来执行 python 代码。
os.environ[‘pyspark_driver_python’] = sys.executable 将当前 python 解释器的路径赋给环境变量 pyspark_driver_python,这告诉 spark 驱动程序使用与当前 python 解释器相同的解释器。
from pyspark import sparkcontext, sparkconf
import os
import sys
os.environ['pyspark\_python'] = sys.executable
os.environ['pyspark\_driver\_python'] = sys.executable
spark_config = sparkconf().setappname("minproject").setmaster("local[\*]")
spark_ctx = sparkcontext(conf=spark_config)
# spark读取本地文件
rdd = spark_ctx.textfile("./word.txt")
print(rdd.collect())
# 分割为单词
flatmaprdd = rdd.flatmap(lambda line: line.split(" "))
print(flatmaprdd.collect())
# 转换为二元组,表示每个单词出现一次
maprdd = flatmaprdd.map(lambda x: (x, 1))
print(maprdd.collect())
# 按照key分组聚合
resultrdd = maprdd.reducebykey(lambda a, b: a + b)
# 第三步、输出数据
print(resultrdd.collect())
原理分析:
最终将结果保存到本地:
# 第三步、输出数据
print(resultrdd.collect())
resultrdd.saveastextfile("./output.txt")
python on spark 执行原理
pyspark宗旨是在不破坏spark已有的运行时架构,在spark架构外层包装一层python api,借助py4j实现python和java的交互,进而实现通过python编写spark应用程序,其运行时架构如下图所示。
spark核心编程
spark 计算框架为了能够进行高并发和高吞吐的数据处理,封装了三大数据结构,用于处理不同的应用场景。三大数据结构分别是:
➢ rdd : 弹性分布式数据集
➢ 累加器:分布式共享只写变量
➢ 广播变量:分布式共享只读变量
rdd
rdd(resilient distributed dataset)叫做弹性分布式数据集,是 spark 中最基本的数据处理模型。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。
- 弹性
- 存储的弹性:内存与磁盘的自动切换;
- 容错的弹性:数据丢失可以自动恢复;
- 计算的弹性:计算出错重试机制;
- 分片的弹性:可根据需要重新分片。
- 分布式:数据存储在大数据集群不同节点上
- 数据集:rdd封装了计算逻辑,并不保存数据
- 数据抽象:rdd是一个抽象类,需要子类具体实现
- 不可变:rdd封装了计算逻辑,是不可以改变的,想要改变,只能产生新的rdd,在新的rdd里面封装计算逻辑
- 可分区、并行计算
- rdd(resilient distributed dataset)弹性分布式数据集,是spark中最基本的数据抽象,代表一个不可变、可分区、里面的元素可并行计算的集合。
- 所有的运算以及操作都建立在rdd 数据结构的基础之上。
- 可以认为rdd是分布式的列表list或数组array,抽象的数据结构,rdd是一个抽象类abstract class和泛型generic type
rdd的五大特性
rdd是有分区的
rdd的方法会作用到所有分区上
rdd之间是有依赖关系的
key-value型的rdd可以有分区器
rdd的分区规划,会尽量靠近数据所在服务器
wordcount案例分析
rdd编程
rdd的创建
spark rdd 编程的程序入口对象是sparkcontext对象(不论何种编程语言),只有构建出sparkcontext, 基于它才能执行后续的api调用和计算。
本质上, sparkcontext对编程来说, 主要功能就是创建第一个rdd出来。
rdd的创建主要有2种方式:
- 通过并行化集合创建( 本地对象转分布式rdd )
- 读取外部数据源( 读取文件)
并行化创建
from pyspark import sparkcontext, sparkconf
import os
import sys
os.environ['pyspark\_python'] = sys.executable
os.environ['pyspark\_driver\_python'] = sys.executable
spark_config = sparkconf().setappname("create add").setmaster("local[4]")
spark_context = sparkcontext(conf=spark_config)
data = [1, 2, 3, 4, 5, 6, 6, 7, 8]
rdd = spark_context.parallelize(data, 4)
print(rdd.collect())
读取文件创建
from pyspark import sparkcontext, sparkconf
import os
import sys
os.environ['pyspark\_python'] = sys.executable
os.environ['pyspark\_driver\_python'] = sys.executable
spark_config = sparkconf().setappname("create add").setmaster("local[4]")
spark_context = sparkcontext(conf=spark_config)
rdd = spark_context.textfile("./word.txt")
print(rdd.collect())
rdd 算子
transformation算子
map算子
flatmap算子
reducebykey算子
对于一个wordcount的应用,可以:
groupby算子
说白了也就是,一个函数的return值是分组的key,条件是分组的条件。
filter算子
和python的filter一个意思,如果返回true则代表当前值有用,否则抛弃。
distinct算子
union算子
join算子
intersection算子
glom算子
import os
import sys
from pyspark import sparkcontext, sparkconf
os.environ['pyspark\_python'] = sys.executable
os.environ['pyspark\_driver\_python'] = sys.executable
spark_config = sparkconf().setappname("create add").setmaster("local[4]")
spark_context = sparkcontext(conf=spark_config)
rdd = spark_context.parallelize([1, 2, 3, 4, 5], 2)
rdd2 = rdd.glom()
print(rdd.collect())
[1, 2, 3, 4, 5]
print(rdd2.collect())
[[1, 2], [3, 4, 5]]
groupbykey算子
sortby算子
import os
import sys
from random import shuffle
from pyspark import sparkcontext, sparkconf
os.environ['pyspark\_python'] = sys.executable
os.environ['pyspark\_driver\_python'] = sys.executable
spark_config = sparkconf().setappname("create add").setmaster("local[4]")
spark_context = sparkcontext(conf=spark_config)
data = [number for number in range(100)]
# 原地打乱
shuffle(data)
rdd = spark_context.parallelize(data, 10)
rdd2 = rdd.glom()
print(rdd2.collect())
rdd3 = rdd2.sortby(lambda x: x[-1], ascending=true, numpartitions=1)
print(rdd3.collect())
官方给的案例:
examples
--------
>>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
>>> sc.parallelize(tmp).sortby(lambda x: x[0]).collect()
[('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
>>> sc.parallelize(tmp).sortby(lambda x: x[1]).collect()
[('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
sortbykey算子
案例
action算子
collection算子
reduce算子
fold算子
first算子
import os
import sys
from random import shuffle
from pyspark import sparkcontext, sparkconf
os.environ['pyspark\_python'] = sys.executable
os.environ['pyspark\_driver\_python'] = sys.executable
spark_config = sparkconf().setappname("create add").setmaster("local[4]")
spark_context = sparkcontext(conf=spark_config)
data = [number for number in range(100)]
# 原地打乱
shuffle(data)
rdd = spark_context.parallelize(data, 10)
print(rdd.getnumpartitions())
# 为了可观性,多加一步glom运算
print(rdd.glom().collect())
first_number = rdd.first()
print(first_number)
可以看出取的是第一个分区的第一个元素。
take算子
同理,take也是按顺序取出元素,当前分区不够就到下一个分区去找。
top算子
count算子
takesample算子
takeordered算子
foreach算子
saveastextfile算子
分区操作算子
mappartitions算子
foreachpartition算子
partitionby算子
repartition算子
coalesce算子
mapvalues算子
join算子
groupbykey和reducebykey的区别
对于分区操作有什么要注意的地方
尽量不要增加分区, 可能破坏内存迭代的计算管道。
rdd的持久化
rdd 的数据是过程数据
rdd 的缓存
rdd 的checkpoint
- cache和checkpoint区别
- cache是轻量化保存rdd数据, 可存储在内存和硬盘, 是分散存储, 设计上数据是不安全的(保留rdd
血缘关系) - checkpoint是重量级保存rdd数据, 是集中存储, 只能存储在硬盘(hdfs)上, 设计上是安全的(不保留
rdd血缘关系)
- cache是轻量化保存rdd数据, 可存储在内存和硬盘, 是分散存储, 设计上数据是不安全的(保留rdd
- cache 和 checkpoint的性能对比?
- cache性能更好, 因为是分散存储, 各个executor并行执行, 效率高, 可以保存到内存中(占内存),更快
- checkpoint比较慢, 因为是集中存储, 涉及到网络io, 但是存储到hdfs上更加安全(多副本)
spark案例练习
搜索引擎日志分析案例
使用搜狗的用户查询数据集:(我放到了https://scripterbro.github.io/files/sogouq.txt)进行数据分析。
faq
报错: hadoop_home and hadoop.home.dir are unset
pyspark 一般会与 hadoop 环境一起运行(依赖关系) , 如果在 windows 中没有安装 hadoop 运行环境 , 就会报上述错误 ;
hadoop 发布版本在 https://hadoop.apache.org/releases.html 页面可下载 ;
下载好后需要用系统权限才可以解压,只需要给winrar以系统权限执行即可:
在 环境变量中 , 设置:hadoop_home = hadoop安装目录
在 path 环境变量中 , 增加
- %hadoop_home%\bin
- %hadoop_home%\sbin
设置hadoop-env.cmd 脚本中的 java_home 为真实的 jdk 路径 :
然后,重启电脑。
既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,涵盖了95%以上大数据知识点,真正体系化!
由于文件比较多,这里只是将部分目录截图出来,全套包含大厂面经、学习笔记、源码讲义、实战项目、大纲路线、讲解视频,并且后续会持续更新
3f4e3ead.png)
下载好后需要用系统权限才可以解压,只需要给winrar以系统权限执行即可:
在 环境变量中 , 设置:hadoop_home = hadoop安装目录
在 path 环境变量中 , 增加
- %hadoop_home%\bin
- %hadoop_home%\sbin
设置hadoop-env.cmd 脚本中的 java_home 为真实的 jdk 路径 :
然后,重启电脑。
[外链图片转存中…(img-7lwttda5-1714857902608)]
[外链图片转存中…(img-6tqripca-1714857902609)]
[外链图片转存中…(img-zw2w3wgs-1714857902609)]
既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,涵盖了95%以上大数据知识点,真正体系化!
由于文件比较多,这里只是将部分目录截图出来,全套包含大厂面经、学习笔记、源码讲义、实战项目、大纲路线、讲解视频,并且后续会持续更新
发表评论