当前位置: 代码网 > it编程>游戏开发>ar > spark+phoenix读取hbase

spark+phoenix读取hbase

2024年08月02日 ar 我要评论
这是phoenix官网提供的代码,我执行没成功,显示org.apache.phoenix.spark.datasource.v2.PhoenixDataSource这个找不到,我不知道是我依赖包没引对还是其他原因,我的代码在上面的基础上做了一些改动。然后解压缩,将里面的phoenix-server-hbase-2.4-5.1.3.jar(你的版本可能和我下载的不一致,这个根据hadoop上安装的hbase的版本来定)拷贝到hbase/lib/目录下,然后重启hbase。最后执行成功的结果如下所示。

正常来说这个内容应该网上可参考的文章很多,但是我还是捣鼓了好久,现在记录下来,给自己个备忘录。

phoenix是操作hbase的皮肤,他可以轻松的使用sql语句来操作hbase,比直接用hbase的原语操作要友好的多。spark直接操作hbase也是通过hbase的原语操作,操作起来比较繁琐,下面就是将spark和phoenix相结合的方法步骤。

我用的是scala语言,首先pom.xml中添加依赖

         <dependency>
            <groupid>org.apache.phoenix</groupid>
            <artifactid>phoenix-spark</artifactid>
            <version>5.0.0-hbase-2.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupid>org.apache.phoenix</groupid>
            <artifactid>phoenix-core</artifactid>
            <version>5.0.0-hbase-2.0</version>
        </dependency>
        <dependency>
            <groupid>org.apache.hbase</groupid>
            <artifactid>hbase-client</artifactid>
            <version>2.4.12</version>
        </dependency>
        <dependency>
            <groupid>org.apache.hbase</groupid>
            <artifactid>hbase-server</artifactid>
            <version>2.4.12</version>
        </dependency>
        <dependency>
            <groupid>org.apache.hbase</groupid>
            <artifactid>hbase-common</artifactid>
            <version>2.4.12</version>
        </dependency>

这里添加的版本信息要和你要访问的hbase相一致!

接下来,到phoenix官网下载jar包,overview | apache phoenix

然后解压缩,将里面的phoenix-server-hbase-2.4-5.1.3.jar(你的版本可能和我下载的不一致,这个根据hadoop上安装的hbase的版本来定)拷贝到hbase/lib/目录下,然后重启hbase。

然后将解压的phoenix-client-hbase-2.4-5.1.3.jar包拷贝到你的工程resources目录下,然后将hadoop中的配置文件也都放到resources/conf/这个目录下,接下来开始写代码。

import org.apache.spark.sparkcontext
import org.apache.spark.sql.{sqlcontext, sparksession}
import org.apache.phoenix.spark.datasource.v2.phoenixdatasource

val spark = sparksession
  .builder()
  .appname("phoenix-test")
  .master("local")
  .getorcreate()

// load data from table1
val df = spark.sqlcontext
  .read
  .format("phoenix")
  .options(map("table" -> "table1", phoenixdatasource.zookeeper_url -> "phoenix-server:2181"))
  .load

df.filter(df("col1") === "test_row_1" && df("id") === 1l)
  .select(df("id"))
  .show

这是phoenix官网提供的代码,我执行没成功,显示org.apache.phoenix.spark.datasource.v2.phoenixdatasource这个找不到,我不知道是我依赖包没引对还是其他原因,我的代码在上面的基础上做了一些改动。

import org.apache.spark.sparkconf
import org.apache.spark.sql.sparksession
import org.apache.spark.sql.execution.datasources.jdbc._
import org.apache.hadoop.conf.configuration
import org.apache.hadoop.fs.path

import org.apache.log4j.logger


object sparkphoenixhbase {
  @transient lazy val log = logger.getlogger(this.getclass)
  def main(args: array[string]): unit = {

    readfromhbasewithphoenix()
  }

  def readfromhbasewithphoenix(): unit = {

    val hadoopconf = new configuration()
    hadoopconf.addresource(new path("conf/core-site.xml"))
    hadoopconf.addresource(new path("conf/hdfs-site.xml"))
    hadoopconf.addresource(new path("conf/mapred-site.xml"))
    hadoopconf.addresource(new path("conf/yarn-site.xml"))
    hadoopconf.addresource(new path("conf/hbase-site.xml"))


  val conf = new sparkconf()
    .setappname("phoenix-spark-hdase")
    .setmaster("local[*]")
    conf.set("spark.driver.extraclasspath","/resources/phoenix-client-hbase-2.4-5.1.3.jar")
    conf.set("spark.executor.extraclasspath","/resources/phoenix-client-hbase-2.4-5.1.3.jar")

    val it = hadoopconf.iterator()
    while (it.hasnext){
      val entry = it.next()
      conf.set(entry.getkey, entry.getvalue)
    }

  val spark = sparksession
    .builder()
    .master("local")
    .appname("phoenix-hbase")
    .config(conf)
    .getorcreate()

    val phoenixconfig = map(
      "url" -> "jdbc:phoenix:10.12.4.51:2181",   //这里是你hadoop上安装的zookeeper的地址
      "driver" -> "org.apache.phoenix.jdbc.phoenixdriver"
    )

  val df = spark.read
    .format("jdbc")
    .options(phoenixconfig)
    .option("dbtable","student")
    .load()

     df.show() 

    spark.close()

  }
}

最好要在工程里配置上日志打印,不然执行过程中的错误信息是看不到的。

最后执行成功的结果如下所示

2024-01-18 08:53:52,487 info [org.apache.spark.executor.executor] : finished task 0.0 in stage 0.0 (tid 0). 1509 bytes result sent to driver
2024-01-18 08:53:52,493 info [org.apache.spark.scheduler.tasksetmanager] : finished task 0.0 in stage 0.0 (tid 0) in 580 ms on desktop-ft30h9d (executor driver) (1/1)
2024-01-18 08:53:52,494 info [org.apache.spark.scheduler.taskschedulerimpl] : removed taskset 0.0, whose tasks have all completed, from pool 
2024-01-18 08:53:52,500 info [org.apache.spark.scheduler.dagscheduler] : resultstage 0 (show at sparkphoenixhbase.scala:70) finished in 0.774 s
2024-01-18 08:53:52,502 info [org.apache.spark.scheduler.dagscheduler] : job 0 is finished. cancelling potential speculative or zombie tasks for this job
2024-01-18 08:53:52,502 info [org.apache.spark.scheduler.taskschedulerimpl] : killing all running tasks in stage 0: stage finished
2024-01-18 08:53:52,504 info [org.apache.spark.scheduler.dagscheduler] : job 0 finished: show at sparkphoenixhbase.scala:70, took 0.808840 s
2024-01-18 08:53:52,538 info [org.apache.spark.sql.catalyst.expressions.codegen.codegenerator] : code generated in 14.3886 ms
+----+--------+---+-------+
|  id|    name|age|   addr|
+----+--------+---+-------+
|1001|zhangsan| 10|tianjin|
+----+--------+---+-------+

// 能看到这个就说明成功了,我的hbase student表里就这么一行信息

2024-01-18 08:53:52,555 info [org.sparkproject.jetty.server.abstractconnector] : stopped spark@4108fa66{http/1.1, (http/1.1)}{0.0.0.0:4040}
2024-01-18 08:53:52,556 info [org.apache.spark.ui.sparkui] : stopped spark web ui at http://desktop-ft30h9d:4040
2024-01-18 08:53:52,566 info [org.apache.spark.mapoutputtrackermasterendpoint] : mapoutputtrackermasterendpoint stopped!
2024-01-18 08:53:52,581 info [org.apache.spark.storage.memory.memorystore] : memorystore cleared
2024-01-18 08:53:52,581 info [org.apache.spark.storage.blockmanager] : blockmanager stopped
2024-01-18 08:53:52,587 info [org.apache.spark.storage.blockmanagermaster] : blockmanagermaster stopped
2024-01-18 08:53:52,589 info [org.apache.spark.scheduler.outputcommitcoordinator$outputcommitcoordinatorendpoint] : outputcommitcoordinator stopped!
2024-01-18 08:53:52,595 info [org.apache.spark.sparkcontext] : successfully stopped sparkcontext
2024-01-18 08:53:59,207 info [org.apache.spark.util.shutdownhookmanager] : shutdown hook called
2024-01-18 08:53:59,207 info [org.apache.spark.util.shutdownhookmanager] : deleting directory c:\users\shell\appdata\local\temp\spark-344ef832-7438-47dd-9126-725e6c2d8af4

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com