目录
七、distributedplan计划(生成分布式逻辑阶段)
8.2 computescanrangeassignment阶段
8.3 computefragmentexecparams阶段
前言
下文主要介绍了doris sql解析的原理,即doris在接受sql后都做了什么?阐述了词法分析,语法分析,生成单机逻辑计划,生成分布式逻辑计划,生成分布式物理计划的过程。对应代码实现是parse, analyze, singleplan, distributedplan, schedule这五个部分。
doris sql解析原理也可以借助另外一篇文章进行辅助理解:
doris源码——查询源码解析-csdn博客文章浏览阅读191次,点赞6次,收藏7次。doris源码——查询源码解析https://blog.csdn.net/shwaitme/article/details/136281615?csdn_share_tail=%7b%22type%22%3a%22blog%22%2c%22rtype%22%3a%22article%22%2c%22rid%22%3a%22136281615%22%2c%22source%22%3a%22shwaitme%22%7d
一、doris简介
apache doris是一个基于mpp架构的高性能、实时的分析型数据库,能够较好的满足报表分析、即席查询、统一数仓构建等使用场景。doris整体架构非常简单,只有fe和be两类进程。fe主要负责用户请求的接入、查询解析规划、元数据的管理、节点管理相关工作。be主要负责数据存储、查询计划的执行。
在 doris 的存储引擎中,用户数据被水平划分为若干个数据分片(tablet,也称作数据分桶)。每个tablet 包含若干数据行。多个 tablet 在逻辑上归属于不同的分区(partition)。一个 tablet 只属于一个 partition。而一个 partition 包含若干个 tablet。tablet 是数据移动、复制等操作的最小物理存储单元。
官网链接指路:
二、sql解析简介
sql解析指的是:一条sql语句经过一系列的解析最后生成一个完整的物理执行计划的过程。解析过程主要包括以下四个步骤:词法分析,语法分析,生成逻辑计划,生成物理计划。
- 词法分析:解析原始sql文本,拆分token
- 语法分析:将token转换成抽象语法树(ast)
- 逻辑查询计划:
- 物理查询计划:在逻辑查询计划的基础上,根据数据的存储方式和机器的分布情况生成实际的执行计划
doris sql 解析架构具体介绍如下:
2.1 词法分析
词法分析主要负责将字符串形式的sql识别成一个个token,为语法分析做准备。
select ...... from ...... where ....... group by ..... order by ......
sql的token 可以分为如下几类:
○ 关键字(select、from、where)
○ 操作符(+、-、>=)
○ 开闭合标志((、case)
○ 占位符(?)
○ 注释
○ 空格
2.2 语法分析
语法分析主要负责根据语法规则,将词法分析生成的token转成抽象语法树(abstract syntax tree),如图2所示。
2.3 逻辑计划
逻辑计划负责将抽象语法树转成代数关系。代数关系是一棵算子树,每个节点代表一种对数据的计算方式,整棵树代表了数据的计算方式以及流动方向
2.4 物理计划
物理计划是在逻辑计划的基础上,根据机器的分布,数据的分布,决定去哪些机器上执行哪些计算操作。doris系统的sql解析也是采用这些步骤,只不过根据doris系统结构的特点和数据的存储方式,进行了细化和优化,最大化发挥机器的计算能力。
三、doris sql解析的总体架构
doris sql解析具体包括了五个步骤:词法分析,语法分析,生成单机逻辑计划,生成分布式逻辑计划,生成物理执行计划。具体代码实现上包含以下五个步骤:parse, analyze, singleplan, distributedplan, schedule。
四、parse阶段
parse阶段主要涉及三部分工作:
- 构建词法解析器
- 词法分析,将 doris sql中的关键词识别成一个个token
- 进行语法解析,将词法分析生成的token转成抽象语法树ast
五、analyze阶段
sql 语句被解析成ast之后,会被交给 stmtexecutor进行一些前期的处理和语义分析,为生成单机逻辑计划做准备,大概会做下面的事情:
- 元信息的识别和解析
识别和解析sql中涉及的 cluster, database, table, column 等元信息,确定需要对哪个集群的哪个数据库的哪些表的哪些列进行计算。
- sql 的合法性检查
窗口函数不能 distinct,投影列是否有歧义,where语句中不能含有grouping操作等。
- sql 重写
比如将 select * 扩展成 select 所有列,count distinct转成bitmap或者hll函数等。
- 函数处理
检查sql中包含的函数和系统定义的函数是否一致,包括参数类型,参数个数等。
- table与column别名处理
- 类型检查和转换
例如:二元表达式两边的类型不一致时,需要对其中一个类型进行转换(bigint和decimal比较,bigint类型需要转换成decimal)
总结,对ast进行analyze后会再进行一次rewrite操作,进行精简或者是转成统一的处理方式
六、singleplan阶段(生成单机逻辑plan阶段)
此阶段主要是根据ast抽象语法树生成算子数。树上的每个节点都是一个算子。如下图所示,scannode代表着对一个表的扫描操作,将一个表的数据读出来。hashjoinnode代表着join操作,将小表广播到大表所在的每个节点,内存中构建哈希表,然后遍历大表每条记录做关联。project算子表示投影操作,代表着最后需要输出的列,下图中的sql表示只用输出citycode这一列。
singleplan阶段主要做了如下几项工作:
- slot 物化:指确定一个表达式对应的列需要 scan 和计算,比如聚合节点的聚合函数表达式和 group by 表达式需要进行物化(slot:计算槽,是一个资源单位, 只有给 task 分配了一个 slot 之后, 这个task才可以运行)
- 投影下推:be在scan 时只会scan必须读取的列
- 谓词下推:在满足语义正确的前提下将过滤条件尽可能下推到scan节点
- 分区,分桶裁剪:根据过滤条件中的信息,确定需要扫描哪些分区,哪些桶的tablet
- join reorder:对于 join操作, doris会根据行数调整表的顺序,将大表放在前面。在保证结果不变的情况,通过规则计算最优(最少资源)join 操作
- sort + limit 优化成 topn:对于order by limit语句会转换成topn的操作节点
- materializedview 选择:会根据查询需要的列,过滤,排序和 join 的列,行数,列数等因素选择最佳的物化视图
-
向量化执行引擎选择:基于现代cpu的特点,重新设计列式存储系统的sql执行引擎,从而提高了cpu在sql执行时的效率,提升了sql查询的性能
-
runtime filter join:doris 在进行hash join 计算时会将小表广播到大表所在的各个节点上,构建一个内存哈希表,然后流式读出大表的数据进行hash join。而 runtimefilter是在右表生成哈希表的时候,动态生成一个基于哈希表数据的过滤条件,将该过滤条件下推到大表的数据扫描节点,从而减少扫描的数据量,避免不必要的i/o和网络传输。
七、distributedplan计划(生成分布式逻辑阶段)
7.1 distributedplan 概述
(1) 根据分布式环境,将单机的plannode树(plannode : 逻辑算子)拆分成分布式planfragment树(planfragment用来表示独立的执行单元)
(2)每个 planfragment 由 plannodetree 和 data sink 组成。plan分布式化的方法是增加 exchangenode,plannodetree执行计划树会以 exchangenode为边界拆分为 planfragment。 exchangenode主要是用于be之间的数据交换与共享,类似 spark 和 mr 中的 shuffle。
(3)distributedplan阶段的主要目标是最大化并行度和数据本地化。主要方法是将能够并行执行的节点拆分出去单独建立一个planfragment,用exchangenode代替被拆分出去的节点,用来接收数据。拆分出去的节点增加一个datasinknode,用来将计算之后数据传送到exchangenode中,做进一步的处理。
(4)distributedplanner中最主要的工作是决定join的分布式执行策略:broadcast join,shuffle join,bucket shuffle join,colocate join以及增加 aggregation 的 merge 阶段。
7.2 四种join算法:
对于查询操作来说,join操作是最常见的一种操作。doris目前支持4种join算法:broadcast join,shuffle join,bucket shuffle join,colocate join。
7.2.1 broadcast join
小表进行条件过滤后,将其广播到大表所在的各个节点上,形成一个内存hash 表,然后流式读出大表的数据hash join。doris会自动尝试进行 broadcast join,如果预估小表过大则会自动切换至 shuffle join。
7.2.2 shuffle join
大表和大表join时,一般采用hash partition join。它遍历表中的所有数据,计算key的哈希值,然后对集群数取模,选到哪台机器,就将数据发送到这台机器进行hash join操作。
7.2.3 bucket shuffle join
当join列是左表的分桶列,可以采用bucket shuffle join算法。下图中的hash(column) % n 中的n指的是左表的桶数,column代表的是join 列,同时也是分桶列。这样左表数据不移动,右表数据根据分区计算的结果发送到左表扫表的节点就可以完成join的计算。即只需网络传输一份右表数据就可以了,极大减少了数据的网络传输。
7.2.4 colocate join
两个表在创建的时候就指定了数据分布保持一致,那么当两个表的join key与分桶的key一致时,就会采用colocate join算法。由于两个表的数据分布是一样的,那么hash join操作就相当于在本地,不涉及到任何的数据传输,极大提高查询性能。
总结:上面这 4 种join方式灵活度是从高到低的,它对这个数据分布的要求是越来越严格,但 join计算的性能也是越来越好的。
7.3 分布式逻辑计划的核心流程
7.3.1 plannode
如果是plannode, 自底向上创建planfragment。
7.3.2 scannode
如果是scannode,则直接创建一个planfragment,planfragment的rootplannode是这个scannode。
7.3.3 hashjoinnode
如果是hashjoinnode,则首先计算下broadcastcost(成本),根据不同的条件判断选择哪种join算法。
(1)如果使用colocate join,由于join操作都在本地,就不需要拆分。设置hashjoinnode的左子节点为leftfragment的rootplannode,右子节点为rightfragment的rootplannode,与leftfragment共用一个planfragment,删除掉rightfragment。
(2)如果使用bucket shuffle join,需要将右表的数据发送给左表。所以先创建了一个exchangenode,设置hashjoinnode的左子节点为leftfragment的rootplannode,右子节点为这个exchangenode,与leftfragment共用一个planfragment,并且指定rightfragment数据发送的目的地为这个exchangenode。
(3)如果使用broadcast join,需要将右表的数据发送给左表。所以先创建了一个exchangenode,设置hashjoinnode的左子节点为leftfragment的rootplannode,右子节点为这个exchangenode,与leftfragment共用一个planfragment,并且指定rightfragment数据发送的目的地为这个exchangenode。
(4)如果使用hash partition join(也就是shuffle joun),左表和右边的数据都要切分,需要将左右节点都拆分出去,分别创建left exchangenode, right exchangenode,hashjoinnode指定左右节点为left exchangenode和 right exchangenode。单独创建一个planfragment,指定rootplannode为这个hashjoinnode。最后指定leftfragment, rightfragment的数据发送目的地为left exchangenode, right exchangenode。
八、schedule阶段
该阶段是根据分布式逻辑计划,创建分布式物理计划。主要解决以下问题:
-
哪个 be 执行哪个 planfragment
-
每个tablet选择哪个副本去查询
-
如何进行多实例并发
创建分布式物理计划的核心流程有:
8.1 prepare阶段
给每个planfragment创建一个fragmentexecparams结构,用来表示planfragment执行时所需的所有参数;如果一个planfragment包含有datasinknode,则找到数据发送的目的planfragment,然后指定目的planfragment的fragmentexecparams的输入为该planfragment的fragmentexecparams。
8.2 computescanrangeassignment阶段
对fragment合理分配,尽可能保证每个be节点的请求都是平均,针对不同类型的join进行不同的处理。
- computescanrangeassignmentbycolocate
针对colocate join 进行处理,由于join得两个表桶中的数据分布是一样的,他们是基于桶的join操作,所以在这里确定每个桶选择哪个host。在给host分配桶的时候,尽量保证每个host分配到的桶基本平均。
- computescanrangeassignmentbybucket
对bucket shuffle join进行处理,也只是基于桶的操作,所以在这里是确定每个桶选择哪个host。在给host分配桶时,同样需要尽量保证每个host分配到的桶基本平均。
- computescanrangeassignmentbyscheduler
针对其他类型的join进行处理。确定每个scannode读取tablet哪个副本。一个scannode会读取多个tablet,每个tablet有多个副本。为了使sca操作尽可能的分散到多台机器上去,提高并发性能,减少io压力,doris 采用了round-robin算法,使tablet的扫描尽可能分散到多台机器上去。例如100个tablet需要扫描,每个tablet有3个副本,假设集群有10台机器,在分配时,保障每台机器扫10个tablet。
8.3 computefragmentexecparams阶段
处理fragment执行参数,这个阶段解决planfragment下发到哪个be上执行, 以及如何处理实例并发问题。
8.4 create result receiver阶段
result receiver是查询完成后,最终数据需要输出的地方。
8.5 to thrift阶段
根据所有planfragment创建的rpc请求下发到be端执行,一个完整的sql解析过程完成了。
综上所述:
九、总结
本篇文章介绍了sql解析的通用流程:词法分析,语法分析,生成逻辑计划,生成物理计划。从总体上阐述了doris在sql解析这块的总体架构,从代码和算法层面上解析parse, analyze, singleplan, distributedplan, schedule五步骤的内容。
doris遵守了sql解析的常用方法,根据底层存储架构,以及分布式的特点,在sql解析这块进行了大量的优化,实现了最大化的计算并行度、最小化的数据网络传输,最大化减少需扫描的数据量,给sql执行层面减少了很多负担。
补充:hive sql编译成mapreduce任务的过程 见文章:
参考文章:
发表评论