当前位置: 代码网 > it编程>数据库>Mysql > Flink cdc 实现源表sqlserver到目标表sqlserver或者mysql

Flink cdc 实现源表sqlserver到目标表sqlserver或者mysql

2024年08月01日 Mysql 我要评论
源表到目标表的同步

说明,本文章只提供借鉴,博主也是初次接触flink,请见谅

flink版本1.17.0,flink cdc版本2.3.0,百分百可以实时同步,亲测可用

首先看下我的程序结构吧

准备

1、依赖

    <properties>
        <flink.version>1.17.0</flink.version>
        <flink.cdc.version>2.3.0</flink.cdc.version>
        <logback-classic.version>1.1.11</logback-classic.version>
        <slf4j-api.version>1.7.25</slf4j-api.version>
    </properties>

    <!-- 依赖器 -->
    <dependencymanagement>
        <dependencies>

            <!-- springcloud 微服务 -->
            <dependency>
                <groupid>org.springframework.cloud</groupid>
                <artifactid>spring-cloud-dependencies</artifactid>
                <version>2021.0.4</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>

            <!-- springcloud alibaba 微服务 -->
            <dependency>
                <groupid>com.alibaba.cloud</groupid>
                <artifactid>spring-cloud-alibaba-dependencies</artifactid>
                <version>2021.0.4.0</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>

            <!-- springboot 依赖配置 -->
            <dependency>
                <groupid>org.springframework.boot</groupid>
                <artifactid>spring-boot-dependencies</artifactid>
                <version>2.7.3</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencymanagement>

    <dependencies>
        <!-- bootstrap 启动器 -->
        <dependency>
            <groupid>org.springframework.cloud</groupid>
            <artifactid>spring-cloud-starter-bootstrap</artifactid>
        </dependency>

        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-web</artifactid>
            <version>2.7.3</version>
        </dependency>

        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-freemarker</artifactid>
            <version>2.4.4</version>
        </dependency>

        <dependency>
            <groupid>org.mybatis</groupid>
            <artifactid>mybatis</artifactid>
            <version>3.5.9</version>
        </dependency>

        <dependency>
            <groupid>com.microsoft.sqlserver</groupid>
            <artifactid>mssql-jdbc</artifactid>
            <version>9.4.1.jre8</version>
        </dependency>

        <dependency>
            <groupid>com.google.code.gson</groupid>
            <artifactid>gson</artifactid>
            <version>2.8.2</version>
        </dependency>
        <dependency>
            <groupid>org.apache.flink</groupid>
            <artifactid>flink-java</artifactid>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupid>org.apache.flink</groupid>
            <artifactid>flink-streaming-java</artifactid>
            <version>${flink.version}</version>
        </dependency>
        
        <dependency>
            <groupid>com.ververica</groupid>
            <artifactid>flink-connector-sqlserver-cdc</artifactid>
            <version>${flink.cdc.version}</version>
        </dependency>

        <dependency>
            <groupid>com.ververica</groupid>
            <artifactid>flink-connector-oracle-cdc</artifactid>
            <version>${flink.cdc.version}</version>
        </dependency>

        <dependency>
            <groupid>mysql</groupid>
            <artifactid>mysql-connector-java</artifactid>
            <version>8.0.33</version>
        </dependency>

        <dependency>
            <groupid>org.springframework</groupid>
            <artifactid>spring-context-support</artifactid>
            <version>5.3.22</version>
        </dependency>

        <dependency>
            <groupid>org.apache.flink</groupid>
            <artifactid>flink-runtime-web</artifactid>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupid>com.oracle.database.jdbc</groupid>
            <artifactid>ojdbc8</artifactid>
            <version>12.2.0.1</version>
        </dependency>

        <dependency>
            <groupid>com.ibm.db2</groupid>
            <artifactid>jcc</artifactid>
            <version>11.5.0.0</version>
        </dependency>

        <dependency>
            <groupid>org.projectlombok</groupid>
            <artifactid>lombok</artifactid>
            <version>1.18.26</version>
            <scope>compile</scope>
        </dependency>

        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-configuration-processor</artifactid>
            <optional>true</optional>
        </dependency>

    </dependencies>


2、打包插件

    <build>
            <plugins>
                <plugin>
                    <groupid>org.apache.maven.plugins</groupid>
                    <artifactid>maven-compiler-plugin</artifactid>
                    <version>3.1</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                    </configuration>
                </plugin>
                <plugin>
                    <groupid>org.apache.maven.plugins</groupid>
                    <artifactid>maven-shade-plugin</artifactid>
                    <version>3.1.1</version>
                    <executions>
                        <!-- run shade goal on package phase -->
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>shade</goal>
                            </goals>
                            <configuration>
                                <artifactset>
                                    <excludes>
                                        <exclude>org.apache.flink:force-shading</exclude>
                                        <exclude>com.google.code.findbugs:jsr305</exclude>
<!--                                        <exclude>org.slf4j:*</exclude>-->
<!--                                        <exclude>org.apache.logging.log4j:*</exclude>-->
                                    </excludes>
                                </artifactset>
                                <filters>
                                    <filter>
                                        <artifact>*:*</artifact>
                                        <excludes>
                                            <exclude>meta-inf/*.sf</exclude>
                                            <exclude>meta-inf/*.dsa</exclude>
                                            <exclude>meta-inf/*.rsa</exclude>
                                        </excludes>
                                    </filter>
                                </filters>
                                <transformers combine.children="append">
                                    <!-- todo:这个防止多个connector的相同类名覆盖-->
                                    <transformer implementation="org.apache.maven.plugins.shade.resource.servicesresourcetransformer"/>
                                    <!--指定 主类-->
                                    <transformer implementation="org.apache.maven.plugins.shade.resource.manifestresourcetransformer">
                                        <mainclass>com.hbos.flinkcdcapplication</mainclass>
                                    </transformer>
                                </transformers>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
            <resources>
                <resource>
                    <directory>src/main/resources</directory>
                    <includes>
                        <include>application.yml</include>
                        <include>**/*.ftl</include>
                        <include>**/*.xml</include>
                        <include>**/*.properties</include>
                    </includes>
                    <filtering>true</filtering>
                </resource>
            </resources>
        </build>

编写主类

@springbootapplication(excludename = {"org.springframework.security"})
@slf4j
public class flinkcdcapplication {

    public static void main(string[] args){

        springapplication.run(flinkcdcapplication.class,args);
        log.info("cdc服务启动成功");
        system.out.println("=========================="+"\n"+
                "(♥◠‿◠)ノ゙cdc服务启动成功"+"\n"
                +"==========================");
    }

}

编写源表sourceconfig 类配置

package com.hbos.cdc.config;

import com.hbos.cdc.entity.sourcedataproperties;
import com.hbos.cdc.listener.datachangesink;
import com.ververica.cdc.connectors.sqlserver.sqlserversource;
import com.ververica.cdc.connectors.sqlserver.table.startupoptions;
import com.ververica.cdc.debezium.debeziumsourcefunction;
import com.ververica.cdc.debezium.jsondebeziumdeserializationschema;
import org.apache.flink.streaming.api.environment.streamexecutionenvironment;
import org.apache.kafka.connect.json.decimalformat;
import org.apache.kafka.connect.json.jsonconverterconfig;
import org.springframework.beans.factory.annotation.qualifier;
import org.springframework.boot.context.properties.configurationproperties;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;

import java.util.hashmap;
import java.util.map;
import java.util.properties;

@configuration
public class sourceconfig {

    @bean(name = "sourceinfo")
    @configurationproperties(prefix = "source")
    public sourcedataproperties sourceinfo() {
        return new sourcedataproperties();
    }

    @bean
    public streamexecutionenvironment source(@qualifier("sourceinfo") sourcedataproperties sourceinfo) throws exception {


        map config = new hashmap();
        config.put(jsonconverterconfig.decimal_format_config, decimalformat.numeric.name());
        jsondebeziumdeserializationschema jdd = new jsondebeziumdeserializationschema(false, config);

        //通过flinkcdc构建sourcefunction
        debeziumsourcefunction<string> sourcefunction  = sqlserversource.<string>builder()
                .hostname(sourceinfo.gethost())
                .port(sourceinfo.getport())
                .username(sourceinfo.getusername())
                .password(sourceinfo.getpassword())
                .database(sourceinfo.getdatabase()) //设置捕获的库名
                .tablelist(sourceinfo.gettablelist()) //设置捕获的表,也可以去掉表,那就证明监控某库下所有的表
                .deserializer(jdd) //转换成json
                /*
                 * initial初始化快照,即全量导入后增量导入(检测更新数据写入)
                 * latest:只进行增量导入(不读取历史变化)
                 */
                .startupoptions(startupoptions.initial())//启动模式
                .build();

        streamexecutionenvironment env = streamexecutionenvironment.getexecutionenvironment();
        env.setparallelism(5); //全局并行度为5

        env.addsource(sourcefunction).addsink(new datachangesink()).setparallelism(2);
        //4、启动任务
        env.execute("jhygcdc1");
        return env;
    }
}

因为flink的结构和springboot的结构不一样,所以要想把bean交给spring管理那就需要手动去创建

创建bean工具,也就是springbeanutil类

@component
public class springbeanutil implements applicationcontextaware {

    private static applicationcontext applicationcontext;

    public void setapplicationcontext(applicationcontext applicationcontext)
      throws beansexception {
        springbeanutil.applicationcontext = applicationcontext;
    }

    public static <t> t getbean(class<t> clazz) {
        return applicationcontext.getbean(clazz);
    }

    public static object getbean(string name) throws beansexception {

        return applicationcontext.getbean(name);
    }
}

创建源表实体和目标表实体

源实体sourcedataproperties类

//@component
@data
//@configurationproperties(prefix = "source")
public class sourcedataproperties {

    private string driverclassname;

    private string url;

    private string host;

    private int port;

    private string username;

    private string password;

    private string database;

    private string tablelist;

}

目标实体targetdataproperties 类

@component
@data
@configurationproperties(prefix = "target")
public class targetdataproperties {

    private string driverclassname;

    private string url;

    private string host;

    private string port;

    private string username;

    private string password;

    private string database;

//    @value("${cdc.datasource.target.schema}")
//    private string schema;
//
//    @value("${cdc.datasource.target.tablelist}")
//    private string tablelist;

}

sink类

也就是处理业务逻辑的代码,有点多我就不粘贴出来了,你们可以自己处理你们自己的业务逻辑

也可以借鉴我的,地址如下

flink cdc业务逻辑层处理-csdn博客icon-default.png?t=n7t8https://blog.csdn.net/weixin_50448162/article/details/140635326?spm=1001.2014.3001.5502

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com