当前位置: 代码网 > it编程>编程语言>Java > 项目实战--Spring Boot 3整合Flink实现大数据文件处理

项目实战--Spring Boot 3整合Flink实现大数据文件处理

2024年08月01日 Java 我要评论
性能优化策略利用Spring Boot 3.+和Flink构建一个高效的大数据文件处理应用
一、应用背景

公司大数据项目中,需要构建和开发高效、可靠的数据处理子系统,实现大数据文件处理、整库迁移、延迟与乱序处理、数据清洗与过滤、实时数据聚合、增量同步(cdc)、状态管理与恢复、反压问题处理、数据分库分表、跨数据源一致性以及实时异常检测与告警等功能,确保数据的准确性、一致性和实时性。采用spring boot 3.+和flink平台上进行数据治理的方案。

二、方案优势

由于是大数据项目,因此在处理大规模数据集时,文件处理能力直接影响到数据驱动决策的效果,高效的大数据文件处理既要能保证数据的时效性和准确性,也要能提升整体系统的性能和可靠性。
spring boot 3.+和flink结合使用,在处理大数据文件时有不少独特的优势。
首先,这两者能够相互补充,带来高效和便捷的文件处理能力的原因在于:

1)统一的开发体验:
spring boot 3.+和flink结合使用,可以在同一项目中综合应用两者的优势。spring boot可以负责微服务的治理、api的管理和调度,而flink则专注于大数据的实时处理和分析。两者的结合能够提供一致的开发体验和简化的集成方式。

(2)动态扩展和高可用性:
微服务架构下,spring boot提供的良好扩展性和flink的高可用性,使得系统可以在需求增长时动态扩展,确保系统稳定运行。flink的容错机制配合spring boot的服务治理能力,可以有效提高系统的可靠性。

(3)灵活的数据传输和处理:
通过spring boot的rest api和消息队列,可以轻松地将数据传输到flink进行处理,flink处理完毕后还可以将结果返回到spring boot处理的后续业务逻辑中。这种灵活的处理方式使得整个数据处理流程更为高效且可控。
三、实现步骤

1.首先配置spring boot 3.x和flink的开发环境。在pom.xml中添加必要的依赖:

<dependencies>
    <!-- spring boot 依赖 -->
    <dependency>
        <groupid>org.springframework.boot</groupid>
        <artifactid>spring-boot-starter</artifactid>
    </dependency>
    <dependency>
        <groupid>org.springframework.boot</groupid>
        <artifactid>spring-boot-starter-web</artifactid>
    </dependency>
    
    <!-- apache flink 依赖 -->
    <dependency>
        <groupid>org.apache.flink</groupid>
        <artifactid>flink-java</artifactid>
        <version>1.14.0</version>
    </dependency>
    <dependency>
        <groupid>org.apache.flink</groupid>
        <artifactid>flink-streaming-java_2.11</artifactid>
        <version>1.14.0</version>
    </dependency>

    <!-- 其他必要依赖 -->
    <dependency>
        <groupid>org.apache.flink</groupid>
        <artifactid>flink-connector-filesystem_2.11</artifactid>
        <version>1.14.0</version>
    </dependency>
</dependencies>

2.数据的读取、处理和写入流程
2.1 数据读取
数据源选择:(项目中使用的是hdfs,故后续文档展示从hdfs中并行读取数据)

(1)本地文件系统:适用于中小规模数据处理,开发和调试方便。
(2)分布式文件系统(hdfs):适用于大规模数据处理,具备高扩展性和容错能力。
(3)云存储(s3):适用于云环境下的数据处理,支持弹性存储和高可用性。

为提高读取性能,采用多线程并行读取和数据分片等策略。

import org.apache.flink.api.common.functions.flatmapfunction;
import org.apache.flink.api.java.tuple.tuple2;
import org.apache.flink.util.collector;
import org.apache.flink.streaming.api.environment.streamexecutionenvironment;

public class hdfsdatareader {
    public static void main(string[] args) throws exception {
        streamexecutionenvironment env = streamexecutionenvironment.getexecutionenvironment();
        // 从 hdfs 中读取数据,并通过并行流的方式对数据进行处理和统计。
        datastream<string> text = env.readtextfile("hdfs://localhost:9000/resources/datafile");
        datastream<tuple2<string, integer>> wordcounts = text
            .flatmap(new flatmapfunction<string, tuple2<string, integer>>() {
                @override
                public void flatmap(string value, collector<tuple2<string, integer>> out) {
                    for (string word : value.split("\\s")) {
                        out.collect(new tuple2<>(word, 1));
                    }
                }
            })
            .keyby(0)
            .sum(1);

        wordcounts.writeastext("hdfs:///path/to/output/file", filesystem.writemode.overwrite);
        env.execute("hdfs data reader");
    }
}

2.2 数据处理
数据清洗和预处理是大数据处理中重要的一环,包括步骤:

数据去重:移除重复的数据,确保数据唯一性。
数据过滤:排除不符合业务规则的无效数据。
数据转换:将数据格式转换为统一的规范格式,便于后续处理。

进行简单的数据清洗操作:

datastream<string> cleaneddata = inputstream
    .filter(new filterfunction<string>() {
        @override
        public boolean filter(string value) {
            // 过滤空行和不符合格式的数据
            return value != null && !value.trim().isempty() && value.matches("regex");
        }
    })
    .map(new mapfunction<string, string>() {
        @override
        public string map(string value) {
            // 数据格式转换
            return transformdata(value);
        }
    });

在数据清洗之后,需要对数据进行各种聚合和分析操作,如统计分析、分类聚类等。这是大数据处理的核心部分,flink 提供丰富的内置函数和算子来帮助实现这些功能。

对数据进行简单的聚合统计:

datastream<tuple2<string, integer>> aggregateddata = cleaneddata
    .flatmap(new flatmapfunction<string, tuple2<string, integer>>() {
        @override
        public void flatmap(string value, collector<tuple2<string, integer>> out) {
            for (string word : value.split("\\s+")) {
                out.collect(new tuple2<>(word, 1));
            }
        }
    })
    .keyby(0)
    .sum(1);

2.3 数据写入
处理后的数据需要高效地写入目标存储系统,常见的数据存储包括文件系统、数据库和消息队列等。选择合适的存储系统不仅有助于提升整体性能,同时也有助于数据的持久化和后续分析。

文件系统:适用于批处理结果的落地存储。
数据库:适用于结构化数据的存储和查询。
消息队列:适用于实时流处理结果的传输和消费。

为提高写入性能,可以采取分区写入、批量写入和压缩等策略。
使用分区写入和压缩技术将处理后的数据写入文件系统:

outputstream
    .map(new mapfunction<tuple2<string, integer>, string>() {
        @override
        public string map(tuple2<string, integer> value) {
            // 数据转换为字符串格式
            return value.f0 + "," + value.f1;
        }
    })
    .writeastext("file:output/tag/datafile", filesystem.writemode.overwrite)
    .setparallelism(4) // 设置并行度
    .setwritemodewriteparallelism(filesystem.writemode.no_overwrite); // 设置写入模式和压缩

3.性能优化
3.1 并行度设置
flink 支持高度并行的数据处理,通过设置并行度可以提高整体处理性能。
设置flink的全局并行度和算子级并行度:

env.setparallelism(8); // 设置全局并行度

datastream<tuple2<string, integer>> result = inputstream
    .flatmap(new tokenizer())
    .keyby(0)
    .sum(1)
    .setparallelism(4); // 设置算子级并行度

3.2 资源管理
合理管理计算资源,避免资源争用,可以显著提高数据处理性能。在实际开发中,可以通过配置flink的taskmanager资源配额(如内存、cpu)来优化资源使用:

# flink 配置文件 (flink-conf.yaml)
taskmanager.memory.process.size: 2048m
taskmanager.memory.framework.heap.size: 512m
taskmanager.numberoftaskslots: 4

3.3 数据切分和批处理
对于大文件处理,可以采用数据切分技术,将大文件拆分为多个小文件进行并行处理,避免单个文件过大导致的处理瓶颈。同时,使用批处理可以减少网络和i/o操作,提高整体效率。

datastream<string> partitionedstream = inputstream
    .rebalance() // 重新分区
    .mappartition(new mappartitionfunction<string, string>() {
        @override
        public void mappartition(iterable<string> values, collector<string> out) {
            for (string value : values) {
                out.collect(value);
            }
        }
    })
    .setparallelism(env.getparallelism());

3.4 使用缓存和压缩

对于高频访问的数据,可将中间结果缓存到内存中,以减少重复计算和i/o操作。此外,在写入前对数据进行压缩(如 gzip)可以减少存储空间和网络传输时间。

四、完整示例

通过一个完整的示例来实现spring boot 3.+和flink大数据文件的读取和写入。涵盖上述从数据源读取文件、数据处理、数据写入到目标文件的过程。

首先,通过spring initializer创建一个新的spring boot项目(spring boot 3需要jdk17+),添加以下依赖:

<dependencies>
    <!-- spring boot 依赖 -->
    <dependency>
        <groupid>org.springframework.boot</groupid>
        <artifactid>spring-boot-starter</artifactid>
    </dependency>
    
    <dependency>
        <groupid>org.springframework.boot</groupid>
        <artifactid>spring-boot-starter-web</artifactid>
    </dependency>
    
    <!-- apache flink 依赖 -->
    <dependency>
        <groupid>org.apache.flink</groupid>
        <artifactid>flink-java</artifactid>
        <version>1.14.0</version>
    </dependency>
    <dependency>
        <groupid>org.apache.flink</groupid>
        <artifactid>flink-streaming-java_2.11</artifactid>
        <version>1.14.0</version>
    </dependency>

    <!-- 其他必要依赖 -->
    <dependency>
        <groupid>org.apache.flink</groupid>
        <artifactid>flink-connector-filesystem_2.11</artifactid>
        <version>1.14.0</version>
    </dependency>
</dependencies>

定义一个配置类来管理文件路径和其他配置项:

import org.springframework.context.annotation.configuration;

@configuration
public class fileprocessingconfig {
    // 输入文件路径
    public static final string input_file_path = "fhdfs://localhost:9000/resources/datafile";

    // 输出文件路径
    public static final string output_file_path = "file:output/tag/datafile";
}

在业务逻辑层定义文件处理操作:

import org.apache.flink.api.common.functions.mapfunction;
import org.apache.flink.streaming.api.datastream.datastream;
import org.apache.flink.streaming.api.environment.streamexecutionenvironment;
import org.apache.flink.core.fs.filesystem;
import org.springframework.stereotype.service;

@service
public class fileprocessingservice {

    public void processfiles() throws exception {
        // 创建flink执行环境
        streamexecutionenvironment env = streamexecutionenvironment.getexecutionenvironment();

        // 配置数据源,读取文件
        datastream<string> inputstream = env.readtextfile(fileprocessingconfig.input_file_path);

        // 数据处理逻辑,将数据转换为大写
        datastream<string> processedstream = inputstream.map(new mapfunction<string, string>() {
            @override
            public string map(string value) {
                return value.touppercase();
            }
        });

        // 将处理后的数据写入文件
        processedstream.writeastext(fileprocessingconfig.output_file_path, filesystem.writemode.overwrite);

        // 启动flink任务
        env.execute("file processing job");
    }
}

在主应用程序类中启用spring调度任务:

import org.springframework.boot.springapplication;
import org.springframework.boot.autoconfigure.springbootapplication;
import org.springframework.scheduling.annotation.enablescheduling;
import org.springframework.scheduling.annotation.scheduled;
import org.springframework.beans.factory.annotation.autowired;

@enablescheduling
@springbootapplication
public class fileprocessingapplication {

    @autowired
    private fileprocessingservice fileprocessingservice;

    public static void main(string[] args) {
        springapplication.run(fileprocessingapplication.class, args);
    }

    // 定时任务,每分钟执行一次
    @scheduled(fixedrate = 60000)
    public void schedulefileprocessingtask() {
        try {
            fileprocessingservice.processfiles();
        } catch (exception e) {
            e.printstacktrace();
        }
    }
}

优化数据处理部分,加入更多处理步骤,包括数据校验和过滤来确保数据的质量和准确性。

import org.apache.flink.api.common.functions.filterfunction;
import org.apache.flink.api.common.functions.flatmapfunction;
import org.apache.flink.api.common.functions.mapfunction;
import org.apache.flink.api.java.tuple.tuple2;
import org.apache.flink.util.collector;

public class enhancedfileprocessingservice {

    public void processfiles() throws exception {
        streamexecutionenvironment env = streamexecutionenvironment.getexecutionenvironment();

        datastream<string> inputstream = env.readtextfile(fileprocessingconfig.input_file_path);

        // 数据预处理:数据校验和过滤
        datastream<string> filteredstream = inputstream.filter(new filterfunction<string>() {
            @override
            public boolean filter(string value) {
                // 过滤长度小于5的字符串
                return value != null && value.trim().length() > 5;
            }
        });

        // 数据转换:将每行数据拆分为单词
        datastream<tuple2<string, integer>> wordstream = filteredstream.flatmap(new flatmapfunction<string, tuple2<string, integer>>() {
            @override
            public void flatmap(string value, collector<tuple2<string, integer>> out) {
                for (string word : value.split("\\w+")) {
                    out.collect(new tuple2<>(word, 1));
                }
            }
        });

        // 数据聚合:统计每个单词的出现次数
        datastream<tuple2<string, integer>> wordcounts = wordstream
                .keyby(value -> value.f0)
                .sum(1);

        // 将结果转换为字符串并写入输出文件
        datastream<string> resultstream = wordcounts.map(new mapfunction<tuple2<string, integer>, string>() {
            @override
            public string map(tuple2<string, integer> value) {
                return value.f0 + ": " + value.f1;
            }
        });

        resultstream.writeastext(fileprocessingconfig.output_file_path, filesystem.writemode.overwrite);

        env.execute("enhanced file processing job");
    }
}

增加以下步骤:

数据校验和过滤:过滤掉长度小于5的行,确保数据质量。
数据转换:将每行数据拆分为单词,并为每个单词附加计数1。
数据聚合:统计每个单词的出现次数。
结果写入:将统计结果写入输出文件。

对flink的资源配置进行优化,有效管理 taskmanager 的内存和并行度,以确保文件处理任务的高效执行:

# flink 配置文件 (flink-conf.yaml)
taskmanager.memory.process.size: 4096m
taskmanager.memory.framework.heap.size: 1024m
taskmanager.numberoftaskslots: 4
parallelism.default: 4
五、增量同步与变更数据捕获(cdc)

增量同步是将数据源中发生变化的数据增量同步到目标系统,而不是全量同步,从而提高数据处理效率。cdc技术用于捕获数据库中的数据变化,并将这些变化实时传输到目标系统。
应用场景:

数据仓库实时更新
微服务间数据同步
实时分析和监控
数据备份和恢复
5.1 实现步骤

配置 flink 的 cdc 连接器。这里以 mysql 为例,使用 flink-connector-mysql-cdc。

<dependency>
    <groupid>com.ververica</groupid>
    <artifactid>flink-connector-mysql-cdc</artifactid>
    <version>2.0.0</version>
</dependency>

创建 flink 应用程序,配置 cdc 连接器并实现增量数据处理。

import com.ververica.cdc.connectors.mysql.mysqlsource;
import com.ververica.cdc.connectors.mysql.table.startupoptions;
import org.apache.flink.api.common.eventtime.watermarkstrategy;
import org.apache.flink.streaming.api.environment.streamexecutionenvironment;
import org.apache.flink.streaming.api.functions.source.sourcefunction;
import org.apache.flink.streaming.api.datastream.datastream;

public class flinkcdcexample {

    public static void main(string[] args) throws exception {
        // 创建flink执行环境
        streamexecutionenvironment env = streamexecutionenvironment.getexecutionenvironment();

        // 配置mysql cdc source
        sourcefunction<string> sourcefunction = mysqlsource.<string>builder()
            .hostname("localhost")
            .port(3306)
            .databaselist("test_db") // 设置需要监控的数据库
            .tablelist("test_db.test_table") // 设置需要监控的表
            .username("root")
            .password("password")
            .deserializer(new jsondebeziumdeserializationschema()) // 设置反序列化器
            .startupoptions(startupoptions.initial())
            .build();

        // 创建数据流
        datastream<string> stream = env.addsource(sourcefunction)
            .assigntimestampsandwatermarks(watermarkstrategy.formonotonoustimestamps());

        // 处理数据流
        stream.print();

        // 启动flink作业
        env.execute("flink cdc example");
    }
}

application.properties 中配置数据库连接信息:

spring.datasource.url=jdbc:mysql://localhost:3306/test_db
spring.datasource.username=root
spring.datasource.password=password

spring boot 启动主类,启动 flink 作业:

import org.springframework.boot.springapplication;
import org.springframework.boot.autoconfigure.springbootapplication;
import org.springframework.context.applicationcontext;
import org.apache.flink.streaming.api.environment.streamexecutionenvironment;

@springbootapplication
public class springbootflinkapplication {

    public static void main(string[] args) {
        applicationcontext ctx = springapplication.run(springbootflinkapplication.class, args);
        streamexecutionenvironment env = streamexecutionenvironment.getexecutionenvironment();

        // 配置flink作业
        flinkcdcexample.setupflinkjob(env);

        try {
            env.execute("spring boot flink cdc example");
        } catch (exception e) {
            e.printstacktrace();
        }
    }
}

在 flinkcdcexample 类中配置增量数据处理逻辑:

import com.ververica.cdc.connectors.mysql.mysqlsource;
import com.ververica.cdc.connectors.mysql.table.startupoptions;
import org.apache.flink.api.common.eventtime.watermarkstrategy;
import org.apache.flink.streaming.api.environment.streamexecutionenvironment;
import org.apache.flink.streaming.api.functions.source.sourcefunction;
import org.apache.flink.streaming.api.datastream.datastream;

public class flinkcdcexample {

    public static void setupflinkjob(streamexecutionenvironment env) {
        sourcefunction<string> sourcefunction = mysqlsource.<string>builder()
            .hostname("localhost")
            .port(3306)
            .databaselist("test_db")
            .tablelist("test_db.test_table")
            .username("root")
            .password("password")
            .deserializer(new jsondebeziumdeserializationschema())
            .startupoptions(startupoptions.initial())
            .build();

        datastream<string> stream = env.addsource(sourcefunction)
            .assigntimestampsandwatermarks(watermarkstrategy.formonotonoustimestamps());

        stream.print();
    }
}
5.2 注意事项

数据一致性保障:

使用事务来确保数据一致性
使用幂等操作来处理重复数据
定期进行数据校验

性能与实时性优化:

调整flink的并行度
优化cdc连接器的配置,如批量读取大小和读取间隔
使用更高性能的序列化和反序列化器

以上流程可以提高增量同步和变更数据捕获(cdc)数据处理效率,也能保证数据的一致性和实时性。

好,ok,刹国!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com