Fork me on GitHub

2021 年网易云音乐实时计算平台发展和挑战

以下文章来源于网易有数 ,作者大愚

摘要: 网易云音乐从 2018 年开始搭建实时计算平台,经过几年的发展已经渗透到云音乐的各个业务当中。本文是大愚老师的一篇实践分享,将从一个日常运维问题出发,带领大家了解云音乐实时计算平台的一些工作进展和未来规划。

网易云音乐实时数仓平台上线以后,经过一年半的发展,整体实时数仓已经初具规模,我们已有实时数仓表 300+,运行中的任务数有 1200+。其中 1000 左右的任务是 SQL 任务, Kafka 总出口流量达到到 18GB/S,总用户数达到了 200+。

数据量和用户的增长也给数据平台的易用性以及稳定性带来了了越来越多的挑战,包含 Kafka 的稳定性、集群的稳定性、运维工作的挑战以及很多早期的技术债;业务的增长,暴露出了基建的薄弱,也给我们积累了很多平台建设和运维的经验。

一、平台功能

我们平台整体的的功能大家可以参考《云音乐实时数仓技术改造以及未来的一些规划》,这里将主要介绍我们最新的一些工作:

“我的任务延迟了,怎么扩容都不行,这是为什么?”

在日常运维工作中这是我们经常遇到的问题,往往也是比较耗费时间的问题。导致这种这种问题的原因有很多,为了解决这个问题,我们做了一些工作来增强我们的运维能力。

1. IO 指标完善

IO 问题是导致以上问题经常出现的原因之一,包含消息读取效率、维表 JOIN 效率、SINK 效率等等,第三方存储的性能以及稳定性,直接影响实时任务的稳定性,为了快速定位相关问题,我们添加了很多 IO 相关 Metric 指标。

图片

■ 1.1 Kafka 消费侧的一些性能指标

图片

■ 1.2 读取反序列化指标

包含:

  • 反序列化的 RT;
  • 反序列化的错误比例。

在 Format 侧我们开发了一套 Format 代理,支持在不修改原有 Format 代码的情况下,上报相关 metirc 指标,忽略错误数据等功能。只要添加属性 format.proxy 指定代理类就可以支持不同方式的 Format 封装。

比如我们指定 format.proxy=magina,就可以支持上报上述的性能指标;指定 format.proxy=ds 就可以支持解析 ds 封装的日志格式,使用被代理的 Format 解析 DS 中的 Body 部分,不需要单独为 DS 封装的日志格式开发 Format,且同样会上报性能相关指标,支持忽略错误消息等功能。

■ 1.3 维表 JOIN 相关指标

在维表 JOIN 侧, 我们添加了:

  • 数据查询的响应时间;
  • 本地缓存的命中率;
  • 查询发生重试的比例;
  • 成功 JOIN 上的数据的比例等。

■ 1.4 数据写入的一些性能指标

  • 数据序列化的 RT;
  • 数据写入外部数据源的平均响应时间等。

整套 IO 相关指标的实现,我们全部是在 Flink Connector 的顶层接口做了一些公共的封装,重构了相关 Connector 的代码,只要按照我们自己的接口实现 Connector,无需关心细节指标的上报,这些指标都会自动化的上报出来。

2. Kafka 分区问题

Kafka 分区的限制也是经常导致我们程序性能无法扩展的原因,出于 Exactly Once 的实现、读取性能、以及读取稳定性的考虑,Flink 采用主动拉取的方式读取 Kafka 消息,这种方式限制了我们读取 Kafka 消息的任务数,大大限制我们任务性能的扩张能力,以下面这个 case 为例:

SET 'table.exec.state.ttl' = '1h';
SET 'table.exec.mini-batch.enabled' = 'true';
SET 'table.exec.mini-batch.allow-latency' = '10s';
SET 'table.exec.mini-batch.size' = '100000';
INSERT INTO music_kudu_online.music_kudu_internal.ads_ab_rtrs_user_metric_hour
SELECT 
from_unixtime(`timestamp`, 'yyyy-MM-dd') as dt,
from_unixtime(`timestamp`, 'HH')         as `hour`,
os, sceneid, parent_exp, `exp`, exp_type, userid,
count(1) pv
FROM iplay_ods.ods_rtrs_ab_log 
INNER JOIN abtest_online.abtest.abtest_sence_metric_relation
FOR SYSTEM_TIME AS OF user_metric.proctime
ON ods_rtrs_ab_log.sceneid = abtest_sence_metric_relation.sceneid 
GROUP BY from_unixtime(`timestamp`, 'yyyy-MM-dd'),  
         from_unixtime(`timestamp`, ‘HH’), 
         os, sceneid, parent_exp, `exp`, exp_type, userid

这是一个实时全聚合任务,在原始的 FLINK 中这段 SQL 执行的 DAG 大概是这样的:

图片

假如我们读取的流表 ods_rtrs_ab_log 有 5 个分区,我们的 SQL 任务有七个并发,因为受到 Kafka 分区数的影响,加上 FLINK 本身作业链的优化,我们的消息的读取、维表 JOIN、MINI BATCH 的操作全部受到了 Kafka 分区的影响,无法扩展,特别是对于维表 JOIN 这种 IO 操作来说,任务的并发度严重影响了整体程序的性能,这个时候我只能通过扩容 Kafka 的分区数来提升性能。

但是这种操作非常重,而且很有可能会影响其它读取这张流表的任务;为了解决这个问题,我们对 Kafka 的 Connector 做了一些改造,支持通过配置多添加一步 Shuffle 操作,比如在上面的配置当中我们添加了配置:

'connector.rebalance.keys' = 'sceneid,parent_exp,userid'

消息会在读取以后按照 sceneid,parent_exp,userid 等字段进行 hash 分片,这样大大提高了整体程序的性能扩展性,而且通过指定字段的 keyBy 操作,可以大大提高维表 JOIN 缓存的命中率,提高 MINI BATCH 的性能和效率。

图片

除了以上配置以外,我们还支持添加随机的 Rebalance 操作、Rescale 操作以及解析行为的拆解,来进一步提升整体程序性能的扩展,这里需要注意的是额外 Shuffle 操作,会带来更多线程和网络开销,在配置这些操作的同时需要同时关注机器的负载情况,添加额外的 Shuffle 操作虽然能提升程序的扩展性,但是由于额外网络和线程开销,如果机器本身性能不行的话,很有可能会适得其反,在相同的资源情况下性能变得更差,这点需要根据自己程序以及环境情况进行配置。

3. Kafka 使用优化

随着流量的飞速增长 Kafka 的稳定性也是我们面临的主要难题,包括 Kafka 的机柜带宽问题、跨机房带宽问题、Kafka 扩缩容的抖动问题、还有 Kafka 本身配置问题等等,基本上大家能遇到的问题我们都遇到了,为了解决以上问题我们做了以下工作:

■ 3.1 开发镜像服务,解决带宽问题,保障高优先级任务

图片

我们通过 FLINK 自己开发了一套镜像服务,在不同的机房模块间分别部署了一套 Kafka 集群,通过镜像服务同步两套 Kafak 集群的数据,主 Kafka 提供给比较重要 P0 级别的实时任务,其它不是特别重要的任务读取镜像集群的数据。

我们通过 Yarn Label 技术,通过不同队列的选择来控制任务所在的机房,来减少跨机房带宽的消耗,为了方便用户切换不同的 Kafka 集群,我们在 Flink 流表侧也做了一些改造,支持一张流表同时挂载多个 Kafka 集群,只要通过简单的配置就可以随意切换 Kafka 集群,经过一轮任务整理和切换,Kafka 带宽使用情况有了大大的改善:

图片

■ 3.2 Kafka 监控完善

在日常的工作中,我们发现很多开发对 Kafka 本身并不太了解,运维由于经验的不足在初期对整体 Kafka 的管控也不是那么的严格,导致在使用上有很多问题。所以我们整合了音乐内部的 Kafka 监控服务的数据,结合我们平台的任务血缘,开发了自己的一套 Kafka 监控服务。

目前这套系统整体还比较初级,除了关联了 Kafka、流表、和任务之间的关系以外,我们还对以下这几种情况做了主动监控:

  • Kafka Topic 的分区数的合理性,主要监控消息队列分区数过少或者过多的情况,主要是过少的情况,防止因为分区数过小,下游任务处理性能跟不上的问题;
  • Kafka 分区数据生产均衡问题:防止因为 Kafka 本身分区数据的不均衡导致下游任务处理性能不行的问题;
  • Kafka 分区数据消费均衡问题:防止因为 Kafka 本身分区发生变化,而下游任务因为没有开启分区感知,导致一些数据没有消费到等问题;
  • 流量激增和激降报警:关键队列流量报警,保障实时数据的质量。

Kafka 版本升级:为了解决本身 Kafka 扩容的稳定性问题、资源隔离问题,通过我们音乐公共技术团队,在 Kafka 2.X 版本基础上做了一些二次开发工作,将 Kafka 整个服务做了平台化的支持,支持了 Topic 的平滑扩所容,支持资源隔离。

类似 YARN 的 LAEBL 技术,支持针对不同的 TOPIC 划分不同 region 的机器,完善的消息镜像服务,且支持 offset 的复制;统一的 Kafka 运维监控平台,此部分内容后续文章会详细介绍。

■ 3.3 分区流表技术建设

实时数仓上线以后,我们发现以下几种情况非常影响程序的稳定性以及流表的易用性:

  • 很多时候我们只需要一张流表中 1% 的数据,但是因为没有办法按需读取,所以我们必须消耗大量的资源去解析读取另外 99% 的数据,导致了大量的资源带宽的消耗,浪费了大量的资源,而且本身 SQL 的开发方式本身没有办法按需解析日志,导致我们必须完整的解析出每一条消息,这就导致进一步的计算资源的消耗;
  • 当我们按照经验和业务,将大的 TOPIC 拆分成很多小的 TOPIC 时,一张表变成了很多小表,使用者又必须有很多的经验知识去了解这些 schema 完全相同的小表中分别包含了哪些消息,易用性很差,这样的设计也不符合数仓的整体设计逻辑,以后如果要做批流表统一元数据的时候,整体也变得不太可能。

在离线场景下我们很有很多手段来解决以上问题,减少不必要的 IO,如数据的分桶、存储有序的数据利用 Parquet 的下推查询的能力、做分区表等手段都可以解决以上问题。但是实时表的 Case 下在现有的公开的方案中好像并没有什么好的方法;所以为了解决以上问题,我们开发了流表的分区方案,整体和 HIVE 表的分区实现思想差不多:

图片

我们使用 Flink Table Souce 提供的 SupportsFilterPushDown 的接口实现了一套自己的实时流表分区方案,一个分区对应一个 topic,通过用户的查询条件下推过滤掉没有必要的分区,从而减少没有必要的数据的读取;目前已经上线了第一版,初步拆分了云音乐曝光日志,顺便还尝试使用AVRO的数据格式代替以前的 JSON 格式,实践下来优化效果明显:

  • 使用 AVRO 格式格式基本都能带来至少 30+% 的带宽优化,消息解析性能相对音乐的原始日志格式的解析性能提升一倍;
  • 使用分区流表,我们初步迁移了了 4 个曝光日志的消费任务,已经节省了 7 台物理机,平均节省计算和带宽资源 75% 以上。

图片

虽然这些都是比较极端的 Case,但是从这些例子我们可以预计分区流表技术全面铺开以后,使用得到的话,绝对是一个能带来质变的优化。

二、批流一体

数据实时化一直是我们云音乐数据平台团队数仓建设的一个比较大的目标,在这个目标的背后批流一体也是我们绕不开一个 “名词”、“概念”、“技术”、或者是个 “产品”。在正式开始分享我们的工作以前,首先分享下我有一次在电梯间遇到算法同学,然后和算法同学发生的对话:

算法:你们的批流一体什么时候上线?我们等着用呢?

我: 你们目前的诉求是什么呢?

算法:我们现在很多实时指标都是自己开发,没法在离线以后直接使用现成数仓数据。

从这段对话我们可以看出,算法同学并不是想要什么批流一体的技术,他们想要的是实时的现成的可用的数仓数据,来提升他们的开发效率,批流一体的背后,不同角色的业务方的诉求是什么呢?

对于运营、产品、老板、分析师们来说:

他们想要看到的是准确的实时的可分析的报表数据,关键点在于可分析上。当结果数据发生异常波动时,我们得有实时的明细数据提供分析查询,来调查发生异常波动的原因。当老板有一些新的想法,想对现成的报表做下二次分析时,我们得有能力提供明细的可分析的数据来做分析给出结果。

以实时日活统计来说,我们常用的手段是将用户 ID 存储的 Redis 这样 KV 存储当中来做去重,或者近似去重,然后计算得出实时的日活数据,但是当日活发生异常波动时,因为 Reids 的数据不是可分析的。所以我们很难快速给出原因,也没法在当天做分析,这种方案和结果显然是不合格的。

对于数仓开发来说:

  • 统一实时/离线数仓元数据管理、统一模型、统一存储,减少数仓运维建设成本,提升整体数仓的易用性;
  • 统一开发代码,统一一套 SQL 解决离线/实时开发问题,降低开发运维成本,彻底解决因为业务理解不同、逻辑不同导致的实时离线数据结果差异大的问题。

对于算法同学来说:

有实时/离线统一的数仓表可以可以用使用,统一模型,降低业务理解的门槛,提升整体数仓数据的易用性,方便好用的数仓元数据管理服务,方便算法同学进行二次的特征开发工作,提升模型的开发效率。提供准确实时可分析的算法模型效果数据,提升算法同学模型迭代的效率

整体总结下来批流一体的目标主要包含三个方面:

  • 统一代码: 一套 SQL 完成实时和离线的相关业务的开发需求;
  • 统一数仓元数据: 一张表可以同时提供离线读和实时读,统一模型的批流一体的数仓;
  • 实时的报表数据: 这与统一数仓元数据不同,产品报表数据需要提供秒级的实时的结果的查询能力,而统一数仓数据往往只需要实时的存储即可,对 OLAP 查询的效率,并没有报表数据并没有那么敏感。

1. 统一代码

由于实时 SQL 本身并没有特别的成熟,很多在离线场景下很容易实现的逻辑,在实时场景下要么是不能实现,要么是稳定性有问题。

目前业界都还在探索当中,阿里目前主要的方式的是使用 FLINK 一套引擎解决实时离线统一 SQL 的问题,但是目前也都是在实践,在上层 ADS 层业务逻辑实现上通过底层数仓的建设屏蔽掉一些实时 SQL 能力的问题,做到产品报表开发上统一一套 SQL。这也是我们未来可以尝试的方向,除了在上层报表开发上尝试统一 SQL 以外,我们在统一代码这一块也做了一些工作和规划:

  • 统一 UDF,集成升级平台框架到 FLINK1.12 新版本,统一离线实时统一套UDF;
  • 统一元数据管理:在 FlinkSQL 侧我们继承元数据中心服务,提供 catalog.db.table 这样的数据读取和写入方式,为了统一元数据,同样我们对 SparkSQL 做了二次的封装,同样和元数据中心做了集成,实现了以 catalog.db.table 这样形式的异构数据源之间的读取和写入。

图片

场景化的配置式的批流一体的统一实现,对于一些简单业务逻辑的场景,我们后续会开发场景化的批流一体的实现。如批流一体的索引任务、批流一体的 ETL 清洗平台等等,这块由于资源问题,目前还在规划中。

批流一体 SQL 统一的在目前的技术下,还有一个比较大的前提是本身日志的复杂程度,这个涉及到本身日志埋点规范性和完整性,实时计算不像离线,可以将大量归因逻辑, 关联逻辑放在数据侧进行处理,抛开合理性和成本问题,很多工作在离线场景下是可以做的。

但是在实时场景,本身对性能和稳定性都非常的敏感,如果将大量的逻辑都放在数据侧进行处理,本身就会带来很多不能实现的问题、实现起来成本高的问题、很多稳定性、以及数据延迟的问题。如果打点做不好,整个实时数仓建设都是问题,所以云音乐也启动了曙光打点项目和有数团队合作,彻底重构云音乐各个产品的打点的实现,提升和完善打点的规范性和准确性,降低实时数仓的开发成本问题。

2. 统一数仓元数据

目前业界主要有两类方案:

第一种是建设批流映射层的方案,目前阿里公开的方案的就是这种方案,比较适合已经有了实时数仓和离线数仓的老产品,在不改动原有数仓的情况下,构建统一映射层视图,通过视图的方式提供一体化的使用体验,整体的原理参考下图:

图片

第二种方案是构建一种新的元数据系统,一套 schema 下同时挂载多种存储,如 HDFS、Kafka 等,在写入数据时同时写入,在读取场景下时,根据读取方式的不同,选择相应的合适的存储,目前网易数帆有数产品团队开发的 Arctic 采用的就是这种方案:
图片

整体思路是封装 icberg 和 Kafka 以及 Hbase 等多种存储,在不同场景下使用不同的存储,另外 arctic 还在 iceberg 的基础上做了很多二次开发,来解决 DWS 数据的更新问题,提供类似 Hudi 的 CopyOnWrite 以及 MergeOnRead 等功能,用来解决 Flink 本身用来做全聚合的稳定性问题。目前云音乐已经在一些新的业务场景做了试用,已经上线了几十张的的批流一体表,大家如果想进一步了解 arctic 可以找网易数帆有数实时计算团队了解,在此不过多描述。

3. 实时的报表数据

提供实时的报表数据主要依赖 OLAP 引擎和存储,存储侧需要有需要有在提供实时的数据更新能力的同时,还需要有提供秒级别数据的查询能力,很多时候没有办法把将结果直接写到到存储中。因为数据报表本身很多灵活性的查询,如果直接将结果写到存储中, 就需要类似 Kylin 那种实时的 Cube 能力,这对开发以及 Flink 本身计算的压力太大,本身也会带来很多资源的和存储的浪费,稳定性问题以及开发工作量的问题也会很多,数据的二次分析能力也会很局限;

所以在这一层我们需要 OLAP 引擎提供至少百亿级别的数据的秒级延迟的查询的能力,目前我们主要的方案采用的存储有 Kudu 和 Clickhouse 两种,以我们老版本的 ABTest 为例,我们采用的方案如下:

图片

对于实时的最新的小时维度以及天维度的结果我们通过 Impala 及时读取 Kudu 数据关联出最新的结果;对于历史的一天以前天维度数据或者两个小时以前小时维度的数据我们采用 Spark 预计算好存储在结果表当中,两份数据 UNION 在一起提供给用户,保障数据结果的时效性,以及整体数据查询的用户体验。

三、未来规划

运维工具的完善

实时 SQL 的发展降低了实时数据统计的开发难度,大大降低了实时数据统计的门槛,一方面由于本身实时 SQL 的不成熟而且黑盒,另一方面很多同学带着离线 SQL 的开发经验或者 MYSQL 类数据库的 SQL 经验来开发实时任务,这给平台带来了很大的运维压力,所以运维工具相关的建设,任务实时指标的完善是我们未来主要思考的方向之一。

分区流表技术完善

分区流表技术是一个能给云音乐实时平台资源使用,Kafka 压力以及数仓建设带来质变的技术,目前我们只是完成了一个初版,未来我们会在分区的动态感知,分区的修改, schema 的修改,以及运维监控以及推广上继续完善。

场景化批流一体建设

如批流一体索引任务建设、批流一体 ETL 工具等, 统一日志清洗规则, 为批流一体数仓打好基础。

批流一体存储探索

  • 调研业界目前的方案, 结合音乐的业务场景, 提供整套解决方案, 降低实时报表的开发门槛, 提升实时报表的开发效率;
  • 批流一体逻辑层建设等。

最后附一张网易数帆有数团队的实时计算解决方案架构图,基于 Apache Flink 构建的高性能、一站式实时大数据处理方案,广泛适用于流式数据处理场景。

图片


本文地址:https://www.6aiq.com/article/1633992541754
本文版权归作者和AIQ共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出