当前位置: 代码网 > it编程>数据库>Mysql > Flink的sink实战之四:自定义

Flink的sink实战之四:自定义

2024年08月02日 Mysql 我要评论
我们总是喜欢瞻仰大厂的大神们,但实际上大神也不过凡人,与菜鸟程序员相比,也就多花了几分心思,如果你再不努力,差距也只会越来越大。面试题多多少少对于你接下来所要做的事肯定有点帮助,但我更希望你能透过面试题去总结自己的不足,以提高自己核心技术竞争力。每一次面试经历都是对你技术的扫盲,面试后的复盘总结效果是极好的!《互联网大厂面试真题解析、进阶开发核心学习笔记、全套讲解视频、实战项目源码讲义》点击传送门即可获取!” />

) engine=innodb auto_increment=5 default charset=utf8 collate=utf8_bin;

编码

  1. 使用《flink的sink实战之二:kafka》中创建的flinksinkdemo工程;

  2. 在pom.xml中增加mysql的依赖:

mysql

mysql-connector-java

8.0.11

  1. 创建和数据库的student表对应的实体类student.java:

package com.bolingcavalry.customize;

public class student {

private int id;

private string name;

private int age;

public int getid() {

return id;

}

public void setid(int id) {

this.id = id;

}

public string getname() {

return name;

}

public void setname(string name) {

this.name = name;

}

public int getage() {

return age;

}

public void setage(int age) {

this.age = age;

}

public student(string name, int age) {

this.name = name;

this.age = age;

}

}

  1. 创建自定义sink类mysqlsinkfunction.java,这是本文的核心,有关数据库的连接、断开、写入数据都集中在此:

package com.bolingcavalry.customize;

import org.apache.flink.configuration.configuration;

import org.apache.flink.streaming.api.functions.sink.richsinkfunction;

import java.sql.connection;

import java.sql.drivermanager;

import java.sql.preparedstatement;

import java.util.concurrent.timeunit;

import java.util.concurrent.locks.reentrantlock;

public class mysqlsinkfunction extends richsinkfunction {

preparedstatement preparedstatement;

private connection connection;

private reentrantlock reentrantlock = new reentrantlock();

@override

public void open(configuration parameters) throws exception {

super.open(parameters);

//准备数据库相关实例

buildpreparedstatement();

}

@override

public void close() throws exception {

super.close();

try{

if(null!=preparedstatement) {

preparedstatement.close();

preparedstatement = null;

}

} catch(exception e) {

e.printstacktrace();

}

try{

if(null!=connection) {

connection.close();

connection = null;

}

} catch(exception e) {

e.printstacktrace();

}

}

@override

public void invoke(student value, context context) throws exception {

preparedstatement.setstring(1, value.getname());

preparedstatement.setint(2, value.getage());

preparedstatement.executeupdate();

}

/**

  • 准备好connection和preparedstatement

  • 获取mysql连接实例,考虑多线程同步,

  • 不用synchronize是因为获取数据库连接是远程操作,耗时不确定

  • @return

*/

private void buildpreparedstatement() {

if(null==connection) {

boolean haslock = false;

try {

haslock = reentrantlock.trylock(10, timeunit.seconds);

if(haslock) {

class.forname(“com.mysql.cj.jdbc.driver”);

connection = drivermanager.getconnection(“jdbc:mysql://192.168.50.43:3306/flinkdemo?useunicode=true&characterencoding=utf-8&usessl=false&servertimezone=utc”, “root”, “123456”);

}

if(null!=connection) {

preparedstatement = connection.preparestatement(“insert into student (name, age) values (?, ?)”);

}

} catch (exception e) {

//生产环境慎用

e.printstacktrace();

} finally {

if(haslock) {

reentrantlock.unlock();

}

}

}

}

}

  1. 上述代码很简单,只需要注意在创建连接的时候用到了锁来控制多线程同步,以及高版本mysql驱动对应的driver和uri的写法与以前5.x版本的区别;

  2. 创建任务类studentsink.java,用来创建一个flink任务,里面通过arraylist创建了一个数据集,然后直接addsink,为了看清dag,调用disablechaining方法取消了operator chain:

package com.bolingcavalry.customize;

import org.apache.flink.streaming.api.environment.streamexecutionenvironment;

import java.util.arraylist;

import java.util.list;

public class studentsink {

public static void main(string[] args) throws exception {

final streamexecutionenvironment env = streamexecutionenvironment.getexecutionenvironment();

//并行度为1

env.setparallelism(1);

list list = new arraylist<>();

list.add(new student(“aaa”, 11));

list.add(new student(“bbb”, 12));

list.add(new student(“ccc”, 13));

list.add(new student(“ddd”, 14));

list.add(new student(“eee”, 15));

list.add(new student(“fff”, 16));
自我介绍一下,小编13年上海交大毕业,曾经在小公司待过,也去过华为、oppo等大厂,18年进入阿里一直到现在。

深知大多数java工程师,想要提升技能,往往是自己摸索成长或者是报班学习,但对于培训机构动则几千的学费,着实压力不小。自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!

因此收集整理了一份《2024年java开发全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友,同时减轻大家的负担。img

既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上java开发知识点,真正体系化!

由于文件比较大,这里只是将部分目录截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且会持续更新!

如果你觉得这些内容对你有帮助,可以扫码获取!!(备注java获取)

img

总结

我们总是喜欢瞻仰大厂的大神们,但实际上大神也不过凡人,与菜鸟程序员相比,也就多花了几分心思,如果你再不努力,差距也只会越来越大。

面试题多多少少对于你接下来所要做的事肯定有点帮助,但我更希望你能透过面试题去总结自己的不足,以提高自己核心技术竞争力。每一次面试经历都是对你技术的扫盲,面试后的复盘总结效果是极好的!

《互联网大厂面试真题解析、进阶开发核心学习笔记、全套讲解视频、实战项目源码讲义》
032d36.jpg" alt=“img” style=“zoom: 33%;” />

总结

我们总是喜欢瞻仰大厂的大神们,但实际上大神也不过凡人,与菜鸟程序员相比,也就多花了几分心思,如果你再不努力,差距也只会越来越大。

面试题多多少少对于你接下来所要做的事肯定有点帮助,但我更希望你能透过面试题去总结自己的不足,以提高自己核心技术竞争力。每一次面试经历都是对你技术的扫盲,面试后的复盘总结效果是极好的!

[外链图片转存中…(img-hkz9aazx-1713376775067)]

《互联网大厂面试真题解析、进阶开发核心学习笔记、全套讲解视频、实战项目源码讲义》

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com