当前位置: 代码网 > 服务器>服务器>Linux > Apache SeaTunnel 将 CDC 数据流转换为 Append-Only 模式的详细过程

Apache SeaTunnel 将 CDC 数据流转换为 Append-Only 模式的详细过程

2025年11月20日 Linux 我要评论
rowkindextractor 是 apache seatunnel 的一个转换插件,它能将 cdc 数据流转为 append-only 模式,并提取原始 rowkind 信息为新字段。本文将介绍

rowkindextractor 是 apache seatunnel 的一个转换插件,它能将 cdc 数据流转为 append-only 模式,并提取原始 rowkind 信息为新字段。本文将介绍 rowkindextractor 的核心功能,其在 cdc 数据同步场景下的使用方法,以及配置选项、注意事项及多种应用示例。

rowkindextractor

rowkindextractor 转换插件用于将 cdc(change data capture)数据流转换为 append-only(仅追加)模式,同时将原始的 rowkind 信息提取为一个新的字段。

核心功能:

  • 将所有数据行的 rowkind 统一改为 +i(insert),实现 append-only 模式
  • 将原始的 rowkind 信息(insert、update_before、update_after、delete)保存到新增的字段中
  • 支持短格式和完整格式两种输出方式

为什么需要这个插件?

在 cdc 数据同步场景中,数据行带有 rowkind 标记(+i、-u、+u、-d),表示不同的变更类型。但某些下游系统(如数据湖、分析系统)只支持 append-only 模式,不支持 update 和 delete 操作。此时需要:

  1. 将所有数据转换为 insert 类型(append-only)
  2. 将原始的变更类型保存为普通字段,供后续分析使用

转换示例:

输入(cdc 数据):
  rowkind: -d (delete)
  数据: id=1, name="test1", age=20
输出(append-only 数据):
  rowkind: +i (insert)
  数据: id=1, name="test1", age=20, row_kind="delete"

典型应用场景

  • 将 cdc 数据写入只支持 append 的数据湖
  • 需要在数据仓库中保留完整的变更历史记录
  • 需要对不同类型的变更进行统计分析

配置选项

custom_field_name [string]

指定新增字段的名称,该字段用于存储原始的 rowkind 信息。

默认值:row_kind

注意事项:

  • 字段名不能与原有字段重名,否则会报错
  • 建议使用有意义的名称,如 operation_type、change_type、cdc_op 等

示例:

custom_field_name = "operation_type"  # 使用自定义字段名

transform_type [enum]

指定 rowkind 字段值的输出格式。

可选值:

默认值:short

各值含义:

选择建议:

  • short 格式:节省存储空间,适合对存储敏感的场景
  • full 格式:可读性更好,适合需要人工查看或分析的场景

示例:

transform_type = full  # 使用完整格式

完整示例

  • 示例 1:使用默认配置(short 格式)

使用默认配置,将 cdc 数据转换为 append-only 模式,rowkind 以短格式保存。

env {
  parallelism = 1
  job.mode = "streaming"
}
source {
  mysql-cdc {
    plugin_output = "cdc_source"
    server-id = 5652
    username = "root"
    password = "your_password"
    table-names = ["mydb.users"]
    url = "jdbc:mysql://localhost:3306/mydb"
  }
}
transform {
  rowkindextractor {
    plugin_input = "cdc_source"
    plugin_output = "append_only_data"
    # 使用默认配置:
    # custom_field_name = "row_kind"
    # transform_type = short
  }
}
sink {
  console {
    plugin_input = "append_only_data"
  }
}

数据转换过程:

输入数据(cdc 格式):
  1. rowkind=+i, id=1, name="张三", age=25
  2. rowkind=-u, id=1, name="张三", age=25
  3. rowkind=+u, id=1, name="张三", age=26
  4. rowkind=-d, id=1, name="张三", age=26
输出数据(append-only 格式):
  1. rowkind=+i, id=1, name="张三", age=25, row_kind="+i"
  2. rowkind=+i, id=1, name="张三", age=25, row_kind="-u"
  3. rowkind=+i, id=1, name="张三", age=26, row_kind="+u"
  4. rowkind=+i, id=1, name="张三", age=26, row_kind="-d"
  • 示例 2:使用 full 格式和自定义字段名

使用完整格式输出 rowkind,并自定义字段名称。

env {
  parallelism = 1
  job.mode = "streaming"
}
source {
  mysql-cdc {
    plugin_output = "cdc_source"
    server-id = 5652
    username = "root"
    password = "your_password"
    table-names = ["mydb.orders"]
    url = "jdbc:mysql://localhost:3306/mydb"
  }
}
transform {
  rowkindextractor {
    plugin_input = "cdc_source"
    plugin_output = "append_only_data"
    custom_field_name = "operation_type"  # 自定义字段名
    transform_type = full                 # 使用完整格式
  }
}
sink {
  iceberg {
    plugin_input = "append_only_data"
    catalog_name = "iceberg_catalog"
    database = "mydb"
    table = "orders_history"
    # iceberg 表会包含 operation_type 字段,记录每条数据的变更类型
  }
}
数据转换过程:
输入数据(cdc 格式):
  1. rowkind=+i, order_id=1001, amount=100.00
  2. rowkind=-u, order_id=1001, amount=100.00
  3. rowkind=+u, order_id=1001, amount=150.00
  4. rowkind=-d, order_id=1001, amount=150.00
输出数据(append-only 格式,full 格式):
  1. rowkind=+i, order_id=1001, amount=100.00, operation_type="insert"
  2. rowkind=+i, order_id=1001, amount=100.00, operation_type="update_before"
  3. rowkind=+i, order_id=1001, amount=150.00, operation_type="update_after"
  4. rowkind=+i, order_id=1001, amount=150.00, operation_type="delete"
  • 示例 3:完整的测试示例(使用 fakesource)

使用 fakesource 生成测试数据,演示各种 rowkind 的转换效果。

env {
  parallelism = 1
  job.mode = "batch"
}
source {
  fakesource {
    plugin_output = "fake_cdc_data"
    schema = {
      fields {
        pk_id = bigint
        name = string
        score = int
      }
      primarykey {
        name = "pk_id"
        columnnames = [pk_id]
      }
    }
    rows = [
      {
        kind = insert
        fields = [1, "a", 100]
      },
      {
        kind = insert
        fields = [2, "b", 100]
      },
      {
        kind = update_before
        fields = [1, "a", 100]
      },
      {
        kind = update_after
        fields = [1, "a_updated", 95]
      },
      {
        kind = update_before
        fields = [2, "b", 100]
      },
      {
        kind = update_after
        fields = [2, "b_updated", 98]
      },
      {
        kind = delete
        fields = [1, "a_updated", 95]
      }
    ]
  }
}
transform {
  rowkindextractor {
    plugin_input = "fake_cdc_data"
    plugin_output = "transformed_data"
    custom_field_name = "change_type"
    transform_type = full
  }
}
sink {
  console {
    plugin_input = "transformed_data"
  }
}

预期输出:

+i, pk_id=1, name="a", score=100, change_type="insert"
+i, pk_id=2, name="b", score=100, change_type="insert"
+i, pk_id=1, name="a", score=100, change_type="update_before"
+i, pk_id=1, name="a_updated", score=95, change_type="update_after"
+i, pk_id=2, name="b", score=100, change_type="update_before"
+i, pk_id=2, name="b_updated", score=98, change_type="update_after"
+i, pk_id=1, name="a_updated", score=95, change_type="delete"

到此这篇关于apache seatunnel 将 cdc 数据流转换为 append-only 模式的详细过程的文章就介绍到这了,更多相关apache seatunnel append-only 模式内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com