table_catalog table_schema table_name table_type
test dbo user_info base table
test dbo systranschemas base table
test cdc change_tables base table
test cdc ddl_history base table
test cdc lsn_time_mapping base table
test cdc captured_columns base table
test cdc index_columns base table
test dbo orders base table
test cdc dbo_orders_ct base table
#### 二、具体实现
##### 2.1 flik-cdc采集sqlserver主程序
添加依赖包:
<dependency>
<groupid>com.ververica</groupid>
<artifactid>flink-connector-sqlserver-cdc</artifactid>
<version>3.0.0</version>
</dependency>
编写主函数:
public static void main(string[] args) throws exception {
streamexecutionenvironment env = streamexecutionenvironment.getexecutionenvironment();
// 设置全局并行度
env.setparallelism(1);
// 设置时间语义为processingtime
env.getconfig().setautowatermarkinterval(0);
// 每隔60s启动一个检查点
env.enablecheckpointing(60000, checkpointingmode.exactly\_once);
// checkpoint最小间隔
env.getcheckpointconfig().setminpausebetweencheckpoints(1000);
// checkpoint超时时间
env.getcheckpointconfig().setcheckpointtimeout(60000);
// 同一时间只允许一个checkpoint
// env.getcheckpointconfig().setmaxconcurrentcheckpoints(1);
// flink处理程序被cancel后,会保留checkpoint数据
// env.getcheckpointconfig().setexternalizedcheckpointcleanup(checkpointconfig.externalizedcheckpointcleanup.retain\_on\_cancellation);
sourcefunction<string> sqlserversource = sqlserversource.<string>builder()
.hostname("localhost")
.port(1433)
.username("sa")
.password("")
.database("test")
.tablelist("dbo.t\_info")
.startupoptions(startupoptions.initial())
.debeziumproperties(getdebeziumproperties())
.deserializer(new customerdeserializationschemasqlserver())
.build();
datastreamsource<string> datastreamsource = env.addsource(sqlserversource, "\_transaction\_log\_source");
datastreamsource.print().setparallelism(1);
env.execute("sqlserver-cdc-test");
}
public static properties getdebeziumproperties() {
properties properties = new properties();
properties.put("converters", "sqlserverdebeziumconverter");
properties.put("sqlserverdebeziumconverter.type", "sqlserverdebe
发表评论