当前位置: 代码网 > it编程>游戏开发>ar > 【Flink 核心篇】Flink 的八种分区策略(源码解读)

【Flink 核心篇】Flink 的八种分区策略(源码解读)

2024年08月06日 ar 我要评论
Flink 包含 8 种分区策略,这 8 种分区策略(分区器)分别如下面所示,本文将从源码的角度解读每个分区器的实现方式。

flink 包含 8 种分区策略,这 8 种分区策略(分区器)分别如下面所示,本文将从源码的角度解读每个分区器的实现方式。

在这里插入图片描述

  • globalpartitioner
  • shufflepartitioner
  • rebalancepartitioner
  • rescalepartitioner
  • broadcastpartitioner
  • forwardpartitioner
  • keygroupstreampartitioner
  • custompartitionerwrapper

1.继承关系图

1.1 接口:channelselector

public interface channelselector<t extends ioreadablewritable> {

    /**
     * 初始化channels数量,channel可以理解为下游operator的某个实例(并行算子的某个subtask).
     */
    void setup(int numberofchannels);

    /**
     *根据当前的record以及channel总数,
     *决定应将record发送到下游哪个channel。
     *不同的分区策略会实现不同的该方法。
     */
    int selectchannel(t record);

    /**
    *是否以广播的形式发送到下游所有的算子实例
     */
    boolean isbroadcast();
}

1.2 抽象类:streampartitioner

public abstract class streampartitioner<t> implements
        channelselector<serializationdelegate<streamrecord<t>>>, serializable {
    private static final long serialversionuid = 1l;

    protected int numberofchannels;

    @override
    public void setup(int numberofchannels) {
        this.numberofchannels = numberofchannels;
    }

    @override
    public boolean isbroadcast() {
        return false;
    }

    public abstract streampartitioner<t> copy();
}

1.3 继承关系图

在这里插入图片描述

2.分区策略

2.1 globalpartitioner

该分区器会将所有的数据都发送到下游的某个算子实例(subtask id = 0)。

在这里插入图片描述

/**
 * 发送所有的数据到下游算子的第一个task(id = 0)
 * @param <t>
 */
@internal
public class globalpartitioner<t> extends streampartitioner<t> {
    private static final long serialversionuid = 1l;

    @override
    public int selectchannel(serializationdelegate<streamrecord<t>> record) {
        //只返回0,即只发送给下游算子的第一个task
        return 0;
    }

    @override
    public streampartitioner<t> copy() {
        return this;
    }

    @override
    public string tostring() {
        return "global";
    }
}

2.2 shufflepartitioner

随机选择一个下游算子实例进行发送。

在这里插入图片描述

/**
 * 随机的选择一个channel进行发送
 * @param <t>
 */
@internal
public class shufflepartitioner<t> extends streampartitioner<t> {
    private static final long serialversionuid = 1l;

    private random random = new random();

    @override
    public int selectchannel(serializationdelegate<streamrecord<t>> record) {
        //产生[0,numberofchannels)伪随机数,随机发送到下游的某个task
        return random.nextint(numberofchannels);
    }

    @override
    public streampartitioner<t> copy() {
        return new shufflepartitioner<t>();
    }

    @override
    public string tostring() {
        return "shuffle";
    }
}

2.3 broadcastpartitioner

发送到下游所有的算子实例。

在这里插入图片描述

/**
 * 发送到所有的channel
 */
@internal
public class broadcastpartitioner<t> extends streampartitioner<t> {
    private static final long serialversionuid = 1l;
    /**
     * broadcast模式是直接发送到下游的所有task,所以不需要通过下面的方法选择发送的通道
     */
    @override
    public int selectchannel(serializationdelegate<streamrecord<t>> record) {
        throw new unsupportedoperationexception("broadcast partitioner does not support select channels.");
    }

    @override
    public boolean isbroadcast() {
        return true;
    }

    @override
    public streampartitioner<t> copy() {
        return this;
    }

    @override
    public string tostring() {
        return "broadcast";
    }
}

2.4 rebalancepartitioner

通过循环的方式依次发送到下游的 task

在这里插入图片描述

/**
 *通过循环的方式依次发送到下游的task
 * @param <t>
 */
@internal
public class rebalancepartitioner<t> extends streampartitioner<t> {
    private static final long serialversionuid = 1l;

    private int nextchanneltosendto;

    @override
    public void setup(int numberofchannels) {
        super.setup(numberofchannels);
        //初始化channel的id,返回[0,numberofchannels)的伪随机数
        nextchanneltosendto = threadlocalrandom.current().nextint(numberofchannels);
    }

    @override
    public int selectchannel(serializationdelegate<streamrecord<t>> record) {
        //循环依次发送到下游的task,比如:nextchanneltosendto初始值为0,numberofchannels(下游算子的实例个数,并行度)值为2
        //则第一次发送到id = 1的task,第二次发送到id = 0的task,第三次发送到id = 1的task上...依次类推
        nextchanneltosendto = (nextchanneltosendto + 1) % numberofchannels;
        return nextchanneltosendto;
    }

    public streampartitioner<t> copy() {
        return this;
    }

    @override
    public string tostring() {
        return "rebalance";
    }
}

2.5 rescalepartitioner

基于上下游 operator 的并行度,将记录以循环的方式输出到下游 operator 的每个实例。

举例:

  • 上游并行度是 2,下游是 4,则上游一个并行度以循环的方式将记录输出到下游的两个并行度上;上游另一个并行度以循环的方式将记录输出到下游另两个并行度上。
  • 若上游并行度是 4,下游并行度是 2,则上游两个并行度将记录输出到下游一个并行度上;上游另两个并行度将记录输出到下游另一个并行度上。

在这里插入图片描述

@internal
public class rescalepartitioner<t> extends streampartitioner<t> {
    private static final long serialversionuid = 1l;

    private int nextchanneltosendto = -1;

    @override
    public int selectchannel(serializationdelegate<streamrecord<t>> record) {
        if (++nextchanneltosendto >= numberofchannels) {
            nextchanneltosendto = 0;
        }
        return nextchanneltosendto;
    }

    public streampartitioner<t> copy() {
        return this;
    }

    @override
    public string tostring() {
        return "rescale";
    }
}

flink 中的执行图可以分成四层:streamgraphjobgraphexecutiongraph物理执行图

  • streamgraph:是根据用户通过 stream api 编写的代码生成的最初的图。用来表示程序的拓扑结构。
  • jobgraph:streamgraph 经过优化后生成了 jobgraph,提交给 jobmanager 的数据结构。主要的优化为,将多个符合条件的节点 chain 在一起作为一个节点,这样可以减少数据在节点之间流动所需要的序列化 / 反序列化 / 传输消耗。
  • executiongraph:jobmanager 根据 jobgraph 生成 executiongraph。executiongraph 是 jobgraph 的并行化版本,是调度层最核心的数据结构。
  • 物理执行图:jobmanager 根据 executiongraph 对 job 进行调度后,在各个 taskmanager 上部署 task 后形成的 “”,并不是一个具体的数据结构。

streamingjobgraphgenerator 就是 streamgraph 转换为 jobgraph。在这个类中,把 forwardpartitionerrescalepartitioner 列为 pointwise 分配模式,其他的为 all_to_all 分配模式。代码如下:

if (partitioner instanceof forwardpartitioner || partitioner instanceof rescalepartitioner) {
            jobedge = downstreamvertex.connectnewdatasetasinput(
                headvertex,
                // 上游算子(生产端)的实例(subtask)连接下游算子(消费端)的一个或者多个实例(subtask)
                distributionpattern.pointwise,
                resultpartitiontype);
        } else {
            jobedge = downstreamvertex.connectnewdatasetasinput(
                headvertex,
                // 上游算子(生产端)的实例(subtask)连接下游算子(消费端)的所有实例(subtask)
                distributionpattern.all_to_all,
                resultpartitiontype);
        }

2.6 forwardpartitioner

发送到下游对应的第一个 task,保证上下游算子并行度一致,即上游算子与下游算子是 1 : 1 1:1 1:1 的关系。

在这里插入图片描述

/**
 * 发送到下游对应的第一个task
 * @param <t>
 */
@internal
public class forwardpartitioner<t> extends streampartitioner<t> {
    private static final long serialversionuid = 1l;

    @override
    public int selectchannel(serializationdelegate<streamrecord<t>> record) {
        return 0;
    }

    public streampartitioner<t> copy() {
        return this;
    }

    @override
    public string tostring() {
        return "forward";
    }
}

在上下游的算子没有指定分区器的情况下,如果上下游的算子并行度一致,则使用 forwardpartitioner,否则使用 rebalancepartitioner,对于 forwardpartitioner,必须保证上下游算子并行度一致,否则会抛出异常。

//在上下游的算子没有指定分区器的情况下,如果上下游的算子并行度一致,则使用forwardpartitioner,否则使用rebalancepartitioner
if (partitioner == null && upstreamnode.getparallelism() == downstreamnode.getparallelism()) {
    partitioner = new forwardpartitioner<object>();
} else if (partitioner == null) {
    partitioner = new rebalancepartitioner<object>();
}

if (partitioner instanceof forwardpartitioner) {
    //如果上下游的并行度不一致,会抛出异常
    if (upstreamnode.getparallelism() != downstreamnode.getparallelism()) {
        throw new unsupportedoperationexception("forward partitioning does not allow " +
            "change of parallelism. upstream operation: " + upstreamnode + " parallelism: " + upstreamnode.getparallelism() +
            ", downstream operation: " + downstreamnode + " parallelism: " + downstreamnode.getparallelism() +
            " you must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");
    }
}

2.7 keygroupstreampartitioner

根据 key 的分组索引选择发送到相对应的下游 subtask

在这里插入图片描述

  • org.apache.flink.streaming.runtime.partitioner.keygroupstreampartitioner
/**
 * 根据key的分组索引选择发送到相对应的下游subtask
 * @param <t>
 * @param <k>
 */
@internal
public class keygroupstreampartitioner<t, k> extends streampartitioner<t> implements configurablestreampartitioner {
...

    @override
    public int selectchannel(serializationdelegate<streamrecord<t>> record) {
        k key;
        try {
            key = keyselector.getkey(record.getinstance().getvalue());
        } catch (exception e) {
            throw new runtimeexception("could not extract key from " + record.getinstance().getvalue(), e);
        }
        //调用keygrouprangeassignment类的assignkeytoparalleloperator方法,代码如下所示
        return keygrouprangeassignment.assignkeytoparalleloperator(key, maxparallelism, numberofchannels);
    }
...
}
  • org.apache.flink.runtime.state.keygrouprangeassignment
public final class keygrouprangeassignment {
...

    /**
     * 根据key分配一个并行算子实例的索引,该索引即为该key要发送的下游算子实例的路由信息,
     * 即该key发送到哪一个task
     */
    public static int assignkeytoparalleloperator(object key, int maxparallelism, int parallelism) {
        preconditions.checknotnull(key, "assigned key must not be null!");
        return computeoperatorindexforkeygroup(maxparallelism, parallelism, assigntokeygroup(key, maxparallelism));
    }

    /**
     *根据key分配一个分组id(keygroupid)
     */
    public static int assigntokeygroup(object key, int maxparallelism) {
        preconditions.checknotnull(key, "assigned key must not be null!");
        //获取key的hashcode
        return computekeygroupforkeyhash(key.hashcode(), maxparallelism);
    }

    /**
     * 根据key分配一个分组id(keygroupid),
     */
    public static int computekeygroupforkeyhash(int keyhash, int maxparallelism) {

        //与maxparallelism取余,获取keygroupid
        return mathutils.murmurhash(keyhash) % maxparallelism;
    }

    //计算分区index,即该key group应该发送到下游的哪一个算子实例
    public static int computeoperatorindexforkeygroup(int maxparallelism, int parallelism, int keygroupid) {
        return keygroupid * parallelism / maxparallelism;
    }
...

2.8 custompartitionerwrapper

通过 partitioner 实例的 partition 方法(自定义的)将记录输出到下游。

public class custompartitionerwrapper<k, t> extends streampartitioner<t> {
    private static final long serialversionuid = 1l;

    partitioner<k> partitioner;
    keyselector<t, k> keyselector;

    public custompartitionerwrapper(partitioner<k> partitioner, keyselector<t, k> keyselector) {
        this.partitioner = partitioner;
        this.keyselector = keyselector;
    }

    @override
    public int selectchannel(serializationdelegate<streamrecord<t>> record) {
        k key;
        try {
            key = keyselector.getkey(record.getinstance().getvalue());
        } catch (exception e) {
            throw new runtimeexception("could not extract key from " + record.getinstance(), e);
        }
		//实现partitioner接口,重写partition方法
        return partitioner.partition(key, numberofchannels);
    }

    @override
    public streampartitioner<t> copy() {
        return this;
    }

    @override
    public string tostring() {
        return "custom";
    }
}

比如:

public class custompartitioner implements partitioner<string> {
      // key: 根据key的值来分区
      // numpartitions: 下游算子并行度
      @override
      public int partition(string key, int numpartitions) {
         return key.length() % numpartitions;//在此处定义分区策略
      }
  }
(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com