当前位置: 代码网 > it编程>编程语言>Java > SpringBoot整合Flink CDC,实时追踪mysql数据变动

SpringBoot整合Flink CDC,实时追踪mysql数据变动

2024年07月28日 Java 我要评论
Flink CDC(Flink Change Data Capture)是一种基于数据库日志的CDC技术,它实现了一个全增量一体化的数据集成框架。与Flink计算框架相结合,Flink CDC能够高效地实现海量数据的实时集成。其核心功能在于实时监视数据库或数据流中的数据变动,并将这些变动抽取出来,以便进行进一步的处理和分析。借助Flink CDC,用户可以轻松地构建实时数据管道,实时响应和处理数据变动,为实时分析、实时报表和实时决策等场景提供有力支持。
❃博主首页 : ,同名公众号 :「码到三十五」,wx号 : 「liwu0213」
☠博主专栏 :
♝博主的话 : 搬的每块砖,皆为峰峦之基;公众号搜索「码到三十五」关注这个爱发技术干货的coder,一起筑基

前言

flink cdc(flink change data capture)是一种基于数据库日志的cdc技术,它实现了一个全增量一体化的数据集成框架。与flink计算框架相结合,flink cdc能够高效地实现海量数据的实时集成。其核心功能在于实时监视数据库或数据流中的数据变动,并将这些变动抽取出来,以便进行进一步的处理和分析。借助flink cdc,用户可以轻松地构建实时数据管道,实时响应和处理数据变动,为实时分析、实时报表和实时决策等场景提供有力支持。

flink cdc的应用场景广泛,包括但不限于实时数据仓库更新、实时数据同步和迁移以及实时数据处理等。它还能确保数据一致性,并在数据发生变更时准确地进行捕获和处理。此外,flink cdc支持与多种数据源进行集成,如mysql、postgresql、oracle等,并提供了相应的连接器,便于数据的捕获和处理。

接下来,将详细介绍mysql cdc的使用。mysql cdc连接器允许从mysql数据库中读取快照数据和增量数据。

1. mysql开启binlog

mysql中开启binlog功能,需要修改配置文件中(如linux的/etc/my.cnf或windows的\my.ini)的[mysqld]部分设置相关参数:

[mysqld]
server-id=1
# 设置日志格式为行级格式
binlog-format=row
# 设置binlog日志文件的前缀
log-bin=mysql-bin
# 指定需要记录二进制日志的数据库
binlog_do_db=testjpa

除了开启binlog功能外,还需要为flink cdc配置相应的权限,以确保其能够正常连接到mysql并读取数据。这包括授予flink cdc连接mysql的用户必要的权限,如select、replication slave、replication client、show view等。这些权限是flink cdc读取数据和元数据所必需的。

检查是否已开启binlog功能:

mysql> show variables like 'log_bin';
+---------------+-------+
| variable_name | value |
+---------------+-------+
| log_bin       | on    |
+---------------+-------+

至此,mysql的相关配置已完成。

2. 创建spring boot项目

首先,你需要创建一个spring boot项目。可以使用spring initializr(https://start.spring.io/)来快速生成项目。

3. 添加依赖

pom.xml中添加apache flink和flink cdc的依赖。以下是必要的依赖:

<dependencies>
    <!-- flink dependency -->
    <dependency>
        <groupid>org.apache.flink</groupid>
        <artifactid>flink-java</artifactid>
        <version>1.14.0</version>
    </dependency>
    <dependency>
        <groupid>org.apache.flink</groupid>
        <artifactid>flink-streaming-java_2.12</artifactid>
        <version>1.14.0</version>
    </dependency>
    <dependency>
        <groupid>org.apache.flink</groupid>
        <artifactid>flink-connector-mysql-cdc</artifactid>
        <version>2.0.0</version>
    </dependency>
    <!-- spring boot dependencies -->
    <dependency>
        <groupid>org.springframework.boot</groupid>
        <artifactid>spring-boot-starter</artifactid>
    </dependency>
</dependencies>

4. 配置flink和mysql cdc

在spring boot的application.ymlapplication.properties文件中配置flink和mysql数据库连接:

flink:
  checkpoint:
    interval: 10000
  parallelism: 1

spring:
  datasource:
    url: jdbc:mysql://localhost:3306/your_database
    username: your_username
    password: your_password

5. 实现数据实时追踪

创建一个服务类来实现数据的实时追踪:

import org.apache.flink.streaming.api.datastream.datastream;
import org.apache.flink.streaming.api.environment.streamexecutionenvironment;
import org.apache.flink.table.api.bridge.java.streamtableenvironment;
import org.springframework.stereotype.service;

@service
public class flinkcdcservice {

    public void startdatastreaming() {
        final streamexecutionenvironment env = streamexecutionenvironment.getexecutionenvironment();
        final streamtableenvironment tableenv = streamtableenvironment.create(env);

        // 使用flink cdc连接mysql
        string name = "inventory";
        tableenv.executesql("create table " + name + " (" +
            "  id int," +
            "  name string," +
            "  description string," +
            "  weight decimal(10, 3)" +
            ") with (" +
            "  'connector' = 'mysql-cdc'," +
            "  'hostname' = 'localhost'," +
            "  'port' = '3306'," +
            "  'username' = 'your_username'," +
            "  'password' = 'your_password'," +
            "  'database-name' = 'your_database'," +
            "  'table-name' = 'your_table'" +
            ")");

        // 查询并打印结果
        datastream<string> datastream = tableenv.sqlquery("select * from " + name).execute().print();

        try {
            env.execute("flink cdc demo");
        } catch (exception e) {
            e.printstacktrace();
        }
    }
}

6. 启动spring boot应用

在你的spring boot应用的启动类中调用flinkcdcservicestartdatastreaming方法来启动数据追踪:

import org.springframework.beans.factory.annotation.autowired;
import org.springframework.boot.commandlinerunner;
import org.springframework.boot.springapplication;
import org.springframework.boot.autoconfigure.springbootapplication;

@springbootapplication
public class flinkcdcapplication implements commandlinerunner {

    @autowired
    private flinkcdcservice flinkcdcservice;

    public static void main(string[] args) {
        springapplication.run(flinkcdcapplication.class, args);
    }

    @override
    public void run(string... args) throws exception {
        flinkcdcservice.startdatastreaming();
    }
}

7. 运行并测试

运行spring boot应用,并在mysql数据库中做出一些数据变动。你应该能在控制台看到实时打印的数据变动。


关注公众号[码到三十五]获取更多技术干货 !

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com