一、概述
flink cdc 是一个基于 apache flink 的数据捕获工具,能够实时捕获和处理数据库的变动事件。通过集成 flink cdc,可以实时追踪 mysql 数据库中的数据变动,构建高效的数据处理和分析应用。本文将介绍如何在 springboot 项目中集成 flink cdc,并实现对 mysql 数据变动的实时追踪。
二、准备工作
1. 环境准备
- jdk 1.8+
- maven 3.6+
- mysql 数据库
- apache flink 1.12+
- springboot 2.5+
2. 创建 mysql 数据库和表
create database test_db;
use test_db;
create table users (
id int auto_increment primary key,
name varchar(255) not null,
email varchar(255) not null,
created_at timestamp default current_timestamp
);
三、集成步骤
1. 引入依赖
在 springboot 项目的 pom.xml 中添加必要的依赖:
<dependencies>
<!-- spring boot dependencies -->
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter</artifactid>
</dependency>
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-data-jpa</artifactid>
</dependency>
<!-- flink dependencies -->
<dependency>
<groupid>org.apache.flink</groupid>
<artifactid>flink-java</artifactid>
<version>1.12.0</version>
</dependency>
<dependency>
<groupid>org.apache.flink</groupid>
<artifactid>flink-streaming-java_2.12</artifactid>
<version>1.12.0</version>
</dependency>
<!-- flink cdc dependencies -->
<dependency>
<groupid>com.ververica</groupid>
<artifactid>flink-connector-mysql-cdc</artifactid>
<version>2.0.0</version>
</dependency>
</dependencies>
2. 配置 flink cdc
在 springboot 项目中创建 flink cdc 配置类:
import com.ververica.cdc.connectors.mysql.mysqlsource;
import com.ververica.cdc.connectors.mysql.table.startupoptions;
import org.apache.flink.api.common.eventtime.watermarkstrategy;
import org.apache.flink.streaming.api.datastream.datastreamsource;
import org.apache.flink.streaming.api.environment.streamexecutionenvironment;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
@configuration
public class flinkcdcconfig {
@bean
public datastreamsource<string> mysqlsource(streamexecutionenvironment env) {
mysqlsource<string> source = mysqlsource.<string>builder()
.hostname("localhost")
.port(3306)
.databaselist("test_db")
.tablelist("test_db.users")
.username("root")
.password("password")
.deserializer(new jsondebeziumdeserializationschema())
.startupoptions(startupoptions.initial())
.build();
return env.fromsource(source, watermarkstrategy.nowatermarks(), "mysql source");
}
}
3. 创建 flink 作业
在 springboot 项目中创建 flink 作业:
import org.apache.flink.streaming.api.environment.streamexecutionenvironment;
import org.springframework.boot.commandlinerunner;
import org.springframework.stereotype.component;
@component
public class flinkjobrunner implements commandlinerunner {
private final streamexecutionenvironment env;
private final datastreamsource<string> mysqlsource;
public flinkjobrunner(streamexecutionenvironment env, datastreamsource<string> mysqlsource) {
this.env = env;
this.mysqlsource = mysqlsource;
}
@override
public void run(string... args) throws exception {
mysqlsource.print();
env.execute("flink cdc job");
}
}
4. 启动 springboot 应用
运行 springboot 应用,启动后会自动执行 flink 作业,并打印 mysql 数据库中 users 表的变动。
四、验证和测试
1. 插入测试数据
向 mysql 数据库中插入数据:
insert into users (name, email) values ('alice', 'alice@example.com');
insert into users (name, email) values ('bob', 'bob@example.com');
2. 验证输出
查看 springboot 应用的控制台输出,确认是否正确捕获并打印了 mysql 数据库中的变动。
到此这篇关于springboot 通过集成 flink cdc 来实时追踪 mysql 数据变动的文章就介绍到这了,更多相关springboot flink cdc 追踪mysql 数据变动内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论