当前位置: 代码网 > 科技>人工智能>数据分析 > Flink系列-5、Flink DataSet API介绍

Flink系列-5、Flink DataSet API介绍

2024年08月06日 数据分析 我要评论
Flink DataSet API介绍

版权声明:本文为博主原创文章,遵循 cc 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。

官方网址https://flink.apache.org/

学习资料https://flink-learning.org.cn/
在这里插入图片描述

flink dataset api编程模型

在这里插入图片描述

flink dataset api编程基本步骤

  • 获取执行环境(executionenvironment)
  • 加载/创建初始数据集
  • 对数据集进行各种转换操作(生成新的dataset)
  • 指定将计算的结果输出
  • 提交任务(可选)

输入数据集data source

data sources 是什么呢?就字面意思其实就可以知道 数据来源
flink 做为一款流式计算框架,它可用来做批处理,也可以用来做流处理,这个 data sources 就是数据的来源地。
flink在批处理中常见的source主要有两大类。

  • 基于本地集合的source(collection-based-source)
  • 基于文件的source(file-based-source)

基于本地集合的source(collection-based-source)

在flink最常见的创建dataset方式有三种:

  • 使用env.fromelements(),这种方式也支持tuple,自定义对象等复合形式。
    注意:类型要一致,不一致可以用object接收,但是使用会报错,比如:env.fromelements(“haha”, 1);
    在这里插入图片描述
    源码注释中有写:
    在这里插入图片描述
  • 使用env.fromcollection(),这种方式支持多种collection的具体类型,如list,set,queue
  • 使用env.generatesequence()方法创建基于sequence的dataset

代码

package batch.source;

import org.apache.flink.api.java.executionenvironment;
import org.apache.flink.api.java.operators.datasource;

import java.util.*;
import java.util.concurrent.arrayblockingqueue;

/**
 * @author lwh
 * @date 2023/4/11
 * @description
 **/
public class testsource {
    public static void main(string[] args) throws exception {
        executionenvironment env = executionenvironment.getexecutionenvironment();

        // 1. 从元素中获取
        datasource<string> source1 = env.fromelements("haha", "heihei");
        source1.print();

        // 2. 从元素中获取,但是元素类型不同
//        datasource<object> source2 = env.fromelements("haha", 1);
//        source2.print();

        // 3. 从list中获取
        list<string> list = new arraylist<>();
        list.add("haha");
        list.add("heihei");
        datasource<string> listsource = env.fromcollection(list);
        listsource.print();

        // 4. 从set中获取
        set<string> set = new hashset<>();
        set.add("lalala");
        set.add("guaguagua");
        datasource<string> setsource = env.fromcollection(set);
        setsource.print();

        // 5. 从队列中获取
        queue<string> queue = new arrayblockingqueue<>(10);
        queue.add("spark");
        queue.add("hadoop");
        datasource<string> queuesource = env.fromcollection(queue);
        queuesource.print();

        // 6. 生成序列
        datasource<long> seqsource = env.generatesequence(1, 10);
        seqsource.print();

    }
}

在代码中指定并行度

指定全局并行度
env.setparallelism(12);
获得全局并行度
env.getparallelism();

指定算子设置并行度

在这里插入图片描述

获得指定算子的并行度
eventsource.getparallelism();

基于文件的source(file-based-source)

flink支持直接从外部文件存储系统中读取文件的方式来创建source数据源,flink支持的方式有以下几种:

  • 读取本地文件数据
  • 读取hdfs文件数据
  • 读取csv文件数据
  • 读取压缩文件
  • 遍历目录

下面分别演示每个数据源的加载方式。

读取本地文件

flink的批处理可以直接通过readtextfile()方法读取文件来创建数据源,方法如下:

package batch.source;

import org.apache.flink.api.java.executionenvironment;
import org.apache.flink.api.java.operators.datasource;

/**
 * @author lwh
 * @date 2023/4/11
 * @description
 **/
public class batchfromfiledemo {
    public static void main(string[] args) throws exception {
        //1:初始化运行环境
        executionenvironment env = executionenvironment.getexecutionenvironment();

        //2:读取本地文件数据源
        datasource<string> locallines = env.readtextfile("./data/input/wordcount.txt");

        //3:打印数据
        locallines.print();

    }
}

读取hdfs数据

package batch.source;

import org.apache.flink.api.java.executionenvironment;
import org.apache.flink.api.java.operators.datasource;

/**
 * @author lwh
 * @date 2023/4/11
 * @description
 **/
public class hdfsfilesource {
    public static void main(string[] args) throws exception {
        executionenvironment env = executionenvironment.getexecutionenvironment();

        // read hdfs file
        datasource<string> hdfssource = env.readtextfile("hdfs://node1:8020/input/license.txt");
        hdfssource.print();

    }
}

读取csv数据

package entity;

/**
 * @author lwh
 * @date 2023/4/11
 * @description
 **/
public class student {
    private integer id;

    private string name;

    private integer sort;

    private double score;

    public integer getid() {
        return id;
    }

    public void setid(integer id) {
        this.id = id;
    }

    public string getname() {
        return name;
    }

    public void setname(string name) {
        this.name = name;
    }

    public integer getsort() {
        return sort;
    }

    public void setsort(integer sort) {
        this.sort = sort;
    }

    public double getscore() {
        return score;
    }

    public void setscore(double score) {
        this.score = score;
    }

    @override
    public string tostring() {
        return "student{" +
                "id=" + id +
                ", name='" + name + '\'' +
                ", sort=" + sort +
                ", score=" + score +
                '}';
    }
}

package batch.source;

import entity.student;
import org.apache.flink.api.java.executionenvironment;
import org.apache.flink.api.java.io.csvreader;
import org.apache.flink.api.java.operators.datasource;

/**
 * @author lwh
 * @date 2023/4/11
 * @description
 **/
public class csvfilesource {
    public static void main(string[] args) throws exception {
        executionenvironment env = executionenvironment.getexecutionenvironment();

        /**
         * 参数说明:
         * fielddelimiter设置分隔符,默认的是","
         * ignorefirstline忽略第一行
         * includefields是设置选取哪几列,我这里是第四列不选取的。
         * pojotype和后面字段名,就是对应列。字段名是不需要对应原始文件header字段名,
         * 但必须与pojo例如userinfo.class里的字段名一一对应
         */

        csvreader csvreader = env.readcsvfile("data/input/score.csv");
        datasource<student> csvsource = csvreader.fielddelimiter(",")
                .ignorefirstline()
                .includefields(true, true, true, true)
                .pojotype(student.class, "id", "name", "sort", "score");
        csvsource.print();

    }
}

在这里插入图片描述

基于文件的source(遍历目录)

flink支持对一个文件目录内的所有文件,包括所有子目录中的所有文件的遍历访问方式。
对于从文件中读取数据,当读取的多个文件夹的时候,嵌套的文件默认是不会被读取的,只会读取第一个文件,其他的都会被忽略。所以我们需要使用recursive.file.enumeration进行递归读取

package batch.source;

import org.apache.flink.api.java.executionenvironment;
import org.apache.flink.api.java.operators.datasource;
import org.apache.flink.configuration.configuration;

/**
 * @author lwh
 * @date 2023/4/11
 * @description 递归读取目录内的文件
 **/
public class recursivefilereadsource {
    public static void main(string[] args) throws exception {
        executionenvironment env = executionenvironment.getexecutionenvironment();

        // 生成配置参数
        configuration param = new configuration();
        param.setboolean("recursive.file.enumeration", true);

        // 读取目录配合递归
        datasource<string> source = env.readtextfile("./data").withparameters(param);

        source.print();

    }
}

在这里插入图片描述

读取压缩文件

对于以下压缩类型,不需要指定任何额外的inputformat方法,flink可以自动识别并且解压。但是,压缩文件可能不会并行读取,可能是顺序读取的,这样可能会影响作业的可伸缩性。

压缩格式扩展名
deflatedeflate
gzip.gz,.gzip
bzip2.bz2
xz.xz
package batch.source;

import org.apache.flink.api.java.executionenvironment;
import org.apache.flink.api.java.operators.datasource;

/**
 * @author lwh
 * @date 2023/4/11
 * @description 读取压缩文件:gz,xz
 **/
public class compressfilesource {
    public static void main(string[] args) throws exception {
        executionenvironment env = executionenvironment.getexecutionenvironment();

        // 读取压缩文件
        datasource<string> gzsource = env.readtextfile("data/input/test.gz");
        gzsource.print();

    }
}

注意:读取压缩文件,不能并行处理,因此加载解压的时间会稍微有点长。

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com