Fork me on GitHub

字节跳动湖平台在批计算和特征场景的实践

本文整理自火山引擎云原生计算研发工程师刘纬在 DataFunCon 2022 上的演讲。随着业务发展,字节跳动特征存储已到达 EB 级别,日均增量 PB 级别,每天训练资源量级为百万 Core。随之而来的是内部业务方对原始数据存储、特征回填需求、降低成本、提升速度等需求的期待。本次分享将围绕问题背景、选型& Iceberg 简介、基于 Iceberg 的实践及未来规划展开。

作者|火山引擎云原生计算研发工程师-刘纬
整理|王吉东、于惠
来源:字节技术

01 问题背景

用户使用流程

如我们所知,字节跳动是一家擅长做 A/B test 的公司。以特征工程调研场景为例,流程如下:

  • 首先由算法工程师进行在线特征抽取;

  • 将抽取到的特征,使用 Protobuf 的格式按行存至 HDFS;

    • 出于存储成本的考量,一般只存储抽取后的特征,而不存储原始特征
  • 将 HDFS 存储的特征交由字节自研的分布式框架( Primus )进行并发读取,并进行编码和解码操作,进而发送给训练器。

  • 由训练器对模型进行高效训练

    • 如果模型训练效果符合算法工程师的预期,说明该调研特征生效,进而算法工程师对调研特征进行回溯,通过 Spark 作业将特征回填到历史数据中,分享给其他算法工程师,进而迭代更多的优质模型
    • 如果模型训练效果不符合算法工程师的预期,则调研特征不对原有特征集合产生影响

图片

** 业务规模**

公司庞大的业务规模,带来了巨大的计算和存储体量:

  • 特征存储总量达 EB 级;
  • 单表特征最大可达百 PB 级(如广告业务);
  • 单日特征存储增量达 PB 级;
  • 单日训练资源开销达 PB 级。

图片

遇到的问题

当特征调研场景叠加巨大的数据体量,将会遇到以下困难:

  • 特征存储空间占用较大
  • 样本读放大,不能列裁剪,很难落特征进样本;
  • 样本写放大,COW 很难做特征回溯调研;
  • 不支持特征 Schema 校验;
  • 平台端到端体验差,用户使用成本高

02 选型& Iceberg 简介

在特征调研场景下,行存储是较为低效的存储方式;因此选择 Iceberg 存储方式来解决上述问题。

整体分层

图片

Apache Iceberg 是由 Netflix 公司推出的一种用于大型分析表的高性能通用表格式实现方案。

如上图所示,系统分成引擎层、表格式层、文件格式层、缓存加速层、对象存储层。图中可以看出,Iceberg 所处的层级和 Hudi,DeltaLake 等工具一样,都是表格式层:

  • 向上提供统一的操作 API

  • Iceberg 定义表元数据信息以及 API 接口,包括表字段信息、表文件组织形式、表索引信息、表统计信息以及上层查询引擎读取、表写入文件接口等,使得 Spark, Flink 等计算引擎能够同时高效使用相同的表。

  • 下层有 parquet、orc、avro 等文件格式可供选择

  • 下接缓存加速层,包括开源的 Alluxio、火山引擎自研的 CFS 等;

    • CFS 全称是Cloud File System, 是面向火山引擎和专有云场景下的大数据统一存储服务,支持高性能的缓存和带宽加速,提供兼容 HDFS API 的访问接口。
  • 最底层的实际物理存储,可以选择对象存储,比如 AWS S3,火山引擎的 TOS,或者可以直接使用 HDFS。

通过上图可以较为清晰地了解到,Iceberg 抽象层最大的优势在于:将底层文件的细节对用户屏蔽,将上层的计算与下层的存储进行分离,从而在存储和计算的选择上更为灵活,用户可以通过表的方式去访问,无需关心底层文件的信息。

Iceberg 简介

Iceberg 架构

图片

Iceberg 的本质是一种文件的组织形式。如上图所示,包括多级结构:

  • Iceberg Catalog:保存表和存储路径的映射关系,其核心信息是保存 Version 文件所在的目录。
    • Iceberg Catalog 共有8种实现方式,包括 HadoopCatalog,HiveCatalog,JDBCCatalog,RestCatalog 等
    • 不同的实现方式,其底层存储信息会略有不同;RestCatalog 方式无需对接任何一种具体的存储,而是通过提供 Restful API 接口,借助 Web 服务实现 Catalog,进一步实现了底层存储的解耦。
  • Metadata File:存储表的元数据信息,包括表的 Schema、分区信息、快照信息( Snapshot )等。
    • Snapshot 是快照信息,表示表在某一时刻的状态;用户每次对 Table 进行一次写操作,均会生成一个新的 SnapShot。
    • Manifestlist 是清单文件列表,用于存储单个快照的清单文件。
    • Manifestfile 是存储的每个数据文件对应的清单文件,用来追踪这个数据文件的位置、分区信息、列的最大最小值、是否存在 Null 值等统计信息。
  • Data File 是存储的数据,数据将以 Parquet、Orc、Avro 等文件格式进行存储。

Iceberg 特点

  • SchemaEvolution:Iceberg 表结构的更新,本质是内在元信息的更新,因此无需进行数据迁移或数据重写。Iceberg 保证模式的演化( Schema Evolution )是个独立的、没有副作用的操作流程,不会涉及到重写数据文件等操作;
  • Time travel:用户可任意读取历史时刻的相关数据,并使用完全相同的快照进行重复查询;
  • MVCC:Iceberg 通过 MVCC 来支持事务,解决读写冲突的问题;
  • 开放标准:Iceberg 不绑定任何计算引擎,拥有完全独立开放的标准,易于拓展。

Iceberg 读写流程和提交流程

图片

1.读写

  • Iceberg 的写操作,只有在发生 Commit 之后才可读;如有多个线程同时在读,一部分线程在写,就只有在 Commit 全部数据之后,对用户进行的读操作才能被用户的读线程所看到,从而实现读写分离;
  • 例如上图中,在对 S3 进行写操作的时候,S2、S1 的读操作是不受影响的;此时 S3 无法被读到,只有Commit 之后 S3 才会被读到。此时 Current Snapshot 会指向 S3;
  • Iceberg 默认从最新 Current Snapshot 读取数据;如果读更早的数据,可通过指定对应的 Snapshot ID ,实现数据回溯。

2.事务性提交

  • 写操作:记录当前元数据的版本——Base Version,创建新的元数据以及 Manifest 文件,原子性将 Base Version 替换为新的版本;
  • 原子性替换:原子性替换保证了线性历史,通过元数据管理器所提供的能力,以及 HDFS 或本地文件系统所提供的原子化 Rename 能力实现;
  • 冲突解决:基于乐观锁实现,每一个 Writer 假定当前没有其他的写操作,即对表的 Write 进行原子性的 Commit,若遇到冲突则基于当前最新的元数据进行重试。

分区裁剪

  • 直接定位到 Parquet 文件,无需调用文件系统的 List 操作;
  • Partition 的存储方式对用户透明,用户在修改 Partition 定义时,Iceberg 可以自动地修改存储布局,无需用户重复操作。

谓词下推

Iceberg 在两个层面实现谓词下推:

  • 在 Snapshot 层面,过滤掉不满足条件的 Data File;
  • 在 Data File 层面,过滤掉不满足条件的数据。

其中,Snapshot 层面的过滤操作为 Iceberg 所特有,正是利用到 Manifest 文件中的元数据信息,逐字段实现文件的筛选,大大地减少了文件的扫描量。而同为Table Format 产品、在字节其他业务产线已投入使用的 Hudi,虽然同样具备分区剪枝功能,但是尚不具备谓词下推功能。

03 基于 Iceberg 的实践

Hudi、Iceberg、DeltaLake 这三款 TableFormat 产品各有优劣,然而并没有任何一款产品能够直接满足我们的使用场景需求;考虑到 Iceberg 具备良好的 Schema Evolution 能力、支持下推,且无需绑定计算引擎等优点,因此字节选择使用 Iceberg 作为数据湖工具。

整体架构

图片

  • 在字节的整体架构中,最上层是业务层,包含抖音,头条,小说等字节大部分业务线,以及火山引擎云原生计算等相关 ToB 产品(如 Seveless Spark 等);
  • 在平台层,使用 Global Lake Service 给业务方提供简单易用的 UI 和访问控制等功能;
  • 在框架层,使用 Spark 作为特征处理框架(包含预处理和特征调研等),使用字节自研的 Primus 分布式框架作为训练框架,使用 Flink 实现流式训练;
  • 在格式层,选择 Parquet 作为文件格式,使用 Iceberg 作为表格式;
  • 最下层是调度器层和存储层。选择 Yarn 和 K8S 作为调度器;存储层一般选择 HDFS 进行存储,对于 ToB 产品,则使用 CFS 进行存储。

Data-Parquet

图片

结合上图可以看出, 列存储在特征调研场景存在以下优势

  • 可选择指定列进行读取:有效减少读放大问题,同时易于增列,即新增一列的时候,只需单独写入一列即可,元数据信息会记录每一列所在的磁盘位置;
  • 压缩:同一列的数据格式相同,因此具有更好的压缩比;同一列的数据名称相同,因此无需进行冗余字符串存储;
  • 谓词下推:对每一列数据记录相应的统计信息(如 Min,Max 等),因此可以实现部分的谓词下推。

为了解决业务方的痛点问题,改成使用 Parquet 列存储格式,以降低数据的存储成本;同时由于 Parquet 选列具备下推到存储层的特性,在训练时只需读取模型所需要的特征即可,从而降低训练时序列化、反序列化的成本,提升训练的速度。

然而使用 Parquet 列存储,带来优点的同时也相应地带来了一些问题:

  • 原来的行存储方式是基于 Protobuf 定义的半结构化数据,无需预先定义 Schema;然而使用 Parquet 之后,需要预先指定 Schema 才能进行数据的存取;这样在特征新增和淘汰的时候,Schema 的更新将会变成一个棘手的问题。
  • 此外,Parquet 不支持数据回填;如果需要要回填比较长的数据,就需要将数据全量读取,增加新列,再全量写回。这样一方面会造成大量计算资源的浪费,另一方面会带来 Overwrite 操作,导致正在进行训练的任务由于文件被替换而失败。

为了解决以上两个问题,通过引入 Iceberg 支持 SchemaEvolution,特征回填以及并发读写。

特征回填

COW

图片

从上图可以看出,使用 Iceberg COW( Copy on Write )方式进行特征回填,通过 BackFill 任务将原快照中的数据全部读出,然后添加新列写出到新的 Data File 中,并生成新的快照。这种方式的缺点在于,仅仅新增一列数据的写入,却需要整体数据全部读出后再全部写回,浪费了大量的计算资源和存储资源;因此,我们基于开源的 Iceberg 自研了一种 MOR( Merge on Read )的 BackFill 方案。

MOR

图片

从上图可以看出,在 MOR 方案中,我们仍然需要一个 BackFill 任务来读取原始的 Data File 文件;所不同的是,我们只需读取少数需要的字段。例如对 A 列通过一些计算逻辑生成 C 列,那么 BackFill 任务只需从 Snapshot1 中读取 A 列的数据,且只需将 C 列的 Update 文件写入 Snapshot2 即可。

随着新增列的增多,需要将 Update 文件合并到 Data File 里面;为此,可以进一步提供一种 Compaction 逻辑,即通过读取旧的 Data File 和 Update File,合并生成新的 Data File。实现细节如下:

  • 旧 Data File 和 Update File 增加一个主键,每个文件按照主键排序;
  • 读取旧 Data File 时根据用户选择的列,分析具体需要哪些 Update File 和 Data File;
  • 根据旧 Data File 中 Min-Max 值去选择对应的 Update File。

由此可以看出,MOR 的本质是对多个 Data File 文件和 Update File 文件进行多路归并,归并的顺序由 SEQ 决定,SEQ 大的数据(表明数据越新)会覆盖 SEQ 小的数据。

两种特征回填方式对比

  • COW:读写放大严重、存储空间浪费、读取逻辑简单、写入耗费更多资源、读取无需额外计算资源;
  • MOR:没有读写放大、节省存储空间、读取逻辑复杂、写入耗费较少资源、绝大多数场景,不需要额外资源;

相比于 COW 方式的全量读取和写入,MOR 的优势在于只读取需要的列,同样也只写入更新的列,因此避免了读写放大的问题,节省大量计算资源,并大大降低读写 I/O;相比 COW 方式每次 COW 翻倍的情况,MOR 只需存储新增列,大量节省了存储资源。

对于模型训练任务而言,大多数模型训练只需要用到少量的列,因此大量的线上模型都无需 MOR 操作,涉及开销可忽略不计;对于少数的特征调研模型,只需读取模型对应的 Update File 即可,因此带来的读取资源增加也非常有限。

平台化改造

图片

这里分享下字节内部实现的平台化工作。上图是批式特征存储的列表,借助站内实现的湖平台化工作,业务部门可以轻松实现特征的可视化操作,以及信息概览的获取。

下图是一张特征表样例,通过这张表可以直观地看到存储空间的使用、文件数的统计、记录数统计、特征统计等信息。

图片

其他

图片

除了上面提到的借助 Compaction 提高读性能以及分析特征删除场景外,我们还提供了以下几个服务:

  • Expiration

    • Snapshot Expiration: 用于处理过期的 Snapshots。过期 Snapshots 不及时清理,会导致元数据文件堆积,从而带来文件膨胀问题,会给算法工程师带来困扰,因此需要服务定期做一些清理。我们通过平台化改造实现 Snapshots 文件的统一维护和清理;
    • Data Expiration: 大部分数据是有新鲜度和时效性的,因此用户可设置数据保存多久后被清理。
  • CleanUp:由于一些事务的失败,或者一些快照的过期,导致文件在元数据文件中已经不再被引用,需要定期清理掉。

  • Roll-Back:对于一些在 Table 中非预期数据或者 Schema 变更,希望将其回滚到之前稳定的 Snapshot;结合平台的事件管理器,可以比较容易的实现这一功能。

  • Statistics: 用来实现一些湖平台可视化信息的展示,以及后端服务给业务带来的价值归纳。

04未来规划

规划重点

在未来规划中,计划逐步支持以下功能:

  • 湖冷热分层 :在成本优化方面,可以通过湖冷热分层实现。前文提到对于保存超过一定时间的数据,可以直接删除;然而在某些特定的场景下,这些数据还会被使用,只是访问频率较低;因此未来考虑增加数据湖冷热分层功能,帮助用户降低成本。
  • 物化视图 :在查询优化方面,通过物化视图提升查询性能。该功能是源于 ToB 客户的真实场景需求,目前这部分的优化工作正处于商业化交付流程中,大家可以后续在火山引擎官网相关的产品上进行体验。
  • Self-Optimize :在体验优化方面,实现 Self-Optimize,例如前文提到的一些数据维护的优化等。
  • 支持更多引擎 :为了增加生态的丰富度, Iceberg 在未来也会逐渐更多的引擎。

整体平台架构总览

图片

整体平台架构以计算引擎产品为核心,包含两部分服务:

  • 云原生管理控制 :Quota 服务、租户管理服务、运行时管理、生态整合服务、交付部署服务、网关服务;
  • 云原生运维平台 :组件服务生命周期管理、Helm Chart 管理、日志&审计、监控报警、容灾&高可用;

如前文所述,该平台不仅支持公司内部的业务,还会支持一定的 ToB 的业务,以上在字节内部实现的功能,以及未来规划的能力也会基于内外一致的思路进行演进;最终都会落地到上图中涉及到的几款云原生计算产品中,如流式计算 Flink 版,云原生消息引擎 BMQ,云搜索服务 OpenSearch,大数据文件存储 CloudFS 等。以上均为 Serverless 的全托管产品,让用户更聚焦于自己的业务逻辑,减少数据运维带来的困扰。


本文地址:https://www.6aiq.com/article/1677084749165
本文版权归作者和AIQ共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出