Flink 在众安保险金融业务的应用
郭育波@众安保险 稿
一、 整体概况
上图是我们的实时计算整体架构图,最下层是数据源层,包括了来自于应用系统的业务数据、应用系统的消息数据、用户行为埋点数据以及应用日志数据,这些数据都会经过 Flink 进入实时数仓。
实时数仓分为三层:
- 第一层是 ODS 层,数据经过 Flink 到 ODS 层后会关联一张原始表,这个表是和数据源一一对应的,然后会有一个视图表对原始数据进行简单的清洗加工;
- 之后,数据经过 Flink 下发到 DWD 层,DWD 层是基于主题域进行划分的,我们现在划分为用户数据域、营销数据域、信贷数据域和保险数据域等;
- 另外还有一部分是 DIM 层,包含用户相关、产品相关和渠道相关等维表数据,DIM 层的数据会保存到 HBase 中。
经过 DWD 层的数据清洗之后,数据下发到 DWS 层,DWS 层会对数据进行整合汇总,一般会有指标宽表和多维明细宽表。之后这些数据会进入 ADS 层,这一层包含多样的 OLAP 数据存储引擎。我们现在主要使用 ClickHouse 作为大盘实时报表的存储引擎,还有 HBase 和阿里云的 TableStore 为用户标签和特征工程提供数据存储服务,还有 ES 主要用于实时监控场景。
上图是我们的实时计算平台架构图,整个实时计算平台可以分为三个部分。第一部分是任务管理后台,在任务管理模块里面编辑和提交任务,任务编辑器同时支持 Flink SQL 和 Flink JAR 任务,提供了比较便利的 Flink SQL 编辑功能和调试功能,也支持多种任务启动策略,比如基于 checkpoint、offset、时间点和最早位置等,还支持定时和即时生成 checkpoint 功能。任务提交之后,会通过 Flink 客户端将它提交到我们自建的 CDH 集群里。任务管理服务也会定时从 Yarn 获取任务的实时状态。
监控方面,Flink 会把指标日志数据推送到 PushGateway,Prometheus 获取 PushGateway 这些指标之后会在 Grafana 进行数据的可视化展示。除了对任务异常的状态监控之外,我们还会对资源使用率、消息积压等多种情况进行实时告警。此外 Flink 还支持了比较多的 connector,比如阿里云的 ODPS、TableStore 和 Hologres,也内置了丰富的 UDF 并且支持用户自定义 UDF。
上图是我们实时计算平台的任务编辑器,可以看到它支持 Flink SQL 和 Flink JAR 任务,SQL 任务支持 DML 和 DDL,它们可以一起在编辑器里面进行整体任务提交,任务管理这块同时也支持每一次变更的版本管理。此外,还支持比较多的高级任务配置功能,有 checkpoint 配置、消息 Kafka 的并行度和状态管理等。
二、智能营销应用
接下来,重点介绍一下 Flink 在智能营销应用场景的使用情况。
营销平台的架构图的最下层也是数据源层,包括金融业务数据、保险业务数据、用户行为数据、第三方平台的数据和运营结果数据。离线数据通过 ETL 的方式进入离线数仓,实时数据通过 Flink 的方式进入实时数仓。
实时离线数仓之上是标签服务层,平台有对离线/实时的标签管理功能,同时我们也会对这些标签进行治理管控,比如数据权限的管控,此外,还有标签数据的监控,能够及时发现标签数据的异常,准确掌握标签使用情况的分析统计。
标签层之上是标签应用层,我们有营销 AB 实验室和流量 AB 实验室,它们之间的差异在于,营销 AB 主要居于客群进行营销,无论是基于规则进行客群圈选的静态客群还是通过 Flink 接入的实时客群,都会对这些客群进行流程化的营销和智能的触达。而流量 AB 实验室也是基于标签的数据服务能力,用于 APP 端千人千面的个性化推荐。平台还提供了客群画像的分析功能,可以快速找到相似客群和客群的历史营销的数据效果情况,能够更好地协助运营对于客群的甄选和营销。
通过营销 AB 和流量 AB 实验之后,会有一个效果分析服务来进行实时效果回收,通过效果分析可以及时辅助运营团队进行快速的策略调整。
目前,标签总数已经达到 500 个以上,营销任务执行数量每天会有 200 万左右,流量 AB 每天会有 2000 万以上的调用量,主要是给前端提供了资源位的个性化显示和千人千面的业务场景。
上图是智能营销平台的数据流图,左边是数据源,有来自于业务系统的业务数据,也有埋点、事件数据,这些数据经过 Kafka 到达实时数仓,经过实时数仓的加工后,一部分会变成实时标签,被保存在阿里云的 TableStore,还有一部分会被加工成实时客群,同样也会发到 Kafka,然后由营销 AB 实验室对这些实时客群进行智能的营销触达。
另一部分离线数仓加工出来的标签数据,我们使用 DataX 作为 ETL 的工具,将它们同步到 Hologres,Hologres 能够无缝对接 ODPS,并利用其和 ODPS 关联外表的加速能力,实现了百万级别每秒的数据同步。运营人员可以在营销平台自助的进行客群圈选,利用 Hologres 的交互分析能力,可以支持复杂的客群的秒级生成。
整个营销平台的特征可以总结为三点:
- 第一,实时画像。通过定制标准化的实时事件、数据结构,利用 Flink 实时计算的能力,实现自动化的实时标签接入;
- 第二,支持比较智能的营销策略。可以让用户直接在营销平台上进行组件化的营销流程的配置,提供丰富的时间策略,还有各种智能的营销通道,同时也支持灵活的、多分支的业务流转,使用一致性哈希分流算法进行用户的 AB 实验;
- 第三,实时分析。对营销成效进行实时分析,我们也是使用 Flink 实现实时效果回收。通过漏斗的分析和业务指标的成效分析能力,能够更好地赋能给营销业务。
三、实时特征应用
特征工程主要服务于金融风控场景,比如决策引擎、反欺诈、风控模型服务等。特征工程主要的目的是将原始数据转换为更好的表述问题本质的过程。使用这些特征可以提高我们对一些不可见事物预测的精度,金融业务场景就是使用这个特征来提高对用户风险的识别能力。
特征工程是整个数据挖掘模型里最耗时也最重要的一步,它为金融业务全流程的风控提供了核心的数据支撑,主要分为三个部分:
- 首先是特征挖掘,主要由风控策略和模型开发的团队来完成,他们会根据业务指标进行数据的分析处理,然后再提取出有效的合规的特征;
- 当特征挖掘出来之后会给到开发团队,特征开发团队根据这个特征的来源会对接不同的数据源,有些是来自三方的,有些是离线加工出来的,还有实时加工的,当然还有一些机器学习模型进行再次加工计算出来的特征;
- 开发好的特征会通过特征中台提供给线上的业务使用,同时也要保障整个特征链路的稳定性。
特征工程目前使用的 Flink 实时任务有一百个以上,产生了一万个以上的特征数量,每天会有 3000 万以上的特征调用。
金融风控特征的核心指标,最重要的是合规。所有的特征都是居于合规之上,之外还需要保证特征加工的准确性、特征数字的实时性、特征计算的快速响应,还有整个平台运行的高可用和稳定性。
基于这样的指标要求,我们采用了 Flink 作为实时计算引擎,使用 HBase 和阿里云的 TableStore 作为高性能的存储引擎,然后通过微服务化的架构实现整体的服务化和平台化。
特征平台的架构图总体可以分为 5 大部分。上游系统有前台系统、决策系统和保护系统。业务方所有的请求都会经过特征网关,特征网关会根据特征的源数据进行链路编排,有些要调用三方数据,人行征信数据,还有一些来自数据集市的数据。数据接入之后就会进入特征数据的加工层,里面有对三方数据的特征加工服务,也有对金融实时特征数据的计算;还有一些反欺诈的特征计算服务,其中包含关系图谱以及一些名单特征的服务。
有些基础的特征通过这一层加工之后,就可以提供给上游的业务系统使用了,还有一些需要经过特征组合服务进行再次加工。我们通过一个低代码编辑器来实现特征的组合服务和风控模型服务,通过机器学习平台来进行特征的重新加工。
基础服务层主要是做特征的后台管理和实时监控。实时特征需要依赖实时计算平台,离线特征依赖离线调度平台。
总结来说,特征平台是以微服务化构建的一个特征服务体系,通过接入三方数据、征信数据、内部数据、实时数据、离线数据进行特征加工和服务,组合成的一套特征计算的风控数据产品。
上图可以很清晰地看到实时金融特征数据的流向。数据主要来源于业务数据库,有前台、中台等各个业务系统,通过 binlog 的方式发送到 Kafka,数据中间件 blcs 能够把 binlog 转换到 Kafka。用户行为的数据直接发送到 Kafka,经过 Flink 进入到实时数仓,经过实时数仓的数据计算之后,会把多维明细数据写入到 TableStore。
最早我们使用的是 HBase,后面出于稳定性的考虑,我们使用了 TableStore 进行了一个技术升级。最后考虑到特征服务对稳定性的要求比较高,我们还是保留了两个存储,HBase 作为降级存储来进行使用。
因为金融特征是要求能够描述用户全生命周期的数据服务,所以不单要求实时的数据,还要求全量的离线数据。离线数据是通过 DataX 回流到 HDFS,再使用 Spark 的离线计算能力回流到在线存储引擎 TableStore 里。
现在,风控对于特征的加工越来越要求精细化了,比如支用金额这样简单的一个特征计算,就可能会要求包含半个小时、近 3 个小时、近 6 个小时、近一天、7 天、15 天、30 天等各种业务的窗口。如果使用实时计算会产生非常多的窗口,而且全量数据的计算也会造成Flink吞吐能力下降。所以我们的实时任务主要是做数据的清洗和简单的整合,之后还是会把这些明细数据回流到存储引擎,然后通过应用系统的特征计算引擎进行配置化的特征加工。
风控特征的场景还是比较固定的,基本上是从用户身份证、用户ID或者用户手机号等维度来进行计算,所以我们就抽象了一套用户实体关系关联表,包含身份证、手机号、设备指纹等用户 ID 的 mapping 关系表,业务数据使用 userID 进行维表关联存储,通过实体关系加业务数据两个维度来进行用户明细数据的查询。得益于 TableStore 提供的高性能点查能力,我们可以处理高并发的特征计算。有些特征不单使用到了实时数据,还会调用业务系统的接口来获取数据,需要实时数据,接口数据进行聚合计算来完成,这样导致无法在 Flink 中完成所有的特征计算。所以 Flink 只是进行明细数据的加工和聚合,再由特征计算引擎来实现特征结果的计算。
现在我们的实时特征计算主要是居于实时数仓 DWD 的数据结合特征计算引擎实现的,DWD 的数据会回流到阿里云的 Tablestore,然后通过配置化的方式实现特征的加工计算。为了节约查询成本,我们的计算粒度都是居于特征组的维度,一个特征组只会查询一次数据源,特征组和特征是一对多的关系。
这里简单描述下特征的计算过程:首先会根据特征的查询条件把相关的明细数据都扫描出来,然后根据特征组下的具体的特征配置比如时间粒度,维度使用自定义的统计函数进行特征计算,而如果是多个数据源需要 join 来计算的话,先把依赖的特征因子计算完成后,再完成下一步的特征计算。此外如果我们的自定义函数不能满足计算的需求,系统也提供了居于 Groovy 脚本进行特征加工的方式。另外还有部分的特征来源源是来自业务系统的接口,这样只需要把第一步数据获取从查询 Tablestore 切换到调用接口即可,如果有其他的特征数据源也可以通过实现标准数据接口就可以完成,特征的计算引擎不需要做任何调整。
四、反欺诈应用
上图是实时反欺诈特征应用的数据流图,它和金融实时特征服务的数据流图有些类似的一面,但也存在一些差异。这里的数据源除了会使用业务数据外,更关注的是用户行为数据和用户设备的数据。当然这些设备数据和行为数据都是在用户许可的前提下进行采集。这些数据经过 Kafka之后,也会进入 Flink 进行处理。反欺诈的数据主要是用一个图数据库来存储用户关系数据,对于需要历史数据的复杂特征计算,我们会在 Flink 里面用 bitmap 作为状态存储,结合 timerService 进行数据清理,使用 Redis 进行特征计算结果存储。
GPS 的反欺诈特征是使用 TableStore 的多元索引和 lbs 函数的能力来进行位置识别的特征计算。反欺诈的关系图谱和关系社群会通过数据可视化的能力来提供给反欺诈人员进行个案调查。
我们把反欺诈特征归为 4 大类:
- 第一类是位置识别类型,主要是基于用户的位置信息,加上 GeoHash 的算法,实现位置集聚特征的数据计算。举个例子,我们通过位置集聚特征,发现了一些可疑用户,然后再通过反欺诈调查查看这些用户的人脸识别的照片,发现了他们的背景很相似,都是在同一家公司进行业务申请。所有我们就可以结合位置类的特征,加上图像识别的 AI 能力来更精准地定位类似的欺诈行为;
- 第二类是设备关联类,主要是通过关系图谱来实现。通过获取同一个设备的关联用户的情况,可以比较快速地定位到一些羊毛党和简单的欺诈行为;
- 第三类是图谱关系,比如用户的登录、注册、自用、授信等场景,我们会实时抓取用户在这些场景的一些设备指纹、手机号、联系人等信息,来构造关系图谱的邻边关系。然后通过这样的邻边关系和用户关联的节点度数判断是否关联到一些黑灰名单用户来进行风险的识别;
- 第四类是基于社群发现算法实现的统计类的社群特征,通过判断社群的大小、社群里面这用户行为的表现,来提炼统计类的规则特征。
上文提到比较多的关系图谱特征都是用图计算引擎 (NebulaGraph) 来进行存储的。我们测试过比较常用的是 janusgraph 和 orientdb,但是当数据量达到了一定数量级以上之后,就会出现一些不稳定的因素和情况,所以我们尝试使用了图计算引擎 ,发现它的稳定性相对来说比较高,因为它采用的是 shard-nothing 的分布式引擎存储,能够支持万亿级别的大规模的图的计算。它主要分为三大部分来进行组合服务:
- graph 服务,主要负责图实时计算;
- meta 服务,主要负责数据管理、schema 的操作和用户权限等等;
- storage 服务,主要负责数据存储。
同时 Nubula 还采用了计算存储分离的架构,无论是计算层还是存储层都可以独立进行克隆,同时它还支持传递计算,减少了数据的搬迁。无论是 meta 层还是 storage 层,都是通过 raft 协议来实现数据的最终一致性。
另外 NebulaGraph 也提供了比较丰富的客户端的接入方式,支持 Java\Go\Python 等客户端,同时也提供了 Flink connector 和 Spark connector,能够很容易地和现在主流的大计算引擎集成。
关系图谱的实现路径分为 4 部分:首先是图的数据源。要想构建比较有价值的关系图谱,一定要找到准确丰富的数据进行图建模。我们的数据源主要来自用户数据,比如手机号、身份证、设备信息、联系人等相关数据都会同步到关系图谱里面,除了实时数据,这里也会通过离线 Spark 任务清洗历史数据。NebulaGraph 提供的查询语言支持了丰富的图函数,比如相邻边、最大路径、最短路径等。社群发现我们是通过 Spark Graph-X 来实现的,最后通过 API 的方式提供数据服务,进行图数据库的应用,我们现在有直接提供给决策引擎来进行图数据特征的服务,也有对反欺诈的一些数据服务,甚至之后可以考虑用于营销的基于社群的推荐算法。
五、后期规划
未来,首先我们要夯实我们的实时计算平台,实现实时数据的血缘关系的管理,还有尝试 Flink + K8s 的方式实现资源的动态扩缩容。
其次,我们希望能够基于 Flink + NubelaGraph 进行图谱平台化的建设,目前实时计算和离线计算是 Lambda 架构实现的,所以我们想借 Flink + Hologres 实现流批一体来尝试解决这个问题。
最后,我们会尝试在风控的反欺诈业务场景使用 Flink ML 来实现在线机器学习,提升模型开发效率,快速的实现模型的迭代,赋能智能实时风控。