当前位置: 代码网 > it编程>前端脚本>Python > Python之PySpark简单应用

Python之PySpark简单应用

2024年08月02日 Python 我要评论
PySpark是Apache Spark的Python API,它允许开发人员使用Python编写并运行分布式大数据处理应用程序。通过PySpark,开发人员可以利用Spark的强大功能和高性能,同时享受Python编程语言的灵活性和易用性。报错中直接指出具体报错行,经过检查发现SparkConf没有写括号。

一、介绍

pyspark是apache spark的python api,它允许开发人员使用python编写并运行分布式大数据处理应用程序。通过pyspark,开发人员可以利用spark的强大功能和高性能,同时享受python编程语言的灵活性和易用性。

1.准备工作

pip install pyspark

2. 创建sparksession对象:

from pyspark.sql import sparksession

spark = sparksession.builder \
    .appname("example-app") \
    .getorcreate()

3. 读取数据:

df = spark.read.csv("test.csv", header=true)

4. 数据处理与分析:

result = df.groupby("column").count().show()

5. 停止sparksession:

spark.stop()

二、示例

1.读取解析csv数据

下面是一个简单的示例,演示了如何使用pyspark进行数据处理和分析:

from pyspark.sql import sparksession

# 创建sparksession对象
spark = sparksession.builder.appname("example").getorcreate()

# 读取csv文件
df = spark.read.csv("c:/users/39824/desktop/test.csv", header=true)

# 对数据进行筛选和聚合操作
result = df.filter(df["age"] > 25).groupby("department").count()
# 显示结果
result.show()
# 停止sparksession
spark.stop()

在这个示例中,我们首先创建了一个sparksession对象,然后使用该对象读取了一个csv文件。接着,我们对数据进行了筛选和聚合操作,并最终显示了结果。最后,我们停止了sparksession以释放资源。
输出:
在这里插入图片描述

2.解析计算序列数据map\flatmap

from pyspark import sparkconf, sparkcontext

conf = sparkconf().setappname("create rdd").setmaster("local[*]")
sc = sparkcontext(conf=conf)

rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9])

rdd1 = rdd.map(lambda x: x * 10)
print(rdd1.collect())

rdd_str = sc.parallelize(["java.io.filenotfoundexception", "sc.setloglevel(newlevel)", "hadoop.home.dir"])

rdd_str1 = rdd_str.map(lambda x: x.split("."))
print(f"map解析的结果是:{rdd_str1.collect()}")

rdd_str2 = rdd_str.flatmap(lambda x: x.split("."))
print(f"flatmap解析的结果是:{rdd_str2.collect()}")

输出:

[10, 20, 30, 40, 50, 60, 70, 80, 90]
map解析的结果是:[['java', 'io', 'filenotfoundexception'], ['sc', 'setloglevel(newlevel)'], ['hadoop', 'home', 'dir']]
flatmap解析的结果是:['java', 'io', 'filenotfoundexception', 'sc', 'setloglevel(newlevel)', 'hadoop', 'home', 'dir']

使用 pyspark 创建了一个 rdd,并对其进行了 map 和 flatmap 转换:

  1. 使用 sc.parallelize() 方法创建了一个整数类型的 rdd,其中包含数字1到9。
  2. 对 rdd 进行 map 转换,将每个元素乘以10。
  3. 使用 print 函数输出 map 后的结果。
  4. 使用 sc.parallelize() 方法创建了一个字符串类型的 rdd,其中包含三个字符串。
  5. 对 rdd 进行 map 转换,将每个字符串按照 “.” 分隔成多个子字符串。
  6. 使用 print 函数输出 map 后的结果。
  7. 对 rdd 进行 flatmap 转换,将每个字符串按照 “.” 分隔成多个子字符串,并将所有子字符串扁平化为一维列表。

总结:

  • map 函数将输入 rdd 的每个元素应用于给定的函数,并返回一个新的 rdd,其中包含函数应用后的结果。
  • flatmap 函数与 map 函数类似,但它的输出是一个扁平化的结果。也就是说,对于每个输入元素,函数可以返回一个或多个输出元素,并将所有输出元素进行扁平化。
  • 可以使用 collect() 函数将 rdd 中的所有元素收集到本地计算机上,并将其作为列表返回。需要注意的是,如果 rdd 中的元素非常多,则可能会导致内存不足或性能问题。

pyspark提供了丰富的数据处理和分析功能,同时也具备了python编程语言的灵活性和易用性,使得开发人员能够以简洁的方式编写大规模数据处理应用程序。

三、问题总结

1.代码问题

报错:
在这里插入图片描述

traceback (most recent call last):
  file "d:\demo\pyspark_demo\demo.py", line 3, in <module>
    conf = sparkconf.setappname("create rdd").setmaster("local[*]")
typeerror: sparkconf.setappname() missing 1 required positional argument: 'value'

报错中直接指出具体报错行,经过检查发现sparkconf没有写括号
更正代码:

conf = sparkconf().setappname("create rdd").setmaster("local[*]")

2.配置问题

报错:

java.io.ioexception: cannot run program "python3": createprocess error=3, 系统找不到指定的路径。

在这里插入图片描述
解决方式:
找到本地的python.exe,copy之后改名字python3.exe。重启解决~~~~(真是意想不到!!!!)

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com