目录
前言
总结 flink 读写 hbase
版本
-
flink 1.15.4
-
hbase 2.0.2
-
hudi 0.13.0
官方文档
https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/connectors/table/hbase/
jar包
https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-hbase-2.2/1.15.4/flink-sql-connector-hbase-2.2-1.15.4.jar
sql
hbase shell创建hbase表
hbase shell
create 'flink_hbase_table', 'cf'
flink 写 hbase
create table flink_hbase_table(
id int,
cf row<name string,price double,ts bigint, dt string>,
primary key (id) not enforced
) with (
'connector' = 'hbase-2.2',
'table-name' = 'flink_hbase_table',
'zookeeper.quorum' = 'indata-192-168-44-128.indata.com:2181,indata-192-168-44-129.indata.com:2181,indata-192-168-44-130.indata.com:2181',
'zookeeper.znode.parent' = '/hbase-secure'
);
insert into flink_hbase_table values (1,row('hudi',10.1,1000,'2023-10-10'));
flink 读 hbase
select * from flink_hbase_table;
select id,name,price,ts,dt from flink_hbase_table;
hbase shell 验证数据
hbase(main):002:0> scan 'flink_hbase_table'
row column+cell
\x00\x00\x00\x01 column=cf:dt, timestamp=1697160801719, value=2023-10-10
\x00\x00\x00\x01 column=cf:name, timestamp=1697160801719, value=hudi
\x00\x00\x00\x01 column=cf:price, timestamp=1697160801719, value=@$333333
\x00\x00\x00\x01 column=cf:ts, timestamp=1697160801719, value=\x00\x00\x00\x00\x00\x00\x03\xe8
1 row(s)
took 0.4339 seconds
参数
参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
connector | 必选 | (none) | string | 指定使用的连接器, 支持的值如下 :hbase-1.4: 连接 hbase 1.4.x 集群hbase-2.2: 连接 hbase 2.2.x 集群(我的hbase版本为2.0.2) |
table-name | 必选 | (none) | string | 连接的 hbase 表名。默认该表在 "default" 命名空间下,指定命名空间下的表需要使用 "namespace:table"。 |
zookeeper.quorum | 必选 | (none) | string | hbase zookeeper quorum 信息。 |
zookeeper.znode.parent | 可选 | /hbase | string | hbase 集群的 zookeeper 根目录。 |
properties.* | 可选 | (无) | string | 可以设置任意 hbase 的配置项。后缀名必须匹配在 hbase 配置文档 中定义的配置键。flink 将移除 "properties." 配置键前缀并将变换后的配置键和值传入底层的 hbase 客户端。例如您可以设置 'properties.hbase.security.authentication' = 'kerberos' 等kerberos认证参数。 |
hudi包兼容性
前提:在开启了kerberos的环境上
当flink lib下面存在hudi(0.13.0版本)包时会出现flink连接不上hbase的现象,具体表现为:
1、flink 查询 hbase 时,会抛异常:
ava.net.sockettimeoutexception: calltimeout=60000, callduration=74175: call to indata-192-168-44-128.indata.com/192.168.44.128:16020 failed on local exception: org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.exceptions.connectionclosedexception: connection closed row 'flink_hbase_table,,' on table 'hbase:meta' at region=hbase:meta,,1.1588230740, hostname=indata-192-168-44-128.indata.com,16020,1695447819772, seqnum=-1
at org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.rpcretryingcallerimpl.callwithretries(rpcretryingcallerimpl.java:159)
at org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.resultboundedcompletionservice$queueingfuture.run(resultboundedcompletionservice.java:80)
at java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1149)
at java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:624)
at java.lang.thread.run(thread.java:748)
caused by: org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.exceptions.connectionclosedexception: call to indata-192-168-44-128.indata.com/192.168.44.128:16020 failed on local exception: org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.exceptions.connectionclosedexception: connection closed
2、flink 写 hbase 时,不报异常,但是会卡住,卡15分钟(正常执行时间20s)左右显示任务完成,实际上没有写成功。
原因
出现这种现象的原因是hudi包下存在hbase-site.xml,其中安全认证相关的配置和kerberos环境不一致。
获取hbase配置的逻辑和优先级
优先级:
-
1、用户自定义参数 优先级最高 (sql中配置的)
-
2、环境变量 优先级第二
环境变量一共有两个:hbase_conf_dir
和hbase_home
,其中hbase_conf_dir
的优先级要高于hbase_home
,
这两个环境变量下有两个配置文件 hbase-site.xml 和 hbase-default.xml 其中hbase-site.xml 优先级要高于 hbase-default.xml,也就是一共有四个优先级:
2.1hbase_conf_dir
/conf/hbase-site.xml
2.2hbase_conf_dir
/conf/hbase-default.xml
2.3hbase_home
/conf/hbase-site.xml
2.4hbase_home
/conf/hbase-default.xml -
3、classpath 优先级最低,其中也有两个配置文件 hbase-site.xml 和 hbase-default.xml,hbase-site.xml 优先级要高于 hbase-default.xml
如hudi包中就存在 hbase-site.xml 和 hbase-default.xml
另外classpath可能有多个目录,多个目录之间也有优先级,对于我们环境有两个classpath存在hbase-site.xml,一个flink lib路径下面的hudi包,一个是/etc/hbase/conf。
为啥是/etc/hbase/conf,具体逻辑在flink bin/config.sh中:
# try and set hbase_conf_dir to some common default if it's not set
if [ -z "$hbase_conf_dir" ]; then
if [ -d "/etc/hbase/conf" ]; then
echo "setting hbase_conf_dir=/etc/hbase/conf because no hbase_conf_dir was set."
hbase_conf_dir="/etc/hbase/conf"
fi
fi
因为我们环境默认没有配置hbase_conf_dir,并且存在/etc/hbase/conf,所以就会走到这个逻辑,我们在启动sql-client时也会看到:
bash setting hbase_conf_dir=/etc/hbase/conf because no hbase_conf_dir was set
那又是在哪里将这里的hbase_conf_dir加到classpath中的呢?它是在sql-client.sh中通过java -classpath 参数添加的。
cc_classpath优先级高于internal_hadoop_classpaths ,flink lib属于cc_classpath,/etc/hbase/conf 属于internal_hadoop_classpaths
/etc/hbase/conf 属于internal_hadoop_classpaths:
flink lib属于cc_classpath:
因为上面这个函数(constructflinkclasspath
)的逻辑我不太确定,我们可以在sql-clent.sh中打印: echo $cc_classpath
读取hbase配置的逻辑可以查看源码,本文先不分析这块源码,可能会在下一篇文章补充分析部分源码,先只截个图:hbase2dynamictablefactory
prepareruntimeconfiguration
(hbasesinkfunction
和 hbaserowdatalookupfunction
中都有这个方法,分别是写和读)
解决方法
知道了hbase配置的优先级,弄明白了hudi包中的hbase-site.xml为啥会影响flink读写hbase,也就知道如何解决这个问题,我们只需要根据优先级设置正确的hbase配置参数就好了。比如只有classpath中存在hbase配置那么我们就需要修改classpath中的hbase配置为正确配置。如果不想修改classpath中的配置文件或者觉得这样做不合适,我们可以设置更好优先级的配置,比如设置环境变量hbase_conf_dir
和 hbase_home
指向正确的hbase配置,另外我们也可以通过在sql参数中配置正确的参数,因为用户参数级别最高,这样配置优点是比较灵活,缺点是需要用户每次都多写一下额外的配置。
解决方法1
删除hudi包里面的hbase-site.xml(hudi-flink1.15-bundle-0.13.0.jar),这样就会去加载我们服务器环境上的正确的hbase-site.xml (/etc/hbase/conf)
解决方法2
修改hudi包里面的hbase-site.xml中的kerberos配置:
# 经测试我们环境只需要这两个配置,可以根据自己的环境调整
hbase.security.authentication true
hbase.regionserver.kerberos.principal hbase/_host@indata.com
解决方法3
通过在建表语句中添加配置
create table flink_hbase_table(
id int,
cf row<name string,price double,ts bigint, dt string>,
primary key (id) not enforced
) with (
'connector' = 'hbase-2.2',
'table-name' = 'flink_hbase_table',
'zookeeper.quorum' = 'indata-192-168-44-128.indata.com:2181,indata-192-168-44-129.indata.com:2181,indata-192-168-44-130.indata.com:2181',
'zookeeper.znode.parent' = '/hbase-secure',
'properties.hbase.security.authentication' = 'kerberos',
--'properties.hbase.master.kerberos.principal' = 'hbase/_host@indata.com',
'properties.hbase.regionserver.kerberos.principal' = 'hbase/_host@indata.com'
);
不启用kerberos的配置:
create table dkl(
id int,
cf row<name string,price double,ts bigint, dt string>,
primary key (id) not enforced
) with (
'connector' = 'hbase-2.2',
'table-name' = 'flink_hbase_table',
'zookeeper.quorum' = 'indata-192-168-44-128.indata.com:2181,indata-10-110-105-163.indata.com:2181,indata-10-110-105-164.indata.com:2181',
'zookeeper.znode.parent' = '/hbase-unsecure',
'properties.hbase.security.authentication' = 'simple'
);
解决方法4
配置环境变量hbase_conf_dir
和 hbase_home
中的其中一个即可:
# 配置环境变量指向正确的hbase配置路径
export hbase_conf_dir = /etc/hbase/conf
export hbase_home = /etc/hbase/conf
解决方法5
通过修改源码,添加参数支持通过参数配置:
'hbase.conf.dir'='/opt/dkl/hbase/conf'
打包:
## 修改代码,需要先检查代码格式
mvn spotless:apply -pl flink-connectors/flink-connector-hbase-2.2
## 先编译 hbase-base ,因为 hbase-2.2 依赖 hbase-base中的代码
mvn clean install -dskiptests -pl flink-connectors/flink-connector-hbase-base
mvn clean install -dskiptests -pl flink-connectors/flink-connector-hbase-2.2
## 最后将 flink-sql-connector-hbase-2.2 打包,最后打出来的包名为 flink-sql-connector-hbase-2.2-1.15.4.jar
mvn clean package -dskiptests -pl flink-connectors/flink-sql-connector-hbase-2.2
总结
本文总结了flink sql 读写 hbase 的参数配置,解决了在kerberos环境下因 hudi 包 hbase-site.xml 配置冲突引起的异常,学习总结了 flink sql 读写 hbase 时加载 hbase 配置的优先级,但是没有详细的分析源码中的逻辑,可能会在后面的文章中补充相关的源码分析~
发表评论