
rowkindextractor 是 apache seatunnel 的一个转换插件,它能将 cdc 数据流转为 append-only 模式,并提取原始 rowkind 信息为新字段。本文将介绍 rowkindextractor 的核心功能,其在 cdc 数据同步场景下的使用方法,以及配置选项、注意事项及多种应用示例。
rowkindextractor
rowkindextractor 转换插件用于将 cdc(change data capture)数据流转换为 append-only(仅追加)模式,同时将原始的 rowkind 信息提取为一个新的字段。
核心功能:
- 将所有数据行的 rowkind 统一改为 +i(insert),实现 append-only 模式
- 将原始的 rowkind 信息(insert、update_before、update_after、delete)保存到新增的字段中
- 支持短格式和完整格式两种输出方式
为什么需要这个插件?
在 cdc 数据同步场景中,数据行带有 rowkind 标记(+i、-u、+u、-d),表示不同的变更类型。但某些下游系统(如数据湖、分析系统)只支持 append-only 模式,不支持 update 和 delete 操作。此时需要:
- 将所有数据转换为 insert 类型(append-only)
- 将原始的变更类型保存为普通字段,供后续分析使用
转换示例:
输入(cdc 数据): rowkind: -d (delete) 数据: id=1, name="test1", age=20 输出(append-only 数据): rowkind: +i (insert) 数据: id=1, name="test1", age=20, row_kind="delete"
典型应用场景:
- 将 cdc 数据写入只支持 append 的数据湖
- 需要在数据仓库中保留完整的变更历史记录
- 需要对不同类型的变更进行统计分析
配置选项

custom_field_name [string]
指定新增字段的名称,该字段用于存储原始的 rowkind 信息。
默认值:row_kind
注意事项:
- 字段名不能与原有字段重名,否则会报错
- 建议使用有意义的名称,如 operation_type、change_type、cdc_op 等
示例:
custom_field_name = "operation_type" # 使用自定义字段名
transform_type [enum]
指定 rowkind 字段值的输出格式。
可选值:

默认值:short
各值含义:

选择建议:
- short 格式:节省存储空间,适合对存储敏感的场景
- full 格式:可读性更好,适合需要人工查看或分析的场景
示例:
transform_type = full # 使用完整格式
完整示例
- 示例 1:使用默认配置(short 格式)
使用默认配置,将 cdc 数据转换为 append-only 模式,rowkind 以短格式保存。
env {
parallelism = 1
job.mode = "streaming"
}
source {
mysql-cdc {
plugin_output = "cdc_source"
server-id = 5652
username = "root"
password = "your_password"
table-names = ["mydb.users"]
url = "jdbc:mysql://localhost:3306/mydb"
}
}
transform {
rowkindextractor {
plugin_input = "cdc_source"
plugin_output = "append_only_data"
# 使用默认配置:
# custom_field_name = "row_kind"
# transform_type = short
}
}
sink {
console {
plugin_input = "append_only_data"
}
}数据转换过程:
输入数据(cdc 格式): 1. rowkind=+i, id=1, name="张三", age=25 2. rowkind=-u, id=1, name="张三", age=25 3. rowkind=+u, id=1, name="张三", age=26 4. rowkind=-d, id=1, name="张三", age=26 输出数据(append-only 格式): 1. rowkind=+i, id=1, name="张三", age=25, row_kind="+i" 2. rowkind=+i, id=1, name="张三", age=25, row_kind="-u" 3. rowkind=+i, id=1, name="张三", age=26, row_kind="+u" 4. rowkind=+i, id=1, name="张三", age=26, row_kind="-d"
- 示例 2:使用 full 格式和自定义字段名
使用完整格式输出 rowkind,并自定义字段名称。
env {
parallelism = 1
job.mode = "streaming"
}
source {
mysql-cdc {
plugin_output = "cdc_source"
server-id = 5652
username = "root"
password = "your_password"
table-names = ["mydb.orders"]
url = "jdbc:mysql://localhost:3306/mydb"
}
}
transform {
rowkindextractor {
plugin_input = "cdc_source"
plugin_output = "append_only_data"
custom_field_name = "operation_type" # 自定义字段名
transform_type = full # 使用完整格式
}
}
sink {
iceberg {
plugin_input = "append_only_data"
catalog_name = "iceberg_catalog"
database = "mydb"
table = "orders_history"
# iceberg 表会包含 operation_type 字段,记录每条数据的变更类型
}
}数据转换过程: 输入数据(cdc 格式): 1. rowkind=+i, order_id=1001, amount=100.00 2. rowkind=-u, order_id=1001, amount=100.00 3. rowkind=+u, order_id=1001, amount=150.00 4. rowkind=-d, order_id=1001, amount=150.00 输出数据(append-only 格式,full 格式): 1. rowkind=+i, order_id=1001, amount=100.00, operation_type="insert" 2. rowkind=+i, order_id=1001, amount=100.00, operation_type="update_before" 3. rowkind=+i, order_id=1001, amount=150.00, operation_type="update_after" 4. rowkind=+i, order_id=1001, amount=150.00, operation_type="delete"
- 示例 3:完整的测试示例(使用 fakesource)
使用 fakesource 生成测试数据,演示各种 rowkind 的转换效果。
env {
parallelism = 1
job.mode = "batch"
}
source {
fakesource {
plugin_output = "fake_cdc_data"
schema = {
fields {
pk_id = bigint
name = string
score = int
}
primarykey {
name = "pk_id"
columnnames = [pk_id]
}
}
rows = [
{
kind = insert
fields = [1, "a", 100]
},
{
kind = insert
fields = [2, "b", 100]
},
{
kind = update_before
fields = [1, "a", 100]
},
{
kind = update_after
fields = [1, "a_updated", 95]
},
{
kind = update_before
fields = [2, "b", 100]
},
{
kind = update_after
fields = [2, "b_updated", 98]
},
{
kind = delete
fields = [1, "a_updated", 95]
}
]
}
}
transform {
rowkindextractor {
plugin_input = "fake_cdc_data"
plugin_output = "transformed_data"
custom_field_name = "change_type"
transform_type = full
}
}
sink {
console {
plugin_input = "transformed_data"
}
}预期输出:
+i, pk_id=1, name="a", score=100, change_type="insert"
+i, pk_id=2, name="b", score=100, change_type="insert"
+i, pk_id=1, name="a", score=100, change_type="update_before"
+i, pk_id=1, name="a_updated", score=95, change_type="update_after"
+i, pk_id=2, name="b", score=100, change_type="update_before"
+i, pk_id=2, name="b_updated", score=98, change_type="update_after"
+i, pk_id=1, name="a_updated", score=95, change_type="delete"
到此这篇关于apache seatunnel 将 cdc 数据流转换为 append-only 模式的详细过程的文章就介绍到这了,更多相关apache seatunnel append-only 模式内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论