Apache Flink 在蔚来汽车的应用
摘要:本文整理自蔚来汽车大数据专家,架构师吴江在 Flink Forward Asia 2021 行业实践专场的演讲。主要内容包括:
一、 实时计算在蔚来的发展历程
- 18 年 5 月份左右,我们开始接触实时计算的概念,最初是用 Spark Streaming 做一些简单的流式计算数据的处理;
- 19 年 9 月份我们引入了 Flink,通过命令行的方式进行提交,包括管理整个作业的生命周期;
- 到了 21 年 1 月份,我们上线了实时计算平台 1.0,目前正在进行 2.0 版本的开发。
二、实时计算平台
在实时计算平台 1.0,我们是通过将代码进行编译,然后上传 jar 包到一个服务器上,以命令行的方式进行提交。这个过程中存在很多问题:
- 首先,所有流程都是手动的,非常繁琐而且容易出错;
- 其次,缺乏监控,Flink 本身内置了很多监控,但是没有一个自动的方式将它们加上去,还是需要手动地去做配置;
- 此外,任务的维护也非常麻烦,一些不太熟悉的开发人员进行操作很容易出现问题,而且出现问题之后也难以排查。
实时计算平台 1.0 的生命周期如上图。任务写完之后打成 jar 包进行上传提交,后续的开启任务、停止、恢复和监控都能够自动进行。
作业管理主要负责作业的创建、运行、停止、恢复和更新。日志主要记录 Flink 任务提交时的一些日志,如果是运行时的日志还是要通过 Yarn 集群里的 log 来查看,稍微有点麻烦。关于监控和告警模块,首先 metrics 监控主要是利用 Flink 内置的指标上传到 Prometheus,然后配置各种监控的界面,告警也是利用 Prometheus 的一些指标进行规则的设置,然后进行告警的设置。Yarn 负责整体集群资源的管理。
上图是实时计算平台 1.0 的界面,整体功能比较简单。
上图是实时计算平台 2.0。相对于 1.0,最大的区别是蓝色的部分。对于实时计算平台的形态,可能并没有一个统一的标准,它与每个公司本身的情况息息相关,比如公司本身的体量和规模、公司对实时计算平台的资源投入等,最终还是应该以适用于公司本身的现状为最佳标准。
2.0 版本我们增加从开发到测试两个阶段功能的支持。简单介绍一下它们的具体功能:
- FlinkSQL:它是很多公司的实时计算平台都支持的功能,它的优点在于可以降低使用成本,也比较简单易用。
- 空间管理:不同的部门和不同的组可以在自己的空间里进行作业的创建、管理。有了空间的概念之后,我们可以利用它做一些权限的控制,比如只能在自己有权限的空间里进行一些操作。
- UDF 管理:使用了 FlinkSQL 的前提下,就可以基于 SQL 的语义用 UDF 的方式扩充功能。此外,UDF 还能用于 Java 和 Schema 任务,可以把一些公用的功能包装成 UDF,降低开发成本。它还有一个很重要的功能就是调试,可以简化原有的调试流程,做到用户无感知。
实时计算平台 2.0 的实现,带给我们最大的影响就是减轻了数据团队的负担。在我们原先的开发流程里,经常需要数据团队的介入,但实际上其中的很大一部分工作都是比较简单的,比如数据同步或数据的简单处理,这类工作并不一定需要数据团队去介入。
我们只需要把实时计算平台做得更完善、易用和简单,其他的团队就可以使用 FlinkSQL 去做上述简单的工作,理想的情况下他们甚至不需要知道 Flink 的相关概念就可以做一些 Flink 的开发。比如后台人员做业务侧开发的时候,对于一些比较简单的场景就不需要依赖数据团队,大大降低沟通成本,进度会更快。这样在部门内有一个闭环会更好一点。而且以这样的方式,各个角色其实都会觉得比较开心。产品经理的工作也会变得更轻松,在需求的阶段不需要引入太多的团队,工作量也会变少。
所以,这是一个以技术的方式来优化组织流程的很好的例子。
三、实时看板
实时看板是一个比较常见的功能,在我们的具体实现中,主要发现了以下几个难点:
- 第一,数据延迟上报。比如业务数据库发生问题后,进行 CDC 接入的时候就需要中断,包括后续写到 Kafka,如果 Kafka 集群负载很高或 Kafka 发生问题,也会中断一段时间,这些都会造成数据的延迟。上述延迟在理论上可以避免,但实际上很难完全避免。此外还有一些理论上就不能完全避免的延迟,比如用户的流量或信号有问题导致操作日志无法实时上传。
- 第二,流批一体。主要在于历史数据和实时数据能否统一。
- 第三,维度的实时选择。实时看板可能需要灵活选择多个维度值,比如想先看北京的活跃用户数,再看上海的活跃用户数,最后看北京 + 上海的活跃用户数,这个维度是根据需要可以灵活选择的。
- 第四,指标的验证。指标的验证在离线的情况下,相对来说比较简单一些,比如说可以做一些数据分布,看看每个分布的大概情况,也可以通过 ODS 层数据的计算与中间表进行比对,做交叉验证。但是在实时的情况下就比较麻烦,因为实时处理是一直在进行的,有些情况很难去复现,此外也很难进行指标范围或分布的验证。
实时看板一般存在两个方面的需求:
- 首先是时延方面,不同的场景对时延的要求是不同的,比如有些场景下能够接受数据延迟 1-2 分钟到达,但有的场景下只允许延迟几秒钟。不同场景下实践的技术方案复杂度不一样。
- 其次,需要兼顾实时与历史看板的功能。有些场景下,除了需要看实时的数据变化,还需要对比着历史数据来一起分析。
实时与历史数据应该进行统一的存储,否则可能会存在很多问题。首先,实现的时候表结构比较复杂,查询的时候可能需要判断哪段时间是历史数据,哪段时间是实时数据,然后对它们进行拼接,会导致查询的实现成本过高。其次,在历史数据进行切换的时候也很容易出现问题,比如每天凌晨定时刷新历史数据,此时如果历史任务发生延迟或错误,很容易导致查出来的数据是错误的。
我们内部对实时看板的延时性要求比较高,一般要求在秒级以内,因为我们希望大屏幕上的数字是时刻在跳动和变化的。传统的方案一般是采用拉的方式,比如说每秒查一次数据库,实现的难度比较大,因为一个页面会包含很多指标,需要同时发送很多接口去查询数据,想让所有数据都在一秒钟之内返回是不太可能的。另外,如果很多用户同时进行查询,会导致负载很高,时效性更难以保证。
所以我们采取了推的方式,上图是具体实现的架构图,主要分为三层。第一层是数据层即 Kafka 的实时数仓,通过 Flink 对这些数据进行处理后将它们实时地推到后台,后台再实时地把它们推到前端。后台与前端的交互是通过 web socket 来实现的,这样就可以做到所有的数据都是实时推送。
在这个需求场景下,有一些功能会比较复杂。
举个简单例子,比如统计实时去重人数 UV,其中一个维度是城市,一个用户可能对应多个城市,选择上海和北京两个城市的 UV 数,就意味着要把上海和北京的人放到一起进行去重,算出来去重的实时 UV 数据,这是一件比较麻烦的事情。从离线的角度来看,选多个维度是非常简单的,把维度选好之后直接取出数据进行聚合即可。但是在实时场景下,要在哪些维度进行聚合是提前指定好的。
第一个方案是,在 Flink 状态中存储所有 user ID 和出现过的维度,并直接计算所有可能的维度组合 UV,然后将更新过的 UV 推送给前端。
但这种方式会增加很多计算成本,而且会导致维度爆炸,从而导致存储成本也急剧增加。
第二种方案的架构图如上。我们把 sink 作为一个流式的核心,把端到端整体作为一个流式应用,比如把数据的接入、在 Flink 中数据的处理计算、再到后台、通过 web socket 推给前端这一整体作为一个应用来考虑。
我们会在 Flink 里面存储每个用户所有的维度值,后台的 Flink 推送的用户具体情况也会存在每个城市下 user ID 的 list 里。如果用户已经出现过,那么在 Flink 阶段就不会把变更推送到前端和后台;如果用户没出现过,或者用户出现过但城市没出现过,那就会把用户与城市的组合推送给后台,保证后台可以拿到每个城市下用户 ID 去重的 list。
前端选择维度之后,可以对后台不同维度的 user ID 进行增量的订阅。这里有两个点需要注意:
- 第一是在前端刚打开在选择纬度的时候,有一个初始化的过程,它会从后台读取所选维度的全量用户 ID 来做一个合集,然后计算 UV 人数。
- 在第二个阶段新的用户 ID 到达之后,会通过 Flink 推送给后台,而后台只会推送增量 ID 给前端,然后前端因为已经保存了之前的合集,对于增量的 ID,它就可以直接用 O(1) 的时间去算出新的合集,并且计算出它的 UV 人数。
可能有人会问,在这个方案下,用户太多怎么办?前端会不会占用太多的资源?
首先,从目前我们的实际使用场景来看,这个方案是够用的,如果以后 ID 数激增,用 bitmap 也是一种选择,但只用 bitmap 也不足以解决问题。因为不同公司用户 ID 的生成规则不一样,有些是自增 ID,有些是非自增 ID 或者甚至都不是一个数值,那就需要做映射,如果是一个离散的数值也需要额外做一些处理。
第一种方案把 ID 从 1 开始重新编码,使它变得比较小且连续。目前大部分场景下大家可能都是用 RoaringBitMap,它的特点是如果 ID 非常稀疏,它在实际存储的时候会使用一个 list 来存,而不是用 bitmap 来存,也就无法达到减少占用内存的目的。所以要尽量让 ID 的空间变小,让 ID 的值比较连续。
但这样还不够,如果 ID 是之前没出现过的,就需要给它重新分配一个 ID,但是处理这些数据的时候,Flink task 的并行度可能大于 1,这个时候多个节点同时消费数据的话,它们可能都会遇到同样的新 ID,如何给这个 ID 分配对应的新的映射的小 ID?
举个例子,一个节点查询之后需要生成一个新 ID,同时又要保证其他节点不会再生成相同的 ID,可以通过在新 ID 上做唯一索引来保证,把索引创建成功就生成了新 ID,失败的节点可以进行重试操作,去取现在的 ID mapping,因为刚才已经有其他节点生成这个 ID 了,所以它在重试取 mapping 阶段一定会成功。
除此之外,还需要考虑一种场景,比如用户注册完成后,马上产生一些行为,而用户注册与一些业务模块的行为表可能是由不同业务部门开发,也可能会存在不同的数据库、不同的表里面,甚至是不同类型的数据库,上述情况的接入方式也会不一样,可能会导致虽然是先注册,但是注册数据流可能会稍微晚于行为数据流到达,这会不会导致出现什么问题?
目前看来是不会的,只需要行为数据流与新用户注册数据流共享一个 ID mapping 即可。
综上,一个好的架构,即使面对数据量激增的情况,也是不需要在架构层面进行大改的,只需要在细节上进行重新设计。
第二个问题是前端会不会有很大的计算负载?
答案是:不会。虽然人数的去重是由前端来做,但只有前端第一次加载的时候才需要将用户全量拉取,之后的增量 user ID 都会直接用 O(1) 的方式加入到目前的集合里,所以前端的计算负担是很低的,整个过程完全是流式的。
第三个问题是实时报表同时访问的用户数很多怎么办?
从目前的架构上来看,对 Flink 和后台侧基本没有影响,唯一的影响就是如果有很多用户同时访问,他们的页面需要同时与后台建立 web socket 连接。但是因为实时报表主要还是内部使用,不会对外,所以同时的访问量不会太多。
而且我们把数据 ID 去重的一部分职责放在前端,即使有多个用户同时访问,计算职责也会分摊到不同的用户浏览器里面去,实际上也不会有过多负载。
四、CDP
CDP 是一个运营平台,负责偏后台的工作。我们的 CDP 需要存储一些数据,比如属性的数据存在 ES 里、行为的明细数据包括统计数据存在 Doris 里、任务执行情况存在 TiDB。也存在一些实时场景的应用。
第一个是属性需要实时更新,否则可能造成运营效果不佳。第二个是行为的聚合数据有时候也需要实时更新。
五、实时数仓
实时数仓重点考量点有以下几个:
- 元信息管理,包括 Catalog 的管理。
- 分层,如何进行合理的分层。
- 建模,实时数仓应该如何建模,它与离线数仓的建模方式有什么区别?
- 时效性,时延越低越好,链路越短越好。
上图是我们目前的实时数仓架构图。它整体上与离线数仓非常相似,也是有一个原始层、DWD 层、DWS 层和 Application 层。
不同之处在于它有一个维度层 (DIM 层),里面有很多不同的存储介质,维度信息可以放在 TiDB,并通过 AIO 的方式访问维度表;也可以放在 Hive,用 Temporal Join 的方式去进行关联;有一些数据是一直在变化的,或者需要做一些基于时间的关联,可以把数据放到 Kafka 里,然后用 Broadcast 或者 Temporal Join 去进行关联。
左侧是我们正在规划中的能力。
- 第一个是血缘关系,它对于问题的溯源,以及对改动的影响的评估是有帮助的;
- 第二个是元信息管理,我们希望把所有数据都表化,在进行数据处理的时候可以直接用 SQL 搞定;
- 第三个是权限管理,对于不同的数据源、不同的表,都是需要做权限管理的;
- 第四个是数据质量,如何进行数据质量的保证。
下面是对这些未来规划的具体阐述。
第一,Catalog 管理,这个功能目前暂未开发。我们希望为所有数据源创建一个表,不管里面的数据是维表还是其他表,是存在 MySQL 还是存在 Kafka,创建表之后都可以将这些细节屏蔽,通过 SQL 的方式就能轻松使用它。
第二,合理的分层。分层会对实时数仓造成多方面的影响。
- 首先,分层越多,时延越大。实时数仓是否需要这么多分层,值得深思。
- 其次,实时数据的质量监控会比离线数据更复杂,因为它是在不停地进行处理,分层越多,越难以发现问题、定位问题并进行回溯或复现,包括数据集成的分布也不易监控。
- 最后,如何进行合理的分层。肯定需要尽可能减少层数,并且进行合理的业务功能垂直划分,如果不同业务之间的交集很少,就尽量在各自业务领域内建立自己单独的分层。
第三,建模。这是离线数仓非常重要的部分,因为离线数仓非常大的一部分用户是分析师,他们日常工作就是用 SQL 进行数据的查询和分析,这个时候就必须要考虑到易用性,比如大家都喜欢大宽表,所有相关字段都放到一个表里。所以在离线数仓建模和设计表结构的时候,就需要尽量把一些可能用到的维度都加上。
而实时数仓面对的更多的是开发者,所以更强调实用性。因为在实时数仓的需求下,宽表里每增加一个字段都会增加时延,特别是维度的增加。所以说实时数仓的场景维表和建模更适合按实际需求来做。
第四,时效性。实时数仓本身还是需要有 raw 层,但是时效性比较高的场景,比如要同步一些线上的数据,这个数据最后同步快充也是线上的业务使用,要尽量减少链路,减少时延。比如可以用一些 Flink CDC 的方式减少中间层,这样不单减少了整体的链路和时延,链路节点减少也意味着问题发生的概率变小。对于时延要求没有那么高的内部分析场景,尽量选择使用实时数仓,可以减少数据的冗余。
六、其他应用场景
其他的使用场景还包括 CQRS 类应用。比如业务部门的功能更多的是考虑增删改查或者是传统数据库操作,但后续还是会存在数据分析的场景,这个时候用业务库去做分析是一个不太正确的方法,因为业务库的设计本来就没有考虑分析,更适合使用分析的 OLAP 引擎来做这项工作。这样也就把业务部门要负责的工作和数据部门负责的工作分割开来,大家各司其职。
此外还有指标的监控和异常检测。比如对各种指标通过 Flink 进行实时的检测,它会 load 一个机器学习模型,然后实时检测指标的变化是否符合预期,和预期的差距有多大,还可以设置一个区域值来进行指标的异常检测。
实时数据的场景越来越多,大家对实时数据的需求也越来越多,所以未来我们会继续进行实时数据方面的探索。我们在流批一体的实时和离线存储统一上已经有了一些产出,我们也会在这方面投入更多精力,包括 Flink CDC 是否真的可以减少链路,提高响应效率,也是我们会去考虑的问题。