FlinkSQL的字段血缘解决方案
导读: 随着大数据的进一步发展,对数据血缘解析有着很大需求,数据血缘(data lineage)是数据治理(data governance)的重要组成部分,也是元数据管理、数据质量管理的有力工具。通俗地讲,数据血缘就是数据在产生、加工、流转到最终消费过程中形成的有层次的、可溯源的联系。成熟的数据血缘系统可以帮助开发者快速定位问题,以及追踪数据的更改,确定上下游的影响等等。
因此,数据血缘是组织内使数据发挥价值的重要基础能力,也是数据资产的重要组成部分。当前随着以 Flink 为代表的实时数仓的兴起,迫切需要一种解决 FlinkSQL 字段级别血缘的方法。本文就来简要介绍一种在实时数仓中基于 Apache Calcite 解析 FlinkSQL 字段级血缘的方法。
文章将围绕下面七个部分展开:
-
Apache Calcite 简介
-
FlinkSQL 执行流程
-
FlinkSQL 字段血缘解析思路
-
核心源码阐述
-
Insert、Join 解析案例
-
扩展 Calcite 支持 Lookup Join、UDTF 解析案例
-
解析血缘字段的转换关系
分享嘉宾|白松(笔名:HamaWhite) 数澜科技 研发中心副总经理,数据中台产品研发负责人
编辑整理|曹文武 中科云谷
出品社区|DataFun
01/Apache Calcite 简介
首先来简单介绍下 Apache Calcite。
1. Apache Calcite 简介
Apache Calcite 是一款开源的动态数据管理框架,它提供了标准的 SQL 语言、多种查询优化和连接各种数据源的能力,但不包括数据存储、处理数据的算法和存储元数据的存储库。
Calcite 采用的是业界大数据查询框架的一种通用思路,它的目标是"one size fits all",希望能为不同计算平台和数据源提供统一的查询引擎。Calcite 作为一个强大的 SQL 计算引擎,Flink 内部的 SQL 引擎模块也是基于 Calcite。
Calcite 工作流程如下图所示,一般分为 Parser、Validator、Converter 和 Optimizer 四个阶段。
第一阶段是 Parser,Parser 之后会生成 SqlNode,经过 Validator 和 Converter 生成 RelNode,再经过优化器生成计划。
2. Calcite RelNode 介绍
在 CalciteSQL 解析中,Parser 解析后生成的 SqlNode 语法树,经过 Validator 校验后在 Converter 阶段会把 SqlNode 抽象成语法树(AST)转为关系运算符树(RelNode Tree)。
上图示例的 SQL 来自官网,两张表进行 join 会生成 SqlNode 语法树,经过Converter把 SqlNode 转为关系运算符树。本文的血缘解析会基于 RelNode 来进行。
3. 组件版本
--
02/FlinkSQL 执行流程
根据源码整理出 FlinkSQL 的执行流程主要分为五个阶段:
1. Parse 阶段(语法分析),使用 JavaCC 把 SQL 转换成抽象语法树(AST),在 Calcite 中用 SqlNode 来表示。
2. Validate 阶段(语法校验),根据元数据信息进行语法验证,例如查询的表、字段、函数是否存在,会分别对 from、where、group by、having、select、order by 等子句进行 validate,验证后还是SqlNode构成的语法树 AST。
3. Convert 阶段(语义分析),根据 SqlNode 和元数据信息构成关系表达式RelNode树,也就是最初版本的逻辑计划。
4. Optimize 阶段(逻辑计划优化),优化器会基于规则进行等价变换,例如谓词下推、列裁剪等,最终得到最优的查询计划。
**5. Execute 阶段,**把逻辑查询计划翻译成物理执行计划,依次生成 StreamGraph、JobGraph,最终提交运行。
上图简述了从 FlinkSQL 到 Flink Job 是如何执行的。比如输入一条 Flink SQL,先由 CalciteParser 生成 SqlNode,这就是前文中提到的语法树。后面通过 validator 校验完成之后还是 SqlNode 类型,在这一步校验的时会去连 CatalogManager 做一些校验。本次实验用的是 Hive 的 MetaStore,当然也可以用 Memory 或者 JDBC 类型的 Catalog。生成 SqlNode 之后,会经过 Convert 阶段,这一部分就是生成RelNode 关系型表达式。
优化是在 FlinkChainedProgram 里面进行,包括 11 个步骤,比如子查询的重写,谓词下推或者重写逻辑计划等。这一步生成的是 Optimized Physical RelNode,即作优化后的物理计划。物理计划经过 Planner 的 translateToExecNodeGraph 翻译成 ExecNodeGraph,再经过后面 Transformation 等步骤,依次生成 StreamGraph 、JobGraph。最终通过 submit job,提交到 Flink 单机运行,或者是提交到集群去运行。
以上就是 FlinkSQL 的一个完整的执行流程。
--
03/FlinkSQL 字段血缘解析思路
FlinkSQL 字段血缘解析分为三个阶段:
-
对输入 SQL 进行 Parse、 Validate、Convert 生成关系表达式 RelNode 树。
-
在优化阶段,只生成到 Optimized Logical Plan,而非原本的 Optimized Physical Plan ,要修正 FlinkSQL 的执行流程。
-
针对上步骤优化生成的逻辑 RelNode,调用 RelMetaDataQuery 的 getColumnOrigins(RelNode rel,int column)查询原始字段信息。最后构造血缘关系,并返回结果。
这里再说明下第二阶段,前面 3 步跟 FlinkSQL 的执行流程是一样的,最后生成原始的 RelNode,本文在第四步进行裁剪,只要解析血缘到逻辑计划就停止,而没必要去生成物理计划,所以这里把后面三个步骤裁剪掉(上图中红色字体标注的)。这就是本文的核心设计思路。
**上图中对比原始的逻辑计划和优化后的逻辑计划。**例如有 ods_mysql_users 这和维表 dim_mysql_company 进行关联,原始的逻辑计划是先进行全表扫描,然后 join 之后再进行投影。经过优化之后,会把把查询下推,在 select 的阶段只要 查询相关的字段就好。
--
04/核心源码阐述
核心源码是根据前文讲的三个步骤,此处定义一个 parseFieldLineage 方法,输入是一段 sql,会输出字段血缘的解析结果。如果要把血缘集成到其他系统里,只要去调用这个方法就可以获取到字段的血缘关系。此方法主要包含三个阶段:
1.生成原始的 RelNode Tree,调用 parsestatement sql。
2.对原始的 RelNode 去生成优化后的逻辑计划。
3.调用 Calcite 提供的能力,获取它的血缘关系。
下面详细介绍这三个步骤。
(1)根据 SQL 生成 RelNode 树
第一步是 Parser 阶段,调用 tableEnv.getParser().parse(sql) 方法生成 operations,这里能获取到 RelNode。后面代码限制只能支持 insert 的血缘关系,后续会支持 CTAS 等语法。
(2)生成 Optimized Logical Plan
第二步是生成优化后的逻辑计划,根据Flink的源码可知共有 12 个阶段。根据上面讲的,不要最后三个步骤,只要优化后生成逻辑计划就好。
(3)查询原始字段并构造血缘
第三步,查询目的表(arget table)的所有字段的信息,对目的表的每个字段调用RelMetadataQuery.getColumnOrigins()方法,把优化后的 optRelNode 传给它,再把目的表当前字段的索引传给它,这样 Calcite 就能返回目的表这个字段对应的原始表字段信息。
通过上述介绍,大家会发现这种方法获取血缘关系是非常简单的,我们不用去解析用户输入的各种 SQL,核心的血缘关系会由 Calcite 自动获取。
--
05/Insert、Join解析案例
1. 新建测试表
这里新建了三张测试表,一个是 ods_mysql_users 这张表,connector 是映射为 mysql-cdc,包含 id、name、birthday、ts 和 proc_time 共五个字段;维表dim_mysql_company,包含user_id 和company_name两个字段;目的表dwd_hudi_users,是插入到 Hudi 里面,包含 id、name、company_name、 birthday、ts 和 partition 共六个字段。
2. 测试 Insert-Select
把 MySQL 的这张表直接插入到 Hudi 这张表,生成的血缘关系如上图右表所示。
3. 测试 Insert-Join
MySQL 的这张表和 MySQL 维表去做一个 join,分别插入 Hudi 表。可以看到source 表有两个,一个是 users 表,一个是 company 表。比如concat,是把 a 表的 name 字段和 b 表的 company_name 字段插入到目的表 name 字段。在血缘关系中可以看到准确的字段对应关系。
4.进一步改造
近期,github 项目上多位用户反馈,上述操作完成后不支持 lookup join、watermark、UDTF,以及 CEP 的血缘关系。这里举一个例子,如上图所示把 join 换成 lookup join, 即 join 的时候换成了 FOR SYSTEM_TIME AS OF a.proc_time AS b。会发现生成的血缘关系中没有维表 dim_mysql_company 的字段血缘关。
因此需要进一步改造。这里的改造不是对上述整个思路或者思想方法去做改造,而只是在上述的思路或者方法基础上,做一些补充,就能获取到这些血缘信息。
--
06/扩展 Calcite 支持 Lookup Join、UDTF 解析案例
1. Lookup join
先来看一下 lookup join。
针对于 lookup join,Parser 会把 SQL 语句'FOR SYSTEM_TIME AS OF',解析成SqlSnapshot(SqlNode),在 validate 阶段将其转换为 LocalSnapshot。
从上图中可以看到,先是对这张表做一个scan再转换为snapshot。那为什么没有解析血缘关系呢?是因为 calcite-core中RelMdColumnOrigins 这个 Handler 类里并没有处理 snapshot 类型的 RelNode 导致返回空,继而丢失 lookup join 字段的血缘关系。
我们要做的就是在 RelMdColumnOrigins 增加一个处理 snapshot 的 getColumnOrigins(Snapshot rel, RelMetadataQuery mq, int iOutputColumn) 方法。
这样就可以获取到 lookup join 的血缘关系。
2. UDTF
首先新建建 UDTF,继承 Flink 的 TableFunction 类,复写 eval 这个方法。我们这里加了一个注解,输出两个字段: word 和 length。比如某个表,经过 UDTF 处理,会得到一个类似于临时表,其中包含 word 和 length 两个字段。然后新建 my_split_udtf 函数。
定义好之后,测试 UDTF SQL。
这里用 UDTF 作用于 ods_mysql_users 表的 name 字段,生成临时表,和 ods_mysql_users 表去做 join,注意 length 和 word 这两个字段就来源于 UDTF 对应的临时表。
按照前面讲的血缘分析方法,拿到Relnode后先去获取它的血缘,与 lookup join 类似的思路,我们把优化后的逻辑细化打印出来,发现在 org.apache.calcite.rel.metadata.RelMdColumnOrigin 的 getColumnOrigins() 方法中,没有 Correlate 作为参数的方法,因此无法解析出 UDTF 字段的血缘关系。
由于比较复杂,这里就不列出具体代码。其核心思路是因为 LATERAL TABLE(my_split_udtf(name)) 生成的两个临时表的两个字段 word 和 length,本质上是来自于 ods_mysql_users 这张表的 name 字段,因此针对右边的 LATERAL TABLE 获取 UDTF 中的字段,再根据字段名获取左表信息和索引,最终获取的是左表的字段血缘关系。
3. 扩展支持其它语法
后续如果 Flink 新版本增加新的语法,只要通过自定义 RelMdColumnOrigins 类中的方法便能准确解析出字段血缘关系。
上图的表格中列出了一些 FlinkSQL,经过分析其优化后的逻辑计划,可以找到其 RelNode 子类型,只要针对这一子类型定义一个方法,返回此类型对应的源表和源字段即可。
--
07/解析血缘字段的转换关系
前文中已经解析出字段血缘关系,但只是生成了字段间的映射关系,但是不知道字段间的转换关系,比如两个字段经过怎样的加工生成了目标表的字段。解决这一问题的思路也是修改 Calcite 的源代码,继续修改 RelMdColumnOrigins 在获取血缘关系的时候,记录字段间的转换关系。下面通过三个例子来展示。
从上图的例子中可以看到,birthday 字段经过了 date_format 函数的处理,在血缘关系里面可以看到,在源表、字段、目的表、字段后面多了一个转换关系。
再展示一个例子,是自定义的一个函数 my_suffix_udf,可以看到血缘关系中就包括这一加工映射。
最后来看一个稍微复杂的例子。这里涉及到子查询,字段经过了多个步骤的加工,比如 ods_mysql_users.id 字段,第一步在子查询先经过 sum(id) 处理生成 sum_id,再经过 ABS(sum_id) 处理插入到 dwd_hudi_users.id 字段。此时会记录血缘上整个完整的转换关系 ABS(SUM(id))。
以上就是本次分享的内容,谢谢大家。