作者:陶运道
目录
第一部分 纯ddl编程模式
第二部分 table api编程模式
第一部分 纯ddl编程模式
ddl形式实现kafka->flink->hbase,具体的流程,流程如下:
flink将kafka主题user_behavior内容,通过flink sql client,存入hbase表venn中。
一、开发环境
组件 版本
flink(ha) 1.17.2
zookeeper 3.4.5
hadoop 3.2.3
hbase(ha) 2.4.15
kafka(ha) 3.2.1
本次用到包flink 的lib目录中
sql连接器下载地址
本次解决问题是用了flink-sql-connector-hbase-2.2-1.17.2.jar
flink1.17.有两个版本连接器 1.4 和2.2 ,本次采用2.2、
下载网站及说明
n order to use the hbase connector the following dependencies are required for both projects using a build automation tool (such as maven or sbt) and sql client with sql jar bundles.
hbase version | maven dependency | sql client jar |
1.4.x | <dependency>
</dependency> | |
2.2.x | <dependency>
</dependency> |
二、实践用到操作
1.hbase命令
命令行 作用
hbase shell 进入命令行客户端
create 'venn','cf' 新建表格venn,其中cf是列簇
scan 'venn',{limit=>1} 查看新建的表格中的数据内容
2.kafka命令
kafka-topics.sh --list -bootstrap-server master:9092 查看topic
kafka-console-producer.sh --bootstrap-server master:9092 --topic user_behavior
kafka-console-consumer.sh --bootstrap-server master:9092 --from-beginning --topic user_behavior
kafka-topics.sh --bootstrap-server master:9092 --delete --topic user_behavior
三、实验步骤
1.产生主题消息
注意:(1)数据示例如下(传一条按下回车
(2)格式为json 所有字段值均加””
kafka-console-producer.sh --bootstrap-server master:9092 --topic user_behavior
{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26 01:00:00"}
{"user_id": "662867", "item_id":"2244074", "category_id": "1575622", "behavior": "pv", "ts": "2017-11-26 01:00:00"}
{"user_id": "561558", "item_id":"3611281", "category_id": "965809", "behavior": "pv", "ts": "2017-11-26 01:00:00"}
{"user_id": "894923", "item_id":"3076029", "category_id": "1879194", "behavior": "pv", "ts": "2017-11-26 01:00:00"}
{"user_id": "834377", "item_id":"4541270", "category_id": "3738615", "behavior": "pv", "ts": "2017-11-26 01:00:00"}
{"user_id": "315321", "item_id":"942195", "category_id": "4339722", "behavior": "pv", "ts": "2017-11-26 01:00:00"}
{"user_id": "625915", "item_id":"1162383", "category_id": "570735", "behavior": "pv", "ts": "2017-11-26 01:00:00"}
{"user_id": "578814", "item_id":"176722", "category_id": "982926", "behavior": "pv", "ts": "2017-11-26 01:00:00"}
{"user_id": "873335", "item_id":"1256540", "category_id": "1451783", "behavior": "pv", "ts": "2017-11-26 01:00:00"}
{"user_id": "429984", "item_id":"4625350", "category_id": "2355072", "behavior": "pv", "ts": "2017-11-26 01:00:00"}
{"user_id": "866796", "item_id":"534083", "category_id": "4203730", "behavior": "pv", "ts": "2017-11-26 01:00:00"}
{"user_id": "937166", "item_id":"321683", "category_id": "2355072", "behavior": "pv", "ts": "2017-11-26 01:00:00"}
2. 创建hbase表
#hbase shell 进行hbase命令行
hbase(main):014:0> create 'venn','cf' 创建表,有一个列族cf
3. 启动flink sql client将kafka主题写入hbase
(1)加入连接器到flink的lib目录中,并启动flink集群
(2)启动有关服务
#start-all.sh //启动hadoop集群
#start-cluster.sh //启动flink集群
# start-hbase.sh 启动hbase服务
注意附带要启动flink集群,本实验室standalone模式
# 启动kafka服务
#sql-client.sh 启动flink-sql窗口
(3)读 kafka(json)消息 并写入表sourcetable
flink-sql>create table user_log(
user_id varchar,
item_id varchar,
category_id varchar,
behavior varchar,
ts timestamp(3)
) with ( 'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'master:9092',
'properties.group.id' = 'flinkconsumer',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
);
flink sql> select * from user_log;
注: flink与kafka连接所需要连接器
(4)创建表sinktable 用于连接hbase表
create table user_log_sink (
user_id varchar,
cf row(item_id varchar, category_id varchar, behavior varchar, ts timestamp(3))
)with (
'connector' = 'hbase-2.2',
'table-name' = 'venn',
'zookeeper.quorum' = 'master:2181'
);
注 意:
以下格式是hbase连接器老版本格式。在flink1.17.2版本上练习时为此错误花费好长时间
create table user_log_sink (
user_id varchar,
cf row(item_id varchar, category_id varchar, behavior varchar, ts timestamp(3))
) with (
'connector.type' = 'hbase',
'connector.version' = ' hbase-2.2', -- hbase vesion
'connector.table-name' = 'venn', -- hbase table name
'connector.zookeeper.quorum' = 'master:2181', -- zookeeper quorum
'connector.zookeeper.znode.parent' = '/hbase' -- hbase znode in zookeeper
'connector.write.buffer-flush.max-size' = '10mb', -- max flush size
'connector.write.buffer-flush.max-rows' = '1000', -- max flush rows
'connector.write.buffer-flush.interval' = '2s' -- max flush interval
);
(5)提交任务,将user_log表内容插入到user_log_sink,同时存入hbase
insert into user_log_sink
select user_id,
row(item_id, category_id, behavior, ts ) as cf
from user_log;
(6)查询出现问题
查询插入结果要利用 hbase shell查询,不能利用语句select * from user_log_sink;查询
注意查询是本次插入的记录,而不是hbase表记录。这一点要清楚。
错误1:对于以上版本,则出现以下错误,如下:
[error] could not execute sql statement. reason:
org.apache.flink.table.factories.nomatchingtablefactoryexception: could not find a suitable table factory for 'org.apache.flink.table.factories.tablesourcefactory' in
the classpath.
错误2:出现以下错误
ava.lang.nosuchmethoderror: org.apache.flink.table.types.logical.utils.logicaltypechecks.hasfamily(lorg/apache/flink/table/types/logical/logicaltype;lorg/apache/flink/table/types/logical/logicaltypefamily;)z
是版本号错误 flink-sql-connector-hbase-2.2-1.17.2.jar 1.17.2与flink版本号不对
结束语
(1)主题消息按行组织 格式:{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26 01:00:00"}
{"字段1": "值1", "字段2":"值2", …… , "字段n": "值n"}
主题字段与hbase表中键、列名相对应,否则无法正确写入表格
(2)令人头痛的是 有两点:是flink与hbase连接器,关键版本对应关系要试。
flink1.17.2 对应flink-sql-connector-hbase-2.2-1.17.2.jar
(3)网上案例,很少有做出来的。原因就在于版本不同,代码也不近相同,但不失参考价值。唯一方法就是多找几份参考。
(4)对于一大题,可拆成若干步,如上面可拆成连接kafka,连接hbase,插入三步,缩小问题范围。
第二部分 table api编程模式
一、代码
import org.apache.flink.streaming.api.timecharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala.streamtableenvironment
object scala_1{
def main(args: array[string]): unit = {
val env = streamexecutionenvironment.getexecutionenvironment
//默认流时间方式
env.setstreamtimecharacteristic(timecharacteristic.processingtime)
//构建streamtableenvironment
val tenv = streamtableenvironment.create(env)
val table_sql =
"""create table st (
|`user_id` varchar,
|`item_id` varchar,
|`category_id` varchar,
|`behavior` varchar,
|`ts` timestamp(3))
|with ('connector' = 'kafka',
|'topic' = 'user_behavior',
|'properties.bootstrap.servers' = 'master:9092',
|'properties.group.id' = 'testgroup',
|'scan.startup.mode' = 'earliest-offset',
|'format' = 'json')""".stripmargin
val f=tenv.executesql(table_sql)
tenv.todatastream(tenv.sqlquery("select * from st")) //语句不能少,相当于刷新,否则无法为下沉表sk提供流数据
val f2=tenv.executesql(
"""
create table sk (
user_id varchar,
cf row(item_id varchar, category_id varchar, behavior varchar, ts timestamp(3))
)with (
'connector' = 'hbase-2.2',
'table-name' = 'venn',
'zookeeper.quorum' = 'master:2181'
) """.stripmargin)
tenv.executesql(
"""
|insert into sk
|select user_id,row(item_id,category_id,behavior,ts) as cf from st
""".stripmargin)
env.execute()
}
}
二、 pom.xlm
<?xml version="1.0" encoding="utf-8"?>
<project xmlns="http://maven.apache.org/pom/4.0.0"
xmlns:xsi="http://www.w3.org/2001/xmlschema-instance"
xsi:schemalocation="http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelversion>4.0.0</modelversion>
<groupid>org.example</groupid>
<artifactid>flinktutorial</artifactid>
<version>1.0-snapshot</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceencoding>utf-8</project.build.sourceencoding>
<scala.version>2.12</scala.version>
<hdfs.version>3.2.3</hdfs.version>
<flink.version>1.17.1</flink.version>
<kafka.version>3.2.1</kafka.version>
<flink-connector-redis.verion>1.1.5</flink-connector-redis.verion>
<mysql.version>5.1.47</mysql.version>
<mysqlconnect.version>5.1.47</mysqlconnect.version>
<lang3.version>3.9</lang3.version>
</properties>
<dependencies>
<dependency>
<groupid>com.esotericsoftware</groupid>
<artifactid>kryo-shaded</artifactid>
<version>4.0.2</version>
</dependency>
<!--scala-->
<dependency>
<groupid>org.scala-lang</groupid>
<artifactid>scala-reflect</artifactid>
<version>${scala.version}.12</version>
</dependency>
<dependency>
<groupid>org.scala-lang</groupid>
<artifactid>scala-compiler</artifactid>
<version>${scala.version}.12</version>
</dependency>
<dependency>
<groupid>org.scala-lang</groupid>
<artifactid>scala-library</artifactid>
<version>${scala.version}.12</version>
</dependency>
<!--kafka-->
<dependency>
<groupid>org.apache.kafka</groupid>
<artifactid>kafka_${scala.version}</artifactid>
<version>${kafka.version}</version>
</dependency>
<!--flink 实时处理-->
<dependency>
<groupid>org.apache.flink</groupid>
<artifactid>flink-runtime-web</artifactid>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupid>org.apache.flink</groupid>
<artifactid>flink-clients</artifactid>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupid>org.apache.flink</groupid>
<artifactid>flink-streaming-scala_${scala.version}</artifactid>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupid>org.apache.flink</groupid>
<artifactid>flink-sql-connector-kafka</artifactid>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupid>org.apache.flink</groupid>
<artifactid>flink-table-common</artifactid>
<version>1.17.2</version>
</dependency>
<dependency>
<groupid>org.apache.flink</groupid>
<artifactid>flink-table-api-scala-bridge_2.12</artifactid>
<version>1.17.2</version>
</dependency>
<dependency>
<groupid>org.apache.flink</groupid>
<artifactid>flink-table-planner_2.12</artifactid>
<version>1.17.2</version>
</dependency>
<dependency>
<groupid>org.apache.flink</groupid>
<artifactid>flink-json</artifactid>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupid>org.apache.flink</groupid>
<artifactid>flink-connector-hive_${scala.version}
</artifactid>
<version>${flink.version}</version>
</dependency>
<!--mysql连接器-->
<dependency>
<groupid>org.apache.flink</groupid>
<artifactid>flink-connector-jdbc_2.12</artifactid>
<version>1.14.0</version>
</dependency>
<!-- hadoop相关-->
<dependency>
<groupid>org.apache.hadoop</groupid>
<artifactid>hadoop-client</artifactid>
<version>${hdfs.version}</version>
</dependency>
<dependency>
<groupid>org.apache.hadoop</groupid>
<artifactid>hadoop-auth</artifactid>
<version>${hdfs.version}</version>
</dependency>
<!—在table api下以下两个hbase连接器都可用,之一也可-->
<dependency>
<groupid>org.apache.flink</groupid>
<artifactid>flink-sql-connector-hbase-2.2</artifactid>
<version>1.17.2</version>
</dependency>
<dependency>
<groupid>org.apache.flink</groupid>
<artifactid>flink-connector-hbase-2.2</artifactid>
<version>1.17.2</version>
</dependency>
</dependencies>
</project>
发表评论