Fork me on GitHub

数禾科技|杨涵冰:特征平台在数禾的建设与应用

图片

分享嘉宾:杨涵冰 数禾科技 数据开发专家
编辑整理:曹睿 中国人寿研发中心
出品平台:DataFunTalk

导读: 本次分享的主题是特征平台在数禾的建设与应用,主要从以下四个方面介绍:

  • 特征平台概览
  • 特征存储服务
  • 流批一体方案
  • 模型策略调用方案

01 特征平台概览

图片

首先是特征平台的概览, 整个特征平台分成四层,分别是数据服务、存储服务、计算引擎、原始存储 。数据服务层提供向外的服务,主要包括四种,一是传统的API点查,二是圈选查询,三是事件消息,四是同步调用计算。其中同步调用计算服务是即时计算的,相当于现场进行策略运算,而API点查服务是预先计算并存储的。为了提供数据服务,提供特征行存和特征列存两种服务方式,分别支撑API点查和圈选查询。计算引擎有两个,分别是离线运算引擎和流批一体运算引擎。特征平台的最底层是原始存储,原始存储是为了支持离线运算功能,而事件存储是为了支持流批一体运算。

图片

下面以MySQL为例介绍简化的特征平台数据流转过程。

首先是离线部分,通过Sqoop或者其他的抽取工具将MySQL数库的数据抽取到EMR,然后经过Hive运算,把最终的运算结果存到HBase和ClickHouse中,分别对应特征行存和特征列存,以提供API点查和圈选查询服务。同时MySQL的Binlog会实时写入Kafka,然后Kafka的数据会被消费进入Flink流批一体运算引擎,同时Kafka的数据也会被消费进入到事件存储HBase,事件存储HBase的数据也会提供给Flink流批一体运算引擎。经过引擎计算以后,数据被写入HBase和ClickHouse中,此外还会发事件消息。中转到事件存储HBase的数据可以提供实时调用服务。

02 特征存储服务

接下来介绍特征存储服务。

我们将特征分为四类,分别是:

  • 同步特征 :实时写入、离线修正、流批一体。
  • 即时计算特征 :API调用时运算、线下批量计算,逻辑一致。
  • 实时特征 :传统实时链路,实现复杂实时逻辑,一般可使用流批一体代替。
  • 离线特征 :传统离线链路,实现复杂离线逻辑。

为什么一定要有离线的链路,原因有以下几点:

一是实时链路是一个纯增量的链路,它的链路会很长,并且在任意的环节都有可能发生问题,一旦出错,数据将会在很长的一段时间都无法被自动修正。

二是实时链路对时效性有要求,特别是涉及到多流join的时候,一旦有延迟,需要尽快返回一个降级结果。为了控制实时特征的最终错误率,并且将错误限制在一个较小的时间段内,需要进行离线链路修正。特征存储服务会用两种方式来进行修正,一种就是同步特征,其本身自带修正,是流批一体的链路;其他特征一般是通过实时+离线+即时计算的组合方式。

图片

下面以MySQL为例,介绍下存储服务的整体数据流。离线部分,通过Sqoop抽取工具将MySQL数库的数据抽取到EMR,然后经过Hive运算,把最终的运算结果存到HBase和ClickHouse中。同时Binlog会实时写入Kafka,然后Kafka的数据会被消费进入Flink流批一体运算引擎,经过引擎计算以后,数据被写入HBase和ClickHouse中。HBase和ClickHouse提供API点查和圈选查询服务。

1. 实时特征数据流

图片

在实时特征的数据流中,MySQL通过Binlog写入Kafka,以及其他埋点类的Kafka数据,经过运算以后将结果写入另外一个Kafka,最后消费数据写入HBase和ClickHouse。

2. 离线特征数据流

图片

在离线特征数据流中,MySQL通过Sqoop,OSS通过Spark或者其他方式抽取,Kafka通过Flume抽取进入EMR,然后用Hive或者Spark运算,同时写进HBase和ClickHouse。

3. 同步特征数据流

图片

在同步特征数据流中,MySQL的Binlog会写进实时的Kafka,然后Kafka的数据会被实时写入事件存储,同时MySQL也会离线修正和初始化。Flink 同时做流处理和批处理,写进HBase和ClickHouse。

4. 即时计算特征数据流

图片

在即时计算特征数据流中,依托于HBase和ClickHouse的数据,提供API点查和圈选查询服务。

图片

以上就是整个存储服务的介绍,该部分内容涉及到特征存储服务的大部分,如图中橙色部分所示。

03 流批一体方案

在只提供了特征存储服务的时间里,我们发现了很多问题以及一些业务诉求。首先是一些 问题

  • 在对现有模型策略精耕细作之前,还有没有什么数据没有被使用?比如说MySQL里面状态变化的时间点数据。
  • 输入项离线逻辑是否已经足够完整,为什么实时输入项需要重新梳理与补充逻辑?离线输入项想要编程实时的,需要重新梳理逻辑,有些甚至过于复杂,以至于用传统的方式无法完成实时转换。
  • 不确定使用场景,无法区分点查和跑批,能不能同时覆盖?对于很多业务人员来说,并不知道想要的模型和策略最终需要用跑批还是点查,有没有什么办法能同时满足这两种需求。
  • 流式处理逻辑难以理解,为什么要流Join,不能直接“取数”吗?对于模型开发人员,他们不了解流处理过程,因此实时特征的制作难以下沉到模型开发人员。
  • 实时模型策略空跑测试需要很长时间,能不能缩短?
  • 模型策略开发训练很快,上线开发实时输入项却需要很久,能不能加速?

对于这些问题,我们提出了一些 方案

  • 【数据】 存储状态变化数据,支持还原任意时刻的数据切片状态。这样做还有一个额外的好处,通过流批一体方案进行模型训练的时候不会有特征穿越的问题,因为没有办法拿到未来的数。
  • 【逻辑】 流批一体,以流为主,逻辑一致,无需验证口径。训练的时候用这份数据作为训练,上线及回测时也是用相同的数据,可以保证最终的结果一致。
  • 【执 行】 流、批、调用一体化,自适应不同场景。
  • 【开发】 使用“取数”而不是流合并,封装实时流特有概念,降低实时开发门槛。
  • 【测试】 支持任意时间段回溯测试,增加实时开发测试速度。
  • 【上线】 自助式的流批一体模型开发上线,减少沟通环节,增加上线效率。

图片

传统的实时流方案有Lambda和Kappa两种

Lambda提供了实时和离线的两套逻辑,最终在数据库中将两者合并起来。Lambda的优点是架构简单,很好地结合了离线批处理和实时流处理的优点,稳定且实时计算成本可控,并且离线数据易于订正;缺点是实时、离线数据很难保持⼀致结果,并且需要维护两套系统。

1. 流批一体方案

Kappa则是全部都用实时的逻辑,将历史的数据存下来,每次得到一个切片数据,最后合并起来。Kappa的优点是只需要维护实时处理模块,可以通过消息重放,无需离线实时数据合并;缺点是强依赖消息中间件缓存能力,实时数据处理时存在丢失数据,这个缺点在金融领域是不能容忍的。

图片

由于Kappa在抛弃了离线处理模块的同时也抛弃了离线计算更加可靠稳定的特点,而Lambda虽然保证了离线计算的稳定,但是双系统的维护成本非常高,并且两套代码的运维非常复杂。

因此, 我们提出了Lambda+Kappa的流批一体方案 。如图所示,数据流转的前半部分是Lambda架构,其中心是一个HBase的事件存储;后半部分是Kappa架构,供用户完成流处理和批处理。

图片

上图以MySQL为例展示了整体流批一体方案。 首先是MySQL的Binlog进入Kafka,同时通过离线修正以及切片把数据送到事件中心,同时用相同的Kafka完成实时流的触发,然后事件中心会提供数据获取及离线跑批服务。最后由元数据中心统一管理数据,统一维护数据,以避免同步的问题。Flink 提供整个逻辑服务。

2. 事件中心

图中的事件中心使用Lambda架构存储所有变化数据,每日修正,通过冷热混存与重加热机制追求最佳性价比。此外,我们参考Flink增加水印机制,确保当前值同步完成。最后,事件中心提供消息的转发机制以及异步转同步的的机制,以“取数”代替流Join,消息转发机制,异步转同步。支持触发——消息接收及触发——轮询式调用,并同时赋予该接口回溯的能力。

图片

下面介绍事件中心的村塾数据流,如图所示,MySQL、Kafka等多个数据源通过不同的路径转发到Kafka,然后Flink直接消费Kafka,并会实时的写入HBase热存。此外,离线修正的数据通过EMR也写入HBase热存。另有一套Replica机制完成HBase热存和HBase冷存之间的复制。HBase冷存的数据也会通过重新加热进入到HBase热存中。

图片

整个事件中心的存储结构如图所示,冷存里面只放主体数据,热存里面除了主题数据以外,还有三个表用来做不同的index工作热存一般TTL为32天,有特殊的情况也可以调整。

图片

事件中心的读取数据流中,实时触发是走是Kafka,回溯和取数都是走HBase热存,内部的重加热机制完成HBase冷存到HBase热存数据的更新,这部分逻辑对于开发人员是透明的,开发人员不需要关注数据来自于哪里。

下面介绍事件中心的水印机制与流Join。

假设我们要对两个流进行Join,也可以简单理解为有两张表,通过某外键进行关联。当任何一张表发生变更时我们都需要至少触发一次最终的完整的Join 后的记录。我们将两个流分别记录为A和B,并且假设A流先到。那么在打开事件中心水印机制的情况下,A流触发时,A 流的当前事件已经被记录在事件中心中。此时分为两种情况:

①在事件中心中可以取到B流的相关数据,那么说明在A流当前事件记录进事件中心到运行至读取B流相关数据的时间段内,B流已经完成了事件中心的记录,此时的数据已经完整。

②在事件中心中无法取到B流的相关数据,那么由于事件中心水印机制,说明此时B流相关事件尚未触发。而由于A流当前事件已经被写入事件中心,那么当B流相关事件被触发时,一定能获得A流的当前事件数据,此时数据也是完整的。由此,通过事件中心水印机制,即可确保用“取数”取代流Join后至少会有一次拥有完整数据的计算。

图片

触发消息接收通过消息转发完成。当外部系统发起请求后,会去转发Kafka,然后Kafka的数据同时会进入事件中心,接下来触发相应的计算,最后去用消息队列发送计算结果,外部系统接收这个消息的结果。

图片

同时也提供轮询式的服务,同样也是消息转发,前面与消息接收机制都一样,只是多了一个事件中心,重新存储计算结果,然后提供服务。

3. 取数一致

在取数的时候还有另外一个问题,那就是不一定能拿到最新的数据,除非直接从元数据库获取数据,但这种操作一般是被禁止的,因为会给主库带来压力。为了保证数据的一致性,我们采取了一些措施。首先我们将取数一致性分为四个级别,分别是:

图片

①最终一致 :经过一段时间后能访问到更新的数据,整个流批一体方案默认保证最终一致。

图片

②触发流强一致(可延迟) :保障触发流中的当前数据及早于当前数据的数据在对触发流的取数过程中能获取到。使用水印方案,水印不满足时进行延迟。

图片

③取数强一致(可延迟) :保障取数时早于用户提出的时间要求的数据均能获取到。使用水印方案,水印不满足时进行延迟。

图片

④取数强一致(无延迟) :保障取数时早于用户提出的时间要求的数据均能获取到。当水印不满足时直接从数据源增量补足,增量取数会对数据源带来压力。

4. 流批一体作业

我们使用PyFlink实现流批一体作业,使用python是因为模型和策略开发人员更加熟悉Python语言,而Flink保证了逻辑一致性。基于PyFlink,我们封装了复杂的触发逻辑、复杂的取数逻辑,并能够复用代码片段。

图片

PyFlink的代码组织结构如图所示,包含出发、主逻辑、输出三部分。这三部分可以不用自己实现,只需要选择已经封装好的输出。

图片

Flink整体数据流也简单,最上面是触发逻辑,然后触发主逻辑,主逻辑里面会有取数逻辑去完成取数,最后是输出的逻辑。在这里,触发逻辑、取数逻辑和输出逻辑的底层封装是随流批变化自适应的,所以可以同时确保输入和输出不变,逻辑本身在绝大多数情况下是不需要考虑流批环境变化。

下面介绍一个PyFlink典型的使用流程,首先选择触发流,编写取数及预处理逻辑,可引入已发布的取数或处理逻辑代码,设置取样逻辑并试运行,获取试运行结果,在分析平台中进一步分析与训练。训练结束想要发布模型时,可在作业中选择训练完成的模型,如有需要可以设置初始化相关参数。最后是模型发布上线。

04 模型策略调用方案

我们提供了四种调用方案:

①特征存储服务方案 。Flink作业进行预运算,将运算结果写入特征存储服务平台,通过该数据服务平台对外服务。

②接口触发——轮询方案 。调用并轮询事件中心消息转发接口,直到Flink作业返回运算结果。

③接口触发——消息接收方案 。调用事件中心消息转发接口触发Flink作业运算,接收Flink作业返回的运算结果消息。

④直接消息接收方案 。直接接收Flink作业返回的运算结果消息。

1. 特征存储服务方案

图片

特征存储服务分为三种情况,分别是实时、离线修正和离线初始化。当有新的变量上线或者老的变量发生逻辑变化,需要对全量的数据进行一次刷新,这时候需要离线初始化。实时流是实时触发的,离线修正和离线初始化都是批量触发。如果有取数逻辑则从HBase里面取数,当然取数的过程中实时和离线的作业肯定不一样,但是开发人员不用关注,因为已经封装好了。实时的Flink作业结果会发到Kafka,离线修正和离线初始化的结果都会进EMR,最后写进特征存储,也就是HBase和Clickhouse。

图片

上图展示了特征存储服务方案的时序,Kafka触发了Flink,然后Flink运算完写如特征存储。那么在刚触发的时候,如果有外部调用,是无法获取到最新的数据的,必须要等到运算完成写入存储以后才能获取到更新的数据。

2. 接口触发——轮询方案

图片

在接口触发——轮询方案中,触发调用会触发到消息转发,转发给Kafka,然后Flink将运算的结果吐入Kafka。如果这个时候没有超过单次请求的时间,就会直接返回,这个时候触发轮询就退化成单词调用了。反之,则会继续进入事件存储HBase,通过轮询调用获取结果。

图片

接口触发——轮询方案的时序图如上图所示,当有一个外部调用触发以后,会有一个消息转发触发了Flink运算。在Flink运算及写入数据库的过程中,会有多次轮询,如果在固定的时间还没有办法获取到,则会提示超时;如果下一次轮询的时候,数据已经写入了,则获取成功。

3. 接口触发——消息接收方案

图片

接口触发——消息接收方案是对轮训的简化。如果业务系统支持支持消息接收,那么整个链路变得比较简单,只需要通过消息转发服务触发计算,然后监听结果消息就可以了。

图片

接口触发——消息接收方案的时序是串行的。触发了以后,进行Flink运算,运算完成以后把结果数据通过消息接收机制传输给调用方。

4. 直接消息接收方案

图片

直接消息接收方案就是纯流式的,通过Kafka触发Flink计算,计算完数据传入消息队列,然后等对方订阅接收就可以了。整个时序也是非常的简单,如下图所示。

图片

图片

我们把数据的使用情况分成三种,分别是即时调用、实时流、离线批数据,他们的时效性是依次递减的。我们通过事件中心把这三种情况注册到一起,最后只要通过事件中心作为中转给Flink提供相关的数据,对于Flink来说,不用关心到底是通过哪种方式来调用。

最后,总结下流、批、调用一体化的四种方案:

①特征存储服务方案 :通过特征存储服务,提供持久化的特征存储。提供API点查与特征圈选服务。

②接口触发——轮询方案 :通过事件中心的消息转发与消息查询服务,提供同步调用计算服务。

③接口触发——消息接收方案 :通过事件中心的消息转发服务,提供事件消息服务。

④直接消息接收方案: 支持复杂事件触发,提供事件消息服务。

分享嘉宾

图片


本文地址:https://www.6aiq.com/article/1658931359284
本文版权归作者和AIQ共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出