说明,本文章只提供借鉴,博主也是初次接触flink,请见谅
flink版本1.17.0,flink cdc版本2.3.0,百分百可以实时同步,亲测可用
首先看下我的程序结构吧
准备
1、依赖
<properties>
<flink.version>1.17.0</flink.version>
<flink.cdc.version>2.3.0</flink.cdc.version>
<logback-classic.version>1.1.11</logback-classic.version>
<slf4j-api.version>1.7.25</slf4j-api.version>
</properties>
<!-- 依赖器 -->
<dependencymanagement>
<dependencies>
<!-- springcloud 微服务 -->
<dependency>
<groupid>org.springframework.cloud</groupid>
<artifactid>spring-cloud-dependencies</artifactid>
<version>2021.0.4</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!-- springcloud alibaba 微服务 -->
<dependency>
<groupid>com.alibaba.cloud</groupid>
<artifactid>spring-cloud-alibaba-dependencies</artifactid>
<version>2021.0.4.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!-- springboot 依赖配置 -->
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-dependencies</artifactid>
<version>2.7.3</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencymanagement>
<dependencies>
<!-- bootstrap 启动器 -->
<dependency>
<groupid>org.springframework.cloud</groupid>
<artifactid>spring-cloud-starter-bootstrap</artifactid>
</dependency>
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-web</artifactid>
<version>2.7.3</version>
</dependency>
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-freemarker</artifactid>
<version>2.4.4</version>
</dependency>
<dependency>
<groupid>org.mybatis</groupid>
<artifactid>mybatis</artifactid>
<version>3.5.9</version>
</dependency>
<dependency>
<groupid>com.microsoft.sqlserver</groupid>
<artifactid>mssql-jdbc</artifactid>
<version>9.4.1.jre8</version>
</dependency>
<dependency>
<groupid>com.google.code.gson</groupid>
<artifactid>gson</artifactid>
<version>2.8.2</version>
</dependency>
<dependency>
<groupid>org.apache.flink</groupid>
<artifactid>flink-java</artifactid>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupid>org.apache.flink</groupid>
<artifactid>flink-streaming-java</artifactid>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupid>com.ververica</groupid>
<artifactid>flink-connector-sqlserver-cdc</artifactid>
<version>${flink.cdc.version}</version>
</dependency>
<dependency>
<groupid>com.ververica</groupid>
<artifactid>flink-connector-oracle-cdc</artifactid>
<version>${flink.cdc.version}</version>
</dependency>
<dependency>
<groupid>mysql</groupid>
<artifactid>mysql-connector-java</artifactid>
<version>8.0.33</version>
</dependency>
<dependency>
<groupid>org.springframework</groupid>
<artifactid>spring-context-support</artifactid>
<version>5.3.22</version>
</dependency>
<dependency>
<groupid>org.apache.flink</groupid>
<artifactid>flink-runtime-web</artifactid>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupid>com.oracle.database.jdbc</groupid>
<artifactid>ojdbc8</artifactid>
<version>12.2.0.1</version>
</dependency>
<dependency>
<groupid>com.ibm.db2</groupid>
<artifactid>jcc</artifactid>
<version>11.5.0.0</version>
</dependency>
<dependency>
<groupid>org.projectlombok</groupid>
<artifactid>lombok</artifactid>
<version>1.18.26</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-configuration-processor</artifactid>
<optional>true</optional>
</dependency>
</dependencies>
2、打包插件
<build>
<plugins>
<plugin>
<groupid>org.apache.maven.plugins</groupid>
<artifactid>maven-compiler-plugin</artifactid>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupid>org.apache.maven.plugins</groupid>
<artifactid>maven-shade-plugin</artifactid>
<version>3.1.1</version>
<executions>
<!-- run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactset>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<!-- <exclude>org.slf4j:*</exclude>-->
<!-- <exclude>org.apache.logging.log4j:*</exclude>-->
</excludes>
</artifactset>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>meta-inf/*.sf</exclude>
<exclude>meta-inf/*.dsa</exclude>
<exclude>meta-inf/*.rsa</exclude>
</excludes>
</filter>
</filters>
<transformers combine.children="append">
<!-- todo:这个防止多个connector的相同类名覆盖-->
<transformer implementation="org.apache.maven.plugins.shade.resource.servicesresourcetransformer"/>
<!--指定 主类-->
<transformer implementation="org.apache.maven.plugins.shade.resource.manifestresourcetransformer">
<mainclass>com.hbos.flinkcdcapplication</mainclass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
<resources>
<resource>
<directory>src/main/resources</directory>
<includes>
<include>application.yml</include>
<include>**/*.ftl</include>
<include>**/*.xml</include>
<include>**/*.properties</include>
</includes>
<filtering>true</filtering>
</resource>
</resources>
</build>
编写主类
@springbootapplication(excludename = {"org.springframework.security"})
@slf4j
public class flinkcdcapplication {
public static void main(string[] args){
springapplication.run(flinkcdcapplication.class,args);
log.info("cdc服务启动成功");
system.out.println("=========================="+"\n"+
"(♥◠‿◠)ノ゙cdc服务启动成功"+"\n"
+"==========================");
}
}
编写源表sourceconfig 类配置
package com.hbos.cdc.config;
import com.hbos.cdc.entity.sourcedataproperties;
import com.hbos.cdc.listener.datachangesink;
import com.ververica.cdc.connectors.sqlserver.sqlserversource;
import com.ververica.cdc.connectors.sqlserver.table.startupoptions;
import com.ververica.cdc.debezium.debeziumsourcefunction;
import com.ververica.cdc.debezium.jsondebeziumdeserializationschema;
import org.apache.flink.streaming.api.environment.streamexecutionenvironment;
import org.apache.kafka.connect.json.decimalformat;
import org.apache.kafka.connect.json.jsonconverterconfig;
import org.springframework.beans.factory.annotation.qualifier;
import org.springframework.boot.context.properties.configurationproperties;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
import java.util.hashmap;
import java.util.map;
import java.util.properties;
@configuration
public class sourceconfig {
@bean(name = "sourceinfo")
@configurationproperties(prefix = "source")
public sourcedataproperties sourceinfo() {
return new sourcedataproperties();
}
@bean
public streamexecutionenvironment source(@qualifier("sourceinfo") sourcedataproperties sourceinfo) throws exception {
map config = new hashmap();
config.put(jsonconverterconfig.decimal_format_config, decimalformat.numeric.name());
jsondebeziumdeserializationschema jdd = new jsondebeziumdeserializationschema(false, config);
//通过flinkcdc构建sourcefunction
debeziumsourcefunction<string> sourcefunction = sqlserversource.<string>builder()
.hostname(sourceinfo.gethost())
.port(sourceinfo.getport())
.username(sourceinfo.getusername())
.password(sourceinfo.getpassword())
.database(sourceinfo.getdatabase()) //设置捕获的库名
.tablelist(sourceinfo.gettablelist()) //设置捕获的表,也可以去掉表,那就证明监控某库下所有的表
.deserializer(jdd) //转换成json
/*
* initial初始化快照,即全量导入后增量导入(检测更新数据写入)
* latest:只进行增量导入(不读取历史变化)
*/
.startupoptions(startupoptions.initial())//启动模式
.build();
streamexecutionenvironment env = streamexecutionenvironment.getexecutionenvironment();
env.setparallelism(5); //全局并行度为5
env.addsource(sourcefunction).addsink(new datachangesink()).setparallelism(2);
//4、启动任务
env.execute("jhygcdc1");
return env;
}
}
因为flink的结构和springboot的结构不一样,所以要想把bean交给spring管理那就需要手动去创建
创建bean工具,也就是springbeanutil类
@component
public class springbeanutil implements applicationcontextaware {
private static applicationcontext applicationcontext;
public void setapplicationcontext(applicationcontext applicationcontext)
throws beansexception {
springbeanutil.applicationcontext = applicationcontext;
}
public static <t> t getbean(class<t> clazz) {
return applicationcontext.getbean(clazz);
}
public static object getbean(string name) throws beansexception {
return applicationcontext.getbean(name);
}
}
创建源表实体和目标表实体
源实体sourcedataproperties类
//@component
@data
//@configurationproperties(prefix = "source")
public class sourcedataproperties {
private string driverclassname;
private string url;
private string host;
private int port;
private string username;
private string password;
private string database;
private string tablelist;
}
目标实体targetdataproperties 类
@component
@data
@configurationproperties(prefix = "target")
public class targetdataproperties {
private string driverclassname;
private string url;
private string host;
private string port;
private string username;
private string password;
private string database;
// @value("${cdc.datasource.target.schema}")
// private string schema;
//
// @value("${cdc.datasource.target.tablelist}")
// private string tablelist;
}
sink类
也就是处理业务逻辑的代码,有点多我就不粘贴出来了,你们可以自己处理你们自己的业务逻辑
也可以借鉴我的,地址如下
发表评论