流批一体在 AI 核心电商领域的探索与实践
作者: 祝海峰@阿里
摘要: 本文整理自阿里巴巴高级技术专家祝海峰,在 Flink Forward Asia 2022 流批一体专场的分享。本篇内容主要分为四个部分:
- 背景及平台历程
- 平台架构及实践
- 平台优化
- 未来规划
01 背景及平台历程
搜索、推荐、广告是电商领域非常重要的导购场景,而这些引擎的原始数据,比如商品、卖家的事务数据、算法数据,用户点击日志等都分布在不同的存储系统里,这就需要有一个离线系统将这些数据聚合到一张大宽表上,再提供给引擎使用。
这样的离线系统,它有两个典型的特征:
• 所有的历史数据都需要经过批处理导入到引擎里。
• 实时数据需要更快的速度流入引擎里。
我们以前开发这样的离线系统,会面临以下若干痛点。总的来说,流批两次开发和数据口径问题较难解决。需要了解计算引擎和存储系统,运维复杂、性能调优的门槛较高,需要有专业的大数据团队进行业务开发。
为了降低离线开发和运维的门槛,减少业务接入的成本和提高业务迭代的效率,我们从 2016 年开始研发和建设搜索、广告、推荐离线平台,简称 SARO 平台。
这个平台是从开发到运维的一站式平台,用户可以通过拖拉拽 UI 方式开发,没有大数据背景的人也能使用。平台屏蔽背后的大数据技术,进一步降低用户运维的门槛。在开发上,从数据源到引擎,一个 ETL 流程,一次开发流批一体,平台管理背后的作业依赖和存储的对接。
上图列举了平台上支持的部分业务。到目前为止,平台拥有千级应用规模,日管理万级作业量,PB 级日处理数据量,百万级增量 TPS,秒级增量延时,连续六年成功支持双十一。
这里是平台的一个发展历程,从 2016 年开始到现在一共经历了五个发展阶段。
简单来说,从最初的 MR、自研的 iStream 流式框架多套引擎,之后逐步通过 Blink 替换里面的部分作业,到现在已经全面统一在一套 Flink 引擎上。在流批开发的 API 上,也紧跟 Blink 的发展步伐,从最初批流作业都跑在 DataStream 上,到后来的 Table API,再到现在全部统一到了 SQL 上。统一的计算引擎和开发 API 可以降低平台自身的开发和运维成本。
而存储的发展相对简单,最初是使用 HBase,现在我们使用的是阿里云的 Hologres 实时数仓。
02 平台架构及实践
上图中间框里的是平台的技术架构,之上是平台承载的业务,下面是平台的技术底座。平台通过 ETL 开发 Console 前端,提供用户数据源开发、发布管理、部署管理、运维管理等开发运维管理能力。而这些能力则通过 Console 下面的 ETL Manager Service 来提供。
ETL Manager Service 主要负责 ETL 的生命周期管理。在它的下面一共有两个部分:
• ETL Executor,它主要负责 ETL 的具体执行。在我们的实践中,我们通过 Airflow 调度和 Flink 来共同完成整个 ETL 的过程。
• Catalog,它主要负责维表存储、其他存储的创建、Meta 管理。
此外我们还有两个模块,分别是用户自定义插件代码的编译管理模块和数据血缘模块。
下面以最简单的一个用户开发案例,来给大家讲一讲平台在流批一体上的具体实践。上图是用户开发的 ETL 处理流程图,它由两部分组成,分别是商品维度的数据和卖家维度的数据。
商品维度的数据,主要有商品表、商品图片表、算法模型数据,这三张表通过字段改名等业务处理,最终 Merge 到商品维表上。需要额外说明的是,一个商品有多张图片,所以我们需要有个 Udaf 把它聚合到商品维度上。
卖家维度的数据,主要有卖家表和对应的算法模型数据表,同样会 Merge 到卖家维表上。之后商品维表和卖家维表之间会进行 DimJoin,再经过 Udtf 插件处理,最终输出给引擎。
我们把用户画的这个图叫做 Business Graph,图上的所有输入和输出都是存储表,中间是业务处理算子。我们把业务处理算子分成两部分,一部分是数据聚合的处理算子,这里重点列举了 Merge 维表聚合、DimJoin 维表 Join,一对多的 Udaf。另外一部分是业务逻辑处理算子,这里重点列举了 Udtf 插件。
前面讲过所有的输入输出都是存储表,我们进一步对存储表抽象成一个动态表。动态表由一个全量的批表和一个增量的流表组成,它们共用一份 Schema 语义。前面用户画的图就可以既表达批处理过程,也可以表达流处理过程,一次开发流批一体。
用户画的那个图,在平台背后是如何变成数据处理过程的呢?
首先有一组批流任务,将商品维度的表同步到商品维表上。同样有一组批流任务,把卖家维度的表同步到卖家维表上。在两张维表都同步完成后,我们会有一组批流任务,将这两个维表进行 DimJoin,并产出给引擎。
此外,我们还有个流任务,它会去查询卖家到商品的索引表,然后将卖家的变更触发到商品上,这样卖家的变化就能实时反映到引擎里。
总的来说可以分为三个阶段,分别是同步、Join、索引表查询触发,前面两个阶段里都有批流任务,究竟是先跑批任务再起流任务,还是批流一直起,这取决于维表存储的设计和选型。
前面提到了数据处理过程,其实维表存储的设计也至关重要。我们以商品维表为例,商品表和商品图片表有数据库的 Binlog 可以保持实时同步,但算法模型数据是 T+1 的,每天都需要变更。所以我们根据这个特性就把这两类表分成了两张表,此外我们还需要一张索引表,将卖家的变更转换到商品上。如此一来,一个维表上其实有三类表。
对于维表的读写特性,批次写入的时候我们希望不要对实时链路产生影响,因而希望它具备 Bulkload 的能力。批次读的时候,我们提到里面有多张表,那么就需要在 scan 多张表的同时有更高效的 Join 能力。
对于流的写入,增量在维表上大部分情况是部分 Schema 字段写入,因而需要 Upsert 的能力。对于维表的 Join,我们需要更高效的 KV 点查能力以及索引查询能力。
前面提过增量总是更新维表上面的部分字段,其他字段的补全目前我们是在计算阶段去完成。假如维表存储有 CDC 的能力,我们可以直接消费维表的 Binlog 而不用做额外补全计算。此外我们还可以基于 CDC 做更多的优化,后面会有介绍我们基于 CDC 上做的优化。
一般的数据处理过程,首先会起一个批任务,在批任务完成之后,再起流任务去回追。如果批任务的时间较长,会对下游增量回追造成较大的压力。假如维表存储有 MVCC 特性,批流任务可以同时启动,没有任何回追,对下游引擎来说也更友好。
此外,我们会经常加减字段,所以维表存储也需要有 Alter Schema 的能力。
上图是用户图变成具体的数据处理过程的步骤,最左边是用户画的图,经过校验、解析、优化等步骤得到优化后的图。其中一步优化是分析维表究竟有几张表,有没有索引表,索引字段是谁。
优化后的图经过 Flow Generate,这个阶段主要根据前面说的数据处理模型的几个阶段里的批流任务的依赖关系,绘制出的一个依赖关系图 Flow Graph。Flow Graph 再经过 Flow Code Translate 翻译成具体调度的代码。在我们实践中,我们使用的是 Airflow 调度,所以我们会翻译成 Airflow Python Code,之后由 Flow Runner 把调度给启动起来。在 Airflow 的节点里可能是一个作业节点,也可能是一些其他的预处理节点。
上图是 Airflow 上一个作业节点 Python 代码的案例,我们主要看黄色高亮的部分。
首先这是一个增量同步的 Executor,这个 Executor 主要负责拉起增量同步作业。下面 snapshot 是指用户画的图的具体协议,再下面是这个增量同步作业有关的子图。这个 Executor 拿到用户画的图的协议之后,再根据子图就可以启动对应的作业。
上图是 Executor 拉起一个作业的具体步骤。最左边的仍然是用户化的图,和前面一样,它经过一系列的优化,得到了优化后的图。
然后再经过 Job Generate,这个过程主要是根据子图的信息,还原出子图,最后转换出 Job Graph。这个 Job Graph 经过 Job Code Translate 会翻译成具体作业的代码。在我们的实践中,我们主要翻译成 Flink SQL。之后我们会将这个 SQL 提交给 Job Runner,让它运行起来。值得一提的是,Job Code Translate 和 Job Runner 都是插件化定制的,这么做是为了方便我们这么多年来计算引擎的持续升级和我们核心模型的解耦。
上图是一组批流 SQL 的案例,我们只看黄色高亮的部分。左边是 MySQL,右边是 drc,它是阿里数据库 Binlog 的中间件。中间批流处理的过程基本是一致的,字段也是一致的,到了最后写出的时候,他们会写到同一张维表上,有所不同的是流式任务还会写到消息队列上。
03 平台优化
还是以商品维表为例,这里的 V1、V2 代表的是全量版本,以 V1 为今天的全量,V2 为明天的全量为例。前面提到商品有多张表,一张是保持数据库实时同步的表,还有一张是 T+1 表。那么我们基于这个特性把作业也分成两部分,第一部分是左上角的商品表、商品图像表的批流任务,第二部分是上面中间框里的算法模型数据的批任务,左下角还有一组批流任务,完成这两张表的聚合。
当开始明天 V2 版本全量的时候,因为商品表和商品图像表通过 Binlog 一直保持同步,所以这部分的全量同步我们可以优化掉。只需要对明天的算法模型数据,再进行一个 V2 版本的批任务同步,写到明天 V2 版本的 T+1 表里。同样,我们在明天起一组批流任务去消费 V1 版本的数据库实时同步表和 V2 版本的 T+1 表,提供给引擎使用。
这样的优化对用户是无感知的,我们会去比较 V1 和 V2 两个版本用户图的变化。如果数据库部分没有任何变化,我们会自动进行优化执行。此外,V2 版本的数据在引擎服务上线以后,我们才会将 V1 版本的 T+1 数据和 V1 版的部分增量作业进行下线。对用户来说,版本的切换他是感知不到的,他感觉到的就是 24 小时不间断的增量。
上图主要是针对大型业务的优化。在这个例子中,业务的全量流程到了 T2 时刻完成,引擎才开始索引构建和服务上线。但留给引擎的时间只有 T2 到 24 点之间,数据量非常大的时候,引擎也有可能完成不了索引构建。而且在晚高峰时,在线任务可能会对离线任务进行压制,更容易出现长尾。一旦出现失败的情况,当天可能就无法完成在线引擎的新索引上线。
我们对数据进行了分析之后,发现少量的数据在搜索引擎里占据着 80%以上的曝光,大量的数据只占据 20%不到的曝光,符合一个二八原则的规律。所以我们据此将所有的数据分成冷热两部分。我们先起一个批任务,将热数据进行处理。在这个例子中,到 T1 时刻热数据处理完成,那么引擎就可以从原来的 T2 提前到 T1 时刻将热数据和昨天的冷数据合并在一起,开始做索引的构建和后续流程。在 T1 时刻之后,我们还会起一个有限流任务,继续处理剩余的冷数据。同时我们还会起一个增量任务,将所有的热数据和冷数据的实时变更同步到引擎里。
这样一个优化上线以后,我们有大型业务的全量时间从七小时缩短到一小时,大大提前了索引切换上线的时间。以前会出现的 24 小时内无法完成索引构建的情况不再出现,而且在失败之后,离线和引擎都有足够的时间去再次重跑。
一条增量消息来的时候,并不是所有字段都有变化,也就意味着部分计算是可以被裁剪的。我们基于 CDC 做了一个计算剪枝的优化,下面我以一个最简单的作业为例,来给大家讲讲这个优化。
首先右边框里的这个作业有一张 a、b 两个字段的源表,经过一个 UDTF1 处理,新增了一个 d 字段,再经过一个 DimJoin 读进一个 c 字段,再经过一个 UDTF2,新增一个 e 字段,这样就有 a、b、c、d、e 五个字段写出到结果表里。
用户 UDTF 的开发是基于平台提供的插件框架来进行,用户可以在插件框架之上开发多字段进和多字段出的逻辑处理单元,我们叫做 Processer。一个 UDTF 里有多个 Processer,它们之间通过字段依赖形成字段和逻辑处理的血缘关系图。
为了简化案例,在我这个例子中,UDTF1 和 UDTF2 只有一个 Processer。那么每一个 Processer 如何判断自己是否可以被裁剪呢?这就需要有一个运行时的 Plan。左边就是这个 Plan 生成的过程,最左边是一个 JobGraph,就是我刚刚说的这个作业。
这个 JobGraph 首先经过 Transform,这个阶段主要是去解析用户插件代码,提取出字段和逻辑处理的血缘图,之后再经过一个 Weld,它主要是将这些小图焊接成大图之后再经过一个编译,得到一个裸的 Plan。这个 Plan 经过编码表的组装和编码过程,最终得到一份编码后的 Plan。编码的 Plan 里主要以 bitset 的方式组织,这么做是为了在运行时有更好的查询性能。每一个 Processor 都去加载自己所需要的编码后的 Plan,当 CDC 消息来的时候,这个 Processor 就可以查询这个 Plan,来决定自己是否要被裁剪掉。
来看一个例子,假如有一条 CDC 消息变更字段是 a,那么 P1 和 P2 会被执行,J1 会被裁剪掉。假如这个时候是 b 字段的变更,P1、P2 和 J1 都会被执行,没有任何人被裁剪。如果是 c 的变更,P2 则会被裁剪。假如还有一条消息 f 字段变更,它并不是我们选择的字段。当我们收到 CDC 消息时,我们在源头上就会去查 Plan,发现 f 并不是我们 Plan 里的字段,那么这条消息就会被丢弃掉,从而裁剪此类的消息。
这个优化上线以后,我们双十一大促淘宝部分表 DimJoin 有 60%的裁剪率,日常裁剪率在 30%,节省资源 40%。
前面主要跟大家介绍的是系统层面的优化,下面我选择一部分作业层面的优化给大家介绍一下。
• 第一个是预测执行。在离线混布环境中,因为不同机型的差异以及在线对离线任务的压制,离线任务经常会出现长尾或者跑不完的情况。所以我们在去年 7 月份的时候,在 Blink 上开发了预测执行的功能,在 75%的 Task 完成之后,我们会去找出长尾的 Task,再开启一个 Task 进行双跑,最后选择先跑完的数据作为最终的数据。
这个优化上线以后,我们有业务全量作业时长缩短了 50%,以前不太能跑的出来的作业,现在也能稳定产出。并且在去年 12 月份,我们团队的同学也共同参与了把预测执行功能回馈社区的开发。
• 第二个是维表内多张表的 Join。前面提到一个维表是由多张表组成的,这些表的数据其实是根据 Key 经过排序的。我们基于这个特性,在 Scan Connector 里做了一个优化,边读取这两张表的数据并进行 Local Join。
• 第三个是异步化。我们在维表读取和写入的 Connector 内部做了一些异步读写的优化。
• 第四个是引擎消息去重。我们对发往引擎的消息做了一个 Udaf 处理,比较之前的消息,如果和之前的消息没有任何变化,我们就会不发往引擎,如果有变化,我们就计算出变化的字段,以提升引擎索引的效率。这个优化上线以后,我们有的业务消息去重率达到 50%以上。
在资源类型上,我们的 JobManager 和 Stream 作业都跑在在线资源上,批作业跑在离线资源上。
在离线混部环境里面,不同机型有较大的性能差异,且整体集群规模较大,调度压力也较大,此外还有一些单机的问题,因而大型任务 Pod 比较多的情况下,它的启动时间会很久,所以我们在上面做了一些优化,调整到合适的 numberOfSlot、使用 Chain 等,使用大规格的 Pod,可以减少 Pod 的申请量,来实现启动提速。此外,我们对重要业务进行的资源预留,以保证它更高效的启动速度。
在 FO 代价上,我们对资源释放保留一段时间,这样可以在 FO 时使用原有的资源,实现更高效的 FO。对于不同优先级的作业,我们会分不同的 fo-cost,这样在集群的碎片整理和作业驱逐的时候,就可以更有针对性。
在机器热点上,解决长尾问题的预测执行前面已经讲过,这里就不再重复了。
04 未来规划
目前我们平台上面有大量增量极低的长尾业务,我们希望以多租户的方式支持这部分业务,所以后面会在多租户方面做一些探索,Flink Session Mode 可能是技术候选之一。另一方面,平台管理着万级作业,为了进一步提升整体的资源使用效率,我们还会在弹性上做一些探索。此外在恶劣的在离线混部环境稳定的运行我们的 Flink 作业也是将来的方向之一。