如何从0-1使用 Apache Arrow 构建新数据系统
导读 为了应对大规模数据处理中的性能和互操作性挑战。Arrow 致力于提供一种高效的跨平台数据交换机制,使数据能够在不同操作系统和编程语言之间迅速、一致地流动。其设计注重性能优化,并得到开源社区广泛的支持,成为许多数据处理项目的核心组件,推动了数据科学和分析领域的创新。本文将分享如何使用 Apache Arrow 来构建一个完整的数据系统。
本次介绍会围绕下面四点展开:
-
为什么要构建新的数据系统
-
什么是 Apache Arrow,为什么是 Apache Arrow
-
如何使用 Apache Arrow 构建数据系统
-
一些 Tips
分享嘉宾|李晨曦 上海炎凰数据科技有限公司 研发工程师
编辑整理 | 张阳
内容校对|李瑶
出品社区|DataFun
01为什么要构建新的数据系统
首先需要回答的问题就是为什么要构建一个新的数据系统。
1. One Size Fits All or Not?
据图灵奖得主 Stonebraker 在 2005 年的一篇论文中指出,"One Size Fits All" 的概念已经过时。这意味着使用一个通用的数据系统(如 Oracle 或 MySQL)无法完全满足所有问题的需求。因此,许多领域都需要特定的专用系统。一个明显的例子是交易场景中的 OLTP(联机事务处理)和分析场景中的 OLAP(联机分析处理)的分离。此外,还有众多由于数据处理模型的不同而产生的新系统,如适合实时计算的流处理和更适应云计算的 NoSQL 等等。然而有些人持不同意见,他们尝试在一个系统中解决所有问题。例如,试图同时处理 OLTP 和 OLAP 的 HTAP,还有流批一体、NewSQL 等新概念。无论是否支持"One Size Fits All"的观点,新的想法和概念总是不断涌现, 所以也需要不断构建新的数据系统。
2. 数据库的黄金时代
当前,我们正处于一个数据系统的黄金时代。有许多新的数据库正在不断出现。根据 DBDB(Database of Databases)的统计,自 2020 年以来已经出现了 124 个新的数据库。这意味着几乎每周都会有一个新的数据系统问世。
有了这么多数据系统,我们为什么还需要构造一个新的数据系统,这是否是重复造轮子呢?
3. 读时建模
我们面对的场景是通用的日志处理,这是一项颇具挑战的任务,因为不同厂商,甚至同一厂商不同应用的日志格式各不相同,而且一个尚处于开发中的系统的日志格式也会经常变动。常见的方法之一是写时建模,即通过在数据库中预先定义表结构或者模式来存储日志数据。然后,通过一些 ETL 工具,将数据转化为所需的表格类型。然而,当面对不同类型的数据时,例如日志来自 Nginx、Apache 和 Windows IIS 等不同系统,就需要维护多个 ETL 流程来处理它们。另一种解决方案是读时建模,即先将原始数据存储起来,不进行 ETL 处理,在实际使用时对字段进行抽取。例如图中查询 1,就是从这三类日志数据中抽取出各自的 method、time 和 client 字段,并实时构建一个新的结果表格。这种方法更适合处理多样化的、不固定的数据模式。有兴趣可以扫描上图中的二维码,它提供了一个有关读时建模更加详细的分享。
读时建模的关键是有一个支持动态数据表模式的数据查询引擎,才能够在没有预先定义数据模式的情况下查询各种类型的数据。
下面介绍一下动态数据表模式可能是什么样子的。在我们收集的数据中,可能有不同格式的日志。例如,有以键值对表示的日志,如绿色框所示;也可能有以 JSON 格式表示的日志,如红色框所示。在支持动态数据表模式的系统中,可以在同一数据集中同时查询到这两种类型的日志。
接下来,我们可以通过字段提取的方式对这两类数据进行格式化处理,从而得到一个合并在一起的二维表结构。
最后,可以使用一些函数对数据进行归一化和合并。这样做可以得到一个直观明确的数据表,便于后续机器学习系统和报表系统进行处理。在这个过程中,我们可能会遇到同一个字段有不同类型的情况。例如,某个同名字段在某类日志中可能以整数形式存储,而在另一个日志中可能以浮点数形式存储;或者在导入数据时没有进行适当处理,导致某些字段被导入为字符串类型。因此,同一个字段中可能同时包含多种不同类型的数据。
构建数据库是一项复杂的工程,其中包含许多子任务。可以选择改造一个开源项目,也可以从头开始构建。现有项目要么无法满足读时建模的需求,要么架构相对陈旧,不适合现代云计算服务的架构。而且改造现有系统,难度也比较大,因为具有固定的模式是很多系统的前置条件。因此,我们决定从头开始构建这个系统,第一步就是定义内存中的数据格式。
02什么是 Apache Arrow,为什么是 Apache Arrow
下面介绍一下什么是 Apache Arrow,我们为什么选择了 Apache Arrow,以及我们要用它来做一些什么事情。
1. 内存数据格式
内存中的数据可以根据分布的不同,分为行式存储和列式存储。在行式存储中,数据按照行的方式排列,类似于 C 或 C++ 中的二维数组,每一行连续存储,将相关的信息放在一起。而在列式存储中,同一列的数据放在一起,构成一个长数组。图中是包含三个属性(session_id,timestamp 和 ip)的二维表,左边是它在行式存储中的内存表示,右边是它在列式存储中的内存表示。
在事务处理的场景中,行式存储是非常自然的选择。因为大部分操作都是以一行为单位进行的,可以使用 session_id 构建 B 树或 B+ 树等索引,通过这个索引很快定位到 session_id 对应的具体的某一行,然后进行删除或修改操作。
但在分析型场景中,用户通常不会使用所有属性,而是只会对某些属性进行运算。例如,如果想知道表中有多少个不同的 IP 地址时,只需要关心 ip 这一列,而不关心 session_id 或 timestamp。在这种情况下,列式存储就更为适合,因为只需读取 ip 这一列数据,从而避免读取其他不必要数据的开销,减少了 IO 和内存的消耗。而且,列式存储的数据在磁盘或内存中都是连续存储的,可以更好保证数据的局部性,从而充分利用现代 CPU 的缓存和 SIMD(单指令多数据)向量化等运算机制。另外,相同属性的数据放在一起可以更好地进行压缩。例如,可以使用字典或其他高效的压缩算法将这些具有相同属性的数据放在一起进行压缩。对于日志处理这种分析型的应用来说,列存储是更合适的选择。
2. Apache Arrow 的优势
那么我们还要不要继续造轮子?还是定义一套自己的列式存储内存格式?
现在许多系统都定义了自己的内存数据格式,这带来了数据转换的问题。试想,如果我们想在 pandas 中调用 Spark 进行数据处理,那么要从 pandas 基于的 Python 环境转换到 Spark 基于的 Java 环境,其中需要经过 Py4J、JVM 和 Spark 三层数据转换。同样地,如果我们定义了自己的内存格式,也会面临类似的数据转换问题,特别是在需要与其他系统进行互操作时。幸运的是,借助 Apache Arrow,可以在 PySpark 启用 Arrow 格式,就可以与 pandas 直接共享内存,实现内存交换。
Arrow 到底是什么呢?Arrow 本身并不是一个数据存储或执行引擎,而是一种高性能、内存中的列式存储标准。它与具体的语言或应用程序无关,无论是用 C++、Python 还是 Rust 等语言,都可以进行跨语言跨系统的互操作。因为在任何环境中,Arrow 数据的内存表示是完全一致的,所以在进行跨系统传输时,不需要进行内存拷贝、序列化或反序列化等工作,实现了零拷贝。Arrow 没有发明新的数据存储方式,比如浮点数仍然按 IEEE 754 标准进行表示,但 Arrow 在标准化方面做了很多工作,例如如何表示空值 NULL、如何处理时间戳以及时区的表示等等。这些细节看起来很微小,但它们的重要性是在任何平台和任何语言下的标准化。因此,一个全新的数据引擎也无需重新发明这些内存格式。
使用 Arrow 后,可以实现在不同系统之间共享内存,从而实现零拷贝。这意味着我们不是避免了繁重的数据复制和转换,而是直接共享内存中的数据,使得数据处理过程更加高效。
Arrow 还有以下几个好处。
首先,Arrow 原生实现了七种程序语言,并在此基础上实现了更多语言的绑定,包括 Rust、C++、C、Python 等,基本覆盖了主流的程序语言。并且得到大量数据系统的支持,如 PyTorch、Spark、ClickHouse 和 DuckDB 等,在这些系统中,数据可以采用 Arrow 格式进行输出。
其次,Arrow 的性能表现不错。有一个 Benchmark 对比了基于 Arrow 的数据引擎 DataFusion、Polars 与 DuckDB 的性能,虽然前两者稍慢于 DuckDB,但仍然是可接受的成绩。尽管看起来 Arrow 在每个小功能点上没有什么创新,但综合起来,它提供了一个相对完整的解决方案,并且模块化做得非常好,API 对于系统的侵入性也较小。
此外,Arrow 的扩展性较强,比如可以扩展 Arrow 的类型,将机器学习中的一些类型如 tensor 在 Arrow 中实现;也可以使用 API 扩展自定义的计算函数。
总而言之,Arrow 的主要贡献在于为列式存储提供了一个标准和生态系统,因此对开发者和用户来说,它可以作为一个现代数据技术栈的标准和基础。
Arrow 拥有非常活跃的开源社区。除了 Arrow Rust 等相关项目外,截至 2023 年 10 月,Arrow 本身已经得到了超过一万两千个 GitHub 的 star。上图展示了 Arrow 在最近一个月的活跃程度,包括 PR、issue 以及贡献者等方面的数据,足以看到 Arrow 是一个非常活跃的项目,并不需要担心它的持续性和稳定性,可以预期 Arrow 能够长期存在并会持续完善。
Arrow 通过帮助标准化内存格式,为构建数据系统提供了一个起点。然而,这只是开始,还有许多其他任务有待完成。例如,需要聚合、排序等更多的算子;需要开发客户端 API 和数据交换功能;需要支持新的硬件,尤其是在信创领域,需要考虑对 ARM 指令集和国产硬件的兼容。
如此一来,构建一个数据系统变得非常复杂。上述工作也只能初步构建一个勉强满足小规模使用的数据系统,而完成这个阶段可能需要 10 年甚至更长时间。如果我们想要构建一个更大规模、分布式和高可用的数据系统,所需要的时间可能是前一阶段的几倍甚至更长。
按照人月神话的理论,投入更多的人力并不能线性地减少完成时间。因此,构建新的数据库是一个非常昂贵的事情,需要巨大的时间、人力和经济成本,这也是为什么新的数据库创业公司需要筹集大量资金和足够的时间。
虽然现在是数据库的黄金时代,但也是最具挑战性的时期。如果新的想法不能迅速实现,很难在市场上生存。好在有 Arrow。Arrow 不仅提供了基本的内存数据格式和模型,还提供了一些算子和计算功能,以及持久化、数据交换和跨平台执行等模块。通过使用 Arrow,能够大大节省构建数据系统的时间和开发成本。
03使用 Apache Arrow 构建数据系统
下面介绍 Arrow 如何助力数据系统的开发以及如何使用 Arrow 构建一个数据系统。
1. 数据系统执行流程
一个数据系统的执行流程通常包括以下几个步骤。
首先,当系统接收到用户的查询请求时,会利用存储和索引来获取相关资源。
接下来,系统会根据用户查询生成一个逻辑计划,该计划表示了执行查询所需的关系代数和操作的抽象。
然后,逻辑计划会在经过优化之后转为物理计划,即如何真正执行查询的计划。
之后,在执行引擎中,系统会执行具体的操作,如表达式执行、聚合、排序和物化视图等算子。
最后,系统将结果保存到用户指定的路径或传输到用户的客户端。
2. 数据存储
我们的数据存储模型是基于事件的,即基本的存储单元抽象成了事件,类似于日志中的每一条日志。每个事件都有时间戳、原始信息和其他基本属性,比如主机名、数据类型等。这些都是事件的元信息定义,我们将其抽象出来,并进行索引。
对于日志中的其他内容,我们将其作为原始数据存储。底层存储使用了 Parquet 这一列式持久化存储标准,其对 Arrow 有很好的支持。Parquet 还会存储一些元数据,比如每列的存储位置和一些统计信息,如最大值和最小值等。这样就可以支持一些查询的下推操作。如果数据中存储了多列,但只想访问某一列,可以直接定位到该列的存储位置,而不需要将整个文件都读入内存中。
然而,Parquet 需要预先给定数据的模式,即存储数据时需要先定义一个模式,无法直接支持动态模式或者无模式数据。为了支持动态模式的数据,在 Parquet 的基础上我们进行了一些扩展,这样就可以在 Arrow 和 Parquet 的基础上进行简单的修改,从而完成数据存储。
数据存储之后,需要读入到内存中。每个数据在内存中会以 Arrow 定义的 Record Batch 形式存在。这种表示方法用于描述一组数据,并由其 Schema 指定数据的模式。
例如,有一列包含 session_id 字段的 Int64 类型数据,一列包含 datetime 字段的String 类型数据,还有一列包含 source_ip 字段的 String 类型数据,Schema 中定义并存储了这些字段的类型,而具体的数据存储在 Arrow Array 中,不同 Record Batch 的 Schema 是可以变动的。例如,在下一个 Record Batch 中,session_id 字段可能变成 String 类型,而 time 字段可能变成 Timestamp 类型。
通过不同模式 Record Batch 的组合,就可以获得不同模式的数据。这样,就实现了从数据存储到内存表示的映射关系。
3. 索引/代码/硬件资源
Arrow 并不是一个完整的查询引擎。它缺少索引和用户自定义函数等功能的支持,在我们的系统中,我们使用了时间戳索引和倒排索引,这样用户可以通过关键字和时间来定位到日志的位置。至于用户自定义函数方面,我们向 Arrow 提交了一系列 PR,使其能够支持用户自定义函数。Arrow 在硬件资源方面有一些简单的实现,比如内存管理和线程池。但是,如果想要进行更细粒度的管理,例如限制每个查询的内存使用或设置不同查询任务的优先级,仍然需要自己开发。所以,从这个角度来看,Arrow 在这方面还有继续完善的空间。
4. SQL 解析/计划生成/执行与传输
Arrow 也没有提供将用户的 SQL 语句解析成抽象语法树的功能,但是我们可以使用一些开源工具,比如 ANTLR 和 Calcite,将 SQL 语句转换成抽象语法树。我们选择使用 ANTLR 而不是 Calcite,是因为 Calcite 过于复杂且基于固定数据模式的假设,在处理动态模式时不太适用。
之后可以将抽象语法树进一步转换成逻辑计划,逻辑计划描述了数据执行的具体操作。在进行查询优化时,我们可以调整逻辑计划来提高性能。
例如想要找到特定 ip 的最新访问时间,首先需要从数据集中读取相关数据。然后,根据指定的条件(这里是 ip 等于某个特定字符串)进行数据过滤,并将需要的数据筛选出来。接下来,对过滤后的数据进行聚合运算计算其时间的最大值。
在此过程中,可以进行一些优化,其中一个常见的优化是下推操作。通过下推,可以将读取 ip 和 _time 两个字段的操作下推到表扫描阶段,从而每次读取数据时都跳过其他不必要的字段。此外,我们还可以将条件表达式(例如,ip 等于特定字符串)嵌入到操作中,这样每次读取时只会读取与我们需要的 ip 相匹配的数据。
通过在表扫描阶段进行这些优化,可以节省大量的 IO 开销和内存资源,提高查询性能。
逻辑计划是一个抽象层,不包含在 Arrow 中,因此需要自己编写逻辑计划的代码。
相对而言,逻辑计划相对简单,因为大多数 SQL 查询语言及关系代数和逻辑计划可以相互对应。物理计划则相对复杂,因为它与底层机器有关,需要处理线程、并发和各种硬件。
最近,Arrow 提供了一个查询执行引擎------Acero,可以提供很大帮助。Acero 是一个基于推送(Push)的引擎,其最小执行单元是 execution node,它的代码非常清晰,并且具有清晰的 API 接口,包括如何处理其上游输入和下游输出,如何处理接收到的数据和停止接收数据,以及暂停和继续运行等功能。
如果需要扩展,只需按照 API 定义自己的节点,并在 Acero 中注册即可,就可以借助 Acero 进行计算的调度和执行,而不需要修改 Arrow 代码。
我们注意到 Arrow 在处理动态数据模式方面存在一些限制,因此对 Arrow 进行了一些扩展。例如,添加了支持动态模式的汇聚节点 Schemaless SinkNode,它可以消除数据模式方面的一些限制。通过使用这个节点,可以处理没有严格定义模式的数据。这允许我们更灵活地处理各种数据类型,而不仅仅限于特定的固定模式。
在这个过程中,我们得到了一个支持动态模式物理计划的执行节点。此外,Arrow 的另一个限制在于执行节点创建时就需要预先定义数据的输出模式。为了克服这个限制,我们进行了一些改造,将数据输出模式延迟到实际输出时动态生成。这样,就能更好地支持动态模式的数据引擎。另外,我们也对 Arrow 提供的一些聚合函数和标量函数进行了动态模式的扩展。
这样就可以使用 Arrow 来处理动态模式数据,并使用它执行并调度查询的。目前,Acero 还不支持物化视图,但对于大规模数据来说,物化视图非常重要。物化视图可以预先计算并且储存一些耗时或复杂场景的结果,在查询时可以快速访问和利用这些预先计算的结果。同样,我们对 Acero 进行了一些扩展,添加了中间状态的处理方式,以便在 Arrow 中实现物化视图,我们也计划将这些一系列扩展提交给 Arrow。
最后,当查询结束时,需要进行数据传输,可以是传输给用户的客户端,也可以是传输到用户前端进行显示。如果直接使用 ODBC 或者 JDBC,因为 ODBC 和 JDBC 本质上只能处理行式数据,行列的转换无法避免,我们可以使用 Arrow Flight 和 Arrow Flight SQL 来规避这个问题。
Arrow Flight 是 Arrow 提供的基于 gRPC 或者 REST 的列式数据交换框架,无需复杂的开发,直接使用其 API 即可实现列式数据传输,而避免了数据转换。在 Arrow Flight 之上得到了与 SQL 数据库交互的协议 Arrow Flight SQL。这样我们就可以利用与 SQL 兼容的现有客户端直接进行查询。
将来,Arrow 还将推出一个类似于 Arrow 自己的 JDBC 或者 ODBC 的工具,称为 Arrow ADBC。这样,原本与 ODBC 和 JDBC 兼容的数据库客户端将无需或只需极少修改代码,就可以直接与 Arrow 进行通信。
Arrow 帮助我们实现了数据存储、物理计划和传输这三个方面的重要功能。如果在自己的数据系统实现中不是动态模式的,而仅仅是针对特定领域开发固定模式的新系统,那么只需构建索引、解析用以查询的 SQL 或 Dataframe API,并转换成逻辑计划,然后使用 Calcite 的优化器将其转换为 Arrow 的物理计划,最后直接使用 Arrow 执行即可,需要构建的东西非常少。
04一些 Tips
我们在 Arrow 的使用中积累了一些经验和教训。作为一个新的数据产品或数据产品的底座,Arrow 还存在不少问题。
1. 踩过的一些坑
首先更新频繁是 Arrow 社区活跃的体现,意味着会有新的功能和改进,但同时它的接口还是不够完善,我们建议尽量少修改原始代码,而是向 Arrow 社区贡献改进并多做扩展。Arrow 代码库可以分为三个层次:
- Core 层:提供数据类型表示,这一层非常稳定,新版本可以完全保证和之前版本的兼容。
- Compute 层:提供计算算子,相对稳定但可能有一些 bug,当使用一些比较高级的指令集如 AVX512 指令集可能会有一些内存对齐的问题。
- Acero 层:是最新的执行引擎,不够稳定而更适合开发测试。
Arrow 对于复杂类型的处理还不够完备,比如 Union、List、JSON 等,需要额外的代码实现。另外,Arrow 始于 2016 年,仍需要时间和大规模数据的验证。各个相关项目(包括 DuckDB 等)主要使用的是 Core 部分,对于 Arrow 的 Compute 和 Acero 等部分,仍然需要在更大规模的数据上进行进一步的验证。我们在开发过程中遇到了一些问题已经修复并向 Arrow 社区提交了改进。目前看来,Arrow 处于相对稳定的状态。
2. DATA FUSION
最后,对于追求安全和现代化的考虑,我们建议使用 Arrow Rust 的实现。而且 Arrow 在 Rust 实现的基础上推出了一个完整的数据引擎 DataFusion,它提供了比 Arrow 更强大的功能。DataFusion 在 Arrow 内存格式的基础上提供了 SQL 解析和查询计划等功能,也支持子查询和其他高级函数。此外,DataFusion 也继承了 Arrow 出色的模块化和可扩展的代码风格,基于 DataFusion 构建新的数据引擎可以减少开发所需的时间,同时也能在开源社区获得更多支持。
以上就是本次分享的内容,谢谢大家。