pom
代码
注意开启checkpoint 和不开启是有区别的(savepoint也可以 启动的flink指定时候 -s savepath)
不开启,如果项目重启了,会重新读取所有的数据
开启了,项目重启了额,会根据保留的信息去读取变化的数据
import com.ververica.cdc.connectors.mysql.mysqlsource;
import com.ververica.cdc.connectors.mysql.table.startupoptions;
import com.ververica.cdc.debezium.debeziumsourcefunction;
import com.ververica.cdc.debezium.jsondebeziumdeserializationschema;
import com.ververica.cdc.debezium.stringdebeziumdeserializationschema;
import org.apache.flink.api.common.functions.mapfunction;
import org.apache.flink.streaming.api.datastream.datastreamsource;
import org.apache.flink.streaming.api.environment.streamexecutionenvironment;
public class flinkcdctest {
public static void main(string[] args) throws exception {
//1.获取flink 执行环境
streamexecutionenvironment env = streamexecutionenvironment.getexecutionenvironment();
env.setparallelism(1);
//1.1 开启ck
// env.enablecheckpointing(5000);
// env.getcheckpointconfig().setcheckpointtimeout(10000);
// env.getcheckpointconfig().setcheckpointingmode(checkpointingmode.exactly_once);
// env.getcheckpointconfig().setmaxconcurrentcheckpoints(1);
//
// env.setstatebackend(new fsstatebackend("hdfs://hadoop102:8020/cdc-test/ck"));
//2.通过flinkcdc构建sourcefunction
debeziumsourcefunction<string> sourcefunction = mysqlsource.<string>builder()
.hostname("9.134.70.1")
.port(3306)
.username("root")
.password("xxxxxxx")
.databaselist("cc")
.tablelist("cc.student")
// .deserializer(new stringdebeziumdeserializationschema())
.deserializer(new jsondebeziumdeserializationschema())
.startupoptions(startupoptions.initial())
.build();
datastreamsource<string> datastreamsource = env.addsource(sourcefunction);
//3.数据打印
datastreamsource.print("==flinkcdc==");
//4.启动任务
env.execute("flinkcdc");
}
}
mysql
数据库表
增加一条数据
打印日志 op:c 是create
修改一条数据 age=1 ->age=2 op=u 是update
flink打印日志
删除这条数据 op=d 是delete
由于打印的日志太多 我们可以用fastjson稍微封装下 然后传给sink去处理,根据update delete insert实时更新下游数据,还有一个op=r 是读取的数据
还可以使用flinksql
import org.apache.flink.api.java.tuple.tuple2;
import org.apache.flink.streaming.api.datastream.datastream;
import org.apache.flink.streaming.api.environment.streamexecutionenvironment;
import org.apache.flink.table.api.table;
import org.apache.flink.table.api.bridge.java.streamtableenvironment;
import org.apache.flink.types.row;
public class flinksqlcdc {
public static void main(string[] args) throws exception {
//1.获取执行环境
streamexecutionenvironment env = streamexecutionenvironment.getexecutionenvironment();
env.setparallelism(1);
streamtableenvironment tableenv = streamtableenvironment.create(env);
//2.使用flinksql ddl模式构建cdc 表
tableenv.executesql("create table user_info ( " +
" id string primary key, " +
" name int, " +
" age string " +
") with ( " +
" 'connector' = 'mysql-cdc', " +
" 'scan.startup.mode' = 'latest-offset', " +
" 'hostname' = '9.134.70.1', " +
" 'port' = '3306', " +
" 'username' = 'root', " +
" 'password' = 'xxxxx', " +
" 'database-name' = 'cc', " +
" 'table-name' = 'student' " +
")");
//3.查询数据并转换为流输出
table table = tableenv.sqlquery("select * from user_info");
datastream<tuple2<boolean, row>> retractstream = tableenv.toretractstream(table, row.class);
retractstream.print();
//4.启动
env.execute("flinksqlcdc");
}
}
同样是增删改查,flink打印日志如下
发表评论