Fork me on GitHub

Delta Lake在BI+AI产品中的实践

以下文章来源于 https://zhuanlan.zhihu.com/p/640436278

导读 本文根据观远数据研发主管李迪砺讲座整理,分享主题为《Delta Lake 在 BI+AI 产品中的实践》,主要从以下几个方面介绍:

  1. 观远数据分析产品简介

  2. Delta Lake的应用实践

  3. 总结和展望


分享嘉宾|李迪砺 观远数据 研发主管

编辑整理|杨康 南京大学

出品社区|DataFun


01/观远数据分析产品简介

观远数据成立于2016年,总部位于杭州,主要为企业提供一站式的数据分析与智能决策产品和解决方案,客户包括联合利华、招商银行、安踏、元气森林、小红书、B站等,分布在零售、消费、金融、互联网等各个领域。公司的愿景是------"让业务用起来,让决策更智能"。我们发现很多时候数据分析产品并没有很好地在企业内部被用起来,往往是业务提需求,让IT部门做一些数据处理和报表,这个周期可能会比较长,并不利于敏捷、及时的分析决策。

关于数据分析产品功能,观远已经做的比较完备,涵盖了数据接入、数据开发、数据分析、数据应用等各个环节。产品围绕着"让业务用起来",在易用性方面有一些比较有特色的功能,比如智能ETL,它也是最受客户欢迎的功能之一,使用门槛非常低,业务人员可以不用关心SQL,通过拖拉拽的方式进行一些数据算子的组合来进行数据开发。另外我们基于Delta lake和Python开发了数据解释功能,提供了对数据进行多维分析的能力,可以去探寻数据背后的根因,提升我们对数据的洞察能力。

这里举一个具体的客户案例,某头部银行,其BI平台月活达到4万以上,百分之九十的分析行为可以在3-5秒内完成。他们的计算引擎构建在18000核的超大集群上,每日完成超过50万的Spark任务。能够支撑这么多用户活跃地使用,背后是依托于Delta Lake和Spark的存储计算方案。接下来将介绍我们在这一领域的一些实践。

02/Delta Lake的应用实践

1. 数据湖架构介绍

Delta Lake是Databricks公司开源的数据湖存储方案,最初选择Delta Lake的一个原因也是因为是Databricks开源,和spark的结合应该会有比较好的性能。

这是一个比较典型的架构图,它依托于底层的 HDFS 或者是对象存储,又或者云上的一些存储方案,去支撑上层的BI、AI应用,对多应用的支持也是数据湖的一个重要的特性。数据入湖的方式包括批量和流式。在BI的分析场景,批量方式会多一些,实时能力也在逐渐兴起。

接下来我们看一下将 Delta应用在BI平台之后的整体架构。数据接入层负责将客户的数仓、业务系统、文件、API通过全量、增量、CDC的方式,接入到我们的平台中。数据存储和管理负责Delta Lake数据集、元数据管理,权限管理、血缘管理、数据质量检查以及一些优化策略等。

数据处理和调度部分,离线开发主要基于Spark进行,它作为核心计算引擎,同时也支持机器学习工具,如Pandas,delta-rs 等。我们也引入了 ClickHouse作为查询加速引擎,和Spark在某些场景形成互补。使用DolphinScheduler 作为任务编排和调度工具。数据分析和应用层包括BI平台、数据科学平台以及一些数据应用等。

2. Delta Lake的特性及应用

Delta Lake 的重要特性包括ACID事务的支持、全量/增量更新、Schema管理、对多引擎的支持(包括spark、机器学习框架等)、数据版本支持、分区、存算分离适配多种存储方案以及流批一体的能力。

首先我们来看一下Delta Lake的表结构,其中 delta_log目录用来记录对表的变更历史。每次commit都会生成一个JSON文件,每10次提交会生成一个 checkpoint文件。为什么会有checkpoint文件?它可以在Spark读取数据的时候提供一些性能优化。当通过Spark去访问时,可以基于某一个checkpoint文件以及之后的变更,不用去遍历以往的大量的JSON 文件,从而提高访问效率。如果设置分区字段,我们就会看到类似于 date=2019-01-01这样的文件夹,它表示在date字段上设置了分区,目录下的parquet文件就是分区中的数据。如果没有设置分区,这些parquet文件就会以平铺的方式进行组织。

接下来介绍一下ACID。首先,原子性方面,通过delta log来进行控制和管理。在一个事务中,数据文件会被写入到数据文件夹下。当事务完成时,会向delta log写入一条新的记录,其中包括在事务中所有被修改的文件路径,每一次提交都会增加表的版本号。操作过程中可能会发生异常。如果数据文件已经被写入到文件夹下,当事务失败时,这些文件将不会作为表的数据文件。一致性采用乐观并发控制的方式,会将写操作分为三阶段。首先是读取最新版本检查哪些文件需要修改,接着开始写入数据文件,最后是验证和提交。在这个阶段中,会检查所有将要提交的变更和其他并发的事务有没有冲突,如果没有冲突,就可以进行提交,生成一个新的版本,写操作完成。

Delta Lake默认隔离级别是写序列化,结合上面介绍的乐观并发控制策略,可以提供比较好的一个吞吐能力。最后,因为 Delta Lake表是存在 HDFS、S3 或者 NAS 这些存储方案上,这些存储服务本身也提供了高可用和持久化的能力,因此它的持久性是依托于底层的存储服务来实现的。

当我们对数据集进行并发更新的时候,尤其是并发修改可能会涉及到相同的数据文件时,仍然有可能会发生并发异常。BI业务的特点是平台上每天都会运行大量的任务,当出现异常的时候,不仅会影响当前的任务,也会影响后面的任务。我们可以基于业务特点,通过一些优化来避免这些影响。比如可以对每个表维护一个写操作的队列,去顺序执行。这里面的操作包括更新、小文件合并、版本清理等可能会出现并发异常的操作。小文件合并和版本清理也是性能优化的重要手段。

全量增量更新的能力,也是 BI业务中的基本能力。其中全量覆盖用在表初次加载或者重建的时候。增量更新也是一个非常重要的特性,当我们以 t +1 的方式从客户系统中取数时,可以使用基于时间戳的增量更新机制来提高加载效率。Delta Lake也支持追加新数据的方式,这时候不会对历史数据做修改。

数据入湖的方式,对于数据仓库或者业务库来说,通常可以采用JDBC 的方式从源端抽数生成一个临时文件,通过 Spark 把它转化为Delta Lake的数据集。这种方式对接方便,较为通用。针对抽取 Hive 表比较慢的情况,因为 Hive 表本身的数据文件以及Delta Lake的数据文件都会存在 HDFS 上,没有必要再去生成临时文件,而是可以通过 Spark 去直接加载 Hive 表并进行转换,这样就大大提高了加载效率。

衍生的一个问题是,客户希望可以将 Hive 表和Delta Lake表进行直接的ETL。我们的做法如下图所示,依然通过 Spark 来加载 Hive 表和 Delta Lake表,并进行处理。前文中介绍过智能 ETL 这一模块,我们也在不断将这些能力和ETL进行结合,提升数据开发的易用性。

Schema 管理也是数据开发中常被讨论的一个话题。默认情况下Delta Lake不允许追加 schema不匹配的数据。不过它也提供了一种机制来支持schema变化,比如通过 mergeSchema参数。举个例子,假设初始的源表有两个字段,分别是first_name和age,通过它去创建一个目标的Delta Lake表会和源表具有相同的 schema信息。当源表的Schema发生变更,比如age字段被删除,添加了一个新的字段,这时如果我们再向之前的那个表去追加数据时,会发生错误。我们可以使用 mergeSchema的机制来向目标表去追加新数据,这样目标表schema 也会反映源表的变更。

多引擎支持方面,Spark 是我们核心的处理引擎,它是和Delta Lake结合得最紧密,也是使用场景最广泛的计算引擎。我们的产品中使用 Spark 和Delta Lake来进行数据的接入、开发、数据分析等任务。Spark本身作为大数据技术中的明星,有很多优异的特性,包括对大规模集群的支持、高效任务处理、社区活跃以及性能表现优越等。Delta-rs是我们在算法实验中比较重要的一个工具,他是一个rust库,上层也实现了python API。引入delta-rs的主要原因是避免启动一个很重的spark应用,而是把大部分计算资源预留给E TL和交互式查询使用,而且算法任务中希望通过python直接消费数据,delta-rs刚好提供了这样的能力。相比spark,他有更好的读取性能。不过缺点是对写入支持不完善,存在bug,我们也对社区提了一些优化。

Standalone Reader 是一个java库,可以比较方便的读取数据、和schema。不过这个项目感觉缺乏维护,存在一些bug。也不支持sql查询和写入。使用场景比较有限,比如说数据集的简单预览等。我们也对这个项目做了一些修复。

时间旅行,也叫 time travel,指的是对数据多版本的支持。给大家介绍一个算法实验中的场景,比如我们的算法工程师在进行算法实验时,除了使用最新的数据外,也想使用历史版本的数据来进行实验效果的比对,时间旅行刚好可以很好地解决这些问题。

关于分区,也以一个算法实验的场景来说明如何通过分区来提高算法实验的效率。我们和客户合作的销量预测场景,业务上区分了不同的产品线,比如洗浴产品、食品等。每条产品线的业务形态不同,因此需要去分别构建模型。不过这些特征工程的逻辑比较接近,所以我们可以把这些都放在一个数据集里,通过分区来管理。因为不同分区的写入不会发生冲突,这些流程都可以并发运行,从而提升了的执行效率。对分区字段的选择,也是有一些要求的,比如一般是date类型,在进行正式的分区之前,可以做一些前置检查看字段是否适合分区,避免使用错误的情况;另外,分区适合对大表使用。

下图是一个典型的流式写入流程,通过一些实时同步工具,将源端数据同步到Kafka,再通过Spark Structured Streaming进行增量更新,同步到Delta Lake,供上层应用使用。

性能优化主要在以下四个方面:

  • 小文件合并(compaction):当持续更新数据集时,数据文件会不断增多,我们观察发现,当对一个数据集进行比较高频的更新,比如每 5 分钟做一次更新,在几个小时之内,文件数量就可能增加到数万甚至更多。大量的文件会严重影响 Spark 的查询性能。因此我们需要将大量的小文件压缩为少量大文件,去提高访问效率。
  • 版本清理(vacuum):在数据分析的很多场景我们只需要使用最新数据就可以了,Delta Lake提供了vacuum机制来进行版本清理。如果使用了时间旅行,需要根据情况来制定版本清理策略。关于小文件合并和历史版本清理的时机,我们一般通过定时任务来触发,我们也在客户环境观察到这样的现象,比如一天之内会有几个任务比较密集的时间段,文件和磁盘的增长都比较快,我们的定时策略可以灵活地调整,在这些任务完成之后进行及时地合并和清理。
  • 仅读取需要的列:算法中也有一些典型应用场景,比如可以把Delta Lake的表当成特征库来使用,构建包含数百列的大宽表,但每个产品线在建模时,可以只选择其中所需要的列,这也利用了列式存储的优势。
  • 持续升级版本

03/总结和展望

我们会持续地去升级版本,使用新的一些特性,比如Z-Order对查询性能的优化,DML增强等。同时,我们也会让我们的产品更加云原生,融合多引擎,包括 Databricks,ClickHouse等。也计划让Delta Lake更加开放,可以通过 SQL 的方式提供给其它工具使用。此外,我们计划可以基于数据集catalog做数据资产管理。我们也在持续的回馈社区,公司内有多位delta lake,spark,delta-rs的contributor。



本文地址:https://www.6aiq.com/article/1688028641961
本文版权归作者和AIQ共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出