当前位置: 代码网 > it编程>数据库>MsSqlserver > Flink 内容分享(七):Flink 读写 HBase 总结

Flink 内容分享(七):Flink 读写 HBase 总结

2024年07月28日 MsSqlserver 我要评论
总结 Flink 读写 HBase本文总结了Flink SQL 读写 HBase 的参数配置,解决了在kerberos环境下因 hudi 包 hbase-site.xml 配置冲突引起的异常,学习总结了 Flink SQL 读写 HBase 时加载 HBase 配置的优先级,但是没有详细的分析源码中的逻辑,可能会在后面的文章中补充相关的源码分析~

目录

前言

版本

官方文档

jar包

sql

hbase shell创建hbase表

flink 写 hbase

flink 读 hbase

hbase shell 验证数据

参数

hudi包兼容性

原因

获取hbase配置的逻辑和优先级

解决方法

解决方法1

解决方法2

解决方法3

解决方法4

解决方法5

总结


前言

总结 flink 读写 hbase

版本

  • flink 1.15.4

  • hbase 2.0.2

  • hudi 0.13.0

官方文档

https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/connectors/table/hbase/

jar包

https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-hbase-2.2/1.15.4/flink-sql-connector-hbase-2.2-1.15.4.jar

sql

hbase shell创建hbase表

hbase shell
create 'flink_hbase_table', 'cf'

flink 写 hbase

create table flink_hbase_table(
id int,
cf row<name string,price double,ts bigint, dt string>,
primary key (id) not enforced
) with (
'connector' = 'hbase-2.2',
'table-name' = 'flink_hbase_table',
'zookeeper.quorum' = 'indata-192-168-44-128.indata.com:2181,indata-192-168-44-129.indata.com:2181,indata-192-168-44-130.indata.com:2181',
'zookeeper.znode.parent' = '/hbase-secure'
);

insert into flink_hbase_table values (1,row('hudi',10.1,1000,'2023-10-10'));

flink 读 hbase

select * from flink_hbase_table;

图片

 

select id,name,price,ts,dt from flink_hbase_table;

图片

 

hbase shell 验证数据

hbase(main):002:0> scan 'flink_hbase_table'
row                                                                   column+cell
 \x00\x00\x00\x01                                                     column=cf:dt, timestamp=1697160801719, value=2023-10-10
 \x00\x00\x00\x01                                                     column=cf:name, timestamp=1697160801719, value=hudi
 \x00\x00\x00\x01                                                     column=cf:price, timestamp=1697160801719, value=@$333333
 \x00\x00\x00\x01                                                     column=cf:ts, timestamp=1697160801719, value=\x00\x00\x00\x00\x00\x00\x03\xe8
1 row(s)
took 0.4339 seconds

参数

参数是否必选默认值数据类型描述
connector必选(none)string指定使用的连接器, 支持的值如下 :hbase-1.4: 连接 hbase 1.4.x 集群hbase-2.2: 连接 hbase 2.2.x 集群(我的hbase版本为2.0.2)
table-name必选(none)string连接的 hbase 表名。默认该表在 "default" 命名空间下,指定命名空间下的表需要使用 "namespace:table"。
zookeeper.quorum必选(none)stringhbase zookeeper quorum 信息。
zookeeper.znode.parent可选/hbasestringhbase 集群的 zookeeper 根目录。
properties.*可选(无)string可以设置任意 hbase 的配置项。后缀名必须匹配在 hbase 配置文档 中定义的配置键。flink 将移除 "properties." 配置键前缀并将变换后的配置键和值传入底层的 hbase 客户端。例如您可以设置 'properties.hbase.security.authentication' = 'kerberos' 等kerberos认证参数。

hudi包兼容性

前提:在开启了kerberos的环境上

当flink lib下面存在hudi(0.13.0版本)包时会出现flink连接不上hbase的现象,具体表现为:
1、flink 查询 hbase 时,会抛异常:

ava.net.sockettimeoutexception: calltimeout=60000, callduration=74175: call to indata-192-168-44-128.indata.com/192.168.44.128:16020 failed on local exception: org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.exceptions.connectionclosedexception: connection closed row 'flink_hbase_table,,' on table 'hbase:meta' at region=hbase:meta,,1.1588230740, hostname=indata-192-168-44-128.indata.com,16020,1695447819772, seqnum=-1
    at org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.rpcretryingcallerimpl.callwithretries(rpcretryingcallerimpl.java:159)
    at org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.resultboundedcompletionservice$queueingfuture.run(resultboundedcompletionservice.java:80)
    at java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1149)
    at java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:624)
    at java.lang.thread.run(thread.java:748)
caused by: org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.exceptions.connectionclosedexception: call to indata-192-168-44-128.indata.com/192.168.44.128:16020 failed on local exception: org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.exceptions.connectionclosedexception: connection closed

2、flink 写 hbase 时,不报异常,但是会卡住,卡15分钟(正常执行时间20s)左右显示任务完成,实际上没有写成功。

图片

 

原因

出现这种现象的原因是hudi包下存在hbase-site.xml,其中安全认证相关的配置和kerberos环境不一致。

获取hbase配置的逻辑和优先级

优先级:

  • 1、用户自定义参数 优先级最高 (sql中配置的)

  • 2、环境变量 优先级第二
    环境变量一共有两个:hbase_conf_dir 和 hbase_home,其中 hbase_conf_dir 的优先级要高于hbase_home
    这两个环境变量下有两个配置文件 hbase-site.xml 和 hbase-default.xml 其中hbase-site.xml 优先级要高于 hbase-default.xml,也就是一共有四个优先级:
    2.1 hbase_conf_dir/conf/hbase-site.xml
    2.2 hbase_conf_dir/conf/hbase-default.xml
    2.3 hbase_home/conf/hbase-site.xml
    2.4 hbase_home/conf/hbase-default.xml

  • 3、classpath 优先级最低,其中也有两个配置文件 hbase-site.xml 和 hbase-default.xml,hbase-site.xml 优先级要高于 hbase-default.xml
    如hudi包中就存在 hbase-site.xml 和 hbase-default.xml
    另外classpath可能有多个目录,多个目录之间也有优先级,对于我们环境有两个classpath存在hbase-site.xml,一个flink lib路径下面的hudi包,一个是/etc/hbase/conf。
    为啥是/etc/hbase/conf,具体逻辑在flink bin/config.sh中:

   # try and set hbase_conf_dir to some common default if it's not set
    if [ -z "$hbase_conf_dir" ]; then
        if [ -d "/etc/hbase/conf" ]; then
            echo "setting hbase_conf_dir=/etc/hbase/conf because no hbase_conf_dir was set."
            hbase_conf_dir="/etc/hbase/conf"
        fi
    fi

因为我们环境默认没有配置hbase_conf_dir,并且存在/etc/hbase/conf,所以就会走到这个逻辑,我们在启动sql-client时也会看到:
  bash setting hbase_conf_dir=/etc/hbase/conf because no hbase_conf_dir was set

图片

 

那又是在哪里将这里的hbase_conf_dir加到classpath中的呢?它是在sql-client.sh中通过java -classpath 参数添加的。
 

图片

 

cc_classpath优先级高于internal_hadoop_classpaths    ,flink lib属于cc_classpath,/etc/hbase/conf 属于internal_hadoop_classpaths

/etc/hbase/conf 属于internal_hadoop_classpaths:

图片

 

flink lib属于cc_classpath:

图片

图片

 

因为上面这个函数(constructflinkclasspath)的逻辑我不太确定,我们可以在sql-clent.sh中打印: echo $cc_classpath

图片

 

读取hbase配置的逻辑可以查看源码,本文先不分析这块源码,可能会在下一篇文章补充分析部分源码,先只截个图:
hbase2dynamictablefactory

图片


prepareruntimeconfiguration (hbasesinkfunction和 hbaserowdatalookupfunction 中都有这个方法,分别是写和读)

图片

 

解决方法

知道了hbase配置的优先级,弄明白了hudi包中的hbase-site.xml为啥会影响flink读写hbase,也就知道如何解决这个问题,我们只需要根据优先级设置正确的hbase配置参数就好了。比如只有classpath中存在hbase配置那么我们就需要修改classpath中的hbase配置为正确配置。如果不想修改classpath中的配置文件或者觉得这样做不合适,我们可以设置更好优先级的配置,比如设置环境变量hbase_conf_dir 和 hbase_home指向正确的hbase配置,另外我们也可以通过在sql参数中配置正确的参数,因为用户参数级别最高,这样配置优点是比较灵活,缺点是需要用户每次都多写一下额外的配置。

解决方法1

删除hudi包里面的hbase-site.xml(hudi-flink1.15-bundle-0.13.0.jar),这样就会去加载我们服务器环境上的正确的hbase-site.xml (/etc/hbase/conf)

解决方法2

修改hudi包里面的hbase-site.xml中的kerberos配置:

# 经测试我们环境只需要这两个配置,可以根据自己的环境调整
hbase.security.authentication true
hbase.regionserver.kerberos.principal hbase/_host@indata.com

解决方法3

通过在建表语句中添加配置

create table flink_hbase_table(
id int,
cf row<name string,price double,ts bigint, dt string>,
primary key (id) not enforced
) with (
  'connector' = 'hbase-2.2',
  'table-name' = 'flink_hbase_table',
  'zookeeper.quorum' = 'indata-192-168-44-128.indata.com:2181,indata-192-168-44-129.indata.com:2181,indata-192-168-44-130.indata.com:2181',
  'zookeeper.znode.parent' = '/hbase-secure',
  'properties.hbase.security.authentication' = 'kerberos',
  --'properties.hbase.master.kerberos.principal' = 'hbase/_host@indata.com',
  'properties.hbase.regionserver.kerberos.principal' = 'hbase/_host@indata.com'
);

不启用kerberos的配置:

create table dkl(
id int,
cf row<name string,price double,ts bigint, dt string>,
primary key (id) not enforced
) with (
'connector' = 'hbase-2.2',
'table-name' = 'flink_hbase_table',
'zookeeper.quorum' = 'indata-192-168-44-128.indata.com:2181,indata-10-110-105-163.indata.com:2181,indata-10-110-105-164.indata.com:2181',
'zookeeper.znode.parent' = '/hbase-unsecure',
'properties.hbase.security.authentication' = 'simple'
);

解决方法4

配置环境变量hbase_conf_dir 和 hbase_home中的其中一个即可:

# 配置环境变量指向正确的hbase配置路径
export hbase_conf_dir = /etc/hbase/conf
export hbase_home = /etc/hbase/conf

解决方法5

通过修改源码,添加参数支持通过参数配置:

'hbase.conf.dir'='/opt/dkl/hbase/conf'

打包:

## 修改代码,需要先检查代码格式
mvn spotless:apply  -pl flink-connectors/flink-connector-hbase-2.2
## 先编译 hbase-base ,因为 hbase-2.2 依赖 hbase-base中的代码
mvn clean install -dskiptests -pl flink-connectors/flink-connector-hbase-base
mvn clean install -dskiptests -pl flink-connectors/flink-connector-hbase-2.2
## 最后将 flink-sql-connector-hbase-2.2 打包,最后打出来的包名为 flink-sql-connector-hbase-2.2-1.15.4.jar
mvn clean package -dskiptests -pl flink-connectors/flink-sql-connector-hbase-2.2

总结

本文总结了flink sql 读写 hbase 的参数配置,解决了在kerberos环境下因 hudi 包 hbase-site.xml 配置冲突引起的异常,学习总结了 flink sql 读写 hbase 时加载 hbase 配置的优先级,但是没有详细的分析源码中的逻辑,可能会在后面的文章中补充相关的源码分析~

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com