如何使用 Arctic 更好地玩转 Apache Iceberg
导读: 数据基础设施发展的脚步从未停歇,当前风头正盛的是湖仓一体(Lakehouse)。Arctic 是由网易开源的搭建在 Apache Iceberg 等数据湖表格式之上的湖仓管理系统。本文将分享如何使用 Arctic 更好的玩转 Apache Iceberg。
下面的介绍分为四个部分:
-
Apache Iceberg 简介及原理
-
Apache Iceberg 在网易的实践
-
Arctic 如何高效治理 Apache Iceberg
-
Arctic 未来规划
分享嘉宾|周劲松 网易 平台开发专家
编辑整理|陈西 前程无忧
出品社区|DataFun
01/Apache Iceberg 简介及原理
1. Apache Iceberg 简介
Apache Iceberg 的定位非常清晰,它把自己定位成面向超大体量数据分析场景下的表格式。在大数据领域里面,巨量的数据被分散存储在不同的文件里,文件落在分布式存储系统内。传统的数据湖直接面向文件去做操作,这样会导致数据治理非常困难。Hive 提出了在表上用 SQL 对表进行操作,通过表和 SQL 这种大家更容易接受的方式,更快速地去处理数据、理解数据。Hive 是面向于离线开发去设计的,随着对数据处理时效性需求的不断提高,涌出了很多新的表格式,比较出名的有 Apache Iceberg、Apache Hudi 和 Delta。以 Apache Iceberg 为例,这类新型的表格式在功能和性能上相较于 Hive 都有不少增强。
(1)更易于管理与使用
① Schema evolution
以前 Hive 变更表结构会有很多限制,因为它的表结构和文件里面的数据格式是一种松耦合的方式,所以只能安全的在表上增加字段,或增加数据长度,这些操作一定要保证能向前兼容。在实际的生产场景中,业务系统可能有很复杂的表结构变更需求,比如删除字段、重命名等。以前在大数据系统里面要去应对这样的场景,是比较麻烦的。Apache Iceberg 很好的支持了 schema evaluation ,可以做到像变更数据库中的表结构一样的体验。
② Hidden partitioning
Iceberg 仍然沿用了数据库分区的使用习惯。Hive 表把分区定义成了一种特殊的字段,在查询的时候,一定要在分区字段值上加上条件,才能起到分区过滤。Hidden partitioning 回归了传统数据库的使用习惯,也就是分区是基于表上已有字段的一些额外划分规则,而不再是一些额外定义的字段。在生成数据和读取数据的时候,不再需要指定分区字段的值,它们会基于依赖的字段自动计算而来。
③ Partition evolution
Hive 的分区发生变化,就要对整个表进行重写,才能用到新的分区。Iceberg 的分区规则发生变化,后写的数据就会应用后面的分区规则,前面的数据是不需要重写的,表的管理更加友好。
④ Time travel
时间旅行使得用户可以看到任意时刻表中的数据。比如今天下午两点钟的时候,也可以看今天上午 10 点钟的数据,通过 time travel,可以方便地定位到当初的数据的版本,可以完成对历史数据的分析。
⑤ Rollback
把数据回退到历史的版本,当出现错误的数据写入的时候,rollback 会变得非常有用。
(2)流批统一
① Streaming/Batch Write & Read
Hive 是完全面向于离线去设计的,一个表同时只有写或者读,写和读都是批量操作。Iceberg 在读写接口上都做了流和批的兼容,流批统一的特性解决了生产场景中的 Lambda 架构带来的流批分割的问题。流批一体的基础是存储层面上是流批统一,基于此计算层面才能向着使用一套代码完成流批开发的目标。
② Serialization isolation
相比离线写入,流式处理的写入变得非常频繁,随之而来一个表上同时有读写操作变得非常普遍,良好的隔离能力能够保证在表上并发的读写操作都能得到正确的结果。Iceberg 多版本的机制提供了 serialization 的隔离级别。
③ Multiple concurrent writers
除了读写上的并发,流批一体后流与批也会同时写入一张表,Iceberg 基于乐观并发控制逻辑,在并发写入并无数据冲突时能够同时提交成功。
(3)高效与易扩展
① Scan planning is fast
元数据不再存储在单点的服务器上,而是分散的存储在不同的元数据文件里,并行的对元数据进行处理可以获得更好的读取性能。
② Advanced filtering
在元数据层面上增加了很多统计信息,比如说文件中每个字段的最大/最小值、这些元数据能够在数据查询时更快的过滤掉无需扫描的数据。
③ Works with any cloud Store
Iceberg 在研发之初就以支持各种存储设备为目标,比如说各种分布式文件系统或者对象存储。Iceberg 在访问云上的对象存储时,直接使用对象存储的 API,无需包装或转换成 HDFS 协议,最大可能减少性能衰减,能得到云上最好的性能。
2. Apache Iceberg 核心原理
(1)Catalog
Iceberg 使用 catalog 来管理所有的表,catalog 中存储了当前有哪些表,以及这些表如何按照 namespace 拆分。Iceberg 提供了丰富的 catalog 实现,也提供了API 方便你来扩展自己的 catalog 实现,以便你把表的元数据存储在任何想存储的地方。
① HadoopCatalog,直接使用文件目录管理表信息。
② HiveCatalog,将表的元数据信息存储在 Hive Metastore中。
③ JdbcCatalog,把表的元数据信息存储在数据库中。
(2)Metadata,存储了一个表内部的元数据
① Snapshots,每次数据写入都会生成一个新的 snapshot ,而读取是基于某个确定的 snapshot 来操作的,所以写入和读取可以并发进行,互不影响。在写入的时候,也可以通过 snapshot 去判断出是不是和其他的写入发生了冲突,因为 snapshot 是统一的一条线,提交的时候可以通过这条线做冲突的判断。snapshot 存储有个很重要的概念叫 current snapshot,是当前最新的版本,如果要读取历史版本的数据,也可指定时间或者 snaoshot id 来完成 time travel。
② Manifest list,存储的是 manifest file 列表,用来链接大量的 manifest file。
③ Manifest file,存储当前快照下的所有 data file,每行存储一个文件的信息。Manifest file 使用 avro 格式,为了提升读取文件信息的并发度,manifest file 一般会有多个。
④ Schemas
表所有的历史的 schema 信息都会存储在这里,每个 schema 会有一个自己的 schema id,每个写入的文件都有自己的 schema id 信息,当表结构发生变更,新老文件用的是不同的 schema id,这样就不需要把所有的数据重写。
⑤ Partition specs
分区配置,核心是各种 transform。Hive 的分区是配置分区字段,然后写入和读取的时候要读写这些分区字段。Iceberg 则是在已有的表字段上配置各种转换函数,比如表里有一个 timestamp 字段,需要用 timestamp 字段按天做分区,那就在 timestamp 字段上使用天的转换函数。写数据的时候,不需要再写入额外转换之后天的信息,只需要写入 timestamp,Iceberg 内部就会把这 timestamp 字段的值转换成分区的天的信息,在查询的时候,只要在 timestamp 字段做一个范围查询就能实现高效的基于分区的数据过滤。
3. Apache Iceberg format V2
Iceberg 现阶段有两个版本的 table format,面向大体量数据分析的 format version 1 与 提供行级更新能力的 format version 2, format version 2 相较于 format version 1,功能上有较多增强。
(1)核心的功能
① Row-level Delete,行级更新删除能力。以前数据写入都是文件级别的,更新文件里面的部分数据,需要对这个文件进行完整的重写,把老的文件删除,把新的文件写进去。这种方式的缺点是开销大,Flink 流式场景下基本不可用。format V2 新增行级更新删除,实现方式是新增了一种 delete file 文件类型,文件里记录了表中需要删除的数据信息。比如去做一个 update 操作,只需去找出这次 update 需要更新的数据在原来的文件里面的什么位置并记录到 position delete file 中即可,或者更简单直接地将这次 update 的条件信息记录在 equality delete file 中,再将更新后的内容写入新的 data file 中,将产生的 delete file 和 data file 提交到表中即完成了本次的 update 操作。
② Position delete file,通过被删除数据的位置信息来标记删除内容的 delete file,position delete file 记录被删除数据的 file path 和 position,并且在文件内所有数据会按照顺序排序后写入,这样在与 data file 合并时会有更好的合并性能。
③ Equality delete file,通过被删除数据的等值条件来标记输出内容的 delete file,比如 ID = 5 的数据要删除, Equality delete file存储的就是 ID = 5 这样的信息,在 Merge-on-Read 时就会把 ID = 5 的数据全部过滤掉。Iceberg Flink connector 当前会混合写入 position delete file 和 equality delete file。
④ File sequence,用来标记文件的版本。delete file 只对比自己 file sequence 更小的文件起作用。比如 file sequence 等于 2 的 delete file 只对 file sequence 小于等于 1 的 data file 起作用。每次写入的文件拥有相同的 file sequence,且随着写入顺序递增。
⑤ Rewrite files (compaction),把数据合并,把要删除的数据真实地删除掉,得到删除之后的 datafile,这样的 datafile 读取更高效。这个叫做 Iceberg 的 rewrite 过程。需要注意 rewrite 不会修改文件的 file sequence 信息。
4. Apache Iceberg Roadmap
Apache Iceberg 在 2022 年 11 月 3 日发布了 V1.0.0 版本,并仍然保持着每 2-3 个月发布一个大版本的节奏。V1.0.0 的发布标志着 Apache Iceberg 逐渐走向成熟。
(1)V1.0.0
① 支持在 Spark 引擎里通过 merge on read 的方式进行 update 和 merge 操作。这意味着不再只有 Flink 会写入 delete file,spark 也可以在更新少量数据时通过写入 delete file 来加快更新速度。
② Z-order 排序规则。数据在重写时,可以指定 z-order 排序规则。z-order 和普通排序的差异在于,z-order 不再限制查询条件一定包含排序字段里前面的所有字段,使用排序字段里的任意字段组合都可以通过 z-order 帮助完成数据过滤。
③ 提供了一种新的文件格式,Puffin 文件,它在未来将被用来存储二级索引、统计信息等。
(2)V1.1.0
① Apache Iceberg 提供了类似 git 的多分支管理功能,以帮组用户在表上同时进行不同的更新。
② 支持报表上查询的统计信息,记录查询过程每个步骤的耗时与详细信息,帮助分析表上的查询性能并进一步调优。
③ 正在开发的 Change Data Capture 提供获取表上更新日志的能力,无论是在流式场景还是批处理场景都非常实用。
④ 二级索引的支持也正在开发中,到时 Iceberg 将能提供基于 bloom filter、bitmap 等多种数据结构的二级索引,帮助提升表上的查询性能。
--
02/Apache Iceberg 在网易的实践
1. Apache Iceberg 网易云音乐的实践
**(1)场景:**网易云音乐的用户行为日志。
(2)特点
这个场景下,用户在网易云音乐上的每一个操作,都会产生一个行为日志,所以数据量是很大。日志数据单日就会产生 25-30TB 数据,数据被分散在超过 11 万个文件、100 多个分区中。
(3)提升
以前用 Hive 对这部分数据进行清洗一般需要两个小时,但其中有一半的时间都花在了 plan 阶段。plan 阶段负责找出需要清洗的文件,并将文件组装成多个任务,具体包括从 Hive Metastore 中发现满足条件的分区,并到 Name Node 找出每个分区下的文件集合。
架构迁移到 Iceberg 之后,发现清洗任务的 plan 过程提升很多,提高到了 5-10 分钟。执行过程还是 60 分钟,那总共的执行时间就缩短了 40%。同时减少了任务对 Hive Metastore 和 Name Node 的系统压力,保证了整个系统的稳定性。
2. Apache Iceberg 网易严选的实践
(1)原始架构
网易严选是网易的电商平台,它的很多业务数据是存储在数据库里的,比如大量的订单数据、商品数据。在数据库数据实时入湖的场景下以往迭代了多套方案,比如说用 DataX 做数据的全量抽取,Canal 做数据的增量抽取,并且每天将新增的 binlog 信息和前一天的历史数据去做合并,然后生成新一天的 ODS 数据。通过 kafka、kudu 搭建了实时数仓,实时数仓还需要同步到离线数仓。整体架构和技术方案非常复杂。
(2)Iceberg 流批统一架构
通过 Iceberg 的流批统一功能,严选完成了对以往架构的升级,做到数据入湖的流批融合。数据库数据不再需要 DataX 频繁地去数据库里面抽取,所有的数据都可以来自于实时链路,binlog 数据通过 kafka 实时更新到 Iceberg。binlog 中的更新、删除消息可以直接写入 Iceberg 中,故不再需要基于 binlog 的 merge 任务。
(3)提升
① 离线数据的延迟缩短为 5 分钟,可以做准实时的分析。
② OBS 数据的产出提前了半小时。
③ 500+ 任务,得到了一定数据体量的验证。
--
03/Arctic 如何高效治理 Apache Iceberg
1. Arctic 是什么
Arctic 是搭建在开源表格式上的湖仓管理服务。
(1)从架构层面讲。架构最底层是 storages 存储系统,中间层是 service 和 format ,最上层是计算引擎。Arctic 在 format 层提供了针对流式场景下功能更全面的的 Mixed streaming format,并在 service 层提供了数据湖表管理与自动优化服务。
(2)Arctic 的一个核心功能是完成数据湖中表的自动优化(self-optimizing)。Apache Iceberg 等表格式提供了表上丰富的写入与读取能力以帮助用户更灵活地使用数据湖满足自己的业务需求,但随着表上的操作变得越来越复杂,如何保证表中数据的质量以提供稳定高效的查询性能将变成一个不得不解决的问题,更具体来说需要解决的问题包括,实时写入引入的文件碎片问题,数据行级更新带来的数据冗余问题,多版本支持带来的数据清理问题。Arctic 的自动优化功能即提供了一种透明、自动的服务帮助解决上面提到的所有问题。
(3)基于 Apache Iceberg 表格式 Arctic 还提出了一种针对实时场景功能更完善的 Mixed streaming format,它在现有数据湖表格式之上还加入了毫秒级数据延迟的 LogStore,并更好地解决了实时与批量并发写入场景下的冲突解决问题。
2. Iceberg 表治理类型
Arctic 的一个核心功能是自动完成 Iceberg 表的优化。Iceberg 表上常见的维护操作包括:
(1)Expire Snapshots,清除过期文件
Iceberg 有很多的历史版本,会占用大量的存储资源,虽然它有一定的价值,但是也应该得到定期的清理。比如设定最多保留三天,超过三天数据就应该被清理。Expire Snapshots 就是实现这样的操作。
(2)Delete orphan files,清理垃圾文件
计算引擎在写入 Iceberg 的时候,如果它的写入任务失败了,可能会残留一些垃圾文件在那边。这些文件并没有提交进去,也不会影响到这个表的正确读取,但是浪费了一些存储资源。这些数据应该在后面定期地去做数据清理,把浪费的存储资源给释放出来。
(3)Compact data files,合并数据文件
① 合并碎片文件。在实时场景下,这个非常重要,实时场景下会频繁地往表中提交数据,这样就会产生很多小文件,这些小文件需要进行治理,以提升表上查询性能并减少对存储系统的压力。
② 减少数据冗余。在使用了行级更新的场景下,删除操作通过独立的 delete file 来标记,这就会造成数据冗余,冗余数据过多会大大降低表上的查询性能。故需要通过 compact data files 这样的操作把 delete 文件和 data file 做合并。
(4)Rewrite manifests,重写元数据
当表上的提交越来越多,manifest file 也可能会越来越多,过多的元数据文件同样会影响表上的读取性能,重写元数据操作会对元数据进行重新整理。
3. Arctic 治理 Iceberg 表
(1)全托管
① 定时触发
对历史快照、孤儿文件清理等操作适合定时触发。
② 按需触发
小文件合并、数据冗余合并,适合按需触发。
系统会实时统计表上的小文件与冗余文件情况,只有当满足触发条件时才会进行表上的合并。
(2)可观测
优化历史、资源消耗、优化收益都会在平台里面展示出来,比如花了多少资源,文件从多少个合并成多少个,文件的数据量、平均文件大小得到了多少的提升、数据量压缩后得到了多少的消减的,都能直观地展示出来。
(3)易运维
① 架构松耦合
Arctic 的引入对以往 Iceberg 的使用流程不会有任何影响,Arctic 作为一个独立服务进行搭建,只要将 Iceberg catalog 信息注册到 Arctic 中,Arctic 能自动发现已经存在的表,并根据配置自动完成表的优化。
② 配额隔离
Arctic 使用统一的资源完成所有表的自动优化以取得更高的资源使用率,同时支持为表配置优先级和资源配额,以得到更合理的资源分配。
③ 易扩展
Arctic service 作为中心化的服务只负责优化任务的调度,具体优化任务的执行支持 Yarn/K8s 等多种调度方式,优化任务的实现也通过了基于 Java app/Flink Job/Spark Job 等多种形式。
4. Arctic 小文件治理分类
(1)Minor optimize
实时场景会产生大量碎片文件,它们大多只有 KB 级别的大小,这些碎片文件需要更频繁地合并成更大的文件以快速提升表上的查询性能。
(2)Major optimize 减少数据冗余
当通过 minor optimize 将文件大小提升到一个更大的量级,比如 20-50MB 的量级,如果这时候再频繁地合入数据将造成较大的合并代价。这时候触发重写的条件将变成减少数据的冗余度,即当文件中有较多的数据被删除时再进行重写。
(3)Full optimize 全表重写
对整个表所有的数据做一次完整重写,将文件大小合并到理想大小,并完全去除掉所有的 delete file,这个过程中还可能进行数据的全局排序,全表重写后理论表上将获得最好的查询性能。
5. Arctic 小文件治理过程
(1)整体架构
Arctic 中自动优化的大体流程为 table monitor 监控新的表加入和已知表中的文件情况,当发现了这些表的文件情况达到触发优化的条件,optimize planner 就会针对该表生成优化任务,再通过 task scheduler 把优化任务分发到不同的 optimizer 中,然后这些任务在 optimizer 里面完成文件优化,之后把结果上报给 result handler,其中 optimizer 只负责文件的重写,不负责提交。result handler 负责收集结果后提交。optimizer manager 负责管理 optimizer 的生命周期,进行扩容或缩容的操作。
(2)Plan 过程
① 以分区为单位,热点分区会更频繁得进行文件优化。
② 配额限制资源,一个表不会使用超过其配额的资源。
(3)Execution 过程
① Long-running,optimizer 持续运行,不断地负责下发下来的优化任务。
② Flink stream job/java app 等多种执行方式。
③ K8s/Yarn 等多种执行框架。
(4)Commit
① 由 Arctic service 统一提交。
② 如果出现提交失败,将重试。
6. Arctic 快速流程
(1)安装AMS。
(2)登记 Iceberg Catalog。
在注册页面,将 Iceberg Catalog 注册登录到 AMS 系统。
(3)启动 Optimizer
注册成功之后,在页面上面启动 optimizer,启动起来后 Arctic 即可自动开始数据优化。
(4)可以在 AMS Dashboard 页面上观察各个表的优化结果。
--
04/Arctic 未来规划
1. 增加 Overview 页面
Dashboard 增加 overview 页面,统计 Iceberg 表的信息,比如它的数据量、文件情况、optimizing 情况,帮助用户更加快速地了解 Arctic 系统现在的状况。
2. 异步数据全局排序
支持全局排序,提升查询性能。
3. 异步二级索引构建
Iceberg 在做二级索引,Arctic 也在跟进。二级索引是异步的过程做构建的,Arctic 非常适合完成这种任务。
4. Optimize 消耗与收益的平衡
Arctic 可以上报查询的信息,比如做了一个查询,查询花了多少时间、文件的信息等都会上报。通过分析上报信息,可以发现哪些表是查询频繁的,它的现在的治理情况是什么样的,它的小文件问题有点不严重,根据表的不同情况对资源做针对性倾斜。
--
05/问答环节
Q1:有同学问,什么时候可以支持 Hudi?
A1:Hudi 社区也在开始实现自己的治理服务,说明一套完善的治理服务对数据湖是非常必要的,Arctic 社区还在和 Hudi 社区尝试完成在治理服务上的合作,大家可以持续关注动态。
Q2:有同场景下之前链路和数据湖链路的资源使用对比情况吗,包括存储和计算?
A2:以网易云音乐的那个例子为例,使用 Iceberg 和 Hive 在任务资源投入上是一样的,但是 Iceberg 处理时间缩短了 40%。严选场景更复杂一点,因为新的架构节省了很多的系统,比如以前部署 kudu 资源就不用了,用 Iceberg 准实时的数据湖能力去替换,这样资源就得到了减少。
网易内部还有一些其他的用 Arctic 和 Iceberg 进行数据湖建设的业务,他们都通过湖仓一体的建设完成了降本增效的目标。
Q3:Optimizer 如何运行在 K8s 上?
A3:optimizer 可以使用一个 Flink streaming job 来启动,Flink job 有 Yarn 模式,也有 K8s 的模式,通过这种方式能最快地将 optimizer 部署在 k8s 上。
Q4:Minor optimizer 不是把 Equality Delete 转换成 Position Delete吗?
A4:是的,minor optimize 同时也会将 equality delete file 转换成 position delete file,相较于前者后者会有更好的读取性能。
Q5:Iceberg 相对于 Hudi 的优势和不足,可以介绍一下吗?
A5:Hudi 天生面向实时更新场景而生,Iceberg 则是面向大体量的数据分析场景。现阶段 Iceberg 在实时更新场景下表现还是会弱于 Hudi,Arctic 使用 mixed streaming format 补足了 Iceberg 在这个场景下的不足。
Q6:在 Iceberg 中业务上更新如何实现呢?
A6:如果要使用行级更新能力,需要使用 Iceberg format V2,它通过引入一种新的 delete file 用来记录需要被删除的数据。一条更新会拆分成 一个 delete 和 一个 insert 数据, 把老数据删掉,把新数据写进去即完成了更新。
Q7:何时可不用 Kafka, 支持事件触发实时处理?
A7:如果表中都是 append 数据,当前的 Iceberg 版本已经可以满足这个需求,如果是数据库 CDC 写入的场景,Iceberg 社区正在实现自己的 CDC 功能,可以支持关注社区的发展。
Q8:主键点查的性能优化能一秒内出结果吗?
A8:这主要还是取决于表的数据量大小,不过 Iceberg 本身还是面向大体量的数据分析场景下的结构,主键点查等特殊场景可以尝试通过引入二级索引的方式来加速。
Q9:AMS 可以替代 HMS 吗?AMS未来的规划是否是这个方向?
A9:是这个方向,用 HMS 去管理 Iceberg 表遇到一些问题,比如 Iceberg 表用 HMS ,需要用锁做并发的写入,就是冲突的检测。锁很不稳定,可能有超时的问题。AMS是非常稳定和高效的。所以现在主推的就是用 AMS 充当 metastore 的能力,替换掉 HMS。
Q10:是否可以考虑支持多区数据中心统一元数据管理呢?
A10:可以的。因为 Arctic 是面向多 catalog 的,每个 catalog 对应着一套集群,或者说多区数据中心,可以把这些数据中心全部作为 catalog 给注册上来应该就可以了。
Q11:二级索引何时能够支持哪些索引?
A11:二级索引预计要明年上半年,会优先支持 bloom filter,另外的就是 sketches,它是用于进行去重计算的一种数据结构。
Q12:Iceberg 支持的底层存储可以使用 mini IO 吗?
A12:可以的,直接使用 S3 FileIO 即可。
Q13:Arctic 在安全上有什么样的做法或者是规划?
A13:Arctic 还是会优先完成对 Ranger 的对接,完成对表的权限控制。
Q14:Arctic 支持 Watermark 吗?
A14:Arctic 在 0.4.0 版本新增了 watermark 相关功能,能够计算出表当前的水位信息,帮助用户了解表内数据的写入进度,进一步用来判断是否能够完成后续的计算。
今天的分享就到这里,谢谢大家。
▌2023数据智能创新与实践大会
- 4大体系,专业解构数据智能
- 16个主题论坛,覆盖当下热点与趋势
- 40+演讲,兼具创新与最佳实践
- 1000+专业观众,内行人的技术盛会
第四届DataFunCon数据智能创新与实践大会将于⏰ 7月21-22日在北京召开,会议主题为新基建·新征程,聚焦数据智能四大体系:数据架构 、数据效能 、算法创新 、智能应用 。在这里,你将领略到数据智能技术实践最前沿的景观。
欢迎大家点击下方链接获取大会门票~
DataFunCon2023(北京站):数据智能创新与实践大会