当前位置: 代码网 > it编程>数据库>MsSqlserver > 大数据最新FlinkCDC全量及增量采集SqlServer数据_flink cdc sql server

大数据最新FlinkCDC全量及增量采集SqlServer数据_flink cdc sql server

2024年07月31日 MsSqlserver 我要评论
TABLE_CATALOGTABLE_SCHEMATABLE_NAME TABLE_TYPEtest dbo user_info BASE TABLEtest dbo systranschemas BASE TABLEtest cdc change_tables BASE TABLEtest cdc ddl


table_catalog table_schema table_name table_type
test dbo user_info base table
test dbo systranschemas base table
test cdc change_tables base table
test cdc ddl_history base table
test cdc lsn_time_mapping base table
test cdc captured_columns base table
test cdc index_columns base table
test dbo orders base table
test cdc dbo_orders_ct base table


#### 二、具体实现


##### 2.1 flik-cdc采集sqlserver主程序


添加依赖包:



    <dependency>
        <groupid>com.ververica</groupid>
        <artifactid>flink-connector-sqlserver-cdc</artifactid>
        <version>3.0.0</version>
    </dependency>

编写主函数:



public static void main(string[] args) throws exception {

    streamexecutionenvironment env = streamexecutionenvironment.getexecutionenvironment();

    // 设置全局并行度
    env.setparallelism(1);
    // 设置时间语义为processingtime
    env.getconfig().setautowatermarkinterval(0);
    // 每隔60s启动一个检查点
    env.enablecheckpointing(60000, checkpointingmode.exactly\_once);
    // checkpoint最小间隔
    env.getcheckpointconfig().setminpausebetweencheckpoints(1000);
    // checkpoint超时时间
    env.getcheckpointconfig().setcheckpointtimeout(60000);
    // 同一时间只允许一个checkpoint
    // env.getcheckpointconfig().setmaxconcurrentcheckpoints(1);
    // flink处理程序被cancel后,会保留checkpoint数据
    // env.getcheckpointconfig().setexternalizedcheckpointcleanup(checkpointconfig.externalizedcheckpointcleanup.retain\_on\_cancellation);


    sourcefunction<string> sqlserversource = sqlserversource.<string>builder()
            .hostname("localhost")
            .port(1433)
            .username("sa")
            .password("")
            .database("test")
            .tablelist("dbo.t\_info")
            .startupoptions(startupoptions.initial())
            .debeziumproperties(getdebeziumproperties())
            .deserializer(new customerdeserializationschemasqlserver())
            .build();

    datastreamsource<string> datastreamsource = env.addsource(sqlserversource, "\_transaction\_log\_source");
    datastreamsource.print().setparallelism(1);
    env.execute("sqlserver-cdc-test");

}


    public static properties getdebeziumproperties() {
    properties properties = new properties();
    properties.put("converters", "sqlserverdebeziumconverter");
    properties.put("sqlserverdebeziumconverter.type", "sqlserverdebe
(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com