当前位置: 代码网 > it编程>游戏开发>ar > 【Spark On Hive】—— 基于电商数据分析的项目实战

【Spark On Hive】—— 基于电商数据分析的项目实战

2024年07月28日 ar 我要评论
【Spark On Hive】—— 基于电商数据分析的项目实战

在这里插入图片描述
在这里插入图片描述

spark on hive 详解

本文基于spark重构基于hive的电商数据分析的项目需求,在重构的同时对spark on hive的全流程进行详细的讲解。
所谓的spark on x指的是从x数据源中获取数据并在spark进行计算之后,将计算结果导入该数据库或者数仓。获取数据和导入数据的地方可以是不同的

一、项目配置
1. 创建工程

首先,创建一个空的maven工程,在创建之后,我们需要检查一系列配置,以保证jdk版本的一致性。同时,我们需要创建出scala的编码环境。具体可参考以下文章:
maven工程配置与常见问题解决指南

scala01 —— scala基础

2. 配置文件

2.1 在spark on hive的项目中,我们需要有两个核心配置文件。

  • pom.xml
<?xml version="1.0" encoding="utf-8"?>
<project xmlns="http://maven.apache.org/pom/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/xmlschema-instance"
         xsi:schemalocation="http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelversion>4.0.0</modelversion>

    <groupid>com.ybg</groupid>
    <artifactid>warehouse_ebs_2</artifactid>
    <version>1.0</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceencoding>utf-8</project.build.sourceencoding>
        <spark.version>3.1.2</spark.version>
        <spark.scala.version>2.12</spark.scala.version>
        <hadoop.version>3.1.3</hadoop.version>
        <mysql.version>8.0.33</mysql.version>
        <hive.version>3.1.2</hive.version>
        <hbase.version>2.3.5</hbase.version>
        <jackson.version>2.10.0</jackson.version>
    </properties>

    <dependencies>
        <!-- spark-core -->
        <dependency>
            <groupid>org.apache.spark</groupid>
            <artifactid>spark-core_${spark.scala.version}</artifactid>
            <version>${spark.version}</version>
        </dependency>

        <!-- spark-sql -->
        <dependency>
            <groupid>org.apache.spark</groupid>
            <artifactid>spark-sql_${spark.scala.version}</artifactid>
            <version>${spark.version}</version>
        </dependency>

        <!-- spark-hive -->
        <dependency>
            <groupid>org.apache.spark</groupid>
            <artifactid>spark-hive_${spark.scala.version}</artifactid>
            <version>${spark.version}</version>
        </dependency>

        <!-- hadoop-common -->
        <dependency>
            <groupid>org.apache.hadoop</groupid>
            <artifactid>hadoop-common</artifactid>
            <version>${hadoop.version}</version>
        </dependency>

        <!-- mysql -->
        <dependency>
            <groupid>com.mysql</groupid>
            <artifactid>mysql-connector-j</artifactid>
            <version>${mysql.version}</version>
        </dependency>

        <!-- hive-exec -->
        <dependency>
            <groupid>org.apache.hive</groupid>
            <artifactid>hive-exec</artifactid>
            <version>${hive.version}</version>
            <exclusions>
                <exclusion>
                    <groupid>org.apache.logging.log4j</groupid>
                    <artifactid>log4j-slf4j-impl</artifactid>
                </exclusion>
            </exclusions>
        </dependency>

        <!-- hbase 驱动 -->
        <dependency>
            <groupid>org.apache.hbase</groupid>
            <artifactid>hbase-client</artifactid>
            <version>${hbase.version}</version>
        </dependency>

        <!-- jackson-databind -->
        <dependency>
            <groupid>com.fasterxml.jackson.core</groupid>
            <artifactid>jackson-core</artifactid>
            <version>${jackson.version}</version>
        </dependency>

        <!-- jackson-databind -->
        <dependency>
            <groupid>com.fasterxml.jackson.core</groupid>
            <artifactid>jackson-databind</artifactid>
            <version>${jackson.version}</version>
        </dependency>
    </dependencies>

</project>
  • log4j.properties
    log4j.properties 文件的主要作用是配置日志系统的行为,包括控制日志信息的输出和实现滚动事件日志
log4j.rootlogger=error, stdout, logfile
log4j.appender.stdout=org.apache.log4j.consoleappender
log4j.appender.stdout.layout=org.apache.log4j.patternlayout
----------------------- 滚动事件日志代码 -----------------------
log4j.appender.stdout.layout.conversionpattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.dailyrollingfileappender
log4j.appender.logfile.datepattern='.'yyyy-mm-dd
log4j.appender.logfile.append=true
---------------------------------------------------------------
log4j.appender.logfile.file=log/spark_first.log
log4j.appender.logfile.layout=org.apache.log4j.patternlayout
log4j.appender.logfile.layout.conversionpattern=%d %p [%c] - %m%n

2.2 组件核心配置文件
在这里插入图片描述
在工程的resources目录下,需要存放在虚拟机中大数据服务的核心组件的配置文件,以便于spark on hive中调用大数据组件服务能够正常进行。

3. 工程目录
二、代码实现
  • spark on hive

    • 创建数据校验方法 check:
      用于确保配置项的值有效。
      检查值是否为 null。
      对字符串类型的值进行非空和正则表达式匹配校验。
    • 创建配置设置方法 set:
      先校验配置项名称和值的有效性。
      使用 sparkconf.set 方法设置有效的配置项和值。
    • trait builder:
      有一系列配置方法,最后end()返回sparksession。
    • sparkfactory类中有一个build()方法,build()方法中新建了builder对象实现了各类配置方法。
  • spark on mysql

    • 分为getter和setter两处
    • setter有四处配置,分别是driver,url,user,password
    • getter有两处配置,分别是url和conf

sparkfactory配置表如下:
配置表

2.1 validator
  • 因为check()的校验方法并不只针对于spark,因此可以创建一个object作为通用性的校验方法。
package core


object validator {
  /**
   * 数据校验
   *
   * @param title 校验主题
   * @param value 待校验的值
   * @param regex 若待校验值为字符串,且有特定的规则,那么提供正则表达式进一步验证格式
   */
   def check(title: string, value: any, regex: string = null) = {
    if (null == value) {
      throw new runtimeexception(s"value for $title null pointer exception")
    }
    if (value.isinstanceof[string]) {
      if (value.tostring.isempty) {
        throw new runtimeexception(s"value for $title empty string exception")
      }
      if (null != regex && !value.tostring.matches(regex)) {
        throw new runtimeexception(s"value for $title not match $regex exception")
      }
    }
  }
}

2.2 class sparkfactory
  • 作用:sparkfactory类的作用是能够工厂化地创建和配置sparksession实例,通过一系列的setcheck方法来确保配置项的有效性和正确性,并最终生成一个配置好的sparksession实例。
  • 代码
package core

import core.sparkfactory.builder
import core.validator.check
import org.apache.spark.sparkconf
import org.apache.spark.sql.sparksession
class sparkfactory {
  def build():builder={
    new builder {
      val conf = new sparkconf()
      /**
       * 先检查配置项名称是否正确
       * 再检查配置项的值是否正确
       * @param item 配置项名称
       * @param value 配置项值
       * @param regexvalue 配置项正则规则
       */
      private def set(item:string,value:string,regexvalue:string=null)={
        check("name_of_config_item",item,"^spark\\..*")
        check(item,value,regexvalue)
        conf.set(item,value)
      }
      // base
      private def setbaseappname(appname:string)={
        set("spark.app.name",appname,"^\\w+$")
      }
      private def setbasemaster(master:string)={
        set("spark.master",master,
          "local(\\[(\\*|[1-9][0-9]*)])?|spark://([a-z]\\w+|\\d{1,3}(\\.\\d{1,3}){3}):\\d{4,5}|yarn")
      }
      private def setbasedeploymode(deploymode:string)={
        set("spark.submit.deploymode",deploymode,"client|cluster")
      }
      private def setbaseeventlogenabled(eventlogenabled:boolean)={
        set("spark.eventlog.enabled",s"$eventlogenabled")
      }
      override def baseconfig(appname: string, master: string = "local[*]", deploymode: string = "client", eventlogenabled: boolean = false): builder = {
        setbaseappname(appname)
        setbasemaster(master)
        setbasedeploymode(deploymode)
        setbaseeventlogenabled(eventlogenabled)
        this
      }
      // driver
      private def setdrivermemory(memorygb:int)={
        set("spark.driver.memory",s"${memorygb}g","[1-9]\\d*g")
      }
      private def setdrivercorenum(corenum: int) = {
        set("spark.driver.cores", s"${corenum}", "[1-9]\\d*")
      }
      private def setdrivermaxresultgb(maxrstgb:int)={
        set("spark.driver.maxresultsize",s"${maxrstgb}g","[1-9]\\d*g")
      }
      private def setdriverhost(driverhost:string)={
        set("spark.driver.host",driverhost,"localhost|[a-z]\\w+")
      }
      override def optimizedriver(memorygb: int = 2, corenum: int = 1, maxrstgb: int = 1, driverhost: string = "localhost"): builder = {
        setdrivermemory(memorygb)
        setdrivercorenum(corenum)

        /**
         * 每一个spark行动算子触发的所有分区序列化结果大小上限
         */
        setdrivermaxresultgb(maxrstgb)

        /**
         * standalone 模式需要设置 driverhost,便于 executor 与 master 通信
         */
        if (conf.get("spark.master").startswith("spark://")) {
          setdriverhost(driverhost)
        }
        this
      }
      // executor
      private def setexecutormemory(memorygb: int) = {
        set("spark.executor.memory", s"${memorygb}g", "[1-9]\\d*g")
      }
      private def setexecutorcorenum(corenum: int) = {
        set("spark.executor.cores", s"${corenum}", "[1-9]\\d*")
      }
      override def optimizeexecutor(memorygb:int=1,corenum:int=1):builder={
        setexecutormemory(memorygb)
        /**
         * yarn模式下只能由1个核
         * 其他模式下,核数为所有可用的核
         */
        if(!conf.get("spark.master").equals("yarn")){
          setexecutorcorenum(corenum)
        }
        this
      }
      // limit
      private def setlimitmaxcores(maxcores:int)={
        set("spark.cores.max",s"${maxcores}","[1-9]\\d*")
      }
      private def setlimitmaxtaskfailure(maxtaskfailure:int)={
        set("spark.task.maxfailures",s"${maxtaskfailure}","[1-9]\\d*")
      }
      private def setlimitmaxlocalwaits(maxlocalwaits:int)={
        set("spark.locality.wait",s"${maxlocalwaits}s","[1-9]\\d*s")
      }
      override def optimizelimit(maxcores:int=4,
                                 maxtaskfailure:int=3,
                                 maxlocalwaits:int=30):builder={
        if (conf.get("spark.master").startswith("spark://")) {
          setlimitmaxcores(maxcores)
        }

        /**
         * 单个任务允许失败最大次数,超出会杀死本次任务
         */
        setlimitmaxtaskfailure(maxtaskfailure)

        /**
         * 数据本地化读取加载的最大等待时间
         * 大任务:建议适当增加此值
         */
        setlimitmaxlocalwaits(maxlocalwaits)
        this
      }
      // serializer
      override def optimizeserializer(serde:string="org.apache.spark.serializer.javaserializer"
                                      ,clas:array[class[_]]=null):builder={
        /**
         * 设置将需要通过网络发送或快速缓存的对象序列化工具类
         * 默认为javaserializer
         * 为了提速,推荐设置为kryoserializer
         * 若采用 kryoserializer,需要将所有自定义的实体类(样例类)注册到配置中心
         */
        set("spark.serializer",serde,"([a-z]+\\.)+[a-z]\\w*")
        if(serde.equals("org.apache.spark.serializer.kryoserializer")){
          conf.registerkryoclasses(clas)
        }
        this
      }
      // net
      private def setnettimeout(nettimeouts:int)={
        set("spark.network.timeout",s"${nettimeouts}s","[1-9]\\d*s")
      }
      private def setnetschedulermode(schedulermode:string)={
        set("spark.scheduler.mode",schedulermode,"fair|fifo")
      }
      override def optimizenetabout(nettimeouss:int=120,schedulermode:string="fair"):builder={
        /**
         * 所有和网络交互相关的超时阈值
         */
        setnettimeout(nettimeouss)

        /**
         * 多人工作模式下,建议设置为fair
         */
        setnetschedulermode(schedulermode)
        this
      }
      // dynamic
      private def setdynamicenabled(dynamicenabled:boolean)={
        set("spark.dynamicallocation.enabled",s"${dynamicenabled}")
      }
      private def setdynamicinitialexecutors(initialexecutors:int)={
        set("spark.dynamicallocation.initialexecutors",s"${initialexecutors}","[1-9]\\d*")
      }
      private def setdynamicminexecutors(minexecutors:int)={
        set("spark.dynamicallocation.minexecutors",s"${minexecutors}","[1-9]\\d*")
      }
      private def setdynamicmaxexecutors(maxexecutors:int)={
        set("spark.dynamicallocation.maxexecutors",s"${maxexecutors}","[1-9]\\d*")
      }
      override def optimizedynamicallocation(dynamicenabled:boolean=false,initialexecutors:int=3,minexecutors:int=0,maxexecutors:int=6):builder={
        /**
         * 根据应用的工作需求,动态分配executor
         */
        setdynamicenabled(dynamicenabled)
        if(dynamicenabled){
          setdynamicinitialexecutors(initialexecutors)
          setdynamicminexecutors(minexecutors)
          setdynamicmaxexecutors(maxexecutors)
        }
        this
      }
      // shuffle
      def setshuffleparallelism(parallelism:int=3)={
        set("spark.default.parallelism",s"${parallelism}","[1-9]\\d*")
      }
      def setshufflecompressenabled(shufflecompressenabled:boolean=false)={
        set("spark.shuffle.compress",s"${shufflecompressenabled}")
      }
      def setshufflemaxsizeperreducer(maxsizemb:int=128)={
        set("spark.shuffle.maxsizeinflight",s"${maxsizemb}m","[1-9]\\d*m")
      }
      def setshuffleserviceenabled(shuffleserviceenabled:boolean=true)={
        set("spark.shuffle.service.enabled",s"${shuffleserviceenabled}")
      }
      override def optimizeshuffle(parallelism:int=3,shufflecompressenabled:boolean=false,
                                   maxsizemb:int=48,shuffleserviceenabled:boolean=false):builder={
        /**
         * 如果用户没有指定分区数,则采用该值作为默认的分区数
         */
        setshuffleparallelism(3)

        /**
         * shuffle 过程中 map 端的输出数据是否压缩,建议生成过程中,数据规模较大时开启
         */
        setshufflecompressenabled(shufflecompressenabled)

        /**
         * 设置reducer端的缓冲区大小,生产环境中,服务器内存较大时,可以适当调大
         */
        setshufflemaxsizeperreducer(maxsizemb)

        /**
         * 开启一个独立的外部服务,专门存储executor产生的中间数据
         */
        setshuffleserviceenabled(shuffleserviceenabled)
        this
      }

      // speculation
      def setspeculationenabled(speculationenabled:boolean)={
        set("spark.speculation",s"${speculationenabled}")
      }
      def setspeculationinterval(interval:int)={
        set("spark.speculation.interval",s"${interval}s","[1-9]\\d*s")
      }
      def setspeculationquantile(quantile:float)={
        set("spark.speculation.quantile",s"${quantile}","0?\\.\\d+")
      }
      override def optimizespeculation(speculationenabled:boolean=false,interval:int=5,quantile:float=0.75f):builder={
        /**
         * 是否开启推测执行服务,将各阶段(stage)中执行慢的任务(task)重启
         */
        setspeculationenabled(true)

        /**
         * 设置推测执行频次
         */
        setspeculationinterval(interval)

        /**
         * 设置推测执行阈值
         */
        setspeculationquantile(quantile)
        this

      }
      // warehouse
      override def warehousedir(hdfs:string):builder={
        set("spark.sql.warehouse.dir",hdfs,"hdfs://([a-z]\\w+|\\d{1,3}(\\.\\d{1,3}){3}):\\d{4,5}(/\\w+)+")
        this
      }
      override def end():sparksession={
        sparksession
          .builder()
          .config(conf)
          .enablehivesupport()
          .getorcreate()
      }
    }
  }
}
object sparkfactory {
  def apply(): sparkfactory = new sparkfactory()
  trait builder{
    // 默认值能给就给
    /**
     * 基本配置
     * @param appname
     * @param master 默认是本地方式
     * @param deploymode 默认是集群模式
     * @param eventlogenabled 生产环境打开,测试环境关闭
     * @return
     */
    def baseconfig(appname:string,master:string="local[*]",deploymode:string="client",eventlogenabled:boolean=false):builder
    /**
     * 驱动端优化配置
     * @param memorygb 驱动程序的内存大小
     * @param corenum 驱动程序的核数
     * @param maxrstgb 驱动程序的最大结果大小
     * @param driverhost 驱动程序的主机地址:驱动程序会在主机地址上运行,并且集群中的其他节点会通过这个地址与驱动程序通信
     * @return
     */
    def optimizedriver(memorygb:int=2,corenum:int=1,maxrstgb:int=1,driverhost:string="localhost"):builder
    def optimizeexecutor(memorygb:int=1,corenum:int=1):builder
    /**
     * 整体限制配置
     * @param maxcores 整体可用的最大核数
     * @param maxtaskfailure 单个任务失败的最大次数
     * @param maxlocalwaits 容错机制:数据读取阶段允许等待的最长时间,超过时间切换到其他副本。
     * @return
     */
    def optimizelimit(maxcores:int=4,maxtaskfailure:int=3,maxlocalwaits:int=30):builder
    /**
     * 默认使用:java序列化
     * 推荐使用:kryo序列化 提速或对速度又要i去
     * 所有的自定义类型都要注册到spark中,才能完成序列化。
     * @param serde 全包路径
     * @param classes 自定义类型,默认认为不需要指定,class[_]表示类型未知。
     * @return builder
     */
    def optimizeserializer(serde:string="org.apache.spark.serializer.javaserializer",clas:array[class[_]]=null):builder
    /**
     * 在spark的官方配置中,nettimeouts可能被很多超时的数据调用。
     * @param nettimeouss 判定网络超时的时间
     * @param schedulermode 可能很多任务一起跑,因此公平调度
     * @return
     */
    def optimizenetabout(nettimeouss:int=180,schedulermode:string="fair"):builder
    /**
     * 动态分配->按需分配
     * 类似于配置线程池中的最大闲置线程数,根据需要去做动态分配
     * @param dynamicenabled 是否开启动态分配
     * @param initialexecutors 初始启用的executors的数量
     * @param minexecutors 最小启用的executors的数量
     * @param maxexecutors 最大启用的executors的数量
     * @return
     */
    def optimizedynamicallocation(dynamicenabled:boolean=false,initialexecutors:int=3,minexecutors:int=0,maxexecutors:int=6):builder
    /**
     * 特指在没有指定分区数时,对分区数的配置。
     * 并行度和初始启用的executors的数量一致,避免额外开销。
     *
     * @param parallelism
     * @param shufflecompressenabled
     * @param maxsizemb
     * @param shuffleserviceenabled
     * @return
     */
    def optimizeshuffle(parallelism:int=3,shufflecompressenabled:boolean=false,maxsizemb:int=128,shuffleserviceenabled:boolean=true):builder
    /**
     * 推测执行,将运行时间长的任务,放到队列中,等待运行时间短的任务运行完成后,再运行。
     * @param enabled
     * @param interval spark检查任务执行时间的时间间隔,单位是秒。
     * @param quantile 如果某个任务的执行时间超过指定分位数(如75%的任务执行时间),则认为该任务执行时间过长,需要启动推测执行。
     */
    def optimizespeculation(enabled:boolean=false,interval:int=15,quantile:float=0.75f):builder
    def warehousedir(hdfs:string):builder
    def end():sparksession
  }
}
2.3 mysqlconfigfactory
package core

import core.mysqlconfigfactory.{getter, setter}
import core.validator.check
import java.util.properties

class mysqlconfigfactory {
  def build():setter={
    new setter {
      val conf = new properties();
      override def setdriver(drivercla: string): setter = {
        check("name_of_mysql_driver_class",drivercla,"com\\.mysql(\\.cj)?\\.jdbc\\.driver")
        conf.setproperty("driver",drivercla)
        this
      }

      override def seturl(url: string): setter = {
        check("url_to_connect_mysql", url, "jdbc:mysql://([a-z]\\w+|\\d{1,3}(\\.\\d{1,3}){3}):\\d{4,5}/[a-z]\\w+(\\?.+)?")
        conf.setproperty("url", url)
        this
      }

      override def setuser(user: string): setter = {
        check("user_to_connect_mysql", user)
        conf.setproperty("user", user)
        this
      }

      override def setpassword(password: string): setter = {
        check("password_to_connect_mysql", password)
        conf.setproperty("password", password)
        this
      }

      override def finish(): getter = {
        new getter {
          override def geturl: string = conf.getproperty("url")
          override def getconf: properties = conf
        }
      }
    }
  }
}
object mysqlconfigfactory {
  def apply(): mysqlconfigfactory = new mysqlconfigfactory()
  trait getter{
    def geturl:string
    def getconf:properties
  }
  trait setter {
    def setdriver(drivercla:string):setter
    def seturl(url:string):setter
    def setuser(user:string):setter
    def setpassword(password:string):setter
    def finish():getter
  }
}

2.4 测试调用
package test

import core.{mysqlconfigfactory, sparkfactory}
import org.apache.spark.sql.sparksession

object test {
  def main(args: array[string]): unit = {
    // spark on hive
    val spark: sparksession = sparkfactory()
      .build()
      .baseconfig("ebs_01")
      .optimizedriver()
      .optimizeexecutor()
      .optimizelimit()
      .optimizeserializer()
      .optimizenetabout()
      .optimizedynamicallocation()
      .optimizeshuffle()
      .optimizespeculation()
      .warehousedir("hdfs://single01:9000/hive312/warehouse")
      .end()

    spark.table("yb12211.transaction")
      .show(10)
	// spark on mysql
    val getter: mysqlconfigfactory.getter = mysqlconfigfactory().build()
      .setdriver("com.mysql.cj.jdbc.driver")
      .seturl("jdbc:mysql://single01:3306/yb12211?usessl=false&servertimezone=utc&allowpublickeyretrieval=true")
      .setuser("root")
      .setpassword("123456")
      .finish()

    spark.read.jdbc(getter.geturl, "test_table1_for_hbase_import", getter.getconf)

    spark.stop()
  }
}

在这里插入图片描述

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com