) engine=innodb auto_increment=5 default charset=utf8 collate=utf8_bin;
编码
-
使用《flink的sink实战之二:kafka》中创建的flinksinkdemo工程;
-
在pom.xml中增加mysql的依赖:
mysql
mysql-connector-java
8.0.11
- 创建和数据库的student表对应的实体类student.java:
package com.bolingcavalry.customize;
public class student {
private int id;
private string name;
private int age;
public int getid() {
return id;
}
public void setid(int id) {
this.id = id;
}
public string getname() {
return name;
}
public void setname(string name) {
this.name = name;
}
public int getage() {
return age;
}
public void setage(int age) {
this.age = age;
}
public student(string name, int age) {
this.name = name;
this.age = age;
}
}
- 创建自定义sink类mysqlsinkfunction.java,这是本文的核心,有关数据库的连接、断开、写入数据都集中在此:
package com.bolingcavalry.customize;
import org.apache.flink.configuration.configuration;
import org.apache.flink.streaming.api.functions.sink.richsinkfunction;
import java.sql.connection;
import java.sql.drivermanager;
import java.sql.preparedstatement;
import java.util.concurrent.timeunit;
import java.util.concurrent.locks.reentrantlock;
public class mysqlsinkfunction extends richsinkfunction {
preparedstatement preparedstatement;
private connection connection;
private reentrantlock reentrantlock = new reentrantlock();
@override
public void open(configuration parameters) throws exception {
super.open(parameters);
//准备数据库相关实例
buildpreparedstatement();
}
@override
public void close() throws exception {
super.close();
try{
if(null!=preparedstatement) {
preparedstatement.close();
preparedstatement = null;
}
} catch(exception e) {
e.printstacktrace();
}
try{
if(null!=connection) {
connection.close();
connection = null;
}
} catch(exception e) {
e.printstacktrace();
}
}
@override
public void invoke(student value, context context) throws exception {
preparedstatement.setstring(1, value.getname());
preparedstatement.setint(2, value.getage());
preparedstatement.executeupdate();
}
/**
-
准备好connection和preparedstatement
-
获取mysql连接实例,考虑多线程同步,
-
不用synchronize是因为获取数据库连接是远程操作,耗时不确定
-
@return
*/
private void buildpreparedstatement() {
if(null==connection) {
boolean haslock = false;
try {
haslock = reentrantlock.trylock(10, timeunit.seconds);
if(haslock) {
class.forname(“com.mysql.cj.jdbc.driver”);
connection = drivermanager.getconnection(“jdbc:mysql://192.168.50.43:3306/flinkdemo?useunicode=true&characterencoding=utf-8&usessl=false&servertimezone=utc”, “root”, “123456”);
}
if(null!=connection) {
preparedstatement = connection.preparestatement(“insert into student (name, age) values (?, ?)”);
}
} catch (exception e) {
//生产环境慎用
e.printstacktrace();
} finally {
if(haslock) {
reentrantlock.unlock();
}
}
}
}
}
-
上述代码很简单,只需要注意在创建连接的时候用到了锁来控制多线程同步,以及高版本mysql驱动对应的driver和uri的写法与以前5.x版本的区别;
-
创建任务类studentsink.java,用来创建一个flink任务,里面通过arraylist创建了一个数据集,然后直接addsink,为了看清dag,调用disablechaining方法取消了operator chain:
package com.bolingcavalry.customize;
import org.apache.flink.streaming.api.environment.streamexecutionenvironment;
import java.util.arraylist;
import java.util.list;
public class studentsink {
public static void main(string[] args) throws exception {
final streamexecutionenvironment env = streamexecutionenvironment.getexecutionenvironment();
//并行度为1
env.setparallelism(1);
list list = new arraylist<>();
list.add(new student(“aaa”, 11));
list.add(new student(“bbb”, 12));
list.add(new student(“ccc”, 13));
list.add(new student(“ddd”, 14));
list.add(new student(“eee”, 15));
list.add(new student(“fff”, 16));
自我介绍一下,小编13年上海交大毕业,曾经在小公司待过,也去过华为、oppo等大厂,18年进入阿里一直到现在。
深知大多数java工程师,想要提升技能,往往是自己摸索成长或者是报班学习,但对于培训机构动则几千的学费,着实压力不小。自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!
因此收集整理了一份《2024年java开发全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友,同时减轻大家的负担。
既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上java开发知识点,真正体系化!
由于文件比较大,这里只是将部分目录截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且会持续更新!
如果你觉得这些内容对你有帮助,可以扫码获取!!(备注java获取)

总结
我们总是喜欢瞻仰大厂的大神们,但实际上大神也不过凡人,与菜鸟程序员相比,也就多花了几分心思,如果你再不努力,差距也只会越来越大。
面试题多多少少对于你接下来所要做的事肯定有点帮助,但我更希望你能透过面试题去总结自己的不足,以提高自己核心技术竞争力。每一次面试经历都是对你技术的扫盲,面试后的复盘总结效果是极好的!
《互联网大厂面试真题解析、进阶开发核心学习笔记、全套讲解视频、实战项目源码讲义》
032d36.jpg" alt=“img” style=“zoom: 33%;” />
总结
我们总是喜欢瞻仰大厂的大神们,但实际上大神也不过凡人,与菜鸟程序员相比,也就多花了几分心思,如果你再不努力,差距也只会越来越大。
面试题多多少少对于你接下来所要做的事肯定有点帮助,但我更希望你能透过面试题去总结自己的不足,以提高自己核心技术竞争力。每一次面试经历都是对你技术的扫盲,面试后的复盘总结效果是极好的!
[外链图片转存中…(img-hkz9aazx-1713376775067)]
《互联网大厂面试真题解析、进阶开发核心学习笔记、全套讲解视频、实战项目源码讲义》
发表评论