小米 OLAP 引擎在 Trino 的应用实践
导读 本文将分享 Trino 在小米的应用实践。
主要包括以下五部分内容:
-
架构定位
-
主要工作
-
应用场景
-
未来规划
-
问答环节
分享嘉宾|周渝豪 小米 大数据软件开发工程师
编辑整理|李欣卫
内容校对|李瑶
出品社区|DataFun
01架构定位
1. Trino 的历史
Trino 是一个以惊人速度运行的查询引擎,用于大数据分析的、快速的、分布式的SQL 查询引擎,帮助用户探索数据。Trino 是由 Presto 分裂出来的,2013 年 Facebook 开源 Presto,2019 年初 Presto 的创始人团队独立出来创建了 Starburst 公司来维护 PrestoSQL,因为 Facebook 继续维护着 PrestoDB,后来 Facebook 申请 Presto 的商标,因此,PrestoSQL 就只能改名为 Trino。
在 Facebook 时期,Presto 主要是用来解决 Hive 查询低效的问题,如今 Trino 的功能已不止于此,其在各种数据湖产品上提供优异的查询性能,比如 Iceberg、DataLake 和 Hudi 等等,为湖上的 OLAP 数据分析提供了有力的支持。
2. Trino 架构
Trino 采用 master-slave 架构,其中 Coordinator 负责任务的生成和调度,Worker 负责任务的执行。SQL 可以通过多种客户端提交,如 Trino 自带的 CLI、JDBC 和 HTTP。Coordinator 对提交的查询进行解析和优化,例如进行元数据分析,然后生成执行计划并下发到 Worker 上执行。Worker 在执行过程中会对接各种存储系统来读取数据,包括传统的 HDFS、云上存储以及关系型数据库等。
Trino 能够连接到各种数据源,这种连接器模式使得它能够在不同数据源之间进行联邦查询,简化了数据分析过程。此外,Trino 并不依赖于 Hadoop 生态系统,这使其具有更高的灵活性和可扩展性。
3. Trino 性能
作为 OLAP 引擎,查询性能是最重要的。如上图所示,是对 Trino 和 Spark 的性能对比测试,使用标准的 TPC-DS 数据集,1TB 的数据量,选取三次的平均执行时间。图中绿色的线是 Trino,可以看到其在大部分场景下都是明显优于 SparkSQL 的,大约有 5 倍以上的性能差距。但在成功率上,Trino 有 15 个查询失败,原因是节点的内存不足。
4. Trino 优缺点
优点:
- 架构清晰:Trino 使用 Master-Slave 架构,存算分离,没有额外的系统依赖,使得架构清晰简洁;
- 速度运行快:Trino 查询速度快得益于以下几个原因,一是完全内存运算,二是内部基于流水线的模式,三是其动态代码生成技术比较先进。
- 扩展性强:Trino 可以连接各种数据源,支持跨源查询。
缺点:
- 内存要求高:Trino 对内存的需求较高,通常要求每个节点至少有 32GB 以上的内存,这可能导致在内存资源有限的环境中出现故障。
- 容错能力较低:由于依赖内存计算,Trino 在遇到节点故障时的容错能力相对较弱。
- 受限的并发能力:采用 master-slave 架构,Trino 系统中只有一个 Coordinator 负责协调查询,这限制了系统的并发处理能力,使其依赖于单个 Coordinator 的性能。
5. 小米的 OLAP 架构
小米的 OLAP 架构以 Kyuubi 作为代理层,底层是 Spark 和 Trino 引擎。Kyuubi 是一个 SQL 的代理层,实现代理 SparkSQL 和 Trino 的接入,提供一个常驻服务的能力。Trino 在引擎层和Spark 并列在一起,上层通过一个自动路由的服务来在 Spark 和 Trino 引擎之间进行选择,当用户查询过来之后,会选择使用 Trino 或者Spark 引擎来进行执行。除此之外,外部还配合有权限认证以及元数据管理的服务。
然而,这里存在的一个问题是,Spark 和 Trino 的语法差异较大,一个语句不能同时在两者之上执行,所以需要具有语法兼容性。
6. Trino 在小米的定位
统一使用 Spark SQL 语法:由于用户更擅长使用 Hive 和 Spark,而且 Kyuubi 也是使用 Spark 的语法解析,所以对于 Trino,要尽可能向 SparkSQL 兼容。
Trino 只用于查询操作:数仓写入依然使用 Hive 和 Spark,Trino 只专注于数仓查询。因为写入方面,Trino 并没有特别大的优势。
Kyuubi 负责接入和权限控制:Kyuubi 负责 Trino 的接入和权限管理,Trino 就只为了查询更快。
Metacat 统一元数据管理:内部服务 Metacat,负责统一的元数据管理。
7. 小米 Trino 目标
小米 Trino 的目标是让大数据更快到达用户眼前。其中包含三个重点:
- 大数据:不仅数据量大,数据来源也会很丰富,Trino 能够对接各种不同的数据源,比如 Hive 数仓、Kudu 存储引擎、数据湖、关系型数据库等等。
- 更快:Trino 拥有更快的速度,它能够更快地接入新的数据源,更快地分析处理。
- 眼前:小米 Trino 主要还是被用来提升用户的可见性部分的性能,比如数据预览、即席查询、统计报表等等,而数据 ETL 处理还是交由 Spark 来负责完成。
8. 小米 Trino 发展
小米是 2021 年 Q4 初步引入 Trino,基于 Trino 社区的 352 版本,进行了一些内部的特性适配。到 2022 年 Q3,进行了一次大版本的升级,到了社区最新的 386 版本,针对内部应用场景进行了一些优化。2023 年刚刚升级到最新的 421 版本,各种核心能力都更加成熟。小米不仅紧跟社区的发展,每年至少进行一次主要的版本升级,以确保技术与社区保持一致,还积极参与社区共建,贡献代码和功能改进。
02主要工作
Trino 在小米的实践工作可以总结为三个方面:核心能力、扩展能力和运维能力。
- 核心能力:主要集中于确保与 SparkSQL 的兼容性,以便为使用 Spark 的用户群体提供无缝过渡的体验。此外,团队在数据湖 Iceberg 上进行了一系列的优化实践,以提高查询性能和数据管理效率。
- 功能扩展:对 Trino 增加了诸如支持动态加载 Catalog 和用户自定义函数(UDF)的功能,从而增强了其灵活性和适应不同业务需求的能力。
- 运维能力:强化了审计日志和历史服务,这对于跟踪查询性能、确保安全性以及满足合规要求至关重要。实现了集成测试的自动化发布,这有助于提高软件交付的速度和质量,并确保新版本的稳定部署。
接下来将逐一展开介绍。
1. 核心能力
(1)兼容 Spark SQL
因为 Trino 是一个后来者,大家对 HiveSQL 和 SparkSQL 更为熟悉,为了减少用户的学习成本和迁移障碍,需要实现 Trino 查询语言(TrinoSQL)与 SparkSQL 之间的兼容性。
具体到语法层面,团队注意到两者之间存在一些差异。例如,SparkSQL 中变量的引用通常使用单引号或双引号,而标识符(如列名或表名)则使用反引号。这种语法上的小差别需要被认真对待,以确保查询可以在 Trino 中无缝执行而无需重大修改。
为了解决 Trino 与 SparkSQL 之间语法和语义差异的问题,主要进行了两方面的工作:SQL 重写和隐式类型转换。
- SQL 重写
当用户提交 SQL 查询时,系统首先使用 Spark 的 SQL 解析器将查询解析成标准的语法树(AST)。然后,基于语法树上的节点进行重写操作,把 SparkSQL 的语法结构转换为 Trino 的语法结构。
这一转换过程有可能失败,例如遇到 Spark 和 Trino 之间不兼容的语法或者是用户直接使用了 Trino 特有的语法。在这些情况下,原始的查询会保持原样并继续执行,即使可能会失败。
这种基于语法树的重写方法能够解决超过 80% 的语法兼容问题。Trino 的执行日志会记录重写过程,展示原始的执行语句和重写后的新语句。
- 隐式类型转换
隐式转换主要有两种类型:加宽类型的转换和翻译类型的转换。加宽类型的转换:主要是数据范围的变化,例如将 int 类型转换为 bigint 类型。这种转换不会丢失数据信息,只是范围变大。翻译类型的转换:涉及到不兼容的数据类型之间的转换,例如将字符串类型转换为 int 类型。这种转换可能会导致数据丢失或错误。在 Trino 中,只支持加宽类型的转换,不支持翻译类型的转换。为了提高兼容性,小米内部实现了Trino 的隐式类型转换功能,与 Spark 一样支持。举个例子,当使用 Trino 查询 1÷2 时,如果 2 是字符串类型,会报错,因为不能将整数除以字符串。但是,如果开启了隐式类型转换支持,就可以正常工作。这个参数可以设置为 Spark session 级别和控制级别的配置,以便全局配置。这两个配置可以开启重写和隐式转换,确保与 Spark 的兼容性。
除了以上两点,还有其它一些工作,使得整体的 Spark 的兼容率已经达到 99.6%。比如 Spark 支持 ANTI 和 SEMI JOIN,在 Trino 里面没有对应的语法,改写较为复杂,建议用户使用 IN 或者 EXIST 的语法来进行替代;第二个是 Hints 语法,Spark 中的 Hints 语法可以用来指导查询优化器做出特定的决策,例如数据的重分区,由于这类语法通常不影响查询结果,Trino 暂时没有实现这些功能;第三个是Table-valued Functions,在 Spark 中,TVFs 允许基于行生成表的模式,这是 Spark 的一个特殊功能,Trino 目前还没有支持;还有一些小米内部用户很少使用的 Spark 函数,也没有兼容。
(2)优化 Iceberg 使用
Iceberg 是一个大型分析数据集的开放表格式,它在传统的 Hive 表之上,因为 Hive 表不支持事务,Iceberg 就是为了兼容和提高大数据场景下事务的支持才引入的。Iceberg 支持事务性、模式演进、隐式分区类型以及行级更新等,这是其最主要的4 个特性。
Iceberg 数据湖支持与多种云存储服务和 HDFS 的集成,是小米内部当前重点推荐的数据湖解决方案。它鼓励用户将现有的基于 Hive 的数据仓库迁移到以 Iceberg 为基础的数据湖架构上,以利用其更高效的存储和优化的查询性能。
引入 Iceberg 后,构建了近实时数据处理能力,主要分为两种方式:
- 日志收集与实时处理
- 通过消息队列收集系统或应用程序生成的普通日志数据。使用 Apache Flink 实时消费这些日志数据,并将其写入 Iceberg 表。与传统数仓 T+1(次日生效)的处理时间相比,这种方式可以实现至少小时级别的数据生效时间。数据写入 Iceberg 后,可以使用 Spark 或 Trino 进行查询,实现分钟级别的数据时效性,通常在 30 分钟到 5 分钟内。
- 从关系型数据库实时同步
- 直接从传统关系型数据库(如 MySQL)实时同步业务数据到数据仓库。利用Flink Change Data Capture (CDC)技术导入数据到 Iceberg 的 V2 表中。V2 表支持行级更新,这使得对 MySQL 表中的数据执行 upsert(更新或插入)操作成为可能。这种同步方式也基本能够达到分钟级别的数据时效性,确保数据变化能够实时反映在 Iceberg 表中。
Iceberg 在 HDFS 上实现行级更新,主要依赖于其"Merge On Read"模式。在传统的 Hadoop 文件系统中,一旦文件被写入,通常是不可变的。但在 Iceberg 中,即使基础数据文件是不可变的,也可以实现行级更新。这是因为它在读取数据时会动态地应用一系列的删除文件(delta files),这些删除文件记录了需要从底层数据文件中删除或更新的数据行信息。
行级更新支持两种删除模式:
- 位置删除(Position Delete): 这种模式下,指定要删除的行的位置,例如删除文件二中的第 0 行。当执行读取操作时,Iceberg 会应用删除文件,过滤掉那些被标记为删除的行,从而实现更新效果。
- 相等删除(Value Delete): 这种模式下,通过指定列的值来删除行,比如删除所有第一列值为 1 且第二列值为 a 的行。这种删除操作会被应用到所有相关的数据文件上,确保满足条件的行被删除。
这两种方式都允许 Iceberg 在读取时模拟行级更新,因为实际的数据只有在被查询时才根据删除文件进行"修正"。然而,这种方法的一个缺点是随着更新频率的增加,删除文件的数量也随之增长,这可能会导致读取性能下降,因为系统必须在读取期间处理更多的删除文件。为了解决这个问题,Iceberg 会定期执行后台的清理和合并任务,将删除的信息合并到数据文件中,从而减少读取时需要处理的删除文件数量,保持查询效率。
最开始引入 352 版本的时候,社区上其实没有支持 V2 表的这种行级更新能力,小米独立实现了更新表的能力。在 379 版本,社区支持了读取这种行级删除表,升级时也选用了社区的实现方式,但当删除文件特别多时,其效率非常低,所以对其进行了优化,使效率提升了 10 倍以上,对于这种 V2 表的更新,Trino 的修改已经提交到社区,经过一段时间的 Review,社区已将代码合并。
Trino 在与 Iceberg 集成时确实面临一些挑战,其中包括内存占用高导致的 Out of Memory(OOM)问题,以及对 Timestamp 类型处理的不一致性。此外还有表读取的正确性和性能问题。
为了解决这些问题,小米内部进行了以下工作:
- 降低元数据内存需求:通过优化读取元数据的过程,减少了内存消耗,从而降低了 OOM 的风险。
- 明确内存统计:在计算过程中对内存使用进行了更细致的监控和管理,以确保内存使用更加合理。
- Session-based Timestamp 读取:支持按会话(session)读取 Timestamp 数据类型,并保持时区设置与 Spark 一致,以解决 Trino 和 Spark 在 Timestamp 处理上的不一致性问题。
- 高效读取行级更新表:支持高效地读取 Iceberg V2 的行级更新表,这对提高大数据场景下的查询性能尤其重要。
- 修复列读取错误:解决了一系列读取的错误,这些错误可能导致查询结果不正确。
这些改进和优化都已经提交到了 Trino 社区,并且随着最新版本的发布,用户在使用更新版本的 Trino 时即可体验到性能和正确率的提升。
2. 扩展能力
(1)动态 Catalog 加载
Trino 目前只支持静态配置文件读取。但在实际情况中可能有很多种 JDBC 类型的数据源,需要实时变更,比如用户新增或删除数据源。小米内部实现了这种功能,能够动态地从元数据服务来获取它的 catalog 并实时生效,这样用户在使用的时候就不会因为新增一个数据源而重启 Trino 集群。
社区最新版本也有类似的动态实现,但并不是动态地直接获取注册进去,而是它的Worker 会动态地从 Coordinator 拿取,只需要配置 Catalog,不需要做 Worker上的配置,并且还不支持动态注册的 Catalog。还有一个问题是,Worker 从 Coordinator 去拿配置,负载较高的时候会拿取失败。
小米内部的实现是,所有节点都会从元数据中心统一拿取 Catalog 并注册,现在已经能够支持并发加载数千个 Catalog。
为了保证启动的时候,用户能够直接使用加载好的数据源,需要在启动的时候加载数据源,但又因为内部的数据源太多,会使得加载时间随 Catalog 数线性增加。所以内部现在也在计划考虑使用 lazy 的模式,也就是当用户使用到某一个具体数据源的时候,再把它注册进去。
(2)动态 UDF 加载
在处理用户自定义函数(UDF)的兼容性问题时,小米内部采取了一个相对直接的方法,即通过创建一个单独的项目来兼容 Spark 的所有函数。这个项目被设计为可以动态加载并生效,使得 Trino 能够支持 Spark 的函数库以及新增的自定义函数。
- 函数打包和推送:当有新的函数需要添加时,相关的函数项目会被打包并推送到对象存储中;
- 自动拉取和加载:Trino 引擎会定时从对象存储中拉取更新后的函数项目,并自动加载这些函数,无需重启集群,实现实时生效;
- 插件模式优势:这种插件式的模式使得函数项目与 Trino 代码保持解耦,便于独立开发和维护;
- 版本管理: 使用 git 进行版本管理,如果需要回退到旧版本,可以直接基于 git 操作来实现。
3. 运维能力
(1)审计日志和历史服务
审计日志和历史服务是一个固定的流程,一是 Trino 会把审计日志和历史查询的记录推送到消息中间件,然后会有一个 Flink 流任务实时地将其写入到 Iceberg 表里面。如果是历史服务,Trino 服务还会直接从 Iceberg 表中将数据读出并展示。上图中右上展示的就是一个用报表工具制作的统计信息。右下是一个历史服务,直接在切入的 web UI 上进行了二次开发,支持直接从页面上进行历史服务的查询。同时它也要与 Trino 展示效果保持一致,避免单独去实现,这与 Spark 的 history 类似。
(2)集成测试和自动发布
小米内部开发了单独的测试项目来进行集成测试,用来测试语法语义的正确性,这个项目可以直连 Trino 或者直连 Kyuubi 进行查询,它可以对比 Spark 和 Trino 的查询性能,同时支持从 SQL 文件、审计日志表中直接获取查询语句执行查询。
我们实现了一个有趣的功能,使用 Spark 的 Hash 算法对每行数据进行哈希运算并求和。这个哈希函数用于确保语法兼容性,因为 Trino 本身没有这个功能。这样,每个表都能生成一个校验集,我们可以将其与 Spark 的运行结果或历史版本进行对比,以确保数据一致性。
自动发布使用 git 流程,通过一个流水线实现代码自动部署到容器集群。
4. 其他工作
除了上述三大方面的能力建设之外,还有其它一些工作,比如提升了元数据Metastore 的访问稳定性和缓存性能等等。
03应用场景
1. 多集群模式
在多集群模式下,Trino 的单点故障问题通常通过增加额外的 Coordinator 来避免,例如部署一个 Secondary Coordinator 作为冗余。另一种方法是使用代理服务器来同时管理多个 Trino 集群,从而提供高可用性。
多 Coordinator 模式解决了单点故障问题,能支持大型集群和复杂查询,但实现复杂、内存要求高且资源隔离困难。
社区推荐的多集群模式实现简单,便于资源隔离和定制化需求,但存在失败重试、运维复杂、长时间查询成功率低和无法运行复杂查询的缺点。
小米内部采用多集群管理模式,并建立了全自动化的管理流程,代码与配置分离以保证自动发布。使用镜像存储平台存打包镜像,安装包存储平台用于物理机发布,配置存储在对象存储中,启动集群时实时加载。这实现了配置与服务的解耦,容器和物理平台的独立发布,整个流程自动化。
多集群管理模式有利于业务,允许根据不同业务需求配置和使用不同的集群。通过调整路由策略,比如基于用户来源,可以决定查询使用的特定集群。在单个集群内部实行资源限制,避免资源恶性竞争,确保集群稳定性。未来将引入多级资源管理架构,进一步保障用户查询稳定性。
2. 即席查询
基于 Trino 的即席查询场景,追求秒级的极致速度,一般情况下,限制在 5 分钟以内,如果超时即失败,就自动转到 Spark 上去执行。采用独立的集群,集群内严格限制大查询。
3. BI 分析
在 BI 分析场景中,小米内部的数鲸产品和微软的 PowerBI 要求快速生成报表,查询时间通常限制在 10 分钟内。随着报表数量增加,集群规模扩大。由于报表数量多,并发查询量高,需要额外的负载均衡策略和 Trino 的容错支持来确保成功率。分析业务直接使用 Trino 服务提交查询并生成数据,根据业务规模不同,复杂度较高时会配置物理机以满足业务需求。
04未来规划
在未来规划方面,我们目前正在进行存储上云的工作,旨在降低成本并提高效率。推动存储上云的过程中,不可避免地希望通过缓存加速来实现两个目标:一是降低云端 API 调用的消耗;二是加速云端查询。这将需要与 ALLUXIO 的结合。
05问答环节
Q1:SQL 是如何选择 Trino 或 Doris 引擎的,是用户指定,还是通过某些规则?
A1:我们是提供了手动指定的方式,用户可以指定使用哪个引擎。同时也有自动路由规则,比如用户只查询单个 Doris 引擎的话,会直接使用 Doris 引擎本身去执行;如果要同时查 Hive 和 Doris,或者是两个不同的 Doris,那么就会优先使用 Trino 来进行执行。
Q2:SparkSQL to TrinoSQL 这部分有开源吗?
A2:这部分还没有开源,只是小米内部在使用。这部分主要是语法树的改写,其中还有一个问题尚未解决,语法树在改写的时候,会无法拿到一些字段类型的信息,这部分类型信息还是在 Trino 内部实现的。在下一个版本中可能会迭代,支持这种获取到元数据类型信息的改写。之后也可能考虑贡献给开源社区,因为还是有很多用户希望 Trino 能够直接使用 Spark 的语法来提交到 Trino 上面。我们目前也在逐渐将这部分代码剥离出来,成为一个单独的模块之后再考虑开源。
以上就是本次分享的内容,谢谢大家。