一、前言介绍
二、基础准备
三、数据输入
四、数据计算
五、数据输出
六、分布式集群运行
一、前言介绍
spark概述
apache spark 是一个开源的大数据处理框架,提供了高效、通用、分布式的大规模数据处理能力。spark 的主要特点包括:
-
速度快:
spark 提供了内存计算功能,相较于传统的批处理框架(如hadoop mapreduce),spark 能够更高效地执行数据处理任务。spark 将中间数据存储在内存中,减少了磁盘 i/o,从而加速了计算过程。 -
通用性:
spark 提供了用于批处理、交互式查询、流处理和机器学习等多种计算模式的 api。这种通用性使得 spark 在不同的数据处理场景中都能发挥作用。 -
易用性:
spark 提供了易于使用的高级 api,其中最为知名的是 spark sql 和 dataframe api。这些 api 可以让用户用 sql 查询语言或类似于 pandas 的操作方式对数据进行处理,降低了使用门槛。 -
弹性计算:
spark 可以在集群中分布式执行计算任务,充分利用集群中的计算资源。它具有自动容错和任务重启的机制,保障了计算的稳定性。 -
丰富的生态系统:
spark 生态系统包括 spark sql、spark streaming、mllib(机器学习库)、graphx(图计算库)等模块,提供了全面的大数据处理解决方案。
spark 的核心概念包括:
-
rdd(resilient distributed dataset): rdd 是 spark 中的基本数据抽象,代表分布式的不可变的数据集。spark 的所有计算都是基于 rdd 进行的。
-
dataframe: dataframe 是 spark 2.0 引入的一种抽象数据结构,提供了类似于关系型数据库表的操作接口。dataframe 可以通过 spark sql 进行查询和操作。
-
spark sql: spark sql 提供了用于在 spark 上进行结构化数据处理的 api。它支持 sql 查询、dataframe 操作和集成 hive 查询等。
-
spark streaming: spark streaming 允许以流式的方式处理实时数据,提供了类似于批处理的 api。
-
mllib: mllib 是 spark 的机器学习库,提供了一系列常见的机器学习算法和工具,方便用户进行大规模机器学习任务。
-
graphx: graphx 是 spark 的图计算库,用于处理大规模图数据。
总体而言,spark 是一个灵活、强大且易于使用的大数据处理框架,适用于各种规模的数据处理和分析任务。
pyspark概述
pyspark
是 apache spark 的 python api,用于在 python 中进行大规模数据处理和分析。spark 是一个用于快速、通用、分布式计算的开源集群计算系统,而 pyspark 则是 spark 的 python 版本。
以下是使用 pyspark 进行基本操作的简要步骤:
-
安装 pyspark:
使用以下命令安装 pyspark:pip install pyspark
-
创建 sparksession:
在 pyspark 中,sparksession
是与 spark 进行交互的入口。可以使用以下代码创建一个sparksession
:from pyspark.sql import sparksession # 创建 sparksession spark = sparksession.builder.appname("example").getorcreate()
-
读取数据:
pyspark 提供了用于读取不同数据源的 api。以下是从文本文件读取数据的示例:# 从文本文件读取数据 data = spark.read.text("path/to/textfile")
-
数据转换和处理:
使用 pyspark 的 dataframe api 进行数据转换和处理。dataframe 是一个类似于表的数据结构,可以进行 sql 风格的查询和操作。# 展示 dataframe 的前几行数据 data.show() # 进行数据筛选 filtered_data = data.filter(data["column"] > 10)
-
执行 sql 查询:
使用 pyspark 提供的 sql 接口,可以在 dataframe 上执行 sql 查询。# 创建临时视图 data.createorreplacetempview("my_table") # 执行 sql 查询 result = spark.sql("select * from my_table where column > 10")
-
保存结果:
将处理后的结果保存到文件或其他数据源。# 保存到文本文件 result.write.text("path/to/output")
-
关闭 sparksession:
在完成所有操作后,关闭 sparksession。# 关闭 sparksession spark.stop()
以上是一个简单的 pyspark 示例。实际应用中,可以根据具体需求使用更多功能,例如连接不同数据源、使用机器学习库(mllib)进行机器学习任务等。 pyspark 提供了强大的工具和库,适用于大规模数据处理和分析的场景。
二、基础准备
1、pyspark库的安装
同其它的python第三方库一样,pyspark同样可以使用pip程序进行安装。
在”cmd”命令提示符程序内,输入:
pip install pyspark
或使用国内代理镜像网站(清华大学源)
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark
2、pyspark执行环境入口对象的构建
想要使用pyspark库完成数据处理,首先需要构建一个执行环境入口对象。
pyspark的执行环境入口对象是:类 sparkcontext 的类对象
"""
演示获取pyspark的执行环境入库对象:sparkcontext
并通过sparkcontext对象获取当前pyspark的版本
"""
# 导包
from pyspark import sparkconf, sparkcontext
# 创建sparkconf类对象
conf = sparkconf().setmaster("local[*]").setappname("test_spark_app")
# 基于sparkconf类对象创建sparkcontext对象
sc = sparkcontext(conf=conf)
# 打印pyspark的运行版本
print(sc.version)
# 停止sparkcontext对象的运行(停止pyspark程序)
sc.stop()
3、pyspark的编程模型
总结
- 如何安装pyspark库
pip install pyspark - 为什么要构建sparkcontext对象作为执行入口
pyspark的功能都是从sparkcontext对象作为开始 - pyspark的编程模型是?
- 数据输入:通过sparkcontext完成数据读取
- 数据计算:读取到的数据转换为rdd对象,调用rdd的成员方法完成计算
- 数据输出:调用rdd的数据输出相关成员方法,将结果输出到list、元组、字典、文本文件、数据库等
三、数据输入
rdd对象
如图可见,pyspark支持多种数据的输入,在输入完成后,都会得到一个:rdd类的对象
rdd全称为:弹性分布式数据集(resilient distributed datasets)
pyspark针对数据的处理,都是以rdd对象作为载体,即:
- 数据存储在rdd内
- 各类数据的计算方法,也都是rdd的成员方法
- rdd的数据计算方法,返回值依旧是rdd对象
pyspark的编程模型(上图)可以归纳为: - 准备数据到rdd -> rdd迭代计算 -> rdd导出为list、文本文件等
- 即:源数据 -> rdd -> 结果数据
pyspark数据输入的2种方法
python数据容器转rdd对象
读取文件转rdd对象
总结
- rdd对象是什么?为什么要使用它?
-
rdd对象称之为分布式弹性数据集,是pyspark中数据计算的载体,它可以:
- 提供数据存储
- 提供数据计算的各类方法
- 数据计算的方法,返回值依旧是rdd(rdd迭代计算)
-
后续对数据进行各类计算,都是基于rdd对象进行
- 如何输入数据到spark(即得到rdd对象)
- 通过sparkcontext的parallelize成员方法,将python数据容器转换为rdd对象
- 通过sparkcontext的textfile成员方法,读取文本文件得到rdd对象
四、数据计算
1、map方法
pyspark的数据计算,都是基于rdd对象来进行的,那么如何进行呢?
自然是依赖,rdd对象内置丰富的:成员方法(算子)
"""
演示rdd的map成员方法的使用
"""
from pyspark import sparkconf, sparkcontext
import os
os.environ['pyspark_python'] = "d:/dev/python/python310/python.exe"
conf = sparkconf().setmaster("local[*]").setappname("test_spark")
sc = sparkcontext(conf=conf)
# 准备一个rdd
rdd = sc.parallelize([1, 2, 3, 4, 5])
# 通过map方法将全部数据都乘以10
# def func(data):
# return data * 10
rdd2 = rdd.map(lambda x: x * 10).map(lambda x: x + 5)
print(rdd2.collect())
# (t) -> u
# (t) -> t
# 链式调用
总结
- map算子(成员方法)
接受一个处理函数,可用lambda表达式快速编写
对rdd内的元素逐个处理,并返回一个新的rdd - 链式调用
对于返回值是新rdd的算子,可以通过链式调用的方式多次调用算子。
2、flatmap方法
"""
演示rdd的flatmap成员方法的使用
"""
from pyspark import sparkconf, sparkcontext
import os
os.environ['pyspark_python'] = "d:/dev/python/python310/python.exe"
conf = sparkconf().setmaster("local[*]").setappname("test_spark")
sc = sparkcontext(conf=conf)
# 准备一个rdd
rdd = sc.parallelize(["itheima itcast 666", "itheima itheima itcast", "python itheima"])
# 需求,将rdd数据里面的一个个单词提取出来
rdd2 = rdd.flatmap(lambda x: x.split(" "))
print(rdd2.collect())
总结
flatmap算子
- 计算逻辑和map一样
- 可以比map多出,解除一层嵌套的功能
3、reducebykey方法
"""
演示rdd的reducebykey成员方法的使用
"""
from pyspark import sparkconf, sparkcontext
import os
os.environ['pyspark_python'] = "d:/dev/python/python310/python.exe"
conf = sparkconf().setmaster("local[*]").setappname("test_spark")
sc = sparkcontext(conf=conf)
# 准备一个rdd
rdd = sc.parallelize([('男', 99), ('男', 88), ('女', 99), ('女', 66)])
# 求男生和女生两个组的成绩之和
rdd2 = rdd.reducebykey(lambda a, b: a + b)
print(rdd2.collect())
总结
reducebykey算子
接受一个处理函数,对数据进行两两计算
wordcount案例
"""
完成练习案例:单词计数统计
"""
# 1. 构建执行环境入口对象
from pyspark import sparkcontext, sparkconf
import os
os.environ['pyspark_python'] = "d:/dev/python/python310/python.exe"
conf = sparkconf().setmaster("local[*]").setappname("test_spark")
sc = sparkcontext(conf=conf)
# 2. 读取数据文件
rdd = sc.textfile("d:/hello.txt")
# 3. 取出全部单词
word_rdd = rdd.flatmap(lambda x: x.split(" "))
# 4. 将所有单词都转换成二元元组,单词为key,value设置为1
word_with_one_rdd = word_rdd.map(lambda word: (word, 1))
# 5. 分组并求和
result_rdd = word_with_one_rdd.reducebykey(lambda a, b: a + b)
# 6. 打印输出结果
print(result_rdd.collect())
4、filter方法
"""
演示rdd的filter成员方法的使用
"""
from pyspark import sparkconf, sparkcontext
import os
os.environ['pyspark_python'] = "d:/dev/python/python310/python.exe"
conf = sparkconf().setmaster("local[*]").setappname("test_spark")
sc = sparkcontext(conf=conf)
# 准备一个rdd
rdd = sc.parallelize([1, 2, 3, 4, 5])
# 对rdd的数据进行过滤
rdd2 = rdd.filter(lambda num: num % 2 == 0)
print(rdd2.collect())
总结
filter算子
- 接受一个处理函数,可用lambda快速编写
- 函数对rdd数据逐个处理,得到true的保留至返回值的rdd中
5、distinct方法
"""
演示rdd的distinct成员方法的使用
"""
from pyspark import sparkconf, sparkcontext
import os
os.environ['pyspark_python'] = "d:/dev/python/python310/python.exe"
conf = sparkconf().setmaster("local[*]").setappname("test_spark")
sc = sparkcontext(conf=conf)
# 准备一个rdd
rdd = sc.parallelize([1, 1, 3, 3, 5, 5, 7, 8, 8, 9, 10])
# 对rdd的数据进行去重
rdd2 = rdd.distinct()
print(rdd2.collect())
总结
distinct算子
完成对rdd内数据的去重操作
6、sortby方法
"""
演示rdd的sortby成员方法的使用
"""
from pyspark import sparkconf, sparkcontext
import os
os.environ['pyspark_python'] = "d:/dev/python/python310/python.exe"
conf = sparkconf().setmaster("local[*]").setappname("test_spark")
sc = sparkcontext(conf=conf)
# 1. 读取数据文件
rdd = sc.textfile("d:/hello.txt")
# 2. 取出全部单词
word_rdd = rdd.flatmap(lambda x: x.split(" "))
# 3. 将所有单词都转换成二元元组,单词为key,value设置为1
word_with_one_rdd = word_rdd.map(lambda word: (word, 1))
# 4. 分组并求和
result_rdd = word_with_one_rdd.reducebykey(lambda a, b: a + b)
# 5. 对结果进行排序
final_rdd = result_rdd.sortby(lambda x: x[1], ascending=true, numpartitions=1)
print(final_rdd.collect())
总结
sortby算子
- 接收一个处理函数,可用lambda快速编写
- 函数表示用来决定排序的依据
- 可以控制升序或降序
- 全局排序需要设置分区数为1
五、数据输出
1、输出为python对象
将rdd的结果输出为python对象的各类方法
collect方法
reduce方法
take方法
count方法
总结
- spark的编程流程就是:
- 将数据加载为rdd(数据输入)
- 对rdd进行计算(数据计算)
- 将rdd转换为python对象(数据输出)
- 数据输出的方法
- collect:将rdd内容转换为list
- reduce:对rdd内容进行自定义聚合
- take:取出rdd的前n个元素组成list
- count:统计rdd元素个数
数据输出可用的方法是很多的。
2、输出到文件中
将rdd的内容输出到文件中
saveastextfile方法
注意事项
调用保存文件的算子,需要配置hadoop依赖
- 下载
hadoop
安装包
http://archive.apache.org/dist/hadoop/common/hadoop-3.0.0/hadoop-3.0.0.tar.gz - 解压到电脑任意位置
- 在python代码中使用os模块配置:
os.environ[‘hadoop_home’] = ‘hadoop
解压文件夹路径’ - 下载
winutils.exe
,并放入hadoop
解压文件夹的bin
目录内
https://raw.githubusercontent.com/steveloughran/winutils/master/hadoop-3.0.0/bin/winutils.exe - 下载
hadoop.dll
,并放入:c:/windows/system32
文件夹内
https://raw.githubusercontent.com/steveloughran/winutils/master/hadoop-3.0.0/bin/hadoop.dll
更改rdd的分区数为1
总结
- rdd输出到文件的方法
- rdd.saveastextfile(路径)
- 输出的结果是一个文件夹
- 有几个分区就输出多少个结果文件
- 如何修改rdd分区
- sparkconf对象设置conf.set(“spark.default.parallelism”, “1”)
- 创建rdd的时候,sc.parallelize方法传入numslices参数为1
六、分布式集群运行
在 spark 中,分布式集群运行是其强大性能的体现。下面是使用 spark 进行分布式集群运行的基本步骤:
-
准备 spark 安装:
在集群中的每台机器上安装 spark。确保每台机器都能访问相同的 spark 安装路径。 -
配置 spark:
在 spark 安装路径下,编辑conf/spark-env.sh
文件,设置一些必要的环境变量,例如 java 路径、spark 主节点地址等。确保所有节点的配置文件保持一致。 -
启动 spark 主节点(master):
在集群中选择一台机器作为 spark 主节点,执行以下命令启动主节点:sbin/start-master.sh
默认情况下,主节点的 web ui 地址是
http://localhost:8080
。 -
启动 spark 工作节点(worker):
在其余机器上执行以下命令启动工作节点,将它们连接到主节点:sbin/start-worker.sh spark://<master-node-ip>:<port>
<master-node-ip>
是主节点的 ip 地址,<port>
是主节点的端口号(默认为 7077)。 -
提交 spark 应用程序:
编写 spark 应用程序,并使用以下命令提交到 spark 集群:bin/spark-submit --class com.example.myapp --master spark://<master-node-ip>:<port> myapp.jar
com.example.myapp
是你的应用程序主类,myapp.jar
是打包好的应用程序 jar 文件。 -
监控和调优:
可以通过 spark 的 web ui(默认地址为http://localhost:4040
)监控集群运行状态,查看任务的执行情况、资源使用情况等。根据实际情况进行性能调优。 -
停止 spark 集群:
当任务执行完成后,可以停止 spark 集群。首先停止工作节点:sbin/stop-worker.sh
然后停止主节点:
sbin/stop-master.sh
这些步骤涵盖了在分布式集群上运行 spark 应用程序的基本流程。确保配置正确、节点正常连接,以及应用程序能够充分利用集群中的计算资源。 spark 提供了灵活的配置选项,可以根据具体的集群规模和需求进行调整。
将案例提交到yarn集群中运行
提交命令:
bin/spark-submit --master yarn --num-executors 3 --queue root.teach --executor-cores 4 --executor-memory 4g /home/hadoop/demo.py
上面的 spark 提交命令已经包括了提交到 yarn 集群的必要参数。
以下是命令的解释:
bin/spark-submit
--master yarn # 指定 spark 的主节点为 yarn
--num-executors 3 # 指定执行器的数量
--queue root.teach # 指定 yarn 队列
--executor-cores 4 # 指定每个执行器的核心数
--executor-memory 4g # 指定每个执行器的内存大小
/home/hadoop/demo.py # 提交的 spark 应用程序的路径
解释一下每个参数的作用:
-
--master yarn
: 指定 spark 的主节点为 yarn。这告诉 spark 将任务提交到 yarn 集群管理器。 -
--num-executors 3
: 指定执行器的数量。这是 yarn 上的计算资源,即分配给 spark 应用程序的节点数量。 -
--queue root.teach
: 指定 yarn 队列。这是一个可选的参数,用于将 spark 应用程序提交到指定的 yarn 队列。 -
--executor-cores 4
: 指定每个执行器的核心数。这告诉 yarn 每个执行器可以使用的 cpu 核心数量。 -
--executor-memory 4g
: 指定每个执行器的内存大小。这告诉 yarn 每个执行器可以使用的内存量。 -
/home/hadoop/demo.py
: 提交的 spark 应用程序的路径。这应该是您的 spark 应用程序的入口点。
请确保在提交之前,spark 相关的配置正确,并且 yarn 集群正常运行。如果有额外的依赖项,确保它们在集群中的每个节点上都可用。
代码
"""
演示pyspark综合案例
"""
from pyspark import sparkconf, sparkcontext
import os
os.environ['pyspark_python'] = '/export/server/anaconda3/bin/python'
os.environ['hadoop_home'] = "/export/server/hadoop-3.3.1"
conf = sparkconf().setappname("spark_cluster")
conf.set("spark.default.parallelism", "24")
sc = sparkcontext(conf=conf)
# 读取文件转换成rdd
file_rdd = sc.textfile("hdfs://m1:8020/data/search_log.txt")
# todo 需求1: 热门搜索时间段top3(小时精度)
# 1.1 取出全部的时间并转换为小时
# 1.2 转换为(小时, 1) 的二元元组
# 1.3 key分组聚合value
# 1.4 排序(降序)
# 1.5 取前3
result1 = file_rdd.map(lambda x: (x.split("\t")[0][:2], 1)).\
reducebykey(lambda a, b: a + b).\
sortby(lambda x: x[1], ascending=false, numpartitions=1).\
take(3)
print("需求1的结果:", result1)
# todo 需求2: 热门搜索词top3
# 2.1 取出全部的搜索词
# 2.2 (词, 1) 二元元组
# 2.3 分组聚合
# 2.4 排序
# 2.5 top3
result2 = file_rdd.map(lambda x: (x.split("\t")[2], 1)).\
reducebykey(lambda a, b: a + b).\
sortby(lambda x: x[1], ascending=false, numpartitions=1).\
take(3)
print("需求2的结果:", result2)
# todo 需求3: 统计黑马程序员关键字在什么时段被搜索的最多
# 3.1 过滤内容,只保留黑马程序员关键词
# 3.2 转换为(小时, 1) 的二元元组
# 3.3 key分组聚合value
# 3.4 排序(降序)
# 3.5 取前1
result3 = file_rdd.map(lambda x: x.split("\t")).\
filter(lambda x: x[2] == '黑马程序员').\
map(lambda x: (x[0][:2], 1)).\
reducebykey(lambda a, b: a + b).\
sortby(lambda x: x[1], ascending=false, numpartitions=1).\
take(1)
print("需求3的结果:", result3)
# todo 需求4: 将数据转换为json格式,写出到文件中
# 4.1 转换为json格式的rdd
# 4.2 写出为文件
file_rdd.map(lambda x: x.split("\t")).\
map(lambda x: {"time": x[0], "user_id": x[1], "key_word": x[2], "rank1": x[3], "rank2": x[4], "url": x[5]}).\
saveastextfile("hdfs://m1:8020/output/output_json")
发表评论