当前位置: 代码网 > it编程>软件设计>搜素引擎 > Flink CDC详解

Flink CDC详解

2024年08月06日 搜素引擎 我要评论
CDC 的全称是 Change Data Capture ,在广义的概念上,只要是能捕获数据变更的技术,我们都可以称之为 CDC。目前通常描述的 CDC 技术主要面向数据库的变更,是一种用于捕获数据库中数据变更的技术。Flink CDC本质是一组数据源连接器,使用更改数据捕获(CDC)从不同的数据库中摄取更改。Apache Flink®的CDC连接器集成了Debezium作为捕获数据更改的引擎,所以它可以充分利用Debezium的能力。

flink cdc

一 cdc简介
1.1 cdc定义

cdc 的全称是 change data capture ,在广义的概念上,只要是能捕获数据变更的技术,我们都可以称之为 cdc 。目前通常描述的 cdc 技术主要面向数据库的变更,是一种用于捕获数据库中数据变更的技术。

1.2 cdc应用场景
  • **数据同步:**用于备份,容灾;

  • **数据分发:**一个数据源分发给多个下游系统;

  • **数据采集:**面向数据仓库 / 数据湖的 etl 数据集成,是非常重要的数据源。

1.3 cdc实现机制
  • 基于查询的 cdc机制:

    • 离线调度查询作业,批处理。把一张表同步到其他系统,每次通过查询去获取表中最新的数据;
    • 无法保障数据一致性,查的过程中有可能数据已经发生了多次变更;
    • 不保障实时性,基于离线调度存在天然的延迟。
  • 基于日志的 cdc机制:

    • 实时消费日志,流处理,例如 mysql 的 binlog 日志完整记录了数据库中的变更,可以把 binlog 文件当作流的数据源;
    • 保障数据一致性,因为 binlog 文件包含了所有历史变更明细;
    • 保障实时性,因为类似 binlog 的日志文件是可以流式消费的,提供的是实时数据。
1.4 开源cdc工具对比

图片

从上图可知:

  • 基于日志的cdc机制,除canal都可以很好的做到增量同步。

  • 基于查询的cdc机制,增量和断点几乎都不支持,除sqoop支持增量方式。

  • 从全量同步维度看,除canal不支持全量同步,其它的cdc都支持。

  • 从全量+增量角度看,基于日志方式的cdc都是支持的。

  • 从架构角度去看,该表将架构分为单机和分布式,这里的分布式架构不单纯体现在数据读取能力的水平扩展上,更重要的是在大数据场景下分布式系统接入能力。例如 flink cdc 的数据入湖或者入仓的时候,下游通常是分布式的系统,如 hive、hdfs、iceberg、hudi 等,那么从对接入分布式系统能力上看,flink cdc 的架构能够很好地接入此类系统。

  • 在数据转换 / 数据清洗能力上,当数据进入到 cdc 工具的时候是否能较方便的对数据做一些过滤或者清洗,甚至聚合?

    • 在 flink cdc 上操作相当简单,可以通过 flink sql 去操作这些数据;
    • 像 datax、debezium 等则需要通过脚本或者模板去做,所以用户的使用门槛会比较高。
  • 在生态方面,这里指的是下游的一些数据库或者数据源的支持。flink cdc 下游有丰富的 connector,例如写入到 tidb、mysql、pg、hbase、kafka、clickhouse 等常见的一些系统,也支持各种自定义 connector。

二 flink cdc简介
2.1 flink cdc介绍

flink cdc本质是一组数据源连接器,使用更改数据捕获(cdc)从不同的数据库中摄取更改。apache flink®的cdc连接器集成了debezium作为捕获数据更改的引擎,所以它可以充分利用debezium的能力。

image-20230411175634843

2.2 flink cdc connector(连接器)
connectordatabasedriver
mongodb-cdcmongodb: 3.6, 4.x, 5.0mongodb driver: 4.3.1
mysql-cdcmysql: 5.6, 5.7, 8.0.xrds mysql: 5.6, 5.7, 8.0.xpolardb mysql: 5.6, 5.7, 8.0.xaurora mysql: 5.6, 5.7, 8.0.xmariadb: 10.xpolardb x: 2.0.1jdbc driver: 8.0.27
oceanbase-cdcoceanbase ce: 3.1.xoceanbase ee (mysql mode): 2.x, 3.xjdbc driver: 5.1.4x
oracle-cdcoracle: 11, 12, 19oracle driver: 19.3.0.0
postgres-cdcpostgresql: 9.6, 10, 11, 12jdbc driver: 42.2.12
sqlserver-cdcsqlserver: 2012, 2014, 2016, 2017, 2019jdbc driver: 7.2.2.jre8
tidb-cdctidb: 5.1.x, 5.2.x, 5.3.x, 5.4.x, 6.0.0jdbc driver: 8.0.27
db2-cdcdb2: 11.5db2 driver: 11.5.0.0
2.3 flink cdc && flink版本

flink®cdc connectors与flink®的版本配套关系如下表所示:

flink® cdc versionflink® version
1.0.01.11.*
1.1.01.11.*
1.2.01.12.*
1.3.01.12.*
1.4.01.13.*
2.0.*1.13.*
2.1.*1.13.*
2.2.*1.13., 1.14.
2.3.*1.13., 1.14., 1.15.*, 1.16.0
2.4 flink cdc特点
  • 支持读取数据库快照,即使发生故障,也可以只读取一次binlog。

  • cdc连接器用于datastream api,用户可以在一个作业中对多个数据库和表进行更改,而无需部署debezium和kafka。

  • 用于表/sql api的cdc连接器,用户可以使用sql ddl创建cdc源来监视单个表上的更改。

三 flink cdc发展
3.1 发展历程
  • 2020 年 7 月由云邪提交了第一个 commit,这是基于个人兴趣孵化的项目;

  • 2020 年 7 中旬支持了 mysql-cdc;

  • 2020 年 7 月末支持了 postgres-cdc;

  • 2021年2月27,release-1.2.0发布,支持flink version to 1.12.1,同时支持debezium version to 1.4.1.final版本。

  • 2021年5月12,release-1.4.0发布,支持flink version to 1.13.0。

  • 2021年8月11,release-2.0.0发布,支持flink version to 1.13.1,支持mysql-cdc 2.0,提供并行读取,无锁和检查点功能。

  • 2021年11月15,release-2.1.0发布,新增mongodb-cdc和oracle-cdc,同时吸引一大堆贡献者。

  • 2022年3月27,release-2.2.0发布,兼容flink version to 1.14,同时新增tidb-cdc,sql-server cdc,oceanbase cdc等。

  • 2022年11月10,release-2.3.0发布,当前最新稳定版本。

3.2 背景
dynamic table & changelog stream

大家都知道 flink 有两个基础概念:dynamic table 和 changelog stream。

img

  • dynamic table 就是 flink sql 定义的动态表,动态表和流的概念是对等的。参照上图,流可以转换成动态表,动态表也可以转换成流。
  • 在 flink sql中,数据在从一个算子流向另外一个算子时都是以 changelog stream 的形式,任意时刻的 changelog stream 可以翻译为一个表,也可以翻译为一个流。

联想下 mysql 中的表和 binlog 日志,就会发现:mysql 数据库的一张表所有的变更都记录在 binlog 日志中,如果一直对表进行更新,binlog 日志流也一直会追加,数据库中的表就相当于 binlog 日志流在某个时刻点物化的结果;日志流就是将表的变更数据持续捕获的结果。这说明 flink sql 的 dynamic table 是可以非常自然地表示一张不断变化的 mysql 数据库表。

图片

在此基础上,我们调研了一些 cdc 技术,最终选择了 debezium 作为 flink cdc 的底层采集工具。debezium 支持全量同步,也支持增量同步,也支持全量 + 增量的同步,非常灵活,同时基于日志的 cdc 技术使得提供 exactly-once 成为可能。

将 flink sql 的内部数据结构 rowdata 和 debezium 的数据结构进行对比,可以发现两者是非常相似的。

  • 每条 rowdata 都有一个元数据 rowkind,包括 4 种类型, 分别是插入 (insert)、更新前镜像 (update_before)、更新后镜像 (update_after)、删除 (delete),这四种类型和数据库里面的 binlog 概念保持一致。
  • 而 debezium 的数据结构,也有一个类似的元数据 op 字段, op 字段的取值也有四种,分别是 c、u、d、r,各自对应 create、update、delete、read。对于代表更新操作的 u,其数据部分同时包含了前镜像 (before) 和后镜像 (after)。

通过分析两种数据结构,flink 和 debezium 两者的底层数据是可以非常方便地对接起来的,大家可以发现 flink 做 cdc 从技术上是非常合适的。

3.3 传统 cdc etl 分析

我们来看下传统 cdc 的 etl 分析链路,如下图所示:

图片

传统的基于 cdc 的 etl 分析中,数据采集工具是必须的,国外用户常用 debezium,国内用户常用阿里开源的 canal,采集工具负责采集数据库的增量数据,一些采集工具也支持同步全量数据。采集到的数据一般输出到消息中间件如 kafka,然后 flink 计算引擎再去消费这一部分数据写入到目的端,目的端可以是各种 db,数据湖,实时数仓和离线数仓。

3.4 基于 flink cdc 的 etl 分析

在使用了 flink cdc 之后,除了组件更少,维护更方便外,另一个优势是通过 flink sql 极大地降低了用户使用门槛,可以看下面的例子:

图片

该例子是通过 flink cdc 去同步数据库数据并写入到 tidb,用户直接使用 flink sql 创建了产品和订单的 mysql-cdc 表,然后对数据流进行 join 加工,加工后直接写入到下游数据库。通过一个 flink sql 作业就完成了 cdc 的数据分析,加工和同步。

图片

大家会发现这是一个纯 sql 作业,这意味着只要会 sql 的 bi,业务线同学都可以完成此类工作。与此同时,用户也可以利用 flink sql 提供的丰富语法进行数据清洗、分析、聚合。

图片

此外,利用 flink sql 双流 join、维表 join、udtf 语法可以非常容易地完成数据打宽,以及各种业务逻辑加工。

flink cdc 1.x痛点
  • 全量 + 增量读取的过程需要保证所有数据的一致性:因此需要通过加锁保证,但是加锁在数据库层面上是一个十分高危的操作。底层 debezium 在保证数据一致性时,需要对读取的库或表加锁,全局锁可能导致数据库锁住,表级锁会锁住表的读,dba 一般不给锁权限。

    flink cdc 1.x 可以不加锁,能够满足大部分场景,但牺牲了一定的数据准确性。flink cdc 1.x 默认加全局锁,虽然能保证数据一致性,加锁的时间不确定,但存在上述 hang 住数据的风险

  • 不支持水平扩展:因为 flink cdc 底层是基于 debezium,起架构是单节点,所以flink cdc 只支持单并发。在全量阶段读取阶段,如果表非常大 (亿级别),读取时间在小时甚至天级别,用户不能通过增加资源去提升作业速度。

  • 全量读取阶段不支持 checkpoint:cdc 读取分为两个阶段,全量读取和增量读取,目前全量读取阶段是不支持 checkpoint 的,因此会存在一个问题:当我们同步全量数据时,假设需要 5 个小时,当我们同步了 4 小时的时候作业失败,这时候就需要重新开始,再读取 5 个小时。

flink cdc 2.0设计

2.0 的设计方案,核心要解决上述的三个问题,即支持无锁、水平扩展和checkpoint。

这篇论文里描述的无锁算法如下图所示:

图片

左边是 chunk 的切分算法描述,chunk 的切分算法其实和很多数据库的分库分表原理类似,通过表的主键对表中的数据进行分片。假设每个 chunk 的步长为 10,按照这个规则进行切分,只需要把这些 chunk 的区间做成左开右闭或者左闭右开的区间,保证衔接后的区间能够等于表的主键区间即可。

右边是每个 chunk 的无锁读算法描述,该算法的核心思想是在划分了 chunk 后,对于每个 chunk 的全量读取和增量读取,在不用锁的条件下完成一致性的合并。chunk 的切分如下图所示:

图片

因为每个 chunk 只负责自己主键范围内的数据,不难推导,只要能够保证每个 chunk 读取的一致性,就能保证整张表读取的一致性,这便是无锁算法的基本原理。

flink cdc未来规划

图片

关于 cdc 项目的未来规划,我们希望围绕稳定性,进阶 feature 和生态集成三个方面展开。

  • 稳定性

    • 通过社区的方式吸引更多的开发者,公司的开源力量提升 flink cdc 的成熟度;
    • 支持 lazy assigning。lazy assigning 的思路是将 chunk 先划分一批,而不是一次性进行全部划分。当前 source reader 对数据读取进行分片是一次性全部划分好所有 chunk,例如有 1 万个 chunk,可以先划分 1 千个 chunk,而不是一次性全部划分,在 sourcereader 读取完 1 千 chunk 后再继续划分,节约划分 chunk 的时间。
  • 进阶 feature

    • 支持 schema evolution。这个场景是:当同步数据库的过程中,突然在表中添加了一个字段,并且希望后续同步下游系统的时候能够自动加入这个字段;
    • 支持 watermark pushdown 通过 cdc 的 binlog 获取到一些心跳信息,这些心跳的信息可以作为一个 watermark,通过这个心跳信息可以知道到这个流当前消费的一些进度;
    • 支持 meta 数据,分库分表的场景下,有可能需要元数据知道这条数据来源哪个库哪个表,在下游系统入湖入仓可以有更多的灵活操作;
    • 整库同步:用户要同步整个数据库只需一行 sql 语法即可完成,而不用每张表定义一个 ddl 和 query。
  • 生态集成

    • 集成更多上游数据库,如 oracle,ms sqlserver。cloudera 目前正在积极贡献 oracle-cdc connector;
    • 在入湖层面,hudi 和 iceberg 写入上有一定的优化空间,例如在高 qps 入湖的时候,数据分布有比较大的性能影响,这一点可以通过与生态打通和集成继续优化。
四 table & sql api应用

使用提供的连接器设置flink集群需要几个步骤。

  1. 使用1.12+版本和java 8+安装flink集群。
  2. 从下载页面下载连接器sql jar(或者自己构建)。
  3. 将下载的jar放在flink_home/lib/目录下。
  4. 重新启动flink集群。

这个例子展示了如何在flink sql client中创建mysql cdc源并对其执行查询。

-- creates a mysql cdc table source
create table mysql_binlog (
 id int not null,
 name string,
 description string,
 weight decimal(10,3),
 primary key(id) not enforced
) with (
 'connector' = 'mysql-cdc',
 'hostname' = 'localhost',
 'port' = '3306',
 'username' = 'flinkuser',
 'password' = 'flinkpw',
 'database-name' = 'inventory',
 'table-name' = 'products'
);

-- read snapshot and binlog data from mysql, and do some transformation, and show on the client
select id, upper(name), description, weight from mysql_binlog;
五 datastream api应用

包括以下maven依赖项(可通过maven central获得):

<dependency>
  <groupid>com.ververica</groupid>
  <!-- add the dependency matching your database -->
  <artifactid>flink-connector-mysql-cdc</artifactid>
  <!-- the dependency is available only for stable releases, snapshot dependency need build by yourself. -->
  <version>2.4-snapshot</version>
</dependency>
import org.apache.flink.streaming.api.environment.streamexecutionenvironment;
import com.ververica.cdc.debezium.jsondebeziumdeserializationschema;
import com.ververica.cdc.connectors.mysql.source.mysqlsource;

public class mysqlbinlogsourceexample {
  public static void main(string[] args) throws exception {
    mysqlsource<string> mysqlsource = mysqlsource.<string>builder()
            .hostname("yourhostname")
            .port(yourport)
            .databaselist("yourdatabasename") // set captured database
            .tablelist("yourdatabasename.yourtablename") // set captured table
            .username("yourusername")
            .password("yourpassword")
            .deserializer(new jsondebeziumdeserializationschema()) // converts sourcerecord to json string
            .build();
    
    streamexecutionenvironment env = streamexecutionenvironment.getexecutionenvironment();
    
    // enable checkpoint
    env.enablecheckpointing(3000);
    
    env
      .fromsource(mysqlsource, watermarkstrategy.nowatermarks(), "mysql source")
      // set 4 parallel source tasks
      .setparallelism(4)
      .print().setparallelism(1); // use parallelism 1 for sink to keep message ordering
    
    env.execute("print mysql snapshot + binlog");
  }
}
六 flink cdc connector
6.1 mysql cdc 连接器

mysql cdc 连接器允许从 mysql 数据库读取快照数据和增量数据。如下描述了如何设置 mysql cdc 连接器来对 mysql 数据库运行 sql 查询。

支持的数据库

connectordatabasedriver
mysql-cdcmysql: 5.6, 5.7, 8.0.xrds mysql: 5.6, 5.7, 8.0.xpolardb mysql: 5.6, 5.7, 8.0.xaurora mysql: 5.6, 5.7, 8.0.xmariadb: 10.xpolardb x: 2.0.1jdbc driver: 8.0.21

为了设置 mysql cdc 连接器,下表提供了使用构建自动化工具(如 maven 或 sbt )和带有 sql jar 包的 sql 客户端的两个项目的依赖关系信息。

maven dependency

<dependency>
  <groupid>com.ververica</groupid>
  <artifactid>flink-connector-mysql-cdc</artifactid>
  <!-- 请使用已发布的版本依赖,snapshot版本的依赖需要本地自行编译。 -->
  <version>2.2.1</version>
</dependency>

sql client jar

下载链接仅在已发布版本可用,请在文档网站左下角选择浏览已发布的版本。

下载 flink-sql-connector-mysql-cdc-2.2.1.jar<flink_home>/lib/ 目录下。

为每个 reader 设置不同的 server id

每个用于读取 binlog 的 mysql 数据库客户端都应该有一个唯一的 id,称为 server id。 mysql 服务器将使用此 id 来维护网络连接和 binlog 位置。 因此,如果不同的作业共享相同的 server id, 则可能导致从错误的 binlog 位置读取数据。 因此,建议通过为每个 reader 设置不同的 server id sql hints, 假设 source 并行度为 4, 我们可以使用 select * from source_table /*+ options('server-id'='5401-5404') */ ; 来为 4 个 source readers 中的每一个分配唯一的 server id。

设置 mysql 会话超时时间

当为大型数据库创建初始一致快照时,你建立的连接可能会在读取表时碰到超时问题。你可以通过在 mysql 侧配置 interactive_timeout 和 wait_timeout 来缓解此类问题。

  • interactive_timeout: 服务器在关闭交互连接之前等待活动的秒数。 更多信息请参考 mysql documentations.
  • wait_timeout: 服务器在关闭非交互连接之前等待活动的秒数。 更多信息请参考 mysql documentations.
6.2 如何创建 mysql cdc 表
#	启动flink集群
start-cluster.sh
# 启动flink sql客户端
sql-client.sh

mysql中表结构定义:

create table `orders` (
  `order_id` int not null,
  `order_date` datetime default null,
  `customer_name` varchar(255) default null,
  `price` decimal(10,5) default null,
  `product_id` int default null,
  `order_status` tinyint(1) default null,
  primary key (`order_id`)
) engine=innodb default charset=utf8mb4 collate=utf8mb4_0900_ai_ci;

mysql cdc 表可以定义如下:

-- 每 3 秒做一次 checkpoint,用于测试,生产配置建议5到10分钟                      
flink sql> set 'execution.checkpointing.interval' = '3s';   
-- 在 flink sql中注册 mysql 表 'orders'
flink sql> create table orders (
     order_id int,
     order_date timestamp(0),
     customer_name string,
     price decimal(10, 5),
     product_id int,
     order_status boolean,
     primary key(order_id) not enforced
     ) with (
     'connector' = 'mysql-cdc',
     'hostname' = 'qianfeng01',
     'port' = '3306',
     'username' = 'root',
     'password' = '123456',
     'database-name' = 'mydb',
     'table-name' = 'orders');
  
-- 从订单表读取全量数据(快照)和增量数据(binlog)
flink sql> select * from orders;
6.3 支持的元数据

下表中的元数据可以在 ddl 中作为只读(虚拟)meta 列声明。

keydatatypedescription
table_namestring not null当前记录所属的表名称。
database_namestring not null当前记录所属的库名称。
op_tstimestamp_ltz(3) not null当前记录表在数据库中更新的时间。 如果从表的快照而不是 binlog 读取记录,该值将始终为0。

下述创建表示例展示元数据列的用法:

-- 在 flink sql中注册 mysql 表 'products'
flink sql> create table products (
    db_name string metadata from 'database_name' virtual,
    table_name string metadata  from 'table_name' virtual,
    operation_ts timestamp_ltz(3) metadata from 'op_ts' virtual,
    order_id int,
    order_date timestamp(0),
    customer_name string,
    price decimal(10, 5),
    product_id int,
    order_status boolean,
    primary key(order_id) not enforced
) with (
    'connector' = 'mysql-cdc',
    'hostname' = 'localhost',
    'port' = '3306',
    'username' = 'root',
    'password' = '123456',
    'database-name' = 'mydb',
    'table-name' = 'orders'
);

-- 订单状态实时分布
flink sql> select order_status,count(order_id) from products group by order_status;
6.4 动态加表

扫描新添加的表功能使你可以添加新表到正在运行的作业中,新添加的表将首先读取其快照数据,然后自动读取其变更日志。

想象一下这个场景:一开始, flink 作业监控表 [product, user, address], 但几天后,我们希望这个作业还可以监控表 [order, custom],这些表包含历史数据,我们需要作业仍然可以复用作业的已有状态,动态加表功能可以优雅地解决此问题。

以下操作显示了如何启用此功能来解决上述场景。 使用现有的 flink cdc source 作业,如下:

    mysqlsource<string> mysqlsource = mysqlsource.<string>builder()
        .hostname("yourhostname")
        .port(yourport)
        .scannewlyaddedtableenabled(true) // 启用扫描新添加的表功能
        .databaselist("db") // 设置捕获的数据库
        .tablelist("db.product, db.user, db.address") // 设置捕获的表 [product, user, address]
        .username("yourusername")
        .password("yourpassword")
        .deserializer(new jsondebeziumdeserializationschema()) // 将 sourcerecord 转换为 json 字符串
        .build();
   // 你的业务代码

如果我们想添加新表 [order, custom] 对于现有的 flink 作业,只需更新 tablelist() 将新增表 [order, custom] 加入并从已有的 savepoint 恢复作业。

step 1: 使用 savepoint 停止现有的 flink 作业。

$ ./bin/flink stop $existing_flink_job_id
suspending job "cca7bc1061d61cf15238e92312c2fc20" with a savepoint.
savepoint completed. path: file:/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab

step 2: 更新现有 flink 作业的表列表选项。

  1. 更新 tablelist() 参数.
  2. 编译更新后的作业,示例如下:
    mysqlsource<string> mysqlsource = mysqlsource.<string>builder()
        .hostname("yourhostname")
        .port(yourport)
        .scannewlyaddedtableenabled(true) 
        .databaselist("db") 
        .tablelist("db.product, db.user, db.address, db.order, db.custom") // 设置捕获的表 [product, user, address ,order, custom]
        .username("yourusername")
        .password("yourpassword")
        .deserializer(new jsondebeziumdeserializationschema()) // 将 sourcerecord 转换为 json 字符串
        .build();
   // 你的业务代码

step 3: 从 savepoint 还原更新后的 flink 作业。

$ ./bin/flink run \
      --detached \ 
      --fromsavepoint /tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab \
      ./flinkcdcexample.jar
七 flink cdc案例一
7.1 基于 flink cdc 构建 mysql 和 postgres 的 streaming etl

这篇教程将展示如何基于 flink cdc 快速构建 mysql 和 postgres 的流式 etl。本教程的演示都将在 flink sql cli 中进行,只涉及 sql,无需一行 java/scala 代码,也无需安装 ide。

假设我们正在经营电子商务业务,商品和订单的数据存储在 mysql 中,订单对应的物流信息存储在 postgres 中。 对于订单表,为了方便进行分析,我们希望让它关联上其对应的商品和物流信息,构成一张宽表,并且实时把它写到 elasticsearch 中。

接下来的内容将介绍如何使用 flink mysql/postgres cdc 来实现这个需求,系统的整体架构如下图所示: flink cdc streaming etl

7.2 在 flink sql cli 中使用 flink ddl 创建表

首先,开启 checkpoint,每隔3秒做一次 checkpoint

-- flink sql                   
flink sql> set execution.checkpointing.interval = 3s;

然后, 对于数据库中的表 products, orders, shipments, 使用 flink sql cli 创建对应的表,用于同步这些底层数据库表的数据

-- flink sql
flink sql> create table products (
    id int,
    name string,
    description string,
    primary key (id) not enforced
  ) with (
    'connector' = 'mysql-cdc',
    'hostname' = 'localhost',
    'port' = '3306',
    'username' = 'root',
    'password' = '123456',
    'database-name' = 'mydb',
    'table-name' = 'products'
  );

flink sql> create table orders (
   order_id int,
   order_date timestamp(0),
   customer_name string,
   price decimal(10, 5),
   product_id int,
   order_status boolean,
   primary key (order_id) not enforced
 ) with (
   'connector' = 'mysql-cdc',
   'hostname' = 'localhost',
   'port' = '3306',
   'username' = 'root',
   'password' = '123456',
   'database-name' = 'mydb',
   'table-name' = 'orders'
 );

flink sql> create table shipments (
   shipment_id int,
   order_id int,
   origin string,
   destination string,
   is_arrived boolean,
   primary key (shipment_id) not enforced
 ) with (
   'connector' = 'postgres-cdc',
   'hostname' = 'localhost',
   'port' = '5432',
   'username' = 'postgres',
   'password' = 'postgres',
   'database-name' = 'postgres',
   'schema-name' = 'public',
   'table-name' = 'shipments'
 );

最后,创建 enriched_orders 表, 用来将关联后的订单数据写入 elasticsearch 中

-- flink sql
flink sql> create table enriched_orders (
   order_id int,
   order_date timestamp(0),
   customer_name string,
   price decimal(10, 5),
   product_id int,
   order_status boolean,
   product_name string,
   product_description string,
   shipment_id int,
   origin string,
   destination string,
   is_arrived boolean,
   primary key (order_id) not enforced
 ) with (
     'connector' = 'elasticsearch-7',
     'hosts' = 'http://localhost:9200',
     'index' = 'enriched_orders'
 );
7.3 关联订单数据并且将其写入 elasticsearch 中

使用 flink sql 将订单表 order 与 商品表 products,物流信息表 shipments 关联,并将关联后的订单信息写入 elasticsearch 中

-- flink sql
flink sql> insert into enriched_orders
 select o.*, p.name, p.description, s.shipment_id, s.origin, s.destination, s.is_arrived
 from orders as o
 left join products as p on o.product_id = p.id
 left join shipments as s on o.order_id = s.order_id;
八 flink cdc案例二
8.1 基于 flink cdc 同步 mysql 分库分表构建实时数据湖

在 oltp 系统中,为了解决单表数据量大的问题,通常采用分库分表的方式将单个大表进行拆分以提高系统的吞吐量。 但是为了方便数据分析,通常需要将分库分表拆分出的表在同步到数据仓库、数据湖时,再合并成一个大表。

这篇教程将展示如何使用 flink cdc 构建实时数据湖来应对这种场景,本教程的演示基于 docker,只涉及 sql,无需一行 java/scala 代码,也无需安装 ide,你可以很方便地在自己的电脑上完成本教程的全部内容。

接下来将以数据从 mysql 同步到 iceberg 为例展示整个流程,架构图如下所示:

image-20230607220414588

你也可以使用不同的 source 比如 oracle/postgres 和 sink 比如 hudi 来构建自己的 etl 流程。

8.2 准备教程所需要的组件

接下来的教程将以 docker-compose 的方式准备所需要的组件。

使用下面的内容创建一个 docker-compose.yml 文件:

version: '2.1'
services:
  sql-client:
    user: flink:flink
    image: yuxialuo/flink-sql-client:1.13.2.v1 
    depends_on:
      - jobmanager
      - mysql
    environment:
      flink_jobmanager_host: jobmanager
      mysql_host: mysql
    volumes:
      - shared-tmpfs:/tmp/iceberg
  jobmanager:
    user: flink:flink
    image: flink:1.13.2-scala_2.11
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - |
        flink_properties=
        jobmanager.rpc.address: jobmanager
    volumes:
      - shared-tmpfs:/tmp/iceberg
  taskmanager:
    user: flink:flink
    image: flink:1.13.2-scala_2.11
    depends_on:
      - jobmanager
    command: taskmanager
    environment:
      - |
        flink_properties=
        jobmanager.rpc.address: jobmanager
        taskmanager.numberoftaskslots: 2
    volumes:
      - shared-tmpfs:/tmp/iceberg
  mysql:
    image: debezium/example-mysql:1.1
    ports:
      - "3306:3306"
    environment:
      - mysql_root_password=123456
      - mysql_user=mysqluser
      - mysql_password=mysqlpw

volumes:
  shared-tmpfs:
    driver: local
    driver_opts:
      type: "tmpfs"
      device: "tmpfs"

该 docker compose 中包含的容器有:

  • sql-client: flink sql client, 用来提交 sql 查询和查看 sql 的执行结果
  • flink cluster:包含 flink jobmanager 和 flink taskmanager,用来执行 flink sql
  • mysql:作为分库分表的数据源,存储本教程的 user

注意:

  1. 为了简化整个教程,本教程需要的 jar 包都已经被打包进 sql-client 容器中了,镜像的构建脚本可以在 github 上找到。 如果你想要在自己的 flink 环境运行本教程,需要下载下面列出的包并且把它们放在 flink 所在目录的 lib 目录下,即 flink_home/lib/

    下载链接只对已发布的版本有效, snapshot 版本需要本地编译

    目前支持 flink 1.13 的 iceberg-flink-runtime jar 包还没有发布,所以我们在这里提供了一个支持 flink 1.13 的 iceberg-flink-runtime jar 包,这个 jar 包是基于 iceberg 的 master 分支打包的。 当 iceberg 0.13.0 版本发布后,你也可以在 apache official repository 下载到支持 flink 1.13 的 iceberg-flink-runtime jar 包。

  2. 本教程接下来用到的容器相关的命令都需要在 docker-compose.yml 所在目录下执行

docker-compose.yml 所在目录下执行下面的命令来启动本教程需要的组件:

docker-compose up -d

该命令将以 detached 模式自动启动 docker compose 配置中定义的所有容器。你可以通过 docker ps 来观察上述的容器是否正常启动了,也可以通过访问 http://localhost:8081/ 来查看 flink 是否运行正常。

image-20230609103715578

8.3 准备数据
  1. 进入 mysql 容器中

    docker-compose exec mysql mysql -uroot -p123456
    
  2. 创建数据和表,并填充数据

    创建两个不同的数据库,并在每个数据库中创建两个表,作为 user 表分库分表下拆分出的表。

     create database db_1;
     use db_1;
     create table user_1 (
       id integer not null primary key,
       name varchar(255) not null default 'flink',
       address varchar(1024),
       phone_number varchar(512),
       email varchar(255)
     );
     insert into user_1 values (110,"user_110","shanghai","123567891234","user_110@foo.com");
    
     create table user_2 (
       id integer not null primary key,
       name varchar(255) not null default 'flink',
       address varchar(1024),
       phone_number varchar(512),
       email varchar(255)
     );
    insert into user_2 values (120,"user_120","shanghai","123567891234","user_120@foo.com");
    
    create database db_2;
    use db_2;
    create table user_1 (
      id integer not null primary key,
      name varchar(255) not null default 'flink',
      address varchar(1024),
      phone_number varchar(512),
      email varchar(255)
    );
    insert into user_1 values (110,"user_110","shanghai","123567891234", null);
    
    create table user_2 (
      id integer not null primary key,
      name varchar(255) not null default 'flink',
      address varchar(1024),
      phone_number varchar(512),
      email varchar(255)
    );
    insert into user_2 values (220,"user_220","shanghai","123567891234","user_220@foo.com");
    
8.4 在 flink sql cli 中使用 flink ddl 创建表

首先,使用如下的命令进入 flink sql cli 容器中:

docker-compose exec sql-client ./sql-client

我们可以看到如下界面:

image-20230609104142018

然后,进行如下步骤:

  1. 开启 checkpoint,每隔3秒做一次 checkpoint

    checkpoint 默认是不开启的,我们需要开启 checkpoint 来让 iceberg 可以提交事务。 并且,mysql-cdc 在 binlog 读取阶段开始前,需要等待一个完整的 checkpoint 来避免 binlog 记录乱序的情况。

    -- flink sql                   
    flink sql> set execution.checkpointing.interval = 3s;
    
  2. 创建 mysql 分库分表 source 表

    创建 source 表 user_source 来捕获mysql中所有 user 表的数据,在表的配置项 database-name , table-name 使用正则表达式来匹配这些表。 并且,user_source 表也定义了 metadata 列来区分数据是来自哪个数据库和表。

    -- flink sql
    flink sql> create table user_source (
        database_name string metadata virtual,
        table_name string metadata virtual,
        `id` decimal(20, 0) not null,
        name string,
        address string,
        phone_number string,
        email string,
        primary key (`id`) not enforced
      ) with (
        'connector' = 'mysql-cdc',
        'hostname' = 'mysql',
        'port' = '3306',
        'username' = 'root',
        'password' = '123456',
        'database-name' = 'db_[0-9]+',
        'table-name' = 'user_[0-9]+'
      );
    
  3. 创建 iceberg sink 表

    创建 sink 表 all_users_sink,用来将数据加载至 iceberg 中。 在这个 sink 表,考虑到不同的 mysql 数据库表的 id 字段的值可能相同,我们定义了复合主键 (database_name, table_name, id)。

    -- flink sql
    flink sql> create table all_users_sink (
        database_name string,
        table_name    string,
        `id`          decimal(20, 0) not null,
        name          string,
        address       string,
        phone_number  string,
        email         string,
        primary key (database_name, table_name, `id`) not enforced
      ) with (
        'connector'='iceberg',
        'catalog-name'='iceberg_catalog',
        'catalog-type'='hadoop',  
        'warehouse'='file:///tmp/iceberg/warehouse',
        'format-version'='2'
      );
    
8.5 流式写入 iceberg
  1. 使用下面的 flink sql 语句将数据从 mysql 写入 iceberg 中

    -- flink sql
    flink sql> insert into all_users_sink select * from user_source;
    

    上述命令将会启动一个流式作业,源源不断将 mysql 数据库中的全量和增量数据同步到 iceberg 中。 在 flink ui 上可以看到这个运行的作业:

    image-20230609104414829

    然后我们就可以使用如下的命令看到 iceberg 中的写入的文件:

    docker-compose exec sql-client tree /tmp/iceberg/warehouse/default_database/
    

    如下所示:

    image-20230609104555552

    在你的运行环境中,实际的文件可能与上面的截图不相同,但是整体的目录结构应该相似。

  2. 使用下面的 flink sql 语句查询表 all_users_sink 中的数据

    -- flink sql
    flink sql> select * from all_users_sink;
    

    在 flink sql cli 中我们可以看到如下查询结果:

    image-20230609104650340

  3. 修改 mysql 中表的数据,iceberg 中的表 all_users_sink 中的数据也将实时更新:

    (3.1) 在 db_1.user_1 表中插入新的一行

    --- db_1
    insert into db_1.user_1 values (111,"user_111","shanghai","123567891234","user_111@foo.com");
    

    (3.2) 更新 db_1.user_2 表的数据

    --- db_1
    update db_1.user_2 set address='beijing' where id=120;
    

    (3.3) 在 db_2.user_2 表中删除一行

    --- db_2
    delete from db_2.user_2 where id=220;
    

    每执行一步,我们就可以在 flink client cli 中使用 select * from all_users_sink 查询表 all_users_sink 来看到数据的变化。

    最后的查询结果如下所示:

    image-20230609105124489

    从 iceberg 的最新结果中可以看到新增了(db_1, user_1, 111)的记录,(db_1, user_2, 120)的地址更新成了 beijing,且(db_2, user_2, 220)的记录被删除了,与我们在 mysql 做的数据更新完全一致。

8.6 环境清理

本教程结束后,在 docker-compose.yml 文件所在的目录下执行如下命令停止所有容器:

docker-compose down
参考

阿里云云栖号:https://baijiahao.baidu.com/s?id=1708018647118048692&wfr=spider&for=pc

flink cdc官网:https://ververica.github.io/flink-cdc-connectors/release-2.2/content/about.html

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com