Presto+Alluxio 加速 Iceberg 数据湖访问
转载地址
**导读:**本文将分享 Alluxio 社区和 Presto 社区在数据湖方面的一些工作,主要聚焦 Iceberg。
文章包括以下几个部分:
-
Presto & Alluxio
-
Alluxio & Iceberg
-
最佳实践
-
未来的工作
分享嘉宾|王北南博士 Alluxio 软件工程师
编辑整理|唐洪超 敏捷云
出品社区|DataFun
01/Presto & Alluxio
1. Presto Overview
Presto 是一个里程碑式的产品,它能够让我们很简单的不需要数据的导入和导出,就可以使用标准的 SQL 来查询数据湖仓上的数据。早先是数据仓库 data warehouse 即 Hive 数据仓库,之后出现了 Hudi 和 Iceberg,有一些公司用 Presto 查询 Kafka ,还有 Druid 等等。Druid 很快,但是可能对 Join 支持不好,可以用 Presto 直接查询 Druid 一步到位,然后通过一些计算的 pushdown,能够让 Druid 中有些跑得比较困难的任务得到很好的运行。
Presto 中有一个概念叫做交互式的查询 ,即在几秒种最多几分钟返回一个结果。现实中很多人用 Presto 来做秒级查询,即 subsecond 的查询,一秒钟返回结果,得出一些很快很高效的 dashboard。也有人用 Presto 来处理一些几小时的 job,甚至用 Presto 来部分取代 ETL,通过 SQL 语句就能直接处理数据,简单易用。Presto 处理的数据量为 PB 级,在日常的使用中,一般一个 Presto 集群,一天处理几十个 PB 的数据,还是很容易的。当然,集群越多,处理的数据量也越大。
目前 Presto 有两个开源的社区,一个是 prestodb,此社区主要是由 Facebook 领导的社区,包括 uber、Twitter,以及国内公司 TikTok,腾讯都有参与。
另一个社区是trinodb,prestodb 分出去之后,新建的开源社区更加的活跃。此社区背后的商用公司叫 starburst,这个社区更加活跃,用户会更多一些,但是 prestodb 背后的大厂多一些。
Presto 目前的使用场景有很多,很多数据科学家和数据工程师通过 SQL 查询 想要的数据;一些公司决策使用的 BI 工具,比如 tableau 和 zeppelin;公司决策需要报表和 dashboard,这些 query 可能需要在几秒钟快速地完成,将数据展示出来,比如广告的转化率和活跃用户,这些数据需要实时或准实时的反馈出来;还有一个场景就是A/B testing ,因为它的好处就是很快,结果能够很快的反馈回来;最后一个是ETL,ETL 是很多公司的数据仓库或者数据平台的基石,非常重要,但是 Presto 并不是特别适合在这个领域,虽然很多人使用 Presto 来处理一些 ETL 的 job,但是 Presto 并不是一个很容错的系统,如果计算过程中间坏掉,整个查询可能就要从头开始了。
下图展示了 Presto 发展的历史。
2. Presto 主体架构
上图是 Presto 的主体架构,coordinator 如同一个 master,负责调度,当有一个查询进来时,把 SQL 解析生成查询的 plan,然后根据 plan 分发给若干个 worker 执行。根据不同的运算性质,每个 worker 去查对应的数据源,数据源可能是 Hive 数仓,也可能是数据湖 Iceberg 或者 Hudi,不同的数据源对应不同的 connector。connector 在使用的时候,其实在 Presto 里就像一个 catalog 一个 namespace。比如在 SQL 中查询 Hive 数据仓库中的部门表,通过 hive.ADS.tablename 就可以把这个 table 找到。
由于 Presto 有着多个 connector 和 catalog,天生能够提供数据的 federation,即联合。可以在 Presto 中联合不同的数据源,可以来自 Hive 、Iceberg 、Kafka 、Druid、mysql 等各式各样的数据源,并把来自多个数据源的数据 join 到一起。Presto很灵活,如很多人还把 Hive 的表跟 Google 的 spreadsheet 表格 join 到一起。
目前 presto 主要的数据来源可能 95% 甚至 99% 是来自 Hive 。当然现在也有些变化了,由于数据湖的崛起,可能越来越多流量会转向数据湖 Iceberg 和 Hudi。
3. Presto + Alluxio Overview
Presto 访问数据源就是通过直连的方式,比如要访问 HDFS 就连到 HDFS 上。有的公司可能数据源太多,可能有十几个 HDFS 的集群,这时候 presto 需要一个统一的命名空间,此时 Presto 可以提供一个联合,在物理的数据层上面提供一个抽象层,看起来就像是一个 cluster,然后在 Presto 中呈现出来的就是一个统一的命名空间,这个功能还是挺方便的。
4. Presto 与 Alluxio 结合
Presto 查数据并不是把数据给吃进来,而是访问数据的原始的存储,数据存储在 HDFS 就访问 HDFS,当 SQL 查询进来后翻译完,去到这个 Hive Metastore 中拿到元数据,通过元数据找到表数据存储在哪个目录中,将该目录分开,然后让每个 worker 读取若干的文件去计算结果。在结合 Alluxio 的工作时,改变了缓存路径。
其实在商用版本有更好的一个功能。可以不改变这个路径,还是这个 S3 路径,但它其实使用了本地的 Alluxio,当然这在我们数据库中遇到一些麻烦,因为数据库中 expert 文件里边是 hard code 而不是死的路径,为缓存带来了一些麻烦,我们通过转换,让本来是访问原始数据的存储,通过 election 变成访问本地的数据源,得到提速的效果。
5. Co-located deployment
我们提出提供了另外一种部署的方式。我们把 Presto worker 和 Alluxio worker 部署在同一台物理机上。这样保证了数据的本地性。确保数据加载到了 Presto worker 的本地。这里 Presto DB 上有更精简的实现方式 ,在 to local cache 项目中,有 local cache 实现数据的本地化,通过数据本地化省掉网络传输。对于 Alluxio 就是 Co-located 的部署方式。它跟 HDFS 相比也省掉了一次网络的传输。
6. Disaggregated deployment
国内很多公司使用数据一体机,将 Presto、Spark、HDFS、 ClickHouse 等都放到一起。针对这种情况,推荐的实现就是用 in memory 的 Lark show 的 local cache,会有非常好的提速,即 local cache 结合 Alluxio worker ,能有百分之四五十的提速。缺点在于这种实现需要使用很多的内存,数据缓存在内存中,通过 SSD 或者内存来给 HDD 或者慢速的 SSD 做一个提速。这种方式即 Alluxio worker 跟 Presto worker 捆绑到了一起,200 个 Presto worker节点,就需要 200 个 Alluxio worker,这种方式会导致拓展的时候可能出现问题。
所以当数据量特别巨大,且跨数据中心访问的时候,更推荐分离式 disaggregated 的部署方式。
--
02/Alluxio & Iceberg
Hive 数据仓库 已经有十几年的历史了,但是一直存在着一些问题,对于一个表的 Schema 经常有多人的改动,且改动往往不按规律改,原来是简单类型,改成了复杂类型,导致无法保证数据的一致性,如果一条 SQL 查询两年的数据,这个表很可能两年中改了好几次,可能很多列的类型也改了,名字也改了,甚至可能删掉或者又加回来,这就会导致 Presto 报错,即使 Spark 也很难在数据 Schema 修改的过程中做到完全兼容。这是各个计算引擎的通病。
其实最早我们讨论 Iceberg 这个方案的时候,最想解决的就是 Schema 的升级变化问题,另外想解决的就是数据版本的一致性问题。众所周知,数据可能中间会出错,此时需要数据回滚从而查看上一个版本的数据,也可能要做一些 time travel 查指定时间版本的数据。有些数据是追加的,可以通过 partition 按时间来分区,通过 partition 查询指定时间分区数据。有的数据集是快照数据集,数据后一天覆盖前一天,历史数据无法保留,而 Iceberg 能解决这个问题。
其实 Iceberg 并没有提供一个新的数据存储,它更多的是提供一个数据的组织方式。数据的存储还是像 Hive 的数仓一样,存在 parquet 或者 ORC 中,Iceberg 支持这两种数据格式。
当然很多时候为了能使用 export table,我们会把一些原始的数据 CSV 或者其他格式导进来变成一个 expert table,根据分区重新组织写入 parquet 或者 ORC 文件。
关于 Schema 的 evolution 是一个痛点,Presto 支持读和写,但是目前用 Presto 写 Iceberg 的不多,主要还是用 Presto 读,用 Spark 来写,这给我们的 Alluxio to Iceberg 结合造成了一定的麻烦。
1. Alluxio + Iceberg Architecture 方案
- 方案一:
所有的操作都通过 Alluxio 写,Spark 和 Presto 将 Alluxio 作为一个底层存储,从而充分保证数据的一致性。
弊端是,实施该方案的公司稍微大了之后,数据直接往 S3 或 HDFS 写,不通过 Alluxio。
- 方案二:
读写都通过 Alluxio,通过自动同步元数据,保证拿到最新数据,此方案基本可用,不过还需 Spark 社区、Iceberg 社区以及 Presto 社区继续合作来把数据一致性做得更好。
--
03/最佳实践
1. Iceberg Native Catalog
目前,与 cache 结合比较好的是使用 Iceberg native catalog,在 Iceberg 叫 Hadoop catalog,在 Presto 中叫 native catalog,如果使用最原始的 Hive catalog,则 table 的元数据,即 table 位置的数据是放在 Hive-Metastore 中,Presto 或者 Spark 访问表的时候先去查询 Hive-Metastore 获取表的存储路径,然后通过 Iceberg 将数据文件加载进来,但是实际上,table 会有变更,此时需要将 Hive-Metastore 上锁,这种方案在只有一个 Hive-Metastore 的时候才有效,如果面临多个 Hive-Metastore 会出现锁失效的问题。
**更好的一个方案是 Iceberg native catalog,即完全抛弃 Hive-Metastore,使用一个目录来存储这个 table 的列表,这个目录可以在 HDFS 上或者 S3 上,我们更加推荐 HDFS,因为 HDFS 效果好一些,一致性也强一些。**这一方案避免了 Hive-Metastore service 本身的很多问题,如 scalability 、延时。此方案对 cache 也比较友好,不需要做一个 metadata 的 cache,而是直接 cache 存放 metadata 的目录。
2. Iceberg Local Cache
Local Cache 的实现是 Presto DB 的 RaptorX 项目,是给 Hive connector 做 Local Cache,很容易就可以给 Iceberg connector 也来打开这个 Local Cache。相当于是 cache 了 parquet 的文件到 local 的 SSD 上,Prestoworker,worker 上的 SSD 其实本来是闲置的,通过它来缓存数据效果还是挺好的。它可以提速,但我们目前还没有特别好的官方 benchmark。
目前只是对 worker 进行 cache,metadata coordinator 是不开的,打开的话可能会有数据一致性的问题。
3. 数据加密
早先 parquet 文件是不加密的,cache 了 parquet 文件,虽然不是明文,但只要你知道怎么读取这个 parquet 文件格式就能把所有数据读取出来。其 magic number 原来是 pare 1 就代表第一个版本,现在增加了一个 magic number 即 pare 加密的版本,这个加密版本把一些加密的信和 metadata 存在 footer 里边,它可以选择对一些 column 和配置进行加密。加密好后,数据便不再是明文的了,如果没有对应的 key,就无法读取出数据。
通过对 parquet 加密,我们不再需要第三方的加密,也不需要对整个文件加密,可以只对需要加密的一些数据进行加密,这个方案也解决了另外一个重要的问题,就是有的公司其实是整个文件来加密存放在 HDFS,然后 Presto 读之前把它解密好,很多文件存储系统就是存的时候是加密的。读取的时候确实拿到的解密好的数据,当 Presto 再通过 Local Cache 缓存数据的时候,cache 里存储还是明文数据,这破坏了数据加密的管理。但是采用 parquet 内部加密,local cache 就可以满足数据加密的要求了。
4. 谓词下推
Iceberg 通过谓词下推(Predicate Pushdown)可以减少查询的数据量。
原来 Presto 的暴力查询,根据条件把符合条件的一条条数据挑出来,但是中间有优化。其实很多查询条件可以直接 push 到 Iceberg,Iceberg 读取文件的范围就小了。
下面是一个 benchmark,可以看到没有谓词下推前扫到了 200 万条记录,CPU time 是 62 毫秒。谓词下推后,扫到了一条记录,查询时间极大的缩短,这也是对缓存的一个优化。开谓词下推(Predicate Pushdown)功能后,我们发现,缓存层次够用,扫的文件少了很多,这意味着我们都可以缓存的下了,命中率有一个提高。
--
04/未来的工作
在前面的工作中我们发现系统的瓶颈在 CPU。此瓶体现在很多地方,其中很大一部分是对 parquet 文件的解析,parquet 文件解析任务太重了。由于 parquet 很节约资源,很难将 parquet 转换为更好的格式。此时,一种解决方案是将数据分为冷热数据,将较热的数据转换为更加轻量,序列化低的格式存到缓存中,通过实验,将 parquet 文件反序列好的数据直接放到内存中,效率提升 8% 到 10% 。
但这有一个问题,此方案对 Java 的 GC 压力非常大,因为缓存长时间存在。我们发现此方案并不是那么好实施,所以我们更加想用 off heap 的方式,将数据存在 heap 之外。此时不能 cache object 本身,需要 cache Arrow 或者 flat buffer 格式,这两种格式反序列成本极低,又是二进制的流存在内存中,通过 off heap 把它装进来,然后在 Java 中再反序列化,这样可以达到一个很好的提速效果。
另外我们也可以把一些算子 pushdown 到 native 实现存储。比如说 Alluxio 再增加一些实现 native 的 worker 和客户端的 cache 实现,我们将算子直接 pushdown 过去,就像前面 Iceberg pushdown 一样,有些计算 push 到存储,存储返回来的结果特别少,它帮你计算,而且格式更好,它是 Arrow 并可以有 native 的实现,也可以向量化的计算。
Java 也能向量化计算。但问题在于 Java 的版本要求比较高,需要 Java16 或 17,而现在 Presto DB 还在 Java 11,trainer 倒是可以了,但是这个效果也不是特别好,因为 Presto 和 trainer 内存中的格式对性能化计算不友好,而且这个格式基本上是不能动的,如果要动,基本上全都要重新实现,这也是为什么会有这个 vlogs 在那里的原因。
可能这个 Presto 以后会有格式转换,但是不在眼前,但是我们可以 off heap 的缓存,可以把这个 Arrow 缓存到 off heap 上,然后在那里边需要的时候把它拿出来。然后反序列化成 page,然后给 Presto 进行进一步的计算。这个开发正在进行,可能在将来会给大家展现一部分的工作。其实就是为了降低 CPU 的使用和系统的延时,降低 GC 的开销,让系统变得更加的稳定。
今天的分享就到这里,谢谢大家。
分享嘉宾
**