flink提供了丰富的客户端操作来提交任务和与任务进行交互。下面主要从flink命令行、scala shell、sql client、restful api和 web五个方面进行整理。
在flink安装目录的bin目录下可以看到flink,start-scala-shell.sh和sql-client.sh等文件,这些都是客户端操作的入口。
![[点击并拖拽以移动] ](https://images.3wcode.com/3wcode/20240802/b_0_202408022220012360.png)
flink 常见操作:可以通过 -help 查看帮助
run 运行任务
-d:以分离模式运行作业
-c:如果没有在jar包中指定入口类,则需要在这里通过这个参数指定;
-m:指定需要连接的jobmanager(主节点)地址,使用这个参数可以指定一个不同于配置文件中的jobmanager,可以说是yarn集群名称;
-p:指定程序的并行度。可以覆盖配置文件中的默认值;
-s:保存点savepoint的路径以还原作业来自(例如hdfs:///flink/savepoint-1537);
[root@hadoop1 flink-1.10.1]# bin/flink run -d examples/streaming/topspeedwindowing.jar
executing topspeedwindowing example with default input data set.
use --input to specify file input.
printing result to stdout. use --output to specify output path.
job has been submitted with jobid dce7b69ad15e8756766967c46122736f
就可以看到我们提交的jobmanager,默认是一个并发。
![[点击并拖拽以移动] ](https://images.3wcode.com/3wcode/20240802/b_0_202408022220038572.jpg)
点进去就可以看到详细的信息
![[点击并拖拽以移动] ](https://images.3wcode.com/3wcode/20240802/b_0_202408022220044372.jpg)
点击左侧taskmanager —stdout能看到具体输出的日志信息。
![[点击并拖拽以移动] ](https://images.3wcode.com/3wcode/20240802/b_0_202408022220064343.jpg)
或者查看taskmanager节点的log目录下的*.out文件,也能看到具体的输出信息。
![[点击并拖拽以移动] ](https://images.3wcode.com/3wcode/20240802/b_0_202408022220076885.jpg)
list 查看任务列表
-m:jobmanager<arg>作业管理器(主)的地址连接。
[root@hadoop1 flink-1.10.1]# bin/flink list -m 127.0.0.1:8081
waiting for response...
------------------ running/restarting jobs -------------------
09.07.2020 16:44:09 : dce7b69ad15e8756766967c46122736f : cartopspeedwindowingexample (running)
--------------------------------------------------------------
no scheduled jobs.
stop 停止任务
需要指定jobmanager的ip:prot和jobid。如下报错可知,一个job能够被stop要求所有的source都是可以stoppable的,即实现了 stoppablefunction接口。
[root@hadoop1 flink-1.10.1]# bin/flink stop -m 127.0.0.1:8081 dce7b69ad15e8756766967c46122736f
suspending job "dce7b69ad15e8756766967c46122736f" with a savepoint.
------------------------------------------------------------
the program finished with the following exception:
org.apache.flink.util.flinkexception: could not stop with a savepoint job "dce7b69ad15e8756766967c46122736f".
at org.apache.flink.client.cli.clifrontend.lambda$stop$5(clifrontend.java:458)
stoppablefunction接口如下,属于优雅停止任务。
/**
* @description 需要 stoppabel 的函数必须实现此接口,例如流式任务 source*
* stop() 方法在任务收到 stop信号的时候调用
* source 在接收到这个信号后,必须停止发送新的数据优雅的停止。
* @date 2020/7/9 17:26
*/
@publicevolving
public interface stoppablefunction {
/**
* 停止 source,与 cancel() 不同的是,这是一个让 source优雅停止的请求。
* 等待中的数据可以继续发送出去,不需要立即停止
*/
void stop();
}
cancel 取消任务
如果在conf/flink-conf.yaml里面配置state.savepoints.dir,会保存savepoint,否则不会保存savepoint。(重启)
state.savepoints.dir: file:///tmp/savepoint
执行 cancel命令 取消任务
[root@hadoop1 flink-1.10.1]# bin/flink cancel -m 127.0.0.1:8081 -s e8ce0d111262c52bf8228d5722742d47
deprecation warning: cancelling a job with savepoint is deprecated. use "stop" instead.
cancelling job e8ce0d111262c52bf8228d5722742d47 with savepoint to default savepoint directory.
cancelled job e8ce0d111262c52bf8228d5722742d47. savepoint stored in file:/tmp/savepoint/savepoint-e8ce0d-f7fa96a085d8.
也可以在停止的时候显示指定savepoint目录
1 [root@hadoop1 flink-1.10.1]# bin/flink cancel -m 127.0.0.1:8081 -s /tmp/savepoint f58bb4c49ee5580ab5f27fdb24083353
deprecation warning: cancelling a job with savepoint is deprecated. use "stop" instead.
cancelling job f58bb4c49ee5580ab5f27fdb24083353 with savepoint to /tmp/savepoint.
cancelled job f58bb4c49ee5580ab5f27fdb24083353. savepoint stored in file:/tmp/savepoint/savepoint-f58bb4-127b7e84910e.
取消和停止(流作业)的区别如下:
● cancel()调用, 立即调用作业算子的cancel()方法,以尽快取消它们。如果算子在接到cancel()调用后没有停止,flink将开始定期中断算子线程的执行,直到所有算子停止为止。
● stop()调用 ,是更优雅的停止正在运行流作业的方式。stop()仅适用于source实现了stoppablefunction接口的作业。当用户请求停止作业时,作业的所有source都将接收stop()方法调用。直到所有source正常关闭时,作业才会正常结束。这种方式,使 作业正常处理完所有作业。
触发 savepoint
当需要生成savepoint文件时,需要手动触发savepoint。如下,需要指定正在运行的 jobid 和生成文件的存放目录。同时,我们也可以看到它会返回给用户存放的savepoint的文件名称等信息。
[root@hadoop1 flink-1.10.1]# bin/flink run -d examples/streaming/topspeedwindowing.jar
executing topspeedwindowing example with default input data set.
use --input to specify file input.
printing result to stdout. use --output to specify output path.
job has been submitted with jobid 216c427d63e3754eb757d2cc268a448d
[root@hadoop1 flink-1.10.1]# bin/flink savepoint -m 127.0.0.1:8081 216c427d63e3754eb757d2cc268a448d /tmp/savepoint/
triggering savepoint for job 216c427d63e3754eb757d2cc268a448d.
waiting for response...
savepoint completed. path: file:/tmp/savepoint/savepoint-216c42-154a34cf6bfd
you can resume your program from this savepoint with the run command.
savepoint和checkpoint的区别:
● checkpoint是增量做的,每次的时间较短,数据量较小,只要在程序里面启用后会自动触发,用户无须感知;savepoint是全量做的,每次的时间较长,数据量较大,需要用户主动去触发。
● checkpoint是作业failover的时候自动使用,不需要用户指定。savepoint一般用于程序的版本更新,bug修复,a/b test等场景,需要用户指定。
从指定 savepoint 中启动
[root@hadoop1 flink-1.10.1]# bin/flink run -d -s /tmp/savepoint/savepoint-f58bb4-127b7e84910e/ examples/streaming/topspeedwindowing.jar
executing topspeedwindowing example with default input data set.
use --input to specify file input.
printing result to stdout. use --output to specify output path.
job has been submitted with jobid 1a5c5ce279e0e4bd8609f541b37652e2
查看jobmanager的日志能够看到reset the checkpoint id为我们指定的savepoint文件中的id
![[点击并拖拽以移动] ](https://images.3wcode.com/3wcode/20240802/b_0_202408022220087170.jpg)
modify 修改任务并行度
这里修改master的conf/flink-conf.yaml将task slot数修改为4。并通过xsync分发到 两个slave节点上。
taskmanager.numberoftaskslots: 4
修改参数后需要重启集群生效:关闭/启动集群
[root@hadoop1 flink-1.10.1]# bin/stop-cluster.sh && bin/start-cluster.sh
stopping taskexecutor daemon (pid: 8236) on host hadoop2.
stopping taskexecutor daemon (pid: 8141) on host hadoop3.
stopping standalonesession daemon (pid: 22633) on host hadoop1.
starting cluster.
starting standalonesession daemon on host hadoop1.
starting taskexecutor daemon on host hadoop2.
starting taskexecutor daemon on host hadoop3.
启动任务
[root@hadoop1 flink-1.10.1]# bin/flink run -d examples/streaming/topspeedwindowing.jar
executing topspeedwindowing example with default input data set.
use --input to specify file input.
printing result to stdout. use --output to specify output path.
job has been submitted with jobid 2e833a438da7d8052f14d5433910515a
从页面上能看到task slots总计变为了8,运行的slot为1,剩余slot数量为7。
![[点击并拖拽以移动] ](https://images.3wcode.com/3wcode/20240802/b_0_202408022220092470.jpg)
这时候默认的并行度是1
![[点击并拖拽以移动] ](https://images.3wcode.com/3wcode/20240802/b_0_202408022220101783.jpg)
flink1.0版本命令行flink modify已经没有这个行为了,被移除了。。。flink1.7上是可以运行的。
[root@hadoop1 flink-1.10.1]# bin/flink modify -p 4 cc22cc3d09f5d65651d637be6fb0a1c3
"modify" is not a valid action.
info 显示程序的执行计划
[root@hadoop1 flink-1.10.1]# bin/flink info examples/streaming/topspeedwindowing.jar
----------------------- execution plan -----------------------
{"nodes":[{"id":1,"type":"source: custom source","pact":"data source","contents":"source: custom source","parallelism":1},{"id":2,"type":"timestamps/watermarks","pact":"operator","contents":"timestamps/watermarks","parallelism":1,"predecessors":[{"id":1,"ship_strategy":"forward","side":"second"}]},{"id":4,"type":"window(globalwindows(), deltatrigger, timeevictor, comparableaggregator, passthroughwindowfunction)","pact":"operator","contents":"window(globalwindows(), deltatrigger, timeevictor, comparableaggregator, passthroughwindowfunction)","parallelism":1,"predecessors":[{"id":2,"ship_strategy":"hash","side":"second"}]},{"id":5,"type":"sink: print to std. out","pact":"data sink","contents":"sink: print to std. out","parallelism":1,"predecessors":[{"id":4,"ship_strategy":"forward","side":"second"}]}]}
--------------------------------------------------------------
拷贝输出的json内容,粘贴到这个网站:http://flink.apache.org/visualizer/可以生成类似如下的执行图。
![[点击并拖拽以移动] ](https://images.3wcode.com/3wcode/20240802/b_0_202408022220114167.jpg)
可以与实际运行的物理执行计划进行对比。
![[点击并拖拽以移动] ](https://images.3wcode.com/3wcode/20240802/b_0_202408022220123196.jpg)
sql client beta
进入 flink sql
[root@hadoop1 flink-1.10.1]# bin/sql-client.sh embedded
select查询,按q退出如下界面;
flink sql> select 'hello word';
sql query result (table)
table program finished. page: last of 1 updated: 16:37:04.649
expr$0
hello word
q quit + inc refresh g goto page n next page o open row
r refresh - dec refresh l last page p prev page
打开http://hadoop1:8081能看到这条select语句产生的查询任务已经结束了。这个查询采用的是读取固定数据集的custom source,输出用的是stream collect sink,且只输出一条结果。
![[点击并拖拽以移动] ](https://images.3wcode.com/3wcode/20240802/b_0_202408022220136022.jpg)
![[点击并拖拽以移动] ](https://images.3wcode.com/3wcode/20240802/b_0_202408022220145679.jpg)
explain 查看 sql 的执行计划。
flink sql> explain select name, count(*) as cnt from (values ('bob'), ('alice'), ('greg'), ('bob')) as nametable(name) group by name;
== abstract syntax tree == //抽象语法树
logicalaggregate(group=[{0}], cnt=[count()])
+- logicalvalues(type=[recordtype(varchar(5) name)], tuples=[[{ _utf-16le'bob' }, { _utf-16le'alice' }, { _utf-16le'greg' }, { _utf-16le'bob' }]])
== optimized logical plan == //优化后的逻辑执行计划
groupaggregate(groupby=[name], select=[name, count(*) as cnt])
+- exchange(distribution=[hash[name]])
+- values(type=[recordtype(varchar(5) name)], tuples=[[{ _utf-16le'bob' }, { _utf-16le'alice' }, { _utf-16le'greg' }, { _utf-16le'bob' }]])
== physical execution plan == //物理执行计划
stage 13 : data source
content : source: values(tuples=[[{ _utf-16le'bob' }, { _utf-16le'alice' }, { _utf-16le'greg' }, { _utf-16le'bob' }]])
stage 15 : operator
content : groupaggregate(groupby=[name], select=[name, count(*) as cnt])
ship_strategy : hash
结果展示
sql client支持两种模式来维护并展示查询结果:
table mode
在内存中物化查询结果,并以分页table形式展示。用户可以通过以下命令启用table mode:例如如下案例;
flink sql> set execution.result-mode=table;
[info] session property has been set.
flink sql> select name, count(*) as cnt from (values ('bob'), ('alice'), ('greg'), ('bob')) as nametable(name) group by name;
sql query result (table)
table program finished. page: last of 1 updated: 16:55:08.589
name cnt
alice 1
greg 1
bob 2
q quit + inc refresh g goto page n next page o open row
r refresh - dec refresh l last page p prev page
![ [点击并拖拽以移动] ](https://images.3wcode.com/3wcode/20240802/b_0_202408022220152951.jpg)
![ [点击并拖拽以移动] ](https://images.3wcode.com/3wcode/20240802/b_0_202408022220161793.jpg)
changelog mode
不会物化查询结果,而是直接对continuous query产生的添加和撤回retractions结果进行展示:如下案例中的-表示撤回消息
flink sql> set execution.result-mode=changelog;
[info] session property has been set.
flink sql> select name, count(*) as cnt from (values ('bob'), ('alice'), ('greg'), ('bob')) as nametable(name) group by name;
sql query result (changelog)
table program finished. updated: 16:58:05.777
+/- name cnt
+ bob 1
+ alice 1
+ greg 1
- bob 1
+ bob 2
q quit + inc refresh o open row
r refresh - dec refresh
![ [点击并拖拽以移动] ](https://images.3wcode.com/3wcode/20240802/b_0_202408022220171891.jpg)
![ [点击并拖拽以移动] ](https://images.3wcode.com/3wcode/20240802/b_0_202408022220181892.jpg)
environment files
create table 创建表ddl语句:
flink sql> create table pvuv_sink (
> dt varchar,
> pv bigint,
> uv bigint
> ) ;
[info] table has been created.
show tables 查看所有表名
flink sql> show tables;
pvuv_sink
describe 表名 查看表的详细信息;
flink sql> describe pvuv_sink;
root
|-- dt: string
|-- pv: bigint
|-- uv: bigint
插入等操作均与关系型数据库操作语句一样,省略n个操作
restful api
接下来我们演示如何通过rest api来提交jar包和执行任务。
![[点击并拖拽以移动] ](https://images.3wcode.com/3wcode/20240802/b_0_202408022220209703.jpg)
通过show plan可以看到执行图
![[点击并拖拽以移动] ](https://images.3wcode.com/3wcode/20240802/b_0_202408022220206318.jpg)
提交之后的操作,取消的话点击页面的cancel job
![ [点击并拖拽以移动] ](https://images.3wcode.com/3wcode/20240802/b_0_202408022220216132.jpg)
发表评论