网易 Arctic on Flink 流式特性核心原理解读
导读: 本次分享的内容是 Artic on Flink 流式特性解读。
本次分享会围绕下面四点展开:
-
Arctic 流式特性
-
核心原理解读
-
Benchmark
-
未来规划
分享嘉宾|叶贤勋 网易 资深开发工程师
编辑整理|夏明月 京东方
出品社区|DataFun
01/Arctic 流式特性
本部分主要讲解什么是 Arctic,什么是 Arctic Mixed Format,以及什么是 Arctic 流式特性。
1. Arctic 是什么
Arctic 的架构如上图所示 。本文将重点介绍 Streaming Lakehouse Service,可以在现有几大开源表 Format 平台基础上进行更好的服务化管理能力。目前已经开源的版本中,已经对 Iceberg 的表 Format 做了比较好的支持,未来会考虑去对接 Hudi、Delta 等其他表 Format。Arctic 主要的服务对象是 AMS,其定位与现有的 Metastore 类似,是 HMS 新一代服务。Optimizer 能力,主要包括底层文件结构处理、Upsert 数据合并等能力。
2. Arctic Mixed Format
Arctic 为什么会有 Mixed Format?Arctic 在表 Format 基础上提供了流批一体存储能力,它有自己的 Logstore 和 Filestore,其中 Logstore 可以提供秒级、毫秒级的实时能力。此外,Artic 具有对 Hive 兼容的特性。
3. Arctic 流式特性
Mixed Format 的流式特性可以归结为如下 4 点:
(1)毫秒级数据处理
使用低延迟的 Logstore 作为数据接到消息队列中,提供类似于流批一体的消息处理。针对 Flink CDC 采集的数据也可以对接到 Arctic。现有 Arctic 开源版本支持多种 Broker 对接,目前的 Broker 对接了 Kafka 和 Pulsar,在 Arcitc-0.4.1 版本开始接入了 Pulsar。
(2)基于主键流式更新
这是 Arctic 作为主键表的重要特性。
通过多种引擎对 Arctic 表进行管理,比如 Flink 、Trino、Spark 等。在流式更新时,使用 Flink 摄取经常遇到一些问题,比如流量突然变大过程中需要调整高并发进行流量削峰。Arctic 表中 HashBucket 特性可以很好的处理这种高效数据摄取情况,即在 Flink 摄取任务时对 HashBucket 进行调整,使数据进行自动分桶。
流式表结构优化主要是 **optimizer 服务使用 Flink 任务流式优化 Arctic 下面各个表,作为 Flink 流式任务跑在容器上。**其中 optimizer source 可以不断地请求 optimizer schedule 服务,将它要优化 task 进行文件合并,或者做 upsert、merge 等动作。
(3)分钟级别 OLAP 时效性
目前 Arctic 可以对接 Spark、Trino 这种分钟级延迟数据。支持 Merge On Read(MOR) 的功能。可以使用 Trino 进行查询,分别查询 Base 表和 Change 表,因为在底层的 Base 表跟 Change 表其实就是 Iceberg 表,所以也可以直接查询 Arctic 表字段下面 Base 跟 Change。目前 Flink 基于 FLIP27 支持 Hybrid Source 读取 Base 表跟 Change 表,比如使用 Flink 读取一张 Artic 全量表,可以直接用 Flink 查询 Arctic 表即可。
(4)其他
比如高效维表 Join、多并发事物冲突管控机制等。
--
02/核心原理解读
本部分主要对 Arctic 核心原理进行解读。
1. 基于主键 Arctic 表结构
一张 Arctic 的主键表的表结构主要分为三个部分,即:Change Store、Base Store和 Log Store,如上图所示。
最上面一层是 Change Store 接入Changelog 的数据,比如 Flink 接入的四种 RowKind(Update_BEFORE,Update_AFTER,Delete and Insert)数据,都会写入到 Change Store 中。由于 Change Store 里只有 Insert File 跟 Equal Delete File 两种格式,因此读取 CDC 数据时需要把这 4 种 RowKind 的数据回放出来。关于如何回放部分,在后面会有分享。
最底下一层是 Base Store,可以理解为最原始的数据,历史存量数据都会存放至 Base store 里面。
右边是一个 Log Store,作为毫秒级和秒级消息存放的介质。
2. 流式更新案例
创建一张 keyed 主键表,是带主键、带分区的一张 Arctic 表。分区的规则是用 opt 日期字段作为分区时间,Base Store 与 ChangeStore 其 HashBucket 分别都为 4。
在流式更新时,从数据流转上面这张图可以看出,在对 Arctic 表操作时可以通过 Spark 或者 Flink 把更新的数据插入到 Arctic 中,后续查询可以通过 Trino 或者 Spark 的 Merge On Read(MOR) 批处理方式查询出一张 Arctic 表静态数据,也可以使用 Flink CDC Incremental pull 的方式将变更的明细数据查询出来。
具体的一个案例:
在第一步会插入(Insert)一条 ID = 4、name = Sam、day = 2022-01-01 的分区数据;然后插入 ID =5,分区为第二天的数据;后面会紧跟着就会把这条数据给删掉;再插入了一条 ID =5、name=Linda 的一个数据。如上图中间所示,使用 Trino 或者 Spark 将静态数据通过 Merge On Read(MOR) 的方式查询出来。第五步进行 Update 操作将 ID = 5 、name = Sona 的数据进行 Merge On Read(MOR) ,也会把最新的数据查出来。
用 Flink 时能够获取历史数据,第一步 SQL 是 Insert overwrite 的语句,相当于将 Arctic 表里面 Base Store 数据按天分区下面的数据做删除重写;此时用 Flink 能将Base Store 里面的数据给查询出来;第二步一直到第五步做了 Insert、Delete、Update 操作,Flink 会去查询 Change Store 里面变更数据是怎么插入,怎么回放的。
3. 流式更新文件组织形式
接下来分享有主键表的 Arctic 表,它在我们内部文件组织形式是怎样的?**如上图所示,**最头上部分跟 Hive 的结构是类似,前面会有一个类似 Catalog 的 Warehouse 的地址,后面会跟上 Database,再后面是表的名称,表下面分为 base 跟 change 两个目录,base 下面又会有 data 的一个目录(上图没有展现出来),data 后面会有一个具体的分区,在分区里面有具体的 data file 用于接收 Iceberg 的 Metadata 管理机制进行文件管理。从文件名中看出如下规则:Base Store 的 file 会用 B-1 表示,Change Store 里面 file 会用 I 或者 ED 表示 Insert 文件或者是 Equal Delete 文件。
4. 流式更新 Update 数据如何回放
在流式更新中 Update 语句是怎么回放?使用 Flink 查寻数据时,有 Update before 和 Update after 两种不同表达方式。在真正写入 Change Store 时,首先将 Update before 中的数据写入到 Equal Delete 的 file 里面;然后 Update after 将 Insert 数据写入到 Insert parquet 文件里面。在每个文件里面的尾端都会加一个 _file_offset 字段用于表示数据插入顺序。如果这两个文件的 offset 是一样的,则标出这条数据 Update 语句,在 Delete 里面是 _U(Update before) 的数据,在 Insert 文件里面是 +U( Update after) 的数据。
5. 分钟级别 OLAP 时效性
**在使用 Arctic 管理文件时,用到了文件分桶管理。**如上图所示,在这棵树里面有不同的节点,不同节点存放不同的数据 data file。其中,黄色表示 Change Store 的 data file;蓝色表示 Base Store 的 data file。当我们去做 Merge On Read(MOR) 时对这棵树进行切分,比如 Merge On Read(MOR) 进行多并发查询时(如上图第一个虚线框),只需要查询指定子树下面所有的文件,然后根据 Node 进行切分即可获得结果,因为 mask=1 index=0 节点的数据是跟下面两个节点 mask=3 index=0、mask=3 index=2 是相关的,所以我们会把这里面的文件放到两个不同的 task 中去做一个 Merge On Read(MOR) 读取,以提高 OLAP 性能。
6. 分钟级别 Hybrid Query:Base + Change 数据
**针对分钟级别的 Hybrid Query,可以使用 Flink 的最新的 Source API 查询 Arctic 表里面最新数据的变化。**它的时效延迟依赖于上游任务写进来的 Checkpoint 的时长,如果这个时长是分钟级别的,则 Hybrid Query 的延迟就是分钟级别的。使用时通过 ('scan.startup-mode'='earliest') 的这种配置来指定它是读 earliest 还是 latest,earliest 相当于读取全量的数据,latest 相当于读取当前最新的 Change Store 里面的数据。目前我们现在这一套 API 已经实现了 Base Store 和 Change Store 混合读取。
7. 毫秒级 Data Pipeline
如何实现双写 Logstore 与 Filestore 方案的,**如上图所示。**前面 Operator 接收到数据后,通过 Customer Shuffle 规则保证相同主键数据能进入到相同 Writer 里面,并将数据按顺序去写入到该 Writer,此外 Writer 具有多并发多路数据摄取功能。Logstore Write 可以将数据写入到 Kafka 或者是其他的消息里面,Filestore Write 会写入到 File Store 里面。通过 GlobalAggregateManage 主插件与 JobMaster 进行通信来保证任务过程中数据一致性,比如 Failover 的情况下写入到下游的数据与写入到 File Store 里面数据是一致的。简单而言,任务在 Failover 时,刚开始它会通过 Manager 发一个消息给 Logstore,由 Logstore 下游任务去做脏数据过滤及脏数据回撤。
8. 毫秒级 Data Pipeline:全量入湖场景
**全量入库的场景。**源头数据可能是来自于 Database 里面,我们可以通过 Flink CDC Source 消费数据库里面的数据,把它全量数据通过 Flink CDC 增量一体化组的功能摄取到 Arctic 数据湖里面,这个过程在 Source 端能做到一个任务将历史增量数据同步进来。此时用户下游如果需要做一些离线计算,则可能需要查询一些存量的数据。
有些特征分析可能只需关注最新的一些数据,消费 Logstore 里数据,并不需要将历史的数据写进来,这个时候可以通过 AutomaticLogWriter 来保证数据已经写到了最新数据时,然后再把这些新的数据自动切换到 Logstore 里面,使用方式只需要在入湖任务里面配置一个 EventTime 字段,为其配置 Watermark,在插入到 Arctic 表时配置 Watermark 延迟时间,在这个时间范围与当前时间做对比,如果发现数据已经是最新的则会把数据写入到 Logstore 中。
9. 高效维表 Join 技术选型?
关于 Arctic 做维表 Join 方案的思考。我们之前也做过一些尝试,比如有些业务很多维表数据都是在 Hive 里面,把 Hive 加载到内存里面的方式,特别是在大表场景下很难达到生产可用。如果是想把 Hive 里面的数据同步到 HBase 里面去,则开发成本和运维成本都会不断提高,因为它过度赖于外部 KV 存储系统。我们也考虑过把 KV 的能力使用 AMS Server 做实现,但是开发成本也是有所增加的。
10. 高效维表 Join 原理
我们最终方案选择是 Flink Temporal Join+ RocksDB。**其主要优势是维护成本是比较低,只需要关注 Flink 任务和 RocksDB 使用情况做大维表关联。**使用 put 方式把每个 Rocks DB 的实例所处理的数据进行切分,通过调整并发来增大吞吐量。相对于外部 K-V 存储系统,也存在一些劣势,比如在初始化阶段需要先把维表数据给加载进来,会把实时事实表数据先消费并加载到 RocksDB,但不会进行 Join,此时启动方式会让任务刚启动一段时间内无法关联数据,后续会针对这个问题进行优化。
Flink Temple Join 需要定义 Watermark。在 Watermark 推进的过程中,在 Join 算子里面匹配到对应的维表数据进行下发,整个过程实际上通过 Event Time 的 Watermark 数据进行匹配、下发、推进。在数据刚开始加载阶段发送 Minimal Watermark 值至 Join Operator,进而保证 Join Operator 收到 Watermark 永远都不会一直在推进,永远都是 Minimal 状态。
初始化完成后,将维表的增量数据发一个 Maximal watermark 值,真正起作用的是 Watermark 推进过程依赖于事实表 Watermark 值,因为 Join 算子要取两个进来的 Watermark 的一个最小值去作为当前 Watermark,此时如果事实表有做推进的话,则 Watermark 匹配会正常进行,匹配的结果正常下发给下游。
高效维表的 Join 实现主要在原有的 Hybrid Query (基于 Flip 27 的 Source) 基础上,实现了一套维表 Join 功能。通过自定义 Watermark 的发送策略,在 ArcticReadOutput 这个工具类里面处理 Watermark。
11. 高效维表 JoinDemo
上图是关于 Arctic 维表关联的 Demo。在事实表方面,用 LocalTimeStamp 作为这张事实表的 Event Time,定义它为 Watermark;在维表方面,用 create 表 like 的这种语法,把真实的 Arctic 表 user 表作为维表复制出来,添加一个 opt 的 Watermark,其中 opt 的 name 可以自定义的。真正的维表关联的 Join 跟 Flink 的语义是一样的。在 use_dim 这张维表上,通过配置开启维表的功能。
--
03/Benchmark
本部分主要是对数据湖静态数据和动态数据性能测试验证。
1. 数据湖 OLAP Benchmark -- 静态数据
静态数据解析是用 TBCC 加载测试数据到 MySQL 中,通过数据同步工具 Flink CDC 将 MySQL 数据同步到数据湖(例如:Iceberg、Arctic)里面,并对 100 个 Warehouse 数据做静态分析。从上张图可以看出,Arctic 与 Iceberg 性能对比上大部分性能相差无几,部分场景有些许出入。
2. 数据湖 OLAP Benchmark -- 动态数据
动态数据是指持续对 MySQL 数据进行更新操作,这些数据会同步到数据湖,Arctic 带有 optimizer 功能。其持续优化能力是很强,在后面的 Merge On Read(MOR) 查询性能对比情况下,Arctic 的优势较为明显。
3. Benchamark 测试代码
Benchmark 的测试代码及使用流程都在这两个 GitHub 上面公布出来,如果大家有对这一块感兴趣可以自己来尝试使用一下。
4. Arctic Hive 维表对比
上图是 Hive 的维表 Join 与 Arctic 维表 Join 的功能对比,有两个测试场景。
**测试一:**600 多万的数据,Hive 做最小资源的测试。对比发现要把 600 多万数据加载进来,加载到 Flink 里面,需要 32G 内存全量存储在 Flink Job 里面。Flink Arctic 由于使用 Rocks DB,使得内存开销变小,能处理 1000 QPS 吞吐,在内存上面有很大优势。
**测试二:**维表数据变大的情况下,2000 多万数据。Hive 加载数据内存消耗是很大,在加载过程中时长消耗是非常长。在 Failover 情况发生时由于需要重新把数据加载到内存一次,该时长是不可控的,因此难达到生产可用的状态。Arctic 可以调整并发,调整到 16 个并发时,把 2000 多万数据按 16 个并发去切分,可以保证 10 万的 QPS。
Arctic 对于 Hive 还会有一些优势,比如通过提高并发方式减少任务初始化耗时时长进而提高整体 Join 的性能。
--
04/未来规划
本部分主要分享 Arctic 在流式方面未来的工作规划,整体分为以下三块内容。
(1)Partial Fields Upsert
我们会去做一些部分列的更新,这块的 proposal 已经在社区里面发出来了,正在做一些讨论。
(2)Logstore、Filestroe 一体化读
后面会做 Logstore 与 Filestore 的一体化读。现在的 Logstore 实现还是基于 Flink 老一套 Source API 实现的,后面也会考虑使用 FLIP-27 新一套 API 实现 Logstore 读取,从而与 Filestore 里面的 Base Store 和 Change Store 的整体数据进行一体化读取。该功能实现后,整个 Arctic 维表 Join 的时效延迟会从分钟级别降低到秒级或者毫秒级。
(3)维表优化
维表 Join 在前面提到了在初始化阶段,通过 Watermark 的策略保证数据匹配是一致性的。初始化阶段,数据加载进来时可以缓存在 State 里,等初始化完了之后再去加载到数据流中。但是该过程还有优化的空间,未来会在这一方面做一些规划,去优化初始化加载时间。
--
05/问答环节
Q1:Arctic 支持明细模型、聚合模型、主键模型、更新模型物化视图吗?
A1:现在的 Arctic 表的模式是支持模型主键的这种方式去定义的。它的更新方式也可以根据主键的这种方式去更新的。后面提到的物化视图社区有做一些讨论,我们会以怎么样的方式去做这种物化视图,还在讨论的一个阶段。
Q2:历史存储数据传输至 Base Log Store 使用的是什么技术框架?
A2:历史的存储数据同步到 Arctic 表里面。在内部,我们有一个数据同步的工具,我们会把历史的所有数据同步到 Arctic 里面来;外部的话,现在用的比较多的是 Flink CDC 的技术。Flink CDC 可以去做一些增量一体化的同步,比如把存量的数据库里面的数据给读出来,增量的也会读出来,写入到 Base Store 和 Log Store 里面去,主要通过 Flink 的方式去写。
Q3:Arctic tree 是二叉树还是 B+ 树?
A3:Arctic tree 主要是二叉树,主要功能是针对数据 data file 管控作用,提供一些索引的能力。
Q4:高效 Join 会做数据倾斜场景的优化吗?
A4:这块后面考虑范围内,数据倾斜情况不同的。比如我们前面有读的过程当中有一些 shuffle 的机制,我们有可以根据主键的这种方式 shuffle,也可以有根据分区的这种方式去 shuffle 。如果再往后面去设想,我们可能会或者是跟大家一起讨论一下,有没有一种方式可以去做这种动态 shuffle 的一些优化。比如能自动地去做这种切换哪一个分区、哪一个数据,或者是哈希之后哪一个领域、哪一块区域的数据是比较多等做这种的倾斜优化场景进行优化。
Q5:Flink 与 Trino 在 Arctic 的查询上有什么区别?各有什么优劣?
A5:先说 Trino 吧。Trino 它其实更多的场景是这种做 OLAP 即系查询,做分析的查询会比较多。Flink 可能更多的是偏向于这种计算类的一些查询,比如我们去查询它的一个历史数据,现在 Flink 也正在往流批一体的这种计算引擎去靠拢,它也可以用 Flink 去查询批处理任务。但是客观来讲,用 Flink 去查询批处理的这种业务场景,现在可能还不是特别多。比如现在用 Flink 去做 Merge On Read(MOR) 的功能,在我们无主键表上面已经实现了,但是主键表上面的 Merge On Read(MOR) 现在是还没有实现的。具体的一个进展,可能后面会跟着社群的诉求,或者是根据大家讨论的情况,可能会做一些工作安排。
Q6:Arctic 流批一体指的是存储层面的统一吗?
A6:对,可以这么理解。Arctic 流批一体,首先它是能对流的数据和批的数据做了 Schema 的 Metadata 的统一了,写入到 Logstore,写入到 Filestore 里面的 Schema 全都是统一的。存储层的话,比如我们在 Iceberg 基础之上会有自己的 Log Format,会把数据分一部分到消息队列里面,去保证实时场景的业务诉求。
Q7:计算层面的统一,未来会考虑吗?
A7:计算层更多考虑站在计算引擎侧角度。我个人理解 Flink 引擎社区一直有在考虑可以用 Flink 去更好地支持批的场景,我觉得在未来可能还是有希望用一个 Flink 就能覆盖所有的批处理诉求。实际上,有很多公司也在做这些尝试,据我所知华为内部也在尝试,他们内部在用 Flink 去做这种事情。
Q8:Trino 目前只支持读 Arctic,后面会支持写 Arctic 吗?
A8:据我了解,现在好像没有提出过这种规划。如果感兴趣可以在我们 Arctic GitHub 上面讨论,或者可以分享一下你们的一些场景。
Q9:Arctic 里面单独有一份和 Hive 一样的数据吗?
A9:不是的,刚开始把 Hive 表升级上来的时候,Arctic Base 表会索引 Hive 表相同的数据,随着后面升级上来之后,会有一些数据是通过 Arctic 里面写进去,但是通过 Optimizer 合并文件后,它也是能兼容 Hive 去读去查询这张表的。
关于 Arctic 的更多资讯可查看:
【GitHub】地址:https://github.com/NetEase/arctic
【Arctic】文档地址:https://arctic.netease.com/ch/
今天的分享就到这里,谢谢大家。
▌2023亚马逊云科技中国峰会
2023年6月27-28日9:00-17:00,2023亚马逊云科技中国峰会将在上海世博中心举办。
本次峰会将会分享数百个技术话题与最佳实践,覆盖汽车、制造、金融、医疗与生命科学、电商、游戏、泛娱乐、电信、教育、数字化营销等领域。
下面给大家预告一些精彩议题 报名参会,请点击"下方链接"。
2023年亚马逊云科技中国峰会 - 因构建_而可见
|---------------------|-------------------------------|
| 大数据方向议题 | 算法方向议题 |
| 下一代"智能湖仓"架构演进 | 玩转Stable Diffusion模型的微调与提示词工程 |
| 数据合规与云上安全架构构建实践 | 智能搜索技术在金融行业的应用 |
| 敏捷数据分析架构详解 | 基于开源LLM模型如何快速构建类ChatGPT应用? |
| 云原生数据库最佳实践 | 大语言模型(LLM)驱动的AIGC应用架构解密 |
| 智慧医疗: 本地化与全球化精选案例合集 | 生成式AI在游戏行业的应用 |
| 技术人员如何抓住风口获取成功? | AIGC在互联网行业与传统行业的应用与创新案例 |