袋鼠云在实时数据湖上的探索实践
导读 本文将分享袋鼠云在实时数据湖上的探索与实践。
主要内容包括以下五大部分:
-
背景介绍
-
实时数据湖解读
-
探索与实践
-
发展规划
-
问答环节
分享嘉宾|郝卫亮 袋鼠云 大数据引擎开发工程师
编辑整理|李欣卫
内容校对|李瑶
出品社区|DataFun
01背景介绍
首先来介绍一下袋鼠云,以及引入数据湖的背景。
1. 关于袋鼠云
袋鼠云是领先的数字化基础软件与应用服务商,基于 Data+AI 技术,提供云原生数字基础设施、数字孪生和可观测运维等一站式大数据产品和服务,涵盖从数据采集、数据开发与治理、数据资产、数据安全、数据服务到数据分析与洞察、数据可视化等数据智能应用,帮助客户和伙伴发展新质生产力,助力产业数字化转型。目前为止共服务客户 5000+,覆盖行业 50+,省级区域触达 30+。
2. 数栈
数栈是袋鼠云自研的一站式大数据基础软件,可供企业进行数据仓库的开发和提高数据挖掘的效率。从数栈架构图中可以看到,数栈向下对接了很多的存储与计算引擎,包括 Hadoop、CDH、HDP 等,向上提供了离线和实时开发,以满足日常开发需求,另外还具有数据资产管理的功能。对外提供 API 服务,可以将生产的数据对外提供。
3. 痛点分析
未引入数据湖之前,数栈为企业提供的方案是基于 lambda 架构的。数据从 RDB 数据实时采集到 Kafka 再进行实时加工,另外通过 T+1 的模式将数据周期同步到 Hive 中进行离线加工,这样就需要维护两套存储与计算组件。由于实时路线中Kafka 的数据是顺序读的,当出现问题时往往需要随机查询,Kafka 在这方面比较弱势,在开发和问题排查上较为低效。两条链路中使用的计算引擎分别为 Spark 和 Flink,这样就容易导致计算的数据口径不一致。基于这些原因,我们决定引入数据湖来解决上述问题。
02实时数据湖解读
1. 数据湖解读
(1)提供了多样化的分析能力,不限于批处理、流处理、交互式查询和机器学习;
(2)提供了 ACID 事物能力,可以更好的保障数据质量;
(3)提供了完善的数据管理能力,包括数据格式、数据 schema 等;
(4)提供了存储介质可扩展的能力,支持 HDFS、对象存储等。
数据湖提供的这些能力在价值体现上可用四个字来形容,那就是"降本增效"。数据湖在存储层实现了流批一体,既可以作为流存储,也可以作为批存储,这样就不会像 lambda 架构中流批分开存储,节省了存储成本。使用数据湖组件 Iceberg或 Hudi 取代 Kafka,作为流批一体的存储,节省了存储成本,也减少了运维成本,提高了开发效率。基于数据湖的开放优势,允许批处理、允许流处理、也允许机器学习查询,这样也优化了数据挖掘的处理链路。
2. 数据湖技术开源方案
当前比较流行的数据湖方案有 4 个:Iceberg、Hudi、Delta 和 Paimon。其中,Hudi 在国内使用最广泛,功能最丰富。Paimon 是后起之秀。Hudi 优于 Iceberg 的地方有很多,比如小文件治理上,Iceberg 没有小文件自动治理功能,Hudi 有自动治理功能,尤其在流处理上。正如 Hudi 定义中所提到的,Hudi 是一个事务性的数据湖平台,具有多种功能,能够一键使用这些功能。
3. 基于数据湖的数栈解决方案
数栈采用自研的 ChunJun 组件进行数据采集实时入湖,然后使用 Flink 实时计算组件进行数据的实时加工,在进行实时数据湖建设的时候,流计算平台也支持了批模式的 Flink,这样便可以实现批流一体。自研的 EasyLake 数据湖管理平台也可以实现对湖的管理。
这样就实现了流批一体的存储和基于 Flink 批流一体的数据加工。实时链路由数据湖组件 Hudi 或 Iceberg 替换掉了 Kafka,在开发和问题排查上实现了效率的提升。因为在流数据加工上只使用了 Flink 这一个计算组件,这样就避免了多个组件计算结果口径不一致的问题。数据存储上因为只使用了 Hudi 或 Iceberg 一种数据湖存储,去掉了 Kafka 和 Hive 存储,降低了存储成本。
以上就是数栈基于数据湖的解决方案。
03探索与实践
1. 数栈基于数据湖的实践
建设数据湖首先需要做入湖操作。实时数据湖的第一步是实时入湖,实时数据入湖有两种,一种是接入 Kafka 做 insert 操作,一种是使用 CDC 技术直接采集 RDB 数据库。实时入湖方面支持 Iceberg 和 Hudi 实时入湖,流计算方面支持 Iceberg 和 Hudi 的加工,也支持 Flink 批模式,在此基础上支持了 Iceberg/Hudi 指标的展示,如表大小、文件数等;离线平台支持 Spark 对接 Iceberg 和 Hudi;数据湖管理平台 EasyLake 支持一键转湖表、湖表治理和 Unified Catalog。作为数据湖平台,需要集成多种数据湖表格式,比如 Iceberg、Hudi,以后还会对接 Paimon,这样就需要在上层提供一个统一的 Catalog 来屏蔽不同的数据湖 API。
数据从实时入湖到实时 ETL,为了在 ETL 阶段进行提效,后续将会在 ETL 加速-物化视图上进行探索。
2. 实时入湖
CDC 数据实时入湖具有四大特性:实时性高、历史数据量大、强一致性和Schema 动态演进。
3. CDC 实时入湖方案
CDC 实时入湖方案基于自研组件 ChunJun 进行开发。在此之前 ChunJun 组件已经完成 Oracle、MySQL、PG 和 SQL Server 的 CDC 数据采集,接入数据湖后只需进行 Hudi sink 和 Iceberg sink 的开发即可完成业务库到数据湖的实时数据接入。
(1)自主可控:此功能是自研功能,可自由增减功能,实现自主可控;
(2)全增量一体化:接入业务库并完成一次全量消费后,可通过一键操作切换到增量消费;
(3)分钟级延迟;
(4)链路短:只基于 ChunJun 组件接入业务库,无需额外组件,数据接入更快捷方便;
(5)对业务稳定性无影响:CDC Source 扩展了限制速率的功能,可以通过设置不同的速率减小对业务库的压力。
4. 实时入湖落地遇到的问题
- 小文件问题优化-设置合理的 Checkpoint Interval
数据从业务库实时采集入湖,在入湖后会根据 Checkpoint 设置的时间来产生数据文件,当 Checkpoint 设置的时间到达时接入的数据会产生一个数据文件。所以在设置 Checkpoint 时间间隔时不能设得过小,过小就会产生小文件问题。Checkpoint 间隔设置过小还可能会影响任务的稳定性,在实际生产中,当把 Checkpoint 设置为 30 秒时,数据出现了重复问题,经过排查是因为 Hudi 中的一个 bug 导致出现两个 sink,重复接入数据。经过多次实验,将 Checkpoint 设置为 1-5 分钟比较合适,既不会产生过多小文件,也能保证任务的稳定性。
- 小文件问题优化-小文件治理
当 Checkpoint 设置为 1 分钟的时候还会出现比较少的小文件问题,因此,基于EasyLake 开发了小文件治理的功能。数据实时入湖产生文件在前,然后再基于配置的小文件治理规则进行周期性的小文件治理。治理包括数据文件治理和快照文件治理,也支持 Hudi MOR 增量文件合并。因此,可以将 EasyLake 看作为数据湖的护城河。
- Hudi 适配 Flink1.12
当前 ChunJun 支持 Flink1.10、Flink1.12 和 Flink1.16,因为大部分用户使用 Flink1.12,所以针对 Hudi 进行二次开发及适配。适配方法是将 Flink1.13 进行复制修改版本号,改成 1.13 版本,修改版本号后的Flink 不兼容的地方主要在 Catalog 的 API 上,经过兼容性修改并测试后就完成了适配。
- 跨集群入湖
多套 Hadoop 集群的情况下会出现跨集群读取数据的情形,在建设数据湖的时候需要做到多套集群数据入湖的能力。
- 跨集群入湖方案
5. ETL 加速探索-物化视图
在实时数据湖中主要包含三类任务:实时 ETL、离线 ETL 和 OLAP。很多场景下实时 ETL 会使用到离线数据。数仓加工从 ODS 层到 ADS 层聚合操作会越来越多,IO 越来越密集,多个任务 SQL 中也会有很多相同逻辑的 SQL 代码片段。对于相同的 SQL 逻辑片段可以单独抽取出来形成物化视图,其他依赖于此的任务就能够加速计算。因此流数据计算也会实现从入湖到加工到出湖的计算流加速。
基于目前的探索,在数据湖中落地物化视图需要进行四方面的工作:
- 数据湖上创建的物化视图需要进行平台化的管理,如果没有规范化的管理,那么创建的物化视图将变成垃圾数据;
- 需要 Spark 支持对数据湖表格式管理物化视图的创建和重写;
- 需要 Trino 支持对数据湖表格式管理物化视图的创建和重写,因为数栈使用了Trino 实现 OLAP;
- 需要 Flink 支持对数据湖物化视图的创建。
平台化物化视图管理和 Flink 物化视图管理目前还在研发阶段,Spark 和 Trino 已完成。
物化视图实质是一张特殊的表,它的创建和刷新等功能开发都相对简单,难点在于业务 SQL 自动匹配物化视图的重写过程。物化视图其实是 SQL 查询的重写,SQL 语句在进行解析的时候首先会生成抽象语法树,然后再生成逻辑执行计划,逻辑执行计划会关联一些规则,物化视图的重写就是在这里生成的。
物化视图的重写分两阶段,第一阶段是物化视图的匹配,第二阶段是逻辑的重写。针对复杂 SQL 的匹配,会将复杂 SQL 分解成多个子查询,对子查询进行遍历匹配。在对子查询进行视图匹配时,因为视图数量比较多,遍历匹配比较耗时,这里采用了倒排索引的思想。因为每个视图都是一段 SQL,每个 SQL 都有主表,针对这个主表进行索引查询,将查询结果再进行视图匹配,匹配它们的节点、project、filter、agg 和 join 这些算子,匹配成功后进行下一步的重写。比如 project,就是 SQL 查询的列,匹配的物化视图的列需要包含子查询的列才能重写。最后,在物化视图匹配完成后,物理执行计划进行执行操作,当执行操作失败时需要有失败回退的操作,以保证 SQL 执行的稳定性。
04发展规划
最后来分享一下未来发展规划。
- 增加平台的易用性,使针对 Hudi 和 Iceberg 的管理更加简便。比如,对 snapshot 的读取列表提供可视化管理功能。
- 引入 Paimon,目前 Paimon 势头迅猛,因为它与 Flink 天然的兼容性,未来很可能会被广泛应用。
- 提升入湖性能。深入并增强内核,提升入湖的性能。
- 安全性探索:数据湖提供了共享思维,数据共享需要考虑安全性,又因为支持多引擎查询,所以在多个查询引擎查询数据时数据的安全性更需要考虑。
05
问答环节
Q1:是否支持 Oracle 从库?
A1:不支持从库。
Q2:实时湖方案最终生成的简单指标延迟多久?
A2:需要根据数据流加工任务数来计算,一个任务延迟 1 分钟,5 个就是 5 分钟左右。
Q3:实时入湖 Hudi 元数据表怎么管理,Schema 变化如何同步元数据信息。
A3:Hudi 的 Schema 数据维护在存储里面,而不像 Hive 那样维护在 metastore 表中。Hudi 的 Schema 维护了版本信息,即使数据变化也可以读取旧数据。
Q4:Iceberg 和 Paimon 除了流方面还有其他方面的差异吗?
A4:Paimon 的入湖性能优于 Iceberg;小文件管理方面 Paimon 也更优秀,因为 Paimon 支持自动的小文件治理,Iceberg 没有自动的,需要额外治理。离线方面还未测试。
Q5:ChunJun 支持 CDC 方式从 Iceberg 到 MySQL、TiDB 吗?
A5:支持。因为 Iceberg 本身就支持 CDC 增量消费快照,所以 ChunJun 也直接能支持,需要选择 Iceberg 的 v2 表。
Q6:ChunJun SQL 血缘解析有开源方案吗?
A6:暂时没有。数栈内部是采用自研 SQLParser 组件实现的,是基于 Calcite 实现的。
Q7:Iceberg 和 Paimon 批对比计划何时研究?
A7:这一两个月。目前主要精力放在 Paimon 的流处理上。