当前位置: 代码网 > it编程>数据库>Mysql > Flink调优详解:案例解析(第42天)

Flink调优详解:案例解析(第42天)

2024年07月28日 Mysql 我要评论
本文主要详解常见的Flink优化策略。

系列文章目录

一、flink-任务参数配置
二、flink-sql调优
三、阿里云flink调优


前言

本文主要详解常见的flink优化策略。

官网地址:https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/config/

一、flink-任务参数配置

1.1 运行时参数
  • 异步维度join
# 默认值:100
# 值类型:integer
# 流批任务:流、批任务都支持
# 用处:异步 lookup join 中最大的异步 io 执行数目
table.exec.async-lookup.buffer-capacity: 100
  • 开启微批
# 默认值:false
# 值类型:boolean
# 流批任务:流任务支持
# 用处:minibatch 优化是一种专门针对 unbounded 流任务的优化(即非窗口类应用),其机制是在 `允许的延迟时间间隔内` 以及 `达到最大缓冲记录数` 时触发以减少 `状态访问` 的优化,从而节约处理时间。下面两个参数一个代表 `允许的延迟时间间隔`,另一个代表 `达到最大缓冲记录数`。
table.exec.mini-batch.enabled: false

# 默认值:0 ms
# 值类型:duration
# 流批任务:流任务支持
# 用处:此参数设置为多少就代表 minibatch 机制最大允许的延迟时间。注意这个参数要配合 `table.exec.mini-batch.enabled` 为 true 时使用,而且必须大于 0 ms
table.exec.mini-batch.allow-latency: 0 ms

# 默认值:-1
# 值类型:long
# 流批任务:流任务支持
# 用处:此参数设置为多少就代表 minibatch 机制最大缓冲记录数。注意这个参数要配合 `table.exec.mini-batch.enabled` 为 true 时使用,而且必须大于 0
table.exec.mini-batch.size: -1


注意: 
	1- 如果想开启微批,那么首先需要将第一个参数值设置为true;接着可以设置后面两个参数。只要程序满足后面2个参数中的任意一个条件就会触发微批处理。
	2- 微批处理能够提升程序处理数据的吞吐量,但是会导致数据的处理时效性降低
  • 并行度的设置
# 默认值:-1
# 值类型:integer
# 流批任务:流、批任务都支持
# 用处:可以用此参数设置 flink sql 中算子的并行度,这个参数的优先级 `高于` streamexecutionenvironment 中设置的并行度优先级,如果这个值设置为 -1,则代表没有设置,会默认使用 streamexecutionenvironment 设置的并行度
table.exec.resource.default-parallelism: -1
  • 数据异常时的处理方式
# 默认值:error
# 值类型:enum【error, drop】
# 流批任务:流、批任务都支持
# 用处:表上的 not null 列约束强制不能将 null 值插入表中。flink 支持 `error`(默认)和 `drop` 配置。默认情况下,当 null 值写入 not null 列时,flink 会产生运行时异常。用户可以将行为更改为 `drop`,直接删除此类记录,而不会引发异常。
table.exec.sink.not-null-enforcer: error
  • 上游cdc去重
# 默认值:false
# 值类型:boolean
# 流批任务:流任务
# 用处:接入了 cdc 的数据源,上游 cdc 如果产生重复的数据,可以使用此参数在 flink 数据源算子进行去重操作,去重会引入状态开销
table.exec.source.cdc-events-duplicate: false
  • 设置空闲等待
# 默认值:0 ms
# 值类型:duration
# 流批任务:流任务
# 用处:如果此参数设置为 60 s,当 source 算子在 60 s 内未收到任何元素时,这个 source 将被标记为临时空闲,此时下游任务就不依赖此 source 的 watermark 来推进整体的 watermark 了。
# 默认值为 0 时,代表未启用检测源空闲。
table.exec.source.idle-timeout: 0 ms
  • 设置状态有效期
# 默认值:0 ms
# 值类型:duration
# 流批任务:流任务
# 用处:指定空闲状态(即未更新的状态)将保留多长时间。尤其是在 unbounded 场景中很有用。默认 0 ms 为不清除空闲状态
table.exec.state.ttl: 0 ms

推荐: ttl time to live失效时间。保留窗口附近的1-3个state状态数据。举例: 例如滚动窗口大小是5秒,那么这个参数推荐设置为5000 - 15000毫秒之间

上述的参数中,常用的有:开启微批、设置状态有效期

1.2 优化器参数
  • 开启两阶段聚合
#  默认值:auto
#  值类型:string
#  流批任务:流、批任务都支持
#  用处:聚合阶段的策略。和 mapreduce 的 combiner 功能类似,可以在数据 shuffle 前做一些提前的聚合,可以选择以下三种方式
#  two_phase:强制使用具有 localaggregate 和 globalaggregate 的两阶段聚合。请注意,如果聚合函数不支持优化为两个阶段,flink 仍将使用单阶段聚合。
#  两阶段优化在计算 count,sum 时很有用,但是在计算 count distinct 时需要注意,key 的稀疏程度,如果 key 不稀疏,那么很可能两阶段优化的效果会适得其反
#  one_phase:强制使用只有 completeglobalaggregate 的一个阶段聚合。
#  auto:聚合阶段没有特殊的执行器。选择 two_phase 或者 one_phase 取决于优化器的成本。
table.optimizer.agg-phase-strategy: auto

注意: 两阶段聚合必须配合微批处理的参数一起进行设置。
  • 开启分桶
#  默认值:false
#  值类型:boolean
#  流批任务:流任务
#  用处:避免 group by 计算 count distinct\sum distinct 数据时的 group by 的 key 较少导致的数据倾斜,比如 group by 中一个 key 的 distinct 要去重 500w 数据,而另一个 key 只需要去重 3 个 key,那么就需要先需要按照 distinct 的 key 进行分桶。将此参数设置为 true 之后,下面的 table.optimizer.distinct-agg.split.bucket-num 可以用于决定分桶数是多少
#  后文会介绍具体的案例
table.optimizer.distinct-agg.split.enabled: false

#  默认值:1024
#  值类型:integer
#  流批任务:流任务
#  用处:避免 group by 计算 count distinct 数据时的 group by 较少导致的数据倾斜。加了此参数之后,会先根据 group by key 结合 hash_code(distinct_key)进行分桶,然后再自动进行合桶。
#  后文会介绍具体的案例
table.optimizer.distinct-agg.split.bucket-num: 1024
  • 重用执行计划
#  默认值:true
#  值类型:boolean
#  流批任务:流任务
#  用处:如果设置为 true,flink 优化器将会尝试找出重复的自计划并重用。默认为 true 不需要改动
table.optimizer.reuse-sub-plan-enabled: true
  • source资源重用
#  默认值:true
#  值类型:boolean
#  流批任务:流任务
#  用处:如果设置为 true,flink 优化器会找出重复使用的 table source 并且重用。默认为 true 不需要改动
table.optimizer.reuse-source-enabled: true
  • 开启谓词下推
#  默认值:true
#  值类型:boolean
#  流批任务:流任务
#  用处:如果设置为 true,flink 优化器将会做谓词下推到 filterabletablesource 中,将一些过滤条件前置,提升性能。默认为 true 不需要改动
table.optimizer.source.predicate-pushdown-enabled: true
1.3 表参数
  • 开启dml异步
#  默认值:false
#  值类型:boolean
#  流批任务:流、批任务都支持
#  用处:dml sql(即执行 insert into 操作)是异步执行还是同步执行。默认为异步(false),即可以同时提交多个 dml sql 作业,如果设置为 true,则为同步,第二个 dml 将会等待第一个 dml 操作执行结束之后再执行
table.dml-sync: false
  • 设置方法的最大长度不超过64kb
#  默认值:64000
#  值类型:integer
#  流批任务:流、批任务都支持
#  用处:flink sql 会通过生产 java 代码来执行具体的 sql 逻辑,但是 jvm 限制了一个 java 方法的最大长度不能超过 64kb,但是某些场景下 flink sql 生产的 java 代码会超过 64kb,这时 jvm 就会直接报错。因此此参数可以用于限制生产的 java 代码的长度来避免超过 64kb,从而避免 jvm 报错。
table.generated-code.max-length: 64000
  • 本地时区
#  默认值:default
#  值类型:string
#  流批任务:流、批任务都支持
#  用处:在使用天级别的窗口时,通常会遇到时区问题。举个例子,flink 开一天的窗口,默认是按照 utc 零时区进行划分,那么在北京时区划分出来的一天的窗口是第一天的早上 8:00 到第二天的早上 8:00,但是实际场景中想要的效果是第一天的早上 0:00 到第二天的早上 0:00 点。因此可以将此参数设置为 gmt+08:00 来解决这个问题。
table.local-time-zone: default
  • 编译器
#  默认值:default
#  值类型:enum【blink、old】
#  流批任务:流、批任务都支持
#  用处:flink sql planner,默认为 blink planner,也可以选择 old planner,但是推荐使用 blink planner
table.planner: blink
  • sql方言
#  默认值:default
#  值类型:string
#  流批任务:流、批任务都支持
#  用处:flink 解析一个 sql 的解析器,目前有 flink sql 默认的解析器和 hive sql 解析器,其区别在于两种解析器支持的语法会有不同,比如 hive sql 解析器支持 between and、rlike 语法,flink sql 不支持
table.sql-dialect: default

二、flink-sql调优

2.1 mini-batch聚合

作用:微批处理能够提升程序处理数据的吞吐量,但是会导致数据的处理时效性降低

在这里插入图片描述

sql中参数配置如下:

# 默认值:false
# 值类型:boolean
# 流批任务:流任务支持
# 用处:minibatch 优化是一种专门针对 unbounded 流任务的优化(即非窗口类应用),其机制是在 `允许的延迟时间间隔内` 以及 `达到最大缓冲记录数` 时触发以减少 `状态访问` 的优化,从而节约处理时间。下面两个参数一个代表 `允许的延迟时间间隔`,另一个代表 `达到最大缓冲记录数`。
table.exec.mini-batch.enabled: false

# 默认值:0 ms
# 值类型:duration
# 流批任务:流任务支持
# 用处:此参数设置为多少就代表 minibatch 机制最大允许的延迟时间。注意这个参数要配合 `table.exec.mini-batch.enabled` 为 true 时使用,而且必须大于 0 ms
table.exec.mini-batch.allow-latency: 0 ms

# 默认值:-1
# 值类型:long
# 流批任务:流任务支持
# 用处:此参数设置为多少就代表 minibatch 机制最大缓冲记录数。注意这个参数要配合 `table.exec.mini-batch.enabled` 为 true 时使用,而且必须大于 0
table.exec.mini-batch.size: -1
2.2 两阶段聚合

作用:1- 提前对数据进行局部聚合操作,能够减少后续数据处理量;2-能够一定程度避免数据倾斜的问题

在这里插入图片描述

flinksql中的配置:

#  默认值:auto
#  值类型:string
#  流批任务:流、批任务都支持
#  用处:聚合阶段的策略。和 mapreduce 的 combiner 功能类似,可以在数据 shuffle 前做一些提前的聚合,可以选择以下三种方式
#  two_phase:强制使用具有 localaggregate 和 globalaggregate 的两阶段聚合。请注意,如果聚合函数不支持优化为两个阶段,flink 仍将使用单阶段聚合。
#  两阶段优化在计算 count,sum 时很有用,但是在计算 count distinct 时需要注意,key 的稀疏程度,如果 key 不稀疏,那么很可能两阶段优化的效果会适得其反
#  one_phase:强制使用只有 completeglobalaggregate 的一个阶段聚合。
#  auto:聚合阶段没有特殊的执行器。选择 two_phase 或者 one_phase 取决于优化器的成本。
table.optimizer.agg-phase-strategy: auto


注意: 如果想要使用两阶段聚合,那么必须同时开始mini-batch微批处理
2.3 分桶

作用:能够一定程度避免数据倾斜的问题

在这里插入图片描述

flinksql的配置如下:

#  默认值:false
#  值类型:boolean
#  流批任务:流任务
#  用处:避免 group by 计算 count distinct\sum distinct 数据时的 group by 的 key 较少导致的数据倾斜,比如 group by 中一个 key 的 distinct 要去重 500w 数据,而另一个 key 只需要去重 3 个 key,那么就需要先需要按照 distinct 的 key 进行分桶。将此参数设置为 true 之后,下面的 table.optimizer.distinct-agg.split.bucket-num 可以用于决定分桶数是多少
#  后文会介绍具体的案例
table.optimizer.distinct-agg.split.enabled: false

#  默认值:1024
#  值类型:integer
#  流批任务:流任务
#  用处:避免 group by 计算 count distinct 数据时的 group by 较少导致的数据倾斜。加了此参数之后,会先根据 group by key 结合 hash_code(distinct_key)进行分桶,然后再自动进行合桶。
#  后文会介绍具体的案例
table.optimizer.distinct-agg.split.bucket-num: 1024
2.4 filter去重(了解)
--普通的写法
select
 day,
 count(distinct user_id) as total_uv,
 count(distinct case when flag in ('android', 'iphone') then user_id else null end) as app_uv,
 count(distinct case when flag in ('wap', 'other') then user_id else null end) as web_uv
from t
group by day


--filter优化写法
select
 day,
 count(distinct user_id) as total_uv,
 count(distinct user_id) filter (where flag in ('android', 'iphone')) as app_uv,
 count(distinct user_id) filter (where flag in ('web', 'other')) as web_uv
from t
group by day

filter子句能够将三个状态合并成一个共享的状态。方便程序的读取等操作。能够提升效率。

在这里插入图片描述

三、阿里云flink调优

flink支持智能调优和定时调优两种调优模式。

在这里插入图片描述

3.1 智能调优

简单理解,就是阿里云flink智能来进行调优。默认会从并发度和内存来进行调优。

  • 智能调优会调整作业的并发度来满足作业流量变化所需要的吞吐。

    智能调优会监控消费源头数据的延迟变化情况、taskmanager(tm) cpu实际使用率和各个算子处理数据能力来调整作业的并发度。详情如下:

    • 作业延迟delay指标正常(不超过60s),不修改当前作业并发。
    • 作业延迟delay指标超过默认阈值60s,分以下两种情况来调整并发度:
      • 延迟正在下降,不进行并发度调整。
      • 延迟增加并且连续上升3分钟(默认值), 默认调整作业并发度到当前实际tps的两倍,但不超过设置最大的资源(默认值为64 cu)。
    • 作业不存在延迟指标。
      • 作业某vertex节点连续6分钟实际处理数据时间占比超过80%,调大作业并发度使得slot使用率降低到50%,但不超过设置最大的资源(默认为64 cu)。
      • 所有tm的平均利用率连续6分钟超过80%,调高并发度使tm的cpu使用率降低到50%。
    • 所有tm的最大cpu使用率连续24小时低于20%,且vertex的实际处理数据时间低于20%时,调低作业的并发度使cpu和vertex实际处理的时间占比提高到50%。
  • 智能调优也会监控作业的内存使用和failover情况,来调整作业的内存配置。详情如下:

    • 在jobmanager gc频繁或者发生oom异常时,会调高jm的内存,默认最大调整到16 gib。
    • 在tm gc频繁或者发生oom异常、heartbeattimeout异常时,会调高tm的内存,默认最大调整到16 gib。
    • 在tm内存使用率超过95%时,会调大tm的内存。
    • 在tm的实际内存使用率连续24小时低于30%时,降低tm内存的配置,默认最小调整到1.6 gib。
3.2 定时调优

定时调优计划描述了资源和时间点的对应关系,一个定时调优计划中可以包含多组资源和时间点的关系。在使用定时调优计划时,您需要明确知道各个时间段的资源使用情况,根据业务时间区间特征,设置对应的资源。

例如,某业务全天早09:00~19:00是业务高峰,19:00到第二天09:00是业务低峰。此时您可以使用定时调优功能,在高峰时间段使用30 cu,在业务低峰时使用10 cu。

小结:在使用阿里云调优时,可以使用一些开源的参数。

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com