当前位置: 代码网 > it编程>数据库>MsSqlserver > Flink读取kafka主题,并写入hbase

Flink读取kafka主题,并写入hbase

2024年08月05日 MsSqlserver 我要评论
是版本号错误 flink-sql-connector-hbase-2.2-1.17.2.jar 1.17.2与flink版本号不对。hbase(main):014:0> create 'venn','cf' 创建表,有一个列族cf。(4)对于一大题,可拆成若干步,如上面可拆成连接kafka,连接hbase,插入三步,缩小问题范围。本次解决问题是用了flink-sql-connector-hbase-2.2-1.17.2.jar。{"字段1": "值1", "字段2":"值2", ……

作者:陶运道

  目录

      第一部分 纯ddl编程模式

      第二部分 table api编程模式

                                      第一部分 纯ddl编程模式

ddl形式实现kafka->flink->hbase,具体的流程,流程如下:

https://images.3wcode.com/3wcode/20240805/b_0_202408052359163552.png

flink将kafka主题user_behavior内容,通过flink sql client,存入hbase表venn中。

一、开发环境

组件        版本

flink(ha) 1.17.2

zookeeper       3.4.5

hadoop   3.2.3

hbase(ha)       2.4.15

kafka(ha)        3.2.1

本次用到包flink 的lib目录中

sql连接器下载地址

本次解决问题是用了flink-sql-connector-hbase-2.2-1.17.2.jar

flink1.17.有两个版本连接器   1.4 和2.2 ,本次采用2.2、

下载网站及说明

 hbase | apache flink

n order to use the hbase connector the following dependencies are required for both projects using a build automation tool (such as maven or sbt) and sql client with sql jar bundles.

hbase version

maven dependency

sql client jar

1.4.x

<dependency>
  <groupid>org.apache.flink</groupid>
  <artifactid>flink-connector-hbase-1.4</artifactid>
  <version>1.17.2</version>
</dependency>

download

2.2.x

<dependency>
  <groupid>org.apache.flink</groupid>
  <artifactid>flink-connector-hbase-2.2</artifactid>
  <version>1.17.2</version>
</dependency>

downloa

二、实践用到操作

1.hbase命令

命令行                  作用

hbase shell                进入命令行客户端

create 'venn','cf'            新建表格venn,其中cf是列簇

scan 'venn',{limit=>1}      查看新建的表格中的数据内容

2.kafka命令

kafka-topics.sh --list -bootstrap-server master:9092         查看topic

kafka-console-producer.sh --bootstrap-server master:9092 --topic user_behavior

kafka-console-consumer.sh --bootstrap-server master:9092 --from-beginning --topic user_behavior

kafka-topics.sh --bootstrap-server master:9092 --delete --topic user_behavior  

三、实验步骤

1.产生主题消息

注意:(1)数据示例如下(传一条按下回车

     (2)格式为json 所有字段值均加””

kafka-console-producer.sh --bootstrap-server master:9092 --topic user_behavior

{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26 01:00:00"}

{"user_id": "662867", "item_id":"2244074", "category_id": "1575622", "behavior": "pv", "ts": "2017-11-26 01:00:00"}

{"user_id": "561558", "item_id":"3611281", "category_id": "965809", "behavior": "pv", "ts": "2017-11-26 01:00:00"}

{"user_id": "894923", "item_id":"3076029", "category_id": "1879194", "behavior": "pv", "ts": "2017-11-26 01:00:00"}

{"user_id": "834377", "item_id":"4541270", "category_id": "3738615", "behavior": "pv", "ts": "2017-11-26 01:00:00"}

{"user_id": "315321", "item_id":"942195", "category_id": "4339722", "behavior": "pv", "ts": "2017-11-26 01:00:00"}

{"user_id": "625915", "item_id":"1162383", "category_id": "570735", "behavior": "pv", "ts": "2017-11-26 01:00:00"}

{"user_id": "578814", "item_id":"176722", "category_id": "982926", "behavior": "pv", "ts": "2017-11-26 01:00:00"}

{"user_id": "873335", "item_id":"1256540", "category_id": "1451783", "behavior": "pv", "ts": "2017-11-26 01:00:00"}

{"user_id": "429984", "item_id":"4625350", "category_id": "2355072", "behavior": "pv", "ts": "2017-11-26 01:00:00"}

{"user_id": "866796", "item_id":"534083", "category_id": "4203730", "behavior": "pv", "ts": "2017-11-26 01:00:00"}

{"user_id": "937166", "item_id":"321683", "category_id": "2355072", "behavior": "pv", "ts": "2017-11-26 01:00:00"}

2. 创建hbase表

#hbase shell   进行hbase命令行

hbase(main):014:0> create 'venn','cf'       创建表,有一个列族cf

3. 启动flink sql client将kafka主题写入hbase

(1)加入连接器到flink的lib目录中,并启动flink集群

(2)启动有关服务

#start-all.sh  //启动hadoop集群

#start-cluster.sh     //启动flink集群

# start-hbase.sh     启动hbase服务

注意附带要启动flink集群,本实验室standalone模式

# 启动kafka服务

#sql-client.sh   启动flink-sql窗口

(3)读 kafka(json)消息 并写入表sourcetable

flink-sql>create table user_log(

    user_id varchar,

    item_id varchar,

    category_id varchar,

    behavior varchar,

    ts timestamp(3)

) with (  'connector' = 'kafka',

  'topic' = 'user_behavior',

  'properties.bootstrap.servers' = 'master:9092',

  'properties.group.id' = 'flinkconsumer',

  'scan.startup.mode' = 'earliest-offset',

  'format' = 'json',

  'json.fail-on-missing-field' = 'false',

  'json.ignore-parse-errors' = 'true'

);

flink sql> select * from user_log;

注: flink与kafka连接所需要连接器

(4)创建表sinktable 用于连接hbase表

create table user_log_sink (

    user_id varchar,

    cf row(item_id varchar, category_id varchar, behavior varchar, ts timestamp(3))

)with (

 'connector' = 'hbase-2.2',

 'table-name' = 'venn',

 'zookeeper.quorum' = 'master:2181'

);

注  意:

以下格式是hbase连接器老版本格式。在flink1.17.2版本上练习时为此错误花费好长时间

create table user_log_sink (

    user_id varchar,

    cf row(item_id varchar, category_id varchar, behavior varchar, ts timestamp(3))

) with (

    'connector.type' = 'hbase',

    'connector.version' = ' hbase-2.2', -- hbase vesion

    'connector.table-name' = 'venn',                  -- hbase table name

    'connector.zookeeper.quorum' = 'master:2181',       -- zookeeper quorum

    'connector.zookeeper.znode.parent' = '/hbase'    -- hbase znode in zookeeper

'connector.write.buffer-flush.max-size' = '10mb', -- max flush size

    'connector.write.buffer-flush.max-rows' = '1000', -- max flush rows

    'connector.write.buffer-flush.interval' = '2s'    -- max flush interval

);

(5)提交任务,user_log表内容插入到user_log_sink,同时存入hbase

insert into user_log_sink

select user_id,

  row(item_id, category_id, behavior, ts ) as cf

from user_log;

(6)查询出现问题

  查询插入结果要利用 hbase shell查询,不能利用语句select * from user_log_sink;查询

注意查询是本次插入的记录,而不是hbase表记录。这一点要清楚。

错误1:对于以上版本,则出现以下错误,如下:

 [error] could not execute sql statement. reason:

org.apache.flink.table.factories.nomatchingtablefactoryexception: could not find a suitable table factory for 'org.apache.flink.table.factories.tablesourcefactory' in

the classpath.

错误2:出现以下错误

ava.lang.nosuchmethoderror: org.apache.flink.table.types.logical.utils.logicaltypechecks.hasfamily(lorg/apache/flink/table/types/logical/logicaltype;lorg/apache/flink/table/types/logical/logicaltypefamily;)z

是版本号错误 flink-sql-connector-hbase-2.2-1.17.2.jar     1.17.2与flink版本号不对

结束语

  (1)主题消息按行组织 格式:{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26 01:00:00"}

{"字段1": "值1", "字段2":"值2", …… , "字段n": "值n"}

主题字段与hbase表中键、列名相对应,否则无法正确写入表格

(2)令人头痛的是 有两点:是flink与hbase连接器,关键版本对应关系要试。

 flink1.17.2 对应flink-sql-connector-hbase-2.2-1.17.2.jar 

(3)网上案例,很少有做出来的。原因就在于版本不同,代码也不近相同,但不失参考价值。唯一方法就是多找几份参考。

(4)对于一大题,可拆成若干步,如上面可拆成连接kafka,连接hbase,插入三步,缩小问题范围。

第二部分  table api编程模式

一、代码

import org.apache.flink.streaming.api.timecharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala.streamtableenvironment
object  scala_1{
  
def main(args: array[string]): unit = {
     
val env = streamexecutionenvironment.getexecutionenvironment
     
//默认流时间方式
     
env.setstreamtimecharacteristic(timecharacteristic.processingtime)
    
//构建streamtableenvironment
     
val tenv = streamtableenvironment.create(env)
     
val table_sql =
        
"""create table st (
           |`user_id` varchar,
           |`item_id` varchar,
           |`category_id` varchar,
           |`behavior` varchar,
           |`ts` timestamp(3))
           |with ('connector' = 'kafka',
           |'topic' = 'user_behavior',
           |'properties.bootstrap.servers' = 'master:9092',
           |'properties.group.id' = 'testgroup',
           |'scan.startup.mode' = 'earliest-offset',
           |'format' = 'json')"""
.stripmargin
          
val f=tenv.executesql(table_sql)

 tenv.todatastream(tenv.sqlquery("select * from st")) //语句不能少,相当于刷新,否则无法为下沉表sk提供流数据
         val f2=tenv.executesql(
       
"""
         create table sk (
          user_id varchar,
          cf row(item_id varchar, category_id varchar, behavior varchar, ts timestamp(3))
      )with (
       'connector' = 'hbase-2.2',
       'table-name' = 'venn',
       'zookeeper.quorum' = 'master:2181'
       ) """
.stripmargin)
       tenv.executesql(
        
"""
           |insert into sk
           |select user_id,row(item_id,category_id,behavior,ts) as cf from st
          """
.stripmargin)
      env.execute()
   }
}

二、 pom.xlm

<?xml version="1.0" encoding="utf-8"?>
<project
xmlns="http://maven.apache.org/pom/4.0.0"
        
xmlns:xsi="http://www.w3.org/2001/xmlschema-instance"
        
xsi:schemalocation="http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelversion>
4.0.0</modelversion>
    <groupid>
org.example</groupid>
    <artifactid>
flinktutorial</artifactid>
    <version>
1.0-snapshot</version>

    <properties>
        <maven.compiler.source>
8</maven.compiler.source>
        <maven.compiler.target>
8</maven.compiler.target>
        <project.build.sourceencoding>
utf-8</project.build.sourceencoding>
        <scala.version>
2.12</scala.version>
        <hdfs.version>
3.2.3</hdfs.version>
        <flink.version>
1.17.1</flink.version>
        <kafka.version>
3.2.1</kafka.version>
        <flink-connector-redis.verion>
1.1.5</flink-connector-redis.verion>
        <mysql.version>
5.1.47</mysql.version>
        <mysqlconnect.version>
5.1.47</mysqlconnect.version>
        <lang3.version>
3.9</lang3.version>

    </properties>
    <dependencies>
        <dependency>
            <groupid>
com.esotericsoftware</groupid>
            <artifactid>
kryo-shaded</artifactid>
            <version>
4.0.2</version>
        </dependency>

       
<!--scala-->
       
<dependency>
            <groupid>
org.scala-lang</groupid>
            <artifactid>
scala-reflect</artifactid>
            <version>
${scala.version}.12</version>
        </dependency>
        <dependency>
            <groupid>
org.scala-lang</groupid>
            <artifactid>
scala-compiler</artifactid>
            <version>
${scala.version}.12</version>
        </dependency>
        <dependency>
            <groupid>
org.scala-lang</groupid>
            <artifactid>
scala-library</artifactid>
            <version>
${scala.version}.12</version>
        </dependency>

       
<!--kafka-->
       
<dependency>
            <groupid>
org.apache.kafka</groupid>
            <artifactid>
kafka_${scala.version}</artifactid>
            <version>
${kafka.version}</version>
        </dependency>

       
<!--flink 实时处理-->
       
<dependency>
            <groupid>
org.apache.flink</groupid>
            <artifactid>
flink-runtime-web</artifactid>
            <version>
${flink.version}</version>
        </dependency>
        <dependency>
            <groupid>
org.apache.flink</groupid>
            <artifactid>
flink-clients</artifactid>
            <version>
${flink.version}</version>
        </dependency>
        <dependency>
            <groupid>
org.apache.flink</groupid>
            <artifactid>
flink-streaming-scala_${scala.version}</artifactid>
            <version>
${flink.version}</version>
        </dependency>
        <dependency>
            <groupid>
org.apache.flink</groupid>
            <artifactid>
flink-sql-connector-kafka</artifactid>
            <version>
${flink.version}</version>
        </dependency>
        <dependency>
            <groupid>
org.apache.flink</groupid>
            <artifactid>
flink-table-common</artifactid>
            <version>
1.17.2</version>
        </dependency>


        <dependency>
            <groupid>
org.apache.flink</groupid>
            <artifactid>
flink-table-api-scala-bridge_2.12</artifactid>
            <version>
1.17.2</version>
        </dependency>
        <dependency>
            <groupid>
org.apache.flink</groupid>
            <artifactid>
flink-table-planner_2.12</artifactid>
            <version>
1.17.2</version>
        </dependency>

        <dependency>
            <groupid>
org.apache.flink</groupid>
            <artifactid>
flink-json</artifactid>
            <version>
${flink.version}</version>
        </dependency>
        <dependency>
            <groupid>
org.apache.flink</groupid>
            <artifactid>
flink-connector-hive_${scala.version}
           
</artifactid>
            <version>
${flink.version}</version>
        </dependency>
       
<!--mysql连接器-->
       
<dependency>
            <groupid>
org.apache.flink</groupid>
            <artifactid>
flink-connector-jdbc_2.12</artifactid>
            <version>
1.14.0</version>
        </dependency>

       
<!-- hadoop相关-->
       
<dependency>
            <groupid>
org.apache.hadoop</groupid>
            <artifactid>
hadoop-client</artifactid>
            <version>
${hdfs.version}</version>
        </dependency>
        <dependency>
            <groupid>
org.apache.hadoop</groupid>
            <artifactid>
hadoop-auth</artifactid>
            <version>
${hdfs.version}</version>
        </dependency>

<!—table api下以下两个hbase连接器都可用,之一也可-->

        <dependency>
            <groupid>
org.apache.flink</groupid>
            <artifactid>
flink-sql-connector-hbase-2.2</artifactid>
            <version>
1.17.2</version>
        </dependency>
        <dependency>
            <groupid>
org.apache.flink</groupid>
            <artifactid>
flink-connector-hbase-2.2</artifactid>
            <version>
1.17.2</version>
        </dependency>
    </dependencies>
</project>

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com