flink 系列文章
一、flink 专栏
flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。
-
1、flink 部署系列
本部分介绍flink的部署、配置相关基础内容。 -
2、flink基础系列
本部分介绍flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 -
3、flik table api和sql基础系列
本部分介绍flink table api和sql的基本用法,比如table api和sql创建库、表用法、查询、窗口函数、catalog等等内容。 -
4、flik table api和sql提高与应用系列
本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。 -
5、flink 监控系列
本部分和实际的运维、监控工作相关。
二、flink 示例专栏
flink 示例专栏是 flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。
两专栏的所有文章入口点击:flink 系列文章汇总索引
本文给出了通过table api 和sql 的两种方式创建视图,也就是虚表。同时为了更接近实用,通过table api 创建了一张hive的表,然后在该表上创建视图进行示例。
如果需要了解更多内容,可以在本人flink 专栏中了解更新系统的内容。
本文除了maven依赖外,没有其他依赖。
本文依赖hive、hadoop、kafka环境好用,代码中示例的hive配置文件路径根据你自己的环境而设置。
本文更详细的内容可参考文章:
17、flink 之table api: table api 支持的操作(1)
17、flink 之table api: table api 支持的操作(2)
本专题分为以下几篇文章:
【flink番外篇】9、flink table api 支持的操作示例(1)-通过table api和sql创建表
【flink番外篇】9、flink table api 支持的操作示例(2)- 通过table api 和 sql 创建视图
【flink番外篇】9、flink table api 支持的操作示例(3)- 通过api查询表和使用窗口函数的查询
【flink番外篇】9、flink table api 支持的操作示例(4)- table api 对表的查询、过滤操作
【flink番外篇】9、flink table api 支持的操作示例(5)- 表的列操作
【flink番外篇】9、flink table api 支持的操作示例(6)- 表的聚合(group by、distinct、groupby/over window aggregation)操作
【flink番外篇】9、flink table api 支持的操作示例(7)- 表的join操作(内联接、外联接以及联接自定义函数等)
【flink番外篇】9、flink table api 支持的操作示例(8)- 时态表的join(scala版本)
【flink番外篇】9、flink table api 支持的操作示例(9)- 表的union、unionall、intersect、intersectall、minus、minusall和in的操作
【flink番外篇】9、flink table api 支持的操作示例(10)- 表的orderby、offset 和 fetch、insert操作
【flink番外篇】9、flink table api 支持的操作示例(11)- group windows(tumbling、sliding和session)操作
【flink番外篇】9、flink table api 支持的操作示例(12)- over windows(有界和无界的over window)操作
【flink番外篇】9、flink table api 支持的操作示例(13)- row-based(map、flatmap、aggregate、group window aggregate等)操作
【flink番外篇】9、flink table api 支持的操作示例(14)- 时态表的join(java版本)
【flink番外篇】9、flink table api 支持的操作示例(1)-完整版
【flink番外篇】9、flink table api 支持的操作示例(2)-完整版
一、maven依赖
本文maven依赖参考文章:【flink番外篇】9、flink table api 支持的操作示例(1)-通过table api和sql创建表 中的依赖,为节省篇幅不再赘述。
二、示例:通过table api 和 sql 创建视图
1、示例:通过sql创建视图
本示例是通过sql创建一个简单的表,然后再通过sql创建一个视图,最后查询视图并输出结果。
import static org.apache.flink.table.api.expressions.$;
import static org.apache.flink.table.api.expressions.and;
import static org.apache.flink.table.api.expressions.lit;
import static org.apache.flink.table.expressions.apiexpressionutils.unresolvedcall;
import java.sql.timestamp;
import java.time.duration;
import java.util.arrays;
import java.util.collections;
import java.util.hashmap;
import java.util.list;
import org.apache.flink.api.common.eventtime.watermarkstrategy;
import org.apache.flink.api.common.typeinfo.typehint;
import org.apache.flink.api.common.typeinfo.typeinformation;
import org.apache.flink.api.java.tuple.tuple2;
import org.apache.flink.api.java.tuple.tuple3;
import org.apache.flink.streaming.api.datastream.datastream;
import org.apache.flink.streaming.api.environment.streamexecutionenvironment;
import org.apache.flink.streaming.connectors.kafka.table.kafkaconnectoroptions;
import org.apache.flink.table.api.datatypes;
import org.apache.flink.table.api.environmentsettings;
import org.apache.flink.table.api.over;
import org.apache.flink.table.api.schema;
import org.apache.flink.table.api.table;
import org.apache.flink.table.api.tabledescriptor;
import org.apache.flink.table.api.tableenvironment;
import org.apache.flink.table.api.tumble;
import org.apache.flink.table.api.bridge.java.streamtableenvironment;
import org.apache.flink.table.catalog.catalogdatabaseimpl;
import org.apache.flink.table.catalog.catalogview;
import org.apache.flink.table.catalog.column;
import org.apache.flink.table.catalog.objectpath;
import org.apache.flink.table.catalog.resolvedcatalogview;
import org.apache.flink.table.catalog.resolvedschema;
import org.apache.flink.table.catalog.hive.hivecatalog;
import org.apache.flink.table.functions.builtinfunctiondefinitions;
import org.apache.flink.types.row;
import com.google.common.collect.lists;
/**
* @author alanchan
*
*/
public class testtableapidemo {
/**
* @param args
* @throws exception
*/
public static void main(string[] args) throws exception {
streamexecutionenvironment env = streamexecutionenvironment.getexecutionenvironment();
streamtableenvironment tenv = streamtableenvironment.create(env);
// sql 创建输入表
string sourcesql = "create table alan_kafkatable (\r\n" +
" `event_time` timestamp(3) metadata from 'timestamp',\r\n" +
" `partition` bigint metadata virtual,\r\n" +
" `offset` bigint metadata virtual,\r\n" +
" `user_id` bigint,\r\n" +
" `item_id` bigint,\r\n" +
" `behavior` string\r\n" +
") with (\r\n" +
" 'connector' = 'kafka',\r\n" +
" 'topic' = 'user_behavior',\r\n" +
" 'properties.bootstrap.servers' = 'server1:9092,server2:9092,server3:9092',\r\n" +
" 'properties.group.id' = 'testgroup',\r\n" +
" 'scan.startup.mode' = 'earliest-offset',\r\n" +
" 'format' = 'csv'\r\n" +
");";
tenv.executesql(sourcesql);
//
string sql = "select user_id , behavior from alan_kafkatable group by user_id ,behavior ";
table resultquery = tenv.sqlquery(sql);
tenv.createtemporaryview("alan_kafkaview", resultquery);
string queryviewsql = " select * from alan_kafkaview ";
table queryviewresult = tenv.sqlquery(queryviewsql);
datastream<tuple2<boolean, row>> resultds = tenv.toretractstream(queryviewresult, row.class);
// 6、sink
resultds.print();
// 7、执行
env.execute();
// kafka中输入测试数据
// 1,1001,login
// 1,2001,p_read
// 程序运行控制台输入如下
// 3> (true,+i[1, login])
// 14> (true,+i[1, p_read])
}
}
2、示例:通过table api创建视图
本示例是通过table api创建一个hive的表,将数据写入hive,然后再创建视图,最后查询视图输出。
本示例依赖hive、hadoop、kafka环境好用,代码中示例的hive配置文件路径根据你自己的环境而设置。
import static org.apache.flink.table.api.expressions.$;
import static org.apache.flink.table.api.expressions.and;
import static org.apache.flink.table.api.expressions.lit;
import static org.apache.flink.table.expressions.apiexpressionutils.unresolvedcall;
import java.sql.timestamp;
import java.time.duration;
import java.util.arrays;
import java.util.collections;
import java.util.hashmap;
import java.util.list;
import org.apache.flink.api.common.eventtime.watermarkstrategy;
import org.apache.flink.api.common.typeinfo.typehint;
import org.apache.flink.api.common.typeinfo.typeinformation;
import org.apache.flink.api.java.tuple.tuple2;
import org.apache.flink.api.java.tuple.tuple3;
import org.apache.flink.streaming.api.datastream.datastream;
import org.apache.flink.streaming.api.environment.streamexecutionenvironment;
import org.apache.flink.streaming.connectors.kafka.table.kafkaconnectoroptions;
import org.apache.flink.table.api.datatypes;
import org.apache.flink.table.api.environmentsettings;
import org.apache.flink.table.api.over;
import org.apache.flink.table.api.schema;
import org.apache.flink.table.api.table;
import org.apache.flink.table.api.tabledescriptor;
import org.apache.flink.table.api.tableenvironment;
import org.apache.flink.table.api.tumble;
import org.apache.flink.table.api.bridge.java.streamtableenvironment;
import org.apache.flink.table.catalog.catalogdatabaseimpl;
import org.apache.flink.table.catalog.catalogview;
import org.apache.flink.table.catalog.column;
import org.apache.flink.table.catalog.objectpath;
import org.apache.flink.table.catalog.resolvedcatalogview;
import org.apache.flink.table.catalog.resolvedschema;
import org.apache.flink.table.catalog.hive.hivecatalog;
import org.apache.flink.table.functions.builtinfunctiondefinitions;
import org.apache.flink.types.row;
import com.google.common.collect.lists;
/**
* @author alanchan
*
*/
public class testtableapidemo {
/**
* @param args
* @throws exception
*/
public static void main(string[] args) throws exception {
streamexecutionenvironment env = streamexecutionenvironment.getexecutionenvironment();
streamtableenvironment tenv = streamtableenvironment.create(env);
// sql 创建输入表
string sourcesql = "create table alan_kafkatable (\r\n" +
" `event_time` timestamp(3) metadata from 'timestamp',\r\n" +
" `partition` bigint metadata virtual,\r\n" +
" `offset` bigint metadata virtual,\r\n" +
" `user_id` bigint,\r\n" +
" `item_id` bigint,\r\n" +
" `behavior` string\r\n" +
") with (\r\n" +
" 'connector' = 'kafka',\r\n" +
" 'topic' = 'user_behavior',\r\n" +
" 'properties.bootstrap.servers' = 'server1:9092,server2:9092,server3:9092',\r\n" +
" 'properties.group.id' = 'testgroup',\r\n" +
" 'scan.startup.mode' = 'earliest-offset',\r\n" +
" 'format' = 'csv'\r\n" +
");";
tenv.executesql(sourcesql);
// 创建视图
string catalogname = "alan_hive";
string defaultdatabase = "default";
string databasename = "viewtest_db";
string hiveconfdir = "/usr/local/bigdata/apache-hive-3.1.2-bin/conf";
hivecatalog hivecatalog = new hivecatalog(catalogname, defaultdatabase, hiveconfdir);
tenv.registercatalog(catalogname, hivecatalog);
tenv.usecatalog(catalogname);
hivecatalog.createdatabase(databasename, new catalogdatabaseimpl(new hashmap(), hiveconfdir) {
}, true);
tenv.usedatabase(databasename);
string viewname = "alan_kafkaview";
string originalquery = "select user_id , behavior from alan_kafkatable group by user_id ,behavior ";
string expandedquery = "select user_id , behavior from " + databasename + "." + "alan_kafkatable group by user_id ,behavior ";
string comment = "this is a comment";
objectpath path = new objectpath(databasename, viewname);
createview(originalquery, expandedquery, comment, hivecatalog, path);
// 查询视图
string queryviewsql = " select * from alan_kafkaview ";
table queryviewresult = tenv.sqlquery(queryviewsql);
datastream<tuple2<boolean, row>> resultds = tenv.toretractstream(queryviewresult, row.class);
// 6、sink
resultds.print();
// 7、执行
env.execute();
// kafka中输入测试数据
// 1,1001,login
// 1,2001,p_read
// 程序运行控制台输入如下
// 3> (true,+i[1, login])
// 14> (true,+i[1, p_read])
}
static void createview(string originalquery, string expandedquery, string comment, hivecatalog hivecatalog, objectpath path) throws exception {
resolvedschema resolvedschema = new resolvedschema(
arrays.aslist(
column.physical("user_id", datatypes.int()),
column.physical("behavior", datatypes.string())),
collections.emptylist(),
null);
catalogview origin = catalogview.of(
schema.newbuilder().fromresolvedschema(resolvedschema).build(),
comment,
originalquery,
expandedquery,
collections.emptymap());
catalogview view = new resolvedcatalogview(origin, resolvedschema);
hivecatalog.createtable(path, view, false);
}
}
以上,本文给出了通过table api 和sql 的两种方式创建视图,也就是虚表。同时为了更接近实用,通过table api 创建了一张hive的表,然后在该表上创建视图进行示例。
如果需要了解更多内容,可以在本人flink 专栏中了解更新系统的内容。
本文更详细的内容可参考文章:
17、flink 之table api: table api 支持的操作(1)
17、flink 之table api: table api 支持的操作(2)
本专题分为以下几篇文章:
【flink番外篇】9、flink table api 支持的操作示例(1)-通过table api和sql创建表
【flink番外篇】9、flink table api 支持的操作示例(2)- 通过table api 和 sql 创建视图
【flink番外篇】9、flink table api 支持的操作示例(3)- 通过api查询表和使用窗口函数的查询
【flink番外篇】9、flink table api 支持的操作示例(4)- table api 对表的查询、过滤操作
【flink番外篇】9、flink table api 支持的操作示例(5)- 表的列操作
【flink番外篇】9、flink table api 支持的操作示例(6)- 表的聚合(group by、distinct、groupby/over window aggregation)操作
【flink番外篇】9、flink table api 支持的操作示例(7)- 表的join操作(内联接、外联接以及联接自定义函数等)
【flink番外篇】9、flink table api 支持的操作示例(8)- 时态表的join(scala版本)
【flink番外篇】9、flink table api 支持的操作示例(9)- 表的union、unionall、intersect、intersectall、minus、minusall和in的操作
【flink番外篇】9、flink table api 支持的操作示例(10)- 表的orderby、offset 和 fetch、insert操作
【flink番外篇】9、flink table api 支持的操作示例(11)- group windows(tumbling、sliding和session)操作
【flink番外篇】9、flink table api 支持的操作示例(12)- over windows(有界和无界的over window)操作
【flink番外篇】9、flink table api 支持的操作示例(13)- row-based(map、flatmap、aggregate、group window aggregate等)操作
【flink番外篇】9、flink table api 支持的操作示例(14)- 时态表的join(java版本)
【flink番外篇】9、flink table api 支持的操作示例(1)-完整版
【flink番外篇】9、flink table api 支持的操作示例(2)-完整版
发表评论