当前位置: 代码网 > it编程>数据库>Redis > Flink借助Kafka实现端到端精准一次_flink 如何保障数据一致性

Flink借助Kafka实现端到端精准一次_flink 如何保障数据一致性

2024年08月01日 Redis 我要评论
那么在检查点保存的时候,系统故障,保存检查点失败,那么恢复检查点的时候,恢复的是上次成功保存检查点的数据偏移量,如果不对新数据进行保护,可能就会把之前有可能。
  • 两阶段提交2pc(还行)
⭐️(3)借助kafka实现端到端一致

1)各个端如何进行修改:

2)示例代码:

pom文件中引入依赖:

<properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <flink.version>1.13.0</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <slf4j.version>1.7.30</slf4j.version>

    </properties>
    <dependencies>
        <dependency>
            <groupid>org.apache.flink</groupid>
            <artifactid>flink-connector-kafka_2.12</artifactid>
            <version>${flink.version}</version>
        </dependency>
        <!--1.12.0需要在服务器上引入如下两个jar包-->
        <dependency>
            <groupid>org.apache.flink</groupid>
            <artifactid>flink-connector-base</artifactid>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupid>org.apache.kafka</groupid>
            <artifactid>kafka-clients</artifactid>
            <version>2.4.1</version>
        </dependency>
        <!--flink通用连接kafka 可以在写代码时不用指定版本 2.12代表scala版本-->
        <dependency>
            <groupid>org.apache.flink</groupid>
            <artifactid>flink-connector-kafka_2.12</artifactid>
            <version>${flink.version}</version>
        </dependency>

        <!-- 引入flink相关依赖-->
        <dependency>
            <groupid>org.apache.flink</groupid>
            <artifactid>flink-java</artifactid>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupid>org.apache.flink</groupid>
            <artifactid>flink-streaming-java_${scala.binary.version}</artifactid>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupid>org.apache.flink</groupid>
            <artifactid>flink-clients_${scala.binary.version}</artifactid>
            <version>${flink.version}</version>
        </dependency>
        <!-- 引入日志管理相关依赖-->
        <dependency>
            <groupid>org.slf4j</groupid>
            <artifactid>slf4j-api</artifactid>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupid>org.slf4j</groupid>
            <artifactid>slf4j-log4j12</artifactid>
            <version>${slf4j.version}</version>
        </dependency>


    </dependencies>

  • flink内部

    • 调用setcheckpointinterval设定多久一次存储检查点
    • 调用setcheckpointingmode设置语义是精准一次
//1.flink内部保证精准一次:设置检查点
checkpointconfig checkpointconfig = env.getcheckpointconfig();
//10s内保存一次检查点
checkpointconfig.setcheckpointinterval(10000);
//保存检查点的超时时间为5秒钟
checkpointconfig.setcheckpointtimeout(5000);
//新一轮检查点开始前最少等待上一轮保存15秒才开始
checkpointconfig.setminpausebetweencheckpoints(15000);
//设置作业失败后删除检查点(默认)
checkpointconfig.enableexternalizedcheckpoints(checkpointconfig.externalizedcheckpointcleanup.delete_on_cancellation);
//设置检查点模式为精准一次(默认)
checkpointconfig.setcheckpointingmode(checkpointingmode.exactly_once);

  • 输入端

    • flinkkafkaconsumer必须调用setcommitoffsetsoncheckpoints方法

    执行checkpoint的时候提交offset到checkpoint,当重启flink时,flink作业会告诉kafka我要从哪个offset开始消费,这样我们的数据也就恢复

properties props_source = new properties();
props_source.setproperty("bootstrap.servers", "node1:9092");
props_source.setproperty("group.id", "flink");
props_source.setproperty("auto.offset.reset", "latest");
//会开启一个后台线程每隔5s检测一下kafka的分区情况
props_source.setproperty("flink.partition-discovery.interval-millis", "5000");
flinkkafkaconsumer<string> kafkasource = new flinkkafkaconsumer<>("flink\_kafka", new simplestringschema(), props_source);
kafkasource.setstartfromlatest();
//2.输入端保证:执行checkpoint的时候提交offset到checkpoint(flink用)
kafkasource.setcommitoffsetsoncheckpoints(true);
datastreamsource<string> kafkads = env.addsource(kafkasource);

  • 输出端

    • 设置flink等待事务的超时时间
    • 设置语义为精准一次
       properties props_sink = new properties();
        props_sink.setproperty("bootstrap.servers", "node1:9092");
        //3.1 设置事务超时时间,也可在kafka配置中设置
        props_sink.setproperty("transaction.timeout.ms", 1000 \* 5 + "");
        flinkkafkaproducer<string> kafkasink = new flinkkafkaproducer<>(
                "flink\_kafka2",
                new keyedserializationschemawrapper<string>(new simplestringschema()),
                props_sink,
                //3.2 设置输出的的语义为精准一次
                flinkkafkaproducer.semantic.exactly_once
        );
        result.addsink(kafkasink);

  • 完整代码
package com.ming.test1;
import org.apache.commons.lang3.systemutils;
import org.apache.flink.api.common.functions.flatmapfunction;
import org.apache.flink.api.common.functions.richmapfunction;
import org.apache.flink.api.common.serialization.simplestringschema;
import org.apache.flink.api.java.tuple.tuple;
import org.apache.flink.api.java.tuple.tuple2;
import org.apache.flink.runtime.state.filesystem.fsstatebackend;
import org.apache.flink.streaming.api.checkpointingmode;
import org.apache.flink.streaming.api.datastream.datastreamsource;
import org.apache.flink.streaming.api.datastream.keyedstream;
import org.apache.flink.streaming.api.datastream.singleoutputstreamoperator;
import org.apache.flink.streaming.api.environment.checkpointconfig;
import org.apache.flink.streaming.api.environment.streamexecutionenvironment;
import org.apache.flink.streaming.connectors.kafka.flinkkafkaconsumer;
import org.apache.flink.streaming.connectors.kafka.flinkkafkaproducer;
import org.apache.flink.streaming.connectors.kafka.internals.keyedserializationschemawrapper;
import org.apache.flink.util.collector;

import java.util.properties;
import java.util.random;

public class test {
    public static void main(string[] args) throws exception {
        streamexecutionenvironment env = streamexecutionenvironment.getexecutionenvironment();

        //1.flink内部保证精准一次:设置检查点
        checkpointconfig checkpointconfig = env.getcheckpointconfig();
        //10s内保存一次检查点
        checkpointconfig.setcheckpointinterval(10000);
        //保存检查点的超时时间为5秒钟
        checkpointconfig.setcheckpointtimeout(5000);
        //新一轮检查点开始前最少等待上一轮保存15秒才开始
        checkpointconfig.setminpausebetweencheckpoints(15000);
        //设置作业失败后删除检查点(默认)
        checkpointconfig.enableexternalizedcheckpoints(checkpointconfig.externalizedcheckpointcleanup.delete_on_cancellation);
        //设置检查点模式为精准一次(默认)
        checkpointconfig.setcheckpointingmode(checkpointingmode.exactly_once);

            env.setstatebackend(new fsstatebackend("file:///users/xingxuanming/downloads/flink-checkpoint/checkpoint"));

        properties props_source = new properties();
        props_source.setproperty("bootstrap.servers", "node1:9092");
        props_source.setproperty("group.id", "flink");
        props_source.setproperty("auto.offset.reset", "latest");
        //会开启一个后台线程每隔5s检测一下kafka的分区情况
        props_source.setproperty("flink.partition-discovery.interval-millis", "5000");
        flinkkafkaconsumer<string> kafkasource = new flinkkafkaconsumer<>("flink\_kafka", new simplestringschema(), props_source);
        kafkasource.setstartfromlatest();
        //2.输入端保证:执行checkpoint的时候提交offset到checkpoint(flink用)
        kafkasource.setcommitoffsetsoncheckpoints(true);
        datastreamsource<string> kafkads = env.addsource(kafkasource);

        singleoutputstreamoperator<tuple2<string, integer>> wordandoneds = kafkads.flatmap(new flatmapfunction<string, tuple2<string, integer>>() {
            @override
            public void flatmap(string value, collector<tuple2<string, integer>> out) throws exception {
                //value就是每一行
                string[] words = value.split(" ");
                for (string word : words) {
                    random random = new random();
                    int i = random.nextint(5);
                    if (i > 3) {
                        system.out.println("出bug了...");
                        throw new runtimeexception("出bug了...");
                    }
                    out.collect(tuple2.of(word, 1));
                }
            }
        });
        keyedstream<tuple2<string, integer>, tuple> groupedds = wordandoneds.keyby(0);
        singleoutputstreamoperator<tuple2<string, integer>> aggresult = groupedds.sum(1);
        singleoutputstreamoperator<string> result = (singleoutputstreamoperator<string>) aggresult.map(new richmapfunction<tuple2<string, integer>, string>() {
            @override
            public string map(tuple2<string, integer> value) throws exception {
                return value.f0 + ":::" + value.f1;
            }
        });

        //3.输出端保证
        properties props_sink = new properties();
        props_sink.setproperty("bootstrap.servers", "node1:9092");
        //3.1 设置事务超时时间,也可在kafka配置中设置
        props_sink.setproperty("transaction.timeout.ms", 1000 \* 5 + "");
        flinkkafkaproducer<string> kafkasink = new flinkkafkaproducer<>(
                "flink\_kafka2",
                new keyedserializationschemawrapper<string>(new simplestringschema()),
                props_sink,
                //3.2 设置输出的的语义为精准一次
                flinkkafkaproducer.semantic.exactly_once
        );
        result.addsink(kafkasink);

        env.execute();
       }


进行测试:

先确认下,这里不用关闭应用程序,再重启应用程序来模拟故障,而是程序里模拟故障,用随机数直接throw exeception

进入到kafka的bin目录下

+ 启动zookeeper

 ./zookeeper-server-start.sh …/config/zookeeper.properties &
+ 启动kafka

 ./kafka-server-start.sh …/config/server.properties &
+ 创建主题

 kafka-topics.sh --bootstrap-server localhost:2181 --create --replication-factor 2 --partitions 3 --topic flink\_kafka2
+ 打开控制台生产者

 kafka-console-producer.sh --broker-list localhost:9092 --topic flink\_kafka
+ 打开控制台消费者

 kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic flink\_kafka2来看最后的效果:

此代码主要功能是对输入的字符串做分区和求和操作,例如,传入hello,world,hello三条数据,按照字符串进行分区,分为hello,world两个分区,求和是按照分区内所有汇总的字段进行求和,最后结果是(hello,2) (world,1)

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-tjbpojhl-1681811696794)(image-20230414195452057.png)] 我们假设,来了三条数据,hello,world,hello,处理了两条数据,此时算子状态为(hello,1) (world,1) 第三条数据来的时候“恰好”故障,检查点保存的偏移量是第一条数据hello,算子里保存的是(hello,1) 那么继续从第二条数据world开始消费,第二条数据处理成功,此时算子里保存的(hello,1) (world,1),第三条数据处理成功,最终效果为(hello,2) (world,1)
3)最后补充下完整的流程:

二、前置知识

⭐️1.检查点(快照)

(1)概念:
(2)拓展问题:

2.如何保存以及恢复的?

举个例子,看看完整的流程图:

3.为什么检查点保存时机选择所有算子任务处理完同一批数据

首先需要思考的一个问题:在算子a保存检查点的过程中,算子b正在处理新的数据,出现宕机,那我恢复的时候虽然明确知道哪条数据没有处理完,但是不确定执行到哪一个算子了,所以只能从头source放入数据,然后到算子a执行,但是算子a可能算出来这条数据的中间状态进行保存了, 相当于算子a要对这条数据处理两次。

举例:比如数据a,要进行处理1(求和),处理2(输出到存储文件)阶段,发生故障的时候在处理2阶段,处理1阶段已经保存了处理1阶段的值,难道恢复的时候,处理1阶段还要对数据的a再操作一次吗?

那我不选择处理完同一批数据,有没有什么其他解决方案?有,比较粗暴的方案。

4.那怎么确认所有算子都处理完同一批数据?直接按照最后一个sink算子任务输出成功的数据作为偏移量不可以吗?

5.保存检查点的时候,新来的数据到底该怎么办?

6.那么你既然要缓存不处理,不还是粗暴的方案?不让新数据处理执行,有什么意义吗?

7.那如果任务某个算子任务是并发情况呢?你怎么传递这个特殊标记?

8.既然选择所有任务都处理完一个相同数据,如果有其中一个任务没保存下状态,其他任务都保存了怎么办?

(3)检查点算法:

2)分布式快照算法

(4)使用:
⭐️(5)保存点

自我介绍一下,小编13年上海交大毕业,曾经在小公司待过,也去过华为、oppo等大厂,18年进入阿里一直到现在。

深知大多数大数据工程师,想要提升技能,往往是自己摸索成长或者是报班学习,但对于培训机构动则几千的学费,着实压力不小。自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!

因此收集整理了一份《2024年大数据全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友。
img
img
img
img
img

既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上大数据开发知识点,真正体系化!

由于文件比较大,这里只是将部分目录大纲截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且后续会持续更新

如果你觉得这些内容对你有帮助,可以添加vx:vip204888 (备注大数据获取)
img

stream = env

自我介绍一下,小编13年上海交大毕业,曾经在小公司待过,也去过华为、oppo等大厂,18年进入阿里一直到现在。

深知大多数大数据工程师,想要提升技能,往往是自己摸索成长或者是报班学习,但对于培训机构动则几千的学费,着实压力不小。自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!

因此收集整理了一份《2024年大数据全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友。
[外链图片转存中…(img-bkgkizwo-1712936660165)]
[外链图片转存中…(img-b6bejswu-1712936660165)]
[外链图片转存中…(img-wrffjipm-1712936660165)]
[外链图片转存中…(img-obsabyrw-1712936660166)]
[外链图片转存中…(img-u0swljhz-1712936660166)]

既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上大数据开发知识点,真正体系化!

由于文件比较大,这里只是将部分目录大纲截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且后续会持续更新

如果你觉得这些内容对你有帮助,可以添加vx:vip204888 (备注大数据获取)
[外链图片转存中…(img-fyf4ruyt-1712936660166)]

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com