当前位置: 代码网 > it编程>编程语言>Java > Spark重温笔记(一):一分钟部署PySpark环境,轻松上手Spark配置

Spark重温笔记(一):一分钟部署PySpark环境,轻松上手Spark配置

2024年08月01日 Java 我要评论
Spark 是一种快速、通用、可扩展的大数据分析引擎,2009 年诞生于加州大学伯克利分校。1-速度快:其一、Spark处理数据时,可以将中间处理结果数据存储到内存中其二、spark job调度以DAG方式,每个任务Task以线程Thread方式,而不是mapreduce以进程process方式2-易于使用:Spark 的版本已经更新到(截止日期2021.06.01),支持了包括 Java、Scala、Python 、R和SQL语言在内的多种语言。

spark学习笔记

喜欢我的博客的话,记得点个红心❤️和小关小注哦!您的支持是我创作的动力!"


一、基础环境搭建

1. pyspark+环境

创建一个conda的环境:用于pyspark本地运行

conda create --name pyspark_practice python=3.8.8
conda activate pyspark_practice

记得配置环境变量
pyspark_driver_python=d:\anaconda\envs\pyspark_practice\python.exe
pyspark_python=d:\anaconda\envs\pyspark_practice\python.exe

jupyter notebook --ip 192.168.52.3 --port 8888 --allow-root

这会自动安装与你 python版本对应的最新的tensorflow

pip install -i https://mirrors.aliyun.com/pypi/simple/ --upgrade tensorflow

会安装与你环境python版本对应的 pytorch

conda install pytorch torchvision torchaudio cpuonly -c https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud/pytorch/win-64/
source activate 可以进入conda环境
source deactivate 可以退出conda环境

更换下载源

1、conda:

打开terminal,输入conda config --show channels
这里我们使用清华大学的镜像源。在命令行中复制以下命令,并按回车:

conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free/

conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/main/

conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud/pytorch/

conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud/conda-forge/
完成后,再输入conda config --show channels,可以发现,镜像源已经被改为清华源了。

2、pip

pip是一种通用的python包管理工具,我们在conda环境下也可以使用pip来进行python包的管理。

在命令行中运行:

pip config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple

会生成一个pip.ini的文件,这就是pip镜像源的配置文件。

我们可以使用pip config list命令,查看pip镜像源地址。

2. 三种运行方式

(1) local模式

数据源 word.txt:

hello you spark flink
hello me hello she spark
_01_firstsparkpro.py

# -*- coding: utf-8 -*-
# program function: 本地测试spark简单wordcount程序
from pyspark import sparkcontext,sparkconf

if __name__ == "__main__":
    # 1. 首先创建sparkcontext的上下文环境
    conf = sparkconf().setappname("firstspark").setmaster("local[*]")
    sc = sparkcontext(conf=conf)
    sc.setloglevel("warn")

    # 2.从外部数据源读取数据
    filerdd = sc.textfile("d:\pythonproject\bigdata_pyspark3.1.2\pyspark-sparkbase_3.1.2\data\input\word.txt")
    print(type(filerdd))
    print(filerdd.collect())

    # 3.执行flatmap进行数据扁平化
    flat_maprdd = filerdd.flatmap(lambda words:words.split(" "))
    print(type(flat_maprdd))
    print(flat_maprdd.collect())

    # 4.执行map转化操作,得到(word,1)
    rdd_maprdd = flat_maprdd.map(lambda word:(word,1))
    print(type(rdd_maprdd))
    print(rdd_maprdd.collect())

    # 5.reducebykey将相同的key的value数据累加操作
    resultrdd = rdd_maprdd.reducebykey(lambda x,y:x+y)
    print(type(resultrdd))

    # 6.输出数据
    res_rdd_col2 = resultrdd.collect()

    # 7.输出到控制台
    for line in res_rdd_col2:
        print(line)


    # 8.输出到文件:不用新建文件夹,输出结果是part文件和success文件
    #resultrdd.saveastextfile("d:\pythonproject\bigdata_pyspark3.1.2\pyspark-sparkbase_3.1.2\data\output")

    # 9.针对values单词进行词频统计
    print("=========================sort===============================")

    #print(resultrdd.sortby(lambda x:x[1], ascending=false).take(3))——获取前3位
    
    #print(resultrdd.sortby(lambda x: x[1], ascending=false).top(3))——获取后三位

    print('停止 pyspark sparksession 对象')

    sc.stop()

(2) standlone模式——远程连接
# 配置上下文环境时
conf = sparkconf().setappname("standlonespark").setmaster("spark://node1:7077")

# 读取文件时:
# 路径不指定协议 默认使用hdfs
#  读取hdfs文件:
filerdd = sc.textfile("hdfs://node1:9820/word.txt")

# 读取linux本地文件:需要三个节点备份:
filerdd = sc.textfile("/export/data/pyspark_workplace/pyspark-sparkbase_3.1.2/data/input/word.txt")
(3)ha模式——远程连接
# 1、首先创建sparkcontext上下文环境
conf = sparkconf().setappname("standlonespark").setmaster("spark://node1:7077,node2:7077")

3. spark简介

(1) 介绍spark

spark 是一种快速、通用、可扩展的大数据分析引擎,2009 年诞生于加州大学伯克利分校。

具有以下五大特点:

  • 1-速度快:
    • 其一、spark处理数据时,可以将中间处理结果数据存储到内存中
    • 其二、spark job调度以dag方式,每个任务task以线程thread方式,而不是mapreduce以进程process方式
  • 2-易于使用:
    • spark 的版本已经更新到 spark 3.1.2(截止日期2021.06.01),支持了包括 java、scala、python 、r和sql语言在内的多种语言。为了兼容spark2.x企业级应用场景,spark仍然持续更新spark2版本。
  • 3-通用性强:
    • sparksql:提供结构化的数据处理方式
    • sparkstreaming:处理流式处理任务
    • mllib:机器学习算法库
    • graphx:提供图形和图形并行化计算
  • 4-运行方式:
    • 对于数据源而言,spark 支持从hdfs、hbase、 kafka 等多种途径获取数据。
(2) spark的yarn与deploy-mode
  • 1-提交yarn,不走deploy-mode
spark_home=/export/server/spark
${spark_home}/bin/spark-submit \
--master yarn \
--conf "spark.pyspark.driver.python=/root/anaconda3/bin/python3" \
--conf "spark.pyspark.python=/root/anaconda3/bin/python3" \
${spark_home}/examples/src/main/python/pi.py \
10
  • 2-client模式:学习测试,driver运行在client的sparksubmit进程中,结果在客户端显示
spark_home=/export/server/spark
${spark_home}/bin/spark-submit \
--master spark://node1.itcast.cn:7077,node2.itcast.cn:7077 \
--deploy-mode client \
--driver-memory 512m \
--executor-memory 512m \
--num-executors 1 \
--total-executor-cores 2 \
--conf "spark.pyspark.driver.python=/root/anaconda3/bin/python3" \
--conf "spark.pyspark.python=/root/anaconda3/bin/python3" \
${spark_home}/examples/src/main/python/pi.py \
10
  • 3-cluster模式:生产环境使用,driver程序在yarn集群,运行结果客户端没有显示
spark_home=/export/server/spark
${spark_home}/bin/spark-submit \
--master spark://node1.itcast.cn:7077,node2.itcast.cn:7077 \
--deploy-mode cluster  \
--driver-memory 512m \
--executor-memory 512m \
--num-executors 1 \
--total-executor-cores 2 \
--conf "spark.pyspark.driver.python=/root/anaconda3/bin/python3" \
--conf "spark.pyspark.python=/root/anaconda3/bin/python3" \
${spark_home}/examples/src/main/python/pi.py \
10

(3) spark两种提交方式
  • spark-shell:交互式scale命令行,bin/spark-shell --master spark://node1:7077

  • spark-submit:

    • –master local[2]

      ${spark_home}/bin/spark-submit \
      --master local[2] \
      ${spark_home}/examples/src/main/python/pi.py \
      10
      
    • –master spark://node1.itcast.cn:7077 \

      ${spark_home}/bin/spark-submit \
      --master spark://node1.itcast.cn:7077 \
      --conf "spark.pyspark.driver.python=/root/anaconda3/bin/python3" \
      --conf "spark.pyspark.python=/root/anaconda3/bin/python3" \
      ${spark_home}/examples/src/main/python/pi.py \
      10
      
    • –master spark://node1.itcast.cn:7077,node2.itcast.cn:7077 \

      ${spark_home}/bin/spark-submit \
      --master spark://node1.itcast.cn:7077,node2.itcast.cn:7077 \
      --conf "spark.pyspark.driver.python=/root/anaconda3/bin/python3" \
      --conf "spark.pyspark.python=/root/anaconda3/bin/python3" \
      ${spark_home}/examples/src/main/python/pi.py \
      10
      
(4) driver program 和 executor 参数配置
spark_home=/export/server/spark
${spark_home}/bin/spark-submit \
--master spark://node1.itcast.cn:7077,node2.itcast.cn:7077 \

--deploy-mode cluster  \            
--driver-memory 512m \					# 指定driver program jvm进程内存大小,默认为1g

--executor-memory 512m \				# 指定executor 运行所需内存大小
--num-executors 1 \						# 表示在yarn集群下,executor的个数,默认值为2
--total-executor-cores 2 \				# 所有任务的总 cpu cores

--conf "spark.pyspark.driver.python=/root/anaconda3/bin/python3" \
--conf "spark.pyspark.python=/root/anaconda3/bin/python3" \
${spark_home}/examples/src/main/python/pi.py \
10

(5) spark服务启动
  • 集群启动
/export/server/spark/sbin/start-all.sh
  • 本地启动:python交互式
/export/servers/spark/bin/pyspark   --master local[4]
  • 本地启动:scale交互式
/epxort/servers/spark/bin/spark-shell --master  spark://node1:7077
/epxort/servers/spark/bin/spark-shell --master  spark://node1:7077,node2:7077
  • 启动历史日志服务器
sbin/mr-jobhistory-daemon.sh start historyserver 
端口:18080
(6) spark的重要端口号
  • 1-spark master web ui:4040
这个端口用于显示 spark master 的 web 用户界面通过这个界面查看和管理 spark 集群的状态。
  • spark worker 和 spark driver 与 master 通信端口:7077
这个端口用于 spark worker 和 spark driver 向 spark master 发送心跳和任务状态信息。
  • spark eventlog 事件日志端口:4041
这个端口用于 spark 事件日志的记录和查看,包括任务的创建、提交、完成等事件。
  • spark history server web ui端口:18080
这个端口用于显示 spark history server 的 web 用户界面,你可以通过这个界面查看已完成的任务的历史信息。
(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com