当前位置: 代码网 > it编程>数据库>MsSqlserver > 详解SparkSql输出数据的方式

详解SparkSql输出数据的方式

2024年11月09日 MsSqlserver 我要评论
一、普通文件输出方式方式一:给定输出数据源的类型和地址df.write.format("json").save(path)df.write.format("csv").save(path)df.wri

一、普通文件输出方式

 方式一:给定输出数据源的类型和地址

df.write.format("json").save(path)
df.write.format("csv").save(path)
df.write.format("parquet").save(path)

方式二:直接调用对应数据源类型的方法

df.write.json(path)
df.write.csv(path)
df.write.parquet(path)

append: 追加模式,当数据存在时,继续追加
overwrite: 覆写模式,当数据存在时,覆写以前数据,存储当前最新数据;
error/errorifexists: 如果目标存在就报错,默认的模式
ignore: 忽略,数据存在时不做任何操作

代码编写模板: 

df.write.mode(savemode="append").format("csv").save(path)

代码演示普通的文件输出格式: 

import os
from pyspark.sql import sparksession
if __name__ == '__main__':
    # 配置环境
    os.environ['java_home'] = 'c:/program files/java/jdk1.8.0_241'
    # 配置hadoop的路径,就是前面解压的那个路径
    os.environ['hadoop_home'] = 'd:/hadoop-3.3.1'
    # 配置base环境python解析器的路径
    os.environ['pyspark_python'] = 'c:/programdata/miniconda3/python.exe'  # 配置base环境python解析器的路径
    os.environ['pyspark_driver_python'] = 'c:/programdata/miniconda3/python.exe'
    spark = sparksession.builder.master("local[2]").appname("").config(
        "spark.sql.shuffle.partitions", 2).getorcreate()
    df = spark.read.json("../../datas/person.json")
    # 获取年龄最大的人的名字
    df.createorreplacetempview("persons")
    rsdf = spark.sql("""
       select name,age from persons where age = (select max(age) from persons)
    """)
    # 将结果打印到控制台
    #rsdf.write.format("console").save()
    #rsdf.write.json("../../datas/result",mode="overwrite")
    #rsdf.write.mode(savemode='overwrite').format("json").save("../../datas/result")
    #rsdf.write.mode(savemode='overwrite').format("csv").save("../../datas/result1")
    #rsdf.write.mode(savemode='overwrite').format("parquet").save("../../datas/result2")
    #rsdf.write.mode(savemode='append').format("csv").save("../../datas/result1")
    # text 保存路径为hdfs 直接报错,不支持
    #rsdf.write.mode(savemode='overwrite').text("hdfs://bigdata01:9820/result")
    #rsdf.write.orc("hdfs://bigdata01:9820/result",mode="overwrite")
    rsdf.write.parquet("hdfs://bigdata01:9820/result", mode="overwrite")
    spark.stop()

二、保存到数据库中

代码演示:

import os
# 导入pyspark模块
from pyspark import sparkcontext, sparkconf
from pyspark.sql import sparksession
if __name__ == '__main__':
    # 配置环境
    os.environ['java_home'] = 'd:\download\java\jdk'
    # 配置hadoop的路径,就是前面解压的那个路径
    os.environ['hadoop_home'] = 'd:\\bigdata\hadoop-3.3.1\hadoop-3.3.1'
    # 配置base环境python解析器的路径
    os.environ['pyspark_python'] = 'c:/programdata/miniconda3/python.exe'  # 配置base环境python解析器的路径
    os.environ['pyspark_driver_python'] = 'c:/programdata/miniconda3/python.exe'
    spark = sparksession.builder.master('local[*]').appname('').config("spark.sql.shuffle.partitions", 2).getorcreate()
    df5 = spark.read.format("csv").option("sep", "\t").load("../../datas/zuoye/emp.tsv")\
       .todf('eid','ename','salary','sal','dept_id')
    df5.createorreplacetempview('emp')
    rsdf = spark.sql("select * from emp")
    rsdf.write.format("jdbc") \
        .option("driver", "com.mysql.cj.jdbc.driver") \
        .option("url", "jdbc:mysql://bigdata01:3306/mysql") \
        .option("user", "root") \
        .option("password", "123456") \
        .option("dbtable", "emp1") \
        .save(mode="overwrite")
    spark.stop()
    # 使用完后,记得关闭

三、保存到hive中 

代码演示: 

import os
# 导入pyspark模块
from pyspark import sparkcontext, sparkconf
from pyspark.sql import sparksession
if __name__ == '__main__':
    # 配置环境
    os.environ['java_home'] = 'd:\download\java\jdk'
    # 配置hadoop的路径,就是前面解压的那个路径
    os.environ['hadoop_home'] = 'd:\\bigdata\hadoop-3.3.1\hadoop-3.3.1'
    # 配置base环境python解析器的路径
    os.environ['pyspark_python'] = 'c:/programdata/miniconda3/python.exe'  # 配置base环境python解析器的路径
    os.environ['pyspark_driver_python'] = 'c:/programdata/miniconda3/python.exe'
    os.environ['hadoop_user_name'] = 'root'
    spark = sparksession \
        .builder \
        .appname("hiveapp") \
        .master("local[2]") \
        .config("spark.sql.warehouse.dir", 'hdfs://bigdata01:9820/user/hive/warehouse') \
        .config('hive.metastore.uris', 'thrift://bigdata01:9083') \
        .config("spark.sql.shuffle.partitions", 2) \
        .enablehivesupport() \
        .getorcreate()
    df5 = spark.read.format("csv").option("sep", "\t").load("../../datas/zuoye/emp.tsv") \
        .todf('eid', 'ename', 'salary', 'sal', 'dept_id')
    df5.createorreplacetempview('emp')
    rsdf = spark.sql("select * from emp")
    rsdf.write.saveastable("spark.emp")
    spark.stop()
    # 使用完后,记得关闭

到此这篇关于sparksql输出数据的方式的文章就介绍到这了,更多相关sparksql输出数据内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com