当前位置: 代码网 > it编程>编程语言>Javascript > flink-cdc之读取mysql变化数据

flink-cdc之读取mysql变化数据

2024年08月06日 Javascript 我要评论
由于打印的日志太多 我们可以用fastjson稍微封装下 然后传给sink去处理,根据update delete insert实时更新下游数据。修改一条数据 age=1 ->age=2。

pom

代码

注意开启checkpoint 和不开启是有区别的(savepoint也可以 启动的flink指定时候 -s savepath)

不开启,如果项目重启了,会重新读取所有的数据

开启了,项目重启了额,会根据保留的信息去读取变化的数据


import com.ververica.cdc.connectors.mysql.mysqlsource;
import com.ververica.cdc.connectors.mysql.table.startupoptions;
import com.ververica.cdc.debezium.debeziumsourcefunction;
import com.ververica.cdc.debezium.jsondebeziumdeserializationschema;
import com.ververica.cdc.debezium.stringdebeziumdeserializationschema;
import org.apache.flink.api.common.functions.mapfunction;
import org.apache.flink.streaming.api.datastream.datastreamsource;
import org.apache.flink.streaming.api.environment.streamexecutionenvironment;

public class flinkcdctest {

    public static void main(string[] args) throws exception {

        //1.获取flink 执行环境
        streamexecutionenvironment env = streamexecutionenvironment.getexecutionenvironment();
        env.setparallelism(1);

        //1.1 开启ck
//        env.enablecheckpointing(5000);
//        env.getcheckpointconfig().setcheckpointtimeout(10000);
//        env.getcheckpointconfig().setcheckpointingmode(checkpointingmode.exactly_once);
//        env.getcheckpointconfig().setmaxconcurrentcheckpoints(1);
//        
//        env.setstatebackend(new fsstatebackend("hdfs://hadoop102:8020/cdc-test/ck"));

        //2.通过flinkcdc构建sourcefunction
        debeziumsourcefunction<string> sourcefunction = mysqlsource.<string>builder()
                .hostname("9.134.70.1")
                .port(3306)
                .username("root")
                .password("xxxxxxx")
                .databaselist("cc")
                .tablelist("cc.student")
//                .deserializer(new stringdebeziumdeserializationschema())
                .deserializer(new jsondebeziumdeserializationschema())
                .startupoptions(startupoptions.initial())
                .build();
        datastreamsource<string> datastreamsource = env.addsource(sourcefunction);

        //3.数据打印
        datastreamsource.print("==flinkcdc==");

        //4.启动任务
        env.execute("flinkcdc");

    }

}

 mysql

 

数据库表

 增加一条数据

打印日志 op:c 是create

修改一条数据 age=1 ->age=2  op=u 是update

flink打印日志

删除这条数据 op=d 是delete

由于打印的日志太多 我们可以用fastjson稍微封装下 然后传给sink去处理,根据update delete insert实时更新下游数据,还有一个op=r 是读取的数据

还可以使用flinksql


import org.apache.flink.api.java.tuple.tuple2;
import org.apache.flink.streaming.api.datastream.datastream;
import org.apache.flink.streaming.api.environment.streamexecutionenvironment;
import org.apache.flink.table.api.table;
import org.apache.flink.table.api.bridge.java.streamtableenvironment;
import org.apache.flink.types.row;

public class flinksqlcdc {
    public static void main(string[] args) throws exception {

        //1.获取执行环境
        streamexecutionenvironment env = streamexecutionenvironment.getexecutionenvironment();
        env.setparallelism(1);
        streamtableenvironment tableenv = streamtableenvironment.create(env);

        //2.使用flinksql ddl模式构建cdc 表
        tableenv.executesql("create table user_info ( " +
                " id string primary key, " +
                " name int, " +
                " age string " +
                ") with ( " +
                " 'connector' = 'mysql-cdc', " +
                " 'scan.startup.mode' = 'latest-offset', " +
                " 'hostname' = '9.134.70.1', " +
                " 'port' = '3306', " +
                " 'username' = 'root', " +
                " 'password' = 'xxxxx', " +
                " 'database-name' = 'cc', " +
                " 'table-name' = 'student' " +
                ")");

        //3.查询数据并转换为流输出
        table table = tableenv.sqlquery("select * from user_info");
        datastream<tuple2<boolean, row>> retractstream = tableenv.toretractstream(table, row.class);
        retractstream.print();

        //4.启动
        env.execute("flinksqlcdc");

    }

}

 同样是增删改查,flink打印日志如下

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com