- 两阶段提交2pc(还行)
⭐️(3)借助kafka实现端到端一致
1)各个端如何进行修改:
2)示例代码:
pom文件中引入依赖:
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.13.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.12</scala.binary.version>
<slf4j.version>1.7.30</slf4j.version>
</properties>
<dependencies>
<dependency>
<groupid>org.apache.flink</groupid>
<artifactid>flink-connector-kafka_2.12</artifactid>
<version>${flink.version}</version>
</dependency>
<!--1.12.0需要在服务器上引入如下两个jar包-->
<dependency>
<groupid>org.apache.flink</groupid>
<artifactid>flink-connector-base</artifactid>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupid>org.apache.kafka</groupid>
<artifactid>kafka-clients</artifactid>
<version>2.4.1</version>
</dependency>
<!--flink通用连接kafka 可以在写代码时不用指定版本 2.12代表scala版本-->
<dependency>
<groupid>org.apache.flink</groupid>
<artifactid>flink-connector-kafka_2.12</artifactid>
<version>${flink.version}</version>
</dependency>
<!-- 引入flink相关依赖-->
<dependency>
<groupid>org.apache.flink</groupid>
<artifactid>flink-java</artifactid>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupid>org.apache.flink</groupid>
<artifactid>flink-streaming-java_${scala.binary.version}</artifactid>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupid>org.apache.flink</groupid>
<artifactid>flink-clients_${scala.binary.version}</artifactid>
<version>${flink.version}</version>
</dependency>
<!-- 引入日志管理相关依赖-->
<dependency>
<groupid>org.slf4j</groupid>
<artifactid>slf4j-api</artifactid>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupid>org.slf4j</groupid>
<artifactid>slf4j-log4j12</artifactid>
<version>${slf4j.version}</version>
</dependency>
</dependencies>
-
flink内部
- 调用setcheckpointinterval设定多久一次存储检查点
- 调用setcheckpointingmode设置语义是精准一次
//1.flink内部保证精准一次:设置检查点
checkpointconfig checkpointconfig = env.getcheckpointconfig();
//10s内保存一次检查点
checkpointconfig.setcheckpointinterval(10000);
//保存检查点的超时时间为5秒钟
checkpointconfig.setcheckpointtimeout(5000);
//新一轮检查点开始前最少等待上一轮保存15秒才开始
checkpointconfig.setminpausebetweencheckpoints(15000);
//设置作业失败后删除检查点(默认)
checkpointconfig.enableexternalizedcheckpoints(checkpointconfig.externalizedcheckpointcleanup.delete_on_cancellation);
//设置检查点模式为精准一次(默认)
checkpointconfig.setcheckpointingmode(checkpointingmode.exactly_once);
-
输入端
- flinkkafkaconsumer必须调用setcommitoffsetsoncheckpoints方法
执行checkpoint的时候提交offset到checkpoint,当重启flink时,flink作业会告诉kafka我要从哪个offset开始消费,这样我们的数据也就恢复
properties props_source = new properties();
props_source.setproperty("bootstrap.servers", "node1:9092");
props_source.setproperty("group.id", "flink");
props_source.setproperty("auto.offset.reset", "latest");
//会开启一个后台线程每隔5s检测一下kafka的分区情况
props_source.setproperty("flink.partition-discovery.interval-millis", "5000");
flinkkafkaconsumer<string> kafkasource = new flinkkafkaconsumer<>("flink\_kafka", new simplestringschema(), props_source);
kafkasource.setstartfromlatest();
//2.输入端保证:执行checkpoint的时候提交offset到checkpoint(flink用)
kafkasource.setcommitoffsetsoncheckpoints(true);
datastreamsource<string> kafkads = env.addsource(kafkasource);
-
输出端
- 设置flink等待事务的超时时间,
- 设置语义为精准一次
properties props_sink = new properties();
props_sink.setproperty("bootstrap.servers", "node1:9092");
//3.1 设置事务超时时间,也可在kafka配置中设置
props_sink.setproperty("transaction.timeout.ms", 1000 \* 5 + "");
flinkkafkaproducer<string> kafkasink = new flinkkafkaproducer<>(
"flink\_kafka2",
new keyedserializationschemawrapper<string>(new simplestringschema()),
props_sink,
//3.2 设置输出的的语义为精准一次
flinkkafkaproducer.semantic.exactly_once
);
result.addsink(kafkasink);
- 完整代码
package com.ming.test1;
import org.apache.commons.lang3.systemutils;
import org.apache.flink.api.common.functions.flatmapfunction;
import org.apache.flink.api.common.functions.richmapfunction;
import org.apache.flink.api.common.serialization.simplestringschema;
import org.apache.flink.api.java.tuple.tuple;
import org.apache.flink.api.java.tuple.tuple2;
import org.apache.flink.runtime.state.filesystem.fsstatebackend;
import org.apache.flink.streaming.api.checkpointingmode;
import org.apache.flink.streaming.api.datastream.datastreamsource;
import org.apache.flink.streaming.api.datastream.keyedstream;
import org.apache.flink.streaming.api.datastream.singleoutputstreamoperator;
import org.apache.flink.streaming.api.environment.checkpointconfig;
import org.apache.flink.streaming.api.environment.streamexecutionenvironment;
import org.apache.flink.streaming.connectors.kafka.flinkkafkaconsumer;
import org.apache.flink.streaming.connectors.kafka.flinkkafkaproducer;
import org.apache.flink.streaming.connectors.kafka.internals.keyedserializationschemawrapper;
import org.apache.flink.util.collector;
import java.util.properties;
import java.util.random;
public class test {
public static void main(string[] args) throws exception {
streamexecutionenvironment env = streamexecutionenvironment.getexecutionenvironment();
//1.flink内部保证精准一次:设置检查点
checkpointconfig checkpointconfig = env.getcheckpointconfig();
//10s内保存一次检查点
checkpointconfig.setcheckpointinterval(10000);
//保存检查点的超时时间为5秒钟
checkpointconfig.setcheckpointtimeout(5000);
//新一轮检查点开始前最少等待上一轮保存15秒才开始
checkpointconfig.setminpausebetweencheckpoints(15000);
//设置作业失败后删除检查点(默认)
checkpointconfig.enableexternalizedcheckpoints(checkpointconfig.externalizedcheckpointcleanup.delete_on_cancellation);
//设置检查点模式为精准一次(默认)
checkpointconfig.setcheckpointingmode(checkpointingmode.exactly_once);
env.setstatebackend(new fsstatebackend("file:///users/xingxuanming/downloads/flink-checkpoint/checkpoint"));
properties props_source = new properties();
props_source.setproperty("bootstrap.servers", "node1:9092");
props_source.setproperty("group.id", "flink");
props_source.setproperty("auto.offset.reset", "latest");
//会开启一个后台线程每隔5s检测一下kafka的分区情况
props_source.setproperty("flink.partition-discovery.interval-millis", "5000");
flinkkafkaconsumer<string> kafkasource = new flinkkafkaconsumer<>("flink\_kafka", new simplestringschema(), props_source);
kafkasource.setstartfromlatest();
//2.输入端保证:执行checkpoint的时候提交offset到checkpoint(flink用)
kafkasource.setcommitoffsetsoncheckpoints(true);
datastreamsource<string> kafkads = env.addsource(kafkasource);
singleoutputstreamoperator<tuple2<string, integer>> wordandoneds = kafkads.flatmap(new flatmapfunction<string, tuple2<string, integer>>() {
@override
public void flatmap(string value, collector<tuple2<string, integer>> out) throws exception {
//value就是每一行
string[] words = value.split(" ");
for (string word : words) {
random random = new random();
int i = random.nextint(5);
if (i > 3) {
system.out.println("出bug了...");
throw new runtimeexception("出bug了...");
}
out.collect(tuple2.of(word, 1));
}
}
});
keyedstream<tuple2<string, integer>, tuple> groupedds = wordandoneds.keyby(0);
singleoutputstreamoperator<tuple2<string, integer>> aggresult = groupedds.sum(1);
singleoutputstreamoperator<string> result = (singleoutputstreamoperator<string>) aggresult.map(new richmapfunction<tuple2<string, integer>, string>() {
@override
public string map(tuple2<string, integer> value) throws exception {
return value.f0 + ":::" + value.f1;
}
});
//3.输出端保证
properties props_sink = new properties();
props_sink.setproperty("bootstrap.servers", "node1:9092");
//3.1 设置事务超时时间,也可在kafka配置中设置
props_sink.setproperty("transaction.timeout.ms", 1000 \* 5 + "");
flinkkafkaproducer<string> kafkasink = new flinkkafkaproducer<>(
"flink\_kafka2",
new keyedserializationschemawrapper<string>(new simplestringschema()),
props_sink,
//3.2 设置输出的的语义为精准一次
flinkkafkaproducer.semantic.exactly_once
);
result.addsink(kafkasink);
env.execute();
}
进行测试:
先确认下,这里不用关闭应用程序,再重启应用程序来模拟故障,而是程序里模拟故障,用随机数直接throw exeception
进入到kafka的bin目录下
+ 启动zookeeper
./zookeeper-server-start.sh …/config/zookeeper.properties &
+ 启动kafka
./kafka-server-start.sh …/config/server.properties &
+ 创建主题
kafka-topics.sh --bootstrap-server localhost:2181 --create --replication-factor 2 --partitions 3 --topic flink\_kafka2
+ 打开控制台生产者
kafka-console-producer.sh --broker-list localhost:9092 --topic flink\_kafka
+ 打开控制台消费者
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic flink\_kafka2来看最后的效果:
此代码主要功能是对输入的字符串做分区和求和操作,例如,传入hello,world,hello三条数据,按照字符串进行分区,分为hello,world两个分区,求和是按照分区内所有汇总的字段进行求和,最后结果是(hello,2) (world,1)
我们假设,来了三条数据,hello,world,hello,处理了两条数据,此时算子状态为(hello,1) (world,1) 第三条数据来的时候“恰好”故障,检查点保存的偏移量是第一条数据hello,算子里保存的是(hello,1) 那么继续从第二条数据world开始消费,第二条数据处理成功,此时算子里保存的(hello,1) (world,1),第三条数据处理成功,最终效果为(hello,2) (world,1)
3)最后补充下完整的流程:
二、前置知识
⭐️1.检查点(快照)
(1)概念:
(2)拓展问题:
2.如何保存以及恢复的?
举个例子,看看完整的流程图:
3.为什么检查点保存时机选择所有算子任务处理完同一批数据?
首先需要思考的一个问题:在算子a保存检查点的过程中,算子b正在处理新的数据,出现宕机,那我恢复的时候虽然明确知道哪条数据没有处理完,但是不确定执行到哪一个算子了,所以只能从头source放入数据,然后到算子a执行,但是算子a可能算出来这条数据的中间状态进行保存了, 相当于算子a要对这条数据处理两次。
举例:比如数据a,要进行处理1(求和),处理2(输出到存储文件)阶段,发生故障的时候在处理2阶段,处理1阶段已经保存了处理1阶段的值,难道恢复的时候,处理1阶段还要对数据的a再操作一次吗?
那我不选择处理完同一批数据,有没有什么其他解决方案?有,比较粗暴的方案。
4.那怎么确认所有算子都处理完同一批数据?直接按照最后一个sink算子任务输出成功的数据作为偏移量不可以吗?
5.保存检查点的时候,新来的数据到底该怎么办?
6.那么你既然要缓存不处理,不还是粗暴的方案?不让新数据处理执行,有什么意义吗?
7.那如果任务某个算子任务是并发情况呢?你怎么传递这个特殊标记?
8.既然选择所有任务都处理完一个相同数据,如果有其中一个任务没保存下状态,其他任务都保存了怎么办?
(3)检查点算法:
2)分布式快照算法
(4)使用:
⭐️(5)保存点
自我介绍一下,小编13年上海交大毕业,曾经在小公司待过,也去过华为、oppo等大厂,18年进入阿里一直到现在。
深知大多数大数据工程师,想要提升技能,往往是自己摸索成长或者是报班学习,但对于培训机构动则几千的学费,着实压力不小。自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!
因此收集整理了一份《2024年大数据全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友。
既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上大数据开发知识点,真正体系化!
由于文件比较大,这里只是将部分目录大纲截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且后续会持续更新
如果你觉得这些内容对你有帮助,可以添加vx:vip204888 (备注大数据获取)
stream = env
自我介绍一下,小编13年上海交大毕业,曾经在小公司待过,也去过华为、oppo等大厂,18年进入阿里一直到现在。
深知大多数大数据工程师,想要提升技能,往往是自己摸索成长或者是报班学习,但对于培训机构动则几千的学费,着实压力不小。自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!
因此收集整理了一份《2024年大数据全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友。
[外链图片转存中…(img-bkgkizwo-1712936660165)]
[外链图片转存中…(img-b6bejswu-1712936660165)]
[外链图片转存中…(img-wrffjipm-1712936660165)]
[外链图片转存中…(img-obsabyrw-1712936660166)]
[外链图片转存中…(img-u0swljhz-1712936660166)]
既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上大数据开发知识点,真正体系化!
由于文件比较大,这里只是将部分目录大纲截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且后续会持续更新
如果你觉得这些内容对你有帮助,可以添加vx:vip204888 (备注大数据获取)
[外链图片转存中…(img-fyf4ruyt-1712936660166)]
发表评论