当前位置: 代码网 > 科技>人工智能>数据分析 > 大数据最新Doris实战——结合Flink构建极速易用的实时数仓_flink doris(1),实践出真知

大数据最新Doris实战——结合Flink构建极速易用的实时数仓_flink doris(1),实践出真知

2024年08月05日 数据分析 我要评论
原文大佬的这篇Doris+Flink构建实时数仓的实战文章整体写的很深入,这里直接摘抄下来用作学习和知识沉淀。本篇文章介绍如何基于Doris和Flink快速构建一个极速易用的实时数仓,包括数据同步、数据集成、数仓分层、数据更新、性能提升等方面的具体应用方案。

img
img

网上学习资料一大堆,但如果学到的知识不成体系,遇到问题时只是浅尝辄止,不再深入研究,那么很难做到真正的技术提升。

一个人可以走的很快,但一群人才能走的更远!不论你是正从事it行业的老鸟或是对it行业感兴趣的新人,都欢迎加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!

3.5.2 部分列更新

3.6 如何进一步提升查询性能

3.6.1 智能物化视图

3.6.2 分区分桶裁剪

3.6.3 索引查询加速

3.6.4 执行层查询加速

四、行业最佳实践

4.1 跨境电商

4.2 运营服务商

4.3 供应链企业

原文大佬的这篇doris+flink构建实时数仓的实战文章整体写的很深入,这里直接摘抄下来用作学习和知识沉淀。

本篇文章介绍如何基于doris和flink快速构建一个极速易用的实时数仓,包括数据同步、数据集成、数仓分层、数据更新、性能提升等方面的具体应用方案。

一、实时数仓的需求与挑战

先介绍一下传统的数据架构如何设计的、又存在哪些痛点问题。下图为传统的数据架构,如果从数据流的角度分析传统的数据处理架构,会发现从源端采集到的业务数据和日志数据,主要分为实时和离线两条链路:

  • 在实时数据部分,通过binlog方式,将业务数据库中的数据变更(cdc,change data capture)采集到实时数仓。同时,通过flume -kafka-sink对日志数据进行实时采集,当不同来源的数据都采集到实时存储系统后,便可以基于实时存储系统来构建实时数仓。在实时数仓的内部,仍然会遵守传统数仓分层理论,将数据分为 ods 层、dwd 层、dws 层、 ads 层以实现最大程度的模型复用。
  • 在离线数据部分,通过datax定时同步的方式,批量同步业务库rds中的数据。当不同来源的数据进入到离线数仓后,便可以在离线数仓内部,依赖spark sql 或hive sql 对数据进⾏定时处理,分离出不同层级(ods 、dwd 、ads 等)的数据,并将这些数据存在⼀个存储介质上,⼀般会采用如 hdfs 的分布式文件系统或者 s3 对象存储上。通过这样的⽅式,离线数仓便构建起来了。与此同时,为了保障数据的一致性,通常需要开启数据清洗任务,使用离线数据对实时数据进行清洗和定期覆盖,保障数据最终的一致性

从技术架构的角度对传统数据技术栈进行分析,会发现为了迎合不同场景的需求,往往会采用多种技术栈,例如对于 olap 场景的多维分析,一般使⽤ doris 或 kylin、 druid。除此之外,为应对半结构化数据的分析需求,例如日志分析与检索场景,通常会使⽤ es 进行分析;面对高并发点查询的 data serving 场景会使⽤ hbase等。其中涉及到的数据组件有数十种,高昂的使用成本和组件间兼容、维护及扩展带来的繁重压力成为企业必须要面临的问题。

从上述介绍即可知道,传统的数据架构存在几个核心的痛点问题:

  • 传统数据架构组件繁多,维护复杂,运维难度非常高。
  • 计算、存储和研发成本都较高,与行业降本提效的趋势背道而驰。
  • 同时维护两套数据仓库(实时数仓和离线数仓)和两套计算(实时数据量和实时计算任务),数据时效性和一致性无法保证。

在此背景下,需要构建⼀个“极速、易用、统一、实时”的数据架构来解决这些痛点:

  • 极速:更快的查询速度,最大化提升业务分析人员的效率;
  • 易用:对于用户侧的使用和运维侧的管控,都提供了极简的使用体验;
  • 统一:异构数据与分析场景的统一,半结构化和结构化数据可以统一存储,多分析场景可以统一技术栈
  • 实时:端到端的高时效性保证,发挥实时数据的价值

**二、**构建极速易用的实时数仓架构

采取doris和flink 来构建极速易用的实时数仓,具体架构如下图所示。多种数据源的数据经过flink cdc 集成或flink job加工件处理后,入库到doris或者hive等湖仓中,最终基于doris提供统一的查询服务。

在数据同步上,通过flink cdc将rds的数据实时同步到doris。通过routine load将kafka等消息系统的数据实时同步到doris,在数仓分层上,ods层通常选择明细模型构建,dwd层可以通过sql调度任务,对ods数据抽取并获取,dws和ads层则可以通过物化视图和rollup进行构建。在数据湖上, doris ⽀持为 hive、iceberg 、hudi 以及delta lake(todo)提供联邦分析和湖仓加速的能⼒。在数据应用上,apache doris 既可以承载批量数据加工处理的需求,也可以承载高吞吐的 ad-hoc(数据探索) 和高并发点查询等多种应⽤场景。

三、解决方案

3.1 如何实现数据的增量与全量同步

3.1.1 增量及全量数据同步

在全量数据和增量的同步上,采取了flink cdc来实现。其原理非常简答,flink cdc实现了基于snapshot的全量数据同步,基于 binlog的实时增量数据同步。全量数据同步和增量数据同步可以自动切换,因此在数据迁移过程中,只需要配置好同步的表即可。当flink任务启动时,优先进行历史表的数据同步,同步完成后自动切换成实时同步

3.1.2 数据一致性保证

如何保证数据一致性是大家重点关注的问题之一,那么在新架构是如何实现的呢?

flink cdc通过flink checkpoint 机制结合doris两阶段提交可以实现端到端的exactly once语义,具体过程分为四步:

  • 事务开启(flink job启动及doris事务开启):当flink任务启动后,doris sink 会发起 precommit 请求,随后开启写⼊事务。
  • 数据传输(flink job的运行和数据传输):在flink job运行过程中,doris sink不断从上游算子获取数据,并通过 http chunked 的⽅式持续将数据传输到 doris。
  • 事务预提交:当flink开始进行checkpoint时,flink会发起checkpoint请求,此时flink各个算子会进行barrier对齐和快照保存,doris sink发出停止 stream load 写⼊的请求,并发起一个事务提交请求到doris。这步完成后,这批数据已经完全写入doris be中,但是be没有进行数据发布前对用户是不可见的。
  • 事务提交:当flink的checkpoint完成之后,将通知各个算子,doris发起一次事务提交到doris be,be对此次写入的数据进行发布,最终完成数据流的写入。

综上可知,利用 flink cdc结合doris 两阶段事务提交保证了数据写入一致性。需要注意的是,在该过程中可能遇到一个问题:如果事务预提交成功、但 flink checkpoint 失败了该怎么办?针对该问题,doris 内部支持对写⼊数据进⾏回滚(rollback),从⽽保证数据最终的⼀致性

3.1.3 ddl 和 dml 同步

随着业务的发展,部分用户可能存在rds schema的变更需求。当rds表结构发生变更时,用户期望flink cdc不但能够将数据变化同步到doris,也希望将 rds 表结构的变更同步到 doris,⽤户则无需担⼼ rds 表结构和 doris 表结构不⼀致的问题。

  • light schema change

apache doris 1.2.0 已经实现了  light schema change 功能,可满⾜ ddl 同步需求,快速⽀持 schema 的变更

light schema change的实现原理相对简单,对数据表的加减列操作,不再需要同步更改数据文件,仅需要再fe中更新元数据即可,从而实现毫秒级的schema change 操作。由于 light schema change 只修改了 fe的元数据,并没有同步给 be。因此会产⽣ be 和 fe schema 不⼀致的问题。为了解决这种问题,我们对 be 的写出流程进⾏了修改,具体包含三个⽅⾯。

经过对 light schema change 写出流程的优化后, 单个 schema chang 从 310 毫秒降低到了 7 毫秒,整体性能有近百倍的提升,彻底的解决了海量数据的 schema change 变化难的问题。

  • flink cdc dml 和ddl同步

有了 上述light schema change 的保证,flink cdc 能够同时⽀持dml 和ddl 的数据同步。那么是如何实现的呢?

解决了数据同步过程中源数据⼀致性的保证、全量数据和增量数据的同步以及 ddl 数据的变更后,一个完整的数据同步⽅案就基本形成了。

3.2 如何基于flink实现多种数据集成

除了上文中所提及的基于 flink cdc 进行数据增量/全量同步外,我们还可以基于 flink job 和 doris 来构建多种不同的数据集成方式:

  • 将mysql中两个表的数据同步到flink后,在flink内部进行多流join完成数据打宽,后将大宽表同步到doris中。
  • 对上游的kafka数据进行清洗,在flinkjob完成清洗后,通过doris-sink写入到doris中。
  • 对mysql数据和kafka数据在flink内部进行多流join,将join后的宽表结果写入doris中。
  • 在doris侧预先创建宽表,将上游rds中的数据根据key写入,使用doris的部分列更新将多列数据写入到doris的大宽表中。

3.3 如何选择数据模型

doris针对不同场景,提供了不同的数据模型,分别为明细模型、聚合模型、主键模型。

3.3.1 duplicate 明细模型

在某些多维分析场景下,数据既没有主键,也没有聚合需求,duplicate 数据模型可以满足这类需求。明细模型主要用于需要保留原始数据的场景,如日志分析,用户行为分析等场景。明细模型适合任意维度的 ad-hoc 查询(即席查询)。虽然同样无法利用预聚合的特性,但是不受聚合模型的约束,可以发挥列存模型的优势(只读取相关列,而不需要读取所有 key 列)。

3.3.2 aggregate 聚合模型

在企业实际业务中有很多需要对数据进行统计和汇总操作的场景,如需要分析网站和 app 访问流量、统计用户的访问总时长、访问总次数,或者像厂商需要为广告主提供广告点击的总流量、展示总量、消费统计等指标。在这些不需要召回明细数据的场景,通常可以使用聚合模型,比如上图中需要根据门店 id 和时间对每个门店的销售额实时进行统计。

3.3.3 unique key 主键模型

在某些场景下用户对数据更新和数据全局唯一性有去重的需求,通常使用unique key 模型。在 unique 模型中,会根据表中的主键进⾏upsert 操作:对于已有的主键做 update 操作,更新 value 列,没有的主键做 insert 操作,比如图中我们以订单id为唯一主键,对订单上的其他数据(时间和状态)进行更新。

3.4 如何构建数仓分层

由于数据量级普遍较大,如果直接查询数仓中的原始数据,需要访问的表数量和底层文件的数量都较多,不同层级对数据或指标做不同粒度的抽象,通过复用数据模型来简化数据管理压力,利用血缘关系来定位数据链路的异常,同时进一步提升数据分析的效率,在doris 中可以通过以下多种思路来构建数据仓库分层:

3.4.1 微批调度

通过insert into select 可以将原始表的数据进行处理和过滤并写入到目标表中,这种sql抽取数据的行为 一把是以微批形式进行(例如15分钟一次的etl计算任务),通常发生在从ods到dwd层数据的抽取过程中,因此需要借助外部的调度工具例如dolphinscheduler等来对etl sql进行调度。

3.4.2 物化视图与rollup

物化视图本质是一个预先计算的过程,可以在base表上,创建不同的物化视图或rollup来对base表进行聚合计算。通常在明细层到汇总层(例如dwd层到dws层或 dws层到ads层)的汇聚过程中,可以使用物化视图,以此实现指标的高度聚合。同时物化视图的计算是实时进行的,因此站在计算的角度,也可以将物化视图理解为一个单表上的实时计算过程

3.4.3 多表物化视图

doris 2.0将实现多表物化视图这一功能,可以将带有 join 的查询结果固化以供用户直接查询,支持定时自动或手动触发的方式进行全量更新查询结果。基于多表物化视图这一功能的实现,可以做更复杂的数据流处理,比如数据源侧有 tablea、tableb、tablec,在多表物化视图的情况下,用户就可以将 tablea 和 tableb 的数据进行实时join 计算后物化到 mv1 中。在这个角度上来看,多表物化视图更像一个多流数据实时 join 的过程。

3.5 如何应对数据更新

在实时数仓构建的过程中,还需面临高并发写入和实时更新的挑战,如何在亿级数据中快速找到需要更新的数据,并对其进行更新,⼀直都是大数据领域不断追寻的答案。

3.5.1 高并发数据更新

在doris中通过unique key 模型来满足数据更新的需求,同时通过mvcc多版本并发机制来实现数据的读写隔离。当新数据写入时,如果不存在相同key的数据则会直接写入,如果有相同key的数据则增加版本,此时数据将多个版本的形式存在。后台会启动异步的compaction进程对历史版本的数据进行清理,当用户在查询时,doris会将最新版本对应的数据返回给用户,这种设计解决了海量数据的更新问题。

在doris中提供了merge-on-read和merge-on-write两种数据更新模式。

在此我们以订单数据的写入为例介绍 merge-on-read 的数据写入与查询流程,三条订单数据均以 append 的形式写⼊ doris 表中:

  • 数据 insert:首先写入 id 为 1,2,3 的三条数据;
  • 数据 update:当我们将订单 1 的 cost 更新为 30 时,其实是写⼊⼀条 id 为 1,cost 为 30 的新版本数据,数据通过 append的形式写⼊ doris;
  • 数据 delete:当我们对订单 2 的数据进行删除时,仍然通过 append ⽅式,将数据多版本写⼊ doris ,并将 _doris_delete_sign 字段变为 1 ,则表示这条数据被删除了。当 doris 读取数据时,发现最新版本的数据被标记删除,就会将该数据从查询结果中进⾏过滤。

merge-on-read的特点是写入速度比较快,但是在数据读取过程中由于需要进行多路归并排序,存在着大量非必要的cpu计算资源消耗和io开销。

在1.2.0 版本中,doris在原有的unique key数据模型上增加了 merge-on-write的数据更新模式。merge-on-write兼顾了写入和查询性能,在写入过程中引入了delete bitmap数据结构,使用delete bitmap标记rowset中某一行是否被删除,为了保持unique key原有的语义,delete bitmap也支持多版本。另外使用了兼顾性能和存储空间的 row bitmap,将bitmap中的memtable一起存储在be中,每个segment会对应⼀个 bitmap。

写入流程:

  • deltawriter先将数据flush到磁盘
  • 批量检查所有 key,在点查过程中经过区间树,查找到对应的 rowset。
  • 在 rowset 内部通过 bloomfilter 和 index进行高效查询。

当查询到 key 对应的 rowset 后,便会覆盖 rowset key 对应的 bitmap,接着在 publish 阶段更新 bitmap,从而保证批量点查 key 和更新 bitmap 期间不会有新的可见 rowset,以保证 bitmap 在更新过程中数据的正确性。除此之外,如果某个 segment 没有被修改,则不会有对应版本的 bitmap 记录。

查询流程:

img
img
img

既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,涵盖了95%以上大数据知识点,真正体系化!

由于文件比较多,这里只是将部分目录截图出来,全套包含大厂面经、学习笔记、源码讲义、实战项目、大纲路线、讲解视频,并且后续会持续更新

[外链图片转存中…(img-iiy7lv1j-1715048683015)]
[外链图片转存中…(img-xhy6bfv9-1715048683015)]
[外链图片转存中…(img-9tho9ckf-1715048683016)]

既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,涵盖了95%以上大数据知识点,真正体系化!

由于文件比较多,这里只是将部分目录截图出来,全套包含大厂面经、学习笔记、源码讲义、实战项目、大纲路线、讲解视频,并且后续会持续更新

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com