Fork me on GitHub

Flink 实时计算平台在知乎的演进

知乎@lfyzjck|文章来源


导读: Flink 因为其可靠性和易用性,已经成为当前最流行的流处理框架之一,在流计算领域占据了主导地位。

知乎早在 18 年就引入了 Flink,Flink 组件的版本也从早期的 1.6.x 一路升级到如今的 1.13.x,发展到现在,Flink 已经成为知乎内部最重要的组件之一,每天处理 PB 级的数据。

基础情况:

700+ 实时作业,500+批作业

50TB 内存

1.3w Core

峰值流量 17GB/s

发展历程:

2018 年开始知乎就开始引入 Flink 作为公司级别的实时计算引擎,并配合业务构建了第一版的实时计算平台 Skytree。支持业务的日志的落地,实时数仓,广告实时特征等需求。

到了 2021 年,为了进一步推广实时计算平台,我们对实时计算平台进行了一次大的升级,引入了 Flink SQL 并上线了 Mipha 。Mipha 基于 Flink SQL Gateway 构建,创新性的引入了 Mipha MetaStore 作为统一的元数据管理组件。

01

实时计算平台 1.0 -- Skytree

知乎在 2016 年就完成了所有业务的容器化改造,在 Kubernetes 方面有比较多的积累。由于实时任务不同于离线批作业,对延时和稳定性非常敏感。on YARN 部署需要通过 label 隔离单独的资源池出来,当时知乎采用的 YARN的公平调度器,并不支持 label 特性。因此在实时计算平台的最初期,我们就决定将基于 Kubernetes 来构建整个实时计算平台,完成了 Flink 平台从 0 到 1 的跨越,是一个相当重要的里程碑。

图片

开发第一版的实时计算平台,我们确定了项目的初期目标:

  • 支持 Flink Jar 任务;
  • 部署方式为 k8s 的 session mode,流任务为单集群单 job,批任务支持单集群多 job;
  • 有相对完善的监控和报警机制,在生产环境遇到问题时可以快速定位;
  • 支持模版,以支持一些通用任务比如 kafak2hive/pulsar2hive;

skytree 平台一经上线,就受到了业务用户的一致好评。但是我们也发现整个平台遇到了一些问题:

  • Jar 开发、调试复杂,有一定门槛。经常需要依赖冲突问题;
  • 平台管理成本高,Jar 包基本是黑盒子,再遇到平台和用户的边界非常模糊;
  • 平台使用 Python 开发,缺乏调用 Flink 原生 API 的能力,限制了平台能做的事情;
  • 在 Flink 支持 Native Kubernetes 之后,平台的很多设计变成了技术债务,维护成本高;
  • 管理问题:早期平台的任务是通过用户上传 Jar 包实现的,在人员交接后出现源代码丢失的情况;

因为上述原因,我们决定对实时计算平台进行迭代,面向 SQL 重新设计整个平台,也就是 Mipha 。

02

实时计算平台 2.0 -- Mipha

1. 架构设计

图片

Mipha 平台主要分为 3 个组件:

  • mipha-web-api 负责面向用户,提供监控报警、作业管理、UDF 管理、版本管理等功能
  • Flink SQL Gatway。基于开源版本修改定制后的服务,支持 SQL 和 Jar 任务的提交,部署方式支持 on YARN 和 on Kubernetes
  • mipha-metastore。提供统一的异构元数据管理存储/代理能力

2. Flink SQL 方案

为了让平台支持 Flink SQL,我们需要解决以下问题:

  • 构建一个 SQL 编译服务,负责将用户输入的 SQL 解析为 jobgraph,并提交到集群运行
  • 提供关于各种存储元数据的描述,官方推荐为 Hive MetaStore
  • 方便进行扩展,满足各种 UDF/Connector 的需求

最终我们选择了基于 ververica 的 SQL Gateway(https://github.com/ververica/flink-sql-gateway)做二次开发,作为统一的任务提交入口。除了供实时计算平台使用,还可以开放为了一个基础组件扩展到其他的场景。但是光有 SQL Gateway 本身已经不在维护了,还有很多问题需要解决:

  • SQL Gateway 对 Flink 版本的支持在 1.12 左右,我们目标的版本是 1.13,为了需要兼容和适配。
  • Flink 原生提供的支持 Catalog 只有 Memory 和 Hive MetaStore,Hive MetaStore 会污染 Hive 元数据,增加 Hive 的治理成本。因此我们需要一个更加独立的元数据服务。
  • 支持 UDF 和自定义的 Connector,扩展 Flink SQL 场景和 Kubernetes 的集成,需要调整大量的配置。

3. 统一元数据管理

因为 SQL Gateway 是以 Session 模式来管理用户的连接,当 Session 关闭时,用户在这个 Session 内的所有信息将会丢失,包括已经创建的数据源和映射表等。如果不将数据源与映射表持久化,用户每次在提交任务的时候都需要重新提交建表语句,这样就不能保证易用性了。

Flink 官方目前推荐的做法是将数据源和映射表等信息存储到 Hive Metastore,但是这样会有以下问题:

  • Flink过多的视图表会污染 Hive Metastore 内的数据,Hive Metastore 里面会存在大量的空表,如果表过多,也会对 Metastore 的元数据库造成比较大的压力;
  • Hive Metastore 不好进行权限管理,Flink 创建的数据源和表一般带有连接串和用户名密码,容易泄露。

基于以上原因,我们实现了自己的元数据管理服务——mipha metastore。mipha metastore 实现了以下功能:

  • 在用户建表时,mipha metastore 会验证用户的建表语句,验证通过后,会将用户的建表语句解析成结构化数据,持久化到数据库里。这里我们没有选择直接存储用户的建表语句,而是选择解析成结构化数据(类似 Hive Metastore 存储元数据的方式),看起来是变得更复杂了,但是对修改表结构和属性的帮助巨大,另外也有利于我们导出后进行表和字段血缘分析。
  • 利用 Casbin 实现了表粒度的权限,可以对单个用户或组授予数据源或表的权限,避免了底层数据源连接串泄露的问题。后续我们计划将权限信息下沉到 Ranger 中。
  • 提供部分存储的元数据代理能力,将本身就有 schema 的 Table 直接映射为 Flink Table。比如 MySQL/TiDB/Clickhouse 等存储。

4. 元数据代理

Mipha SQL Gateway 是完全基于 Flink Catalog 来实现数据源管理。这里的 Catalog 主要分为以下两大类:

  • 具备数据结构的数据源:比如 Hive Catalog,JDBC Catalog,这类数据源本身就是关系型数据库,他们的数据都是以表的形式组织起来的,这里 Mipha SQL Gateway 会只存储其 Catalog 的连接串,表结构会直接穿透到数据源,以数据源本身的表结构为主,避免出现元数据一致性的问题。
  • 不具备数据结构的数据源:比如 Redis Catalog,Kafka Catalog,这类数据源本身是以消息或者 k-v 形式存储数据,数据的结构依赖外部的描述,这里我们选择让用户自己建映射表来描述数据结构,存储到 mtastore 里,方便用户随时查看和取用。

在用户建立会话的时候,SQL Gateway会将这个用户所有的 Catalog 都从 mipha metastore 取出,注册进当前会话的 Flink TableEnvironment 中,来保证用户在提交 SQL 任务的时候,一定能获取到自己想要读写的表。但是 TableEnvironment 会在 Catalog 进行注册的时候,调用 Catalog 的 open 方法,而有些 Catalog 的 open 方法实现比较重,会去拿底层数据源的连接,从而导致用户与 SQL Gateway 建立会话很慢,而用户往往一条 SQL 就只读一张表,写一张表,没有必要初始化所有的 Catalog,这也会导致 SQL Gateway 拿到过多的连接,影响 SQL Gateway 的性能。我们选择了用动态代理将 Catalog 代理,在 Catalog 调用 open 方法时,不做任何操作,只有在真正读写的时候,才会调用 open 方法进行 Catalog 的初始化,实现了懒加载。

另外就是关于数据湖的 Catalog 代理,知乎的数据湖实现选择了 Iceberg,为了方便管理 Iceberg 表,我们使用了 Iceberg 的 Hive Catalog。这里存在一个问题,用户在Hive 中读写 Iceberg 表时,其实就将 Iceberg 当成了格式比较特殊的 Hive 外部表,但是在Flink 里读写Iceberg 表时,需要区分 Hive Catalog 和 Iceberg Catalog,使用起来特别不友好。因此我们将 Hive Catalog 进行了一些改造,在 Hive Catalog里兼容了 Iceberg Catalog,用户在读 Hive 表时,走 Hive Catalog 逻辑,在读 Iceberg 表时,走 Iceberg Catalog逻辑,这样用户不管是读写 Hive 表还是 Iceberg 表,都可以直接使用 Hive Catalog。

  1. 集群部署与任务提交

Mipha SQL Gateway 支持了多种集群部署模式,包括 Flink Session on kubernetes,Flink Application Mode,Flink Per-job on Yarn,Flink Session on Yarn 在内的几乎所有 Flink 官方支持的部署模式。

我们最常使用的两种 Flink 部署模式和使用场景如下:

  • Flink Per-job on Yarn:这种部署模式我们主要用于跑数据集成的批任务,比如 MySQL2Hive 的数据同步任务,这类任务的特点是任务数量较多,运行时间短,无状态,但是跑的过程中需要稳定,不能被其他任务影响,而 per-job 模式是一个集群运行一个 Flink 任务,保证了任务的隔离性,并且能够在任务跑完后自动清理集群,及时释放资源,因此我们选择了这种模式作为跑批的数据集成任务。值得一提的是,我们在 SQL Gateway 那一层实现了 hadoop 用户伪装,这可以使得 SQL Gateway 能够以指定的 hadoop 用户和 Yarn 队列提交 Flink 任务。
  • Flink Application Mode:这个模式可以简单理解为 k8s 上的 Per-job 模式,它的实现方式是在启动用户 Jar 包程序之前,先启动 JobManager,然后由 Job Manager 向 k8s 集群申请资源,启动 TaskManager,这种模式的好处在于 Flink 可以根据自身的需要来弹性伸缩资源,这个功能在我们当前的版本(1.13.x)仅仅只是实现了部分,Flink 社区有计划将在后续的版本进行完整的实现,所以我们将这个模式作为了我们 Flink SQL 流任务集群的部署方式,为后续 Flink 动态资源调整做准备。

在任务提交时,如何设置集群的配置和任务的配置,比如 cluster id 属于集群配置,checkpoint 属于任务配置,我们采用 SET key=value 的语法来设置集群与任务的配置,并且在 Flink 原生配置的前面添加不同的前缀来区分配置是传递给集群还是任务。我们在集群的配置前加上 flink.cluster. 的前缀表示配置是传递到集群的,在任务的配置前加上 flink.job. 的前缀表示配置是传递到任务的。这两种配置在真正传递到集群或者任务的时候,会去掉前缀,设置成 Flink 原生的属性。

6. 开发流程改进——持续集成/发布

早期平台通过上传 Jar 包的方式执行作业,产生以下问题:

  • 难以追踪代码变更,出现问题不能快速判断用户是否有代码变更
  • 排查排查需要找用户沟通要代码,效率低
  • 员工离职交接流程问题,导致很多线上任务失传,无法维护

借鉴持续集成的思想,将 CI/CD 引入到 Flink 平台,打通 Git 和 Jar 包版本,后续可以使用该方案管理 UDF/Connector 的开发和维护。重新梳理后实时作业开发流程:

  • 用户新建仓库或者选择已有仓库,在平台进行绑定
  • 接入 CI 系统并声明构建/测试/打包过程
  • 在平台创建任务并依次选择 仓库 -> Jar 路径
  • 启动/回滚作业

架构设计:

为了实现该目标,我们和公司的 CI 团队合作,扩展了 CI 系统的能力。并基于这个系统开发了一个新的包管理服务 Kosmos。Kosmos 整体的架构如下:

图片

用户只需要将代码 push 到 git 仓库就可以完成后面一系列的过程,可以方便的更新/回滚自己的代码。

任务创建:

每个任务强制需要指定 Git 仓库地址,并选择打包后的 Jar 包。

图片

版本管理:

图片

动态 Jar 包加载:

(1)需求

Flink 基础镜像保持稳定,不希望维护太多的镜像版本

Connector/Plugins/Hotfix 可以在不更新镜像的前提下动态升级,增强平台可维护性

(2)方案

  • 在镜像的 entrypoint.sh 处增加 hook,在运行时动态下载 jar 包
  • 通过环境变量注入作业所需的依赖
  • 提供一个支持高并发/高吞吐/HTTP Protocol based 的存储服务,我们选择了 JuiceFS 进行测试

图片

关于这部分详细内容,已经有过对应的分享了,这里不再赘述。

请参考:知乎 x JuiceFS:利用 JuiceFS 给 Flink 容器启动加速

https://juicefs.com/zh-cn/blog/user-stories/zhihu-flink-with-juicefs

7. UDF 支持

Flink SQL 的函数完善度还比较低,需要大量的扩展才能满足业务需求,业务会有大量的调用 RPC 的需求,不支持 UDF 会大大限制 Flink SQL 的使用场景。

我们希望实现:

  • 所有人都可以开发上传,项目组级别共享
  • 运行时按需加载,避免潜在的依赖冲突
  • 和 Git 进行集成,对 UDF 的版本进行管理

方案:

  • UDF的代码管理复用了 Jar 任务的模式,基于 Kosmos 和 CI 服务构建,支持基于 Git Commit进行发布或者回滚。
  • 扩展 Flink SQL 语法,支持 <span>CREATE FUNCTION [IF NOT EXISTS] <function_name> AS <function_class_name></span> ,将函数信息注册到 <span>TableEnvironment</span> 中。
  • UDF 对应的 Jar 通过动态 Jar 加载注入到作业的执行容器中。
  1. Protobuf Format 支持

业务方出于性能考虑,在 Kafka 消息中大量使用了 Protobuf 格式,在算法领域尤其明显。基于 Jar 包开发时,我们可以将 Protobuf的 Message 通过依赖的方式加载进来,并自己处理消息的反序列化。对于 Flink SQL 而言,缺少原生 Protobuf format 的支持会使得 SQL 的使用场景受到限制。我们希望实现一个标准的 Protobuf Format,不限制特定的 Schema 或者特定 Connector,可以在 Flink SQL 中自由使用。

方案:

不同于 Avro 序列化协议,Protobuf 需要知道字段的顺序和类型,因此无法完全动态构造。我们有 3 中思路来构造该 Format:

  • 方案一:用户提供 Protobuf IDL 文件,基于 Protobuf 的 DynamicMessage 进行的数据的序列化和反序列化。DynamicMessage 的问题是太慢了,性能损失极大。
  • 方案二:用户提供 Protobuf IDL 文件,对 IDL 进行实时编译,并将编译后的 Java Class 分发到 Flink 集群中。该方案的问题解决了 DynamicMessage 的性能问题,但是 IDL 的编译对环境要求高,无法下沉到引擎,需要平台层 cover 很多工作,可维护性不好。
  • 方案三:直接加载用户编译好的 Protobuf Java Class。该方案没有前面方案的很多缺点,但是需要用户提前编译好自己 IDL 文件并发布到统一的 Maven 仓库。

由于我们缺乏统一的注册中心来管理 Protobuf 的 IDL 文件,并且用户对自己编译 IDL 并发布到 Maven 这件事情接受程度比较高,我们最终采用了方案三来实现 Protobuf Format。

以 Kakfa Connector 为例,我们希望从 Kafka 从按照 Protobuf 协议对消息进行反序列化,用户可以按照以下方式定义 Table 并进行计算:

CREATE TABLE `kafka`.`region1`.`test_table` (
  `member_id` BIGINT,
  `scene_code` STRING,
  `server_timestamp` BIGINT,
  `response_timestamp` BIGINT,
  `param` MAP<STRING, STRING>,
  `items` ARRAY<ROW<`content_id` BIGINT, `recall` ROW<`source_type` STRING>>>
  `ts` AS CURRENT_TIMESTAMP,
  WATERMARK FOR `ts` AS `ts` - INTERVAL '1' MINUTE
)
COMMENT ''
WITH (
  'properties.bootstrap.servers' = 'xxx',
  'connector' = 'kafka',
  'format' = 'protobufV1',
  'topic' = 'xxxx',
  'protobufV1.class-name' = 'com.zhihu.ts.proto.Feed$Feed'
);


ADD JAR 'protobuf-java-3.19.1.jar';
ADD JAR 'ts-protos-0.0.6.jar';
SELECT
  *
FROM
  `kafka`.`region1`.`test_table`;

Protobuf 辅助建表工具:

由于 Protobuf 经常用于嵌套的数据架构,用户在创建 Protobuf 的 Flink Table 时,需要声明复杂的嵌套结构,这个在结构复杂时尤其明显。为了解决这个问题我们开发了一个工具,传入 proto 文件自动生成对应的 Kafka/Hive 表,简化用户的工作。

关于 Protobuf Format 的详细介绍,已经有文章分享过。想进一步了解细节可以参考:Protobuf 在知乎大数据场景的应用(https://zhuanlan.zhihu.com/p/586120009)。

9. Flink CDC 平台化

实时数据同步/分析一直是业务永恒的诉求,知乎早期尝试过 Kudu + Maxwell + Spark Streaming 构建部分业务的实时 ODS 数据。但是一直面临以下问题:

  • 实时数据同步需要增量+全量两个阶段实现,分别采用了不同的技术和代码,维护成本极高,不能大规模应用于生产环境。
  • 数据一致性难以保障,整个链路非常长,所有环节都可能出错而且真的都出过错,数据不一致是常态。重建表刷数据变成了经常性的工作。

FlinkCDC 将全量数据采集和增量订阅很好的结合在了一起,极大降低了链路的复杂度,减少了出现问题的概率。但是:

  • 一个Flink CDC 实例本质上是一个 MySQL 从库,当一个实例有众多表需要 CDC 数据时,会有 binlog 流量放大的问题,增加对 MySQL 的负担;
  • CDC 需要多个下游进行消费,启动多个 CDC 实例会比较浪费。

我们倾向于抽象一个中间层来缓存 CDC 产生的 Changelog 流并供下游复用,比如同时用于实时特征计算以及落地到 Hive 表中。

图片

我们采用了一个 MySQL 实例对应一个 Flink CDC,下游Sink时不同表会分发到不同的 Kafka topic中。用户可以灵活的使用自己需要的表数据,减少冗余数据的读取。

并且我们选择使用 upsert-kafka 可以很好的对Changelog的数据流进行压缩,并对 Kafka 开启了永久保存+定时 compaction,相同 key 的数据只保留最新的就可以了,随着时间的推移数据不会无限膨胀导致 Kafka 集群的存储压力增加。

另外对于 TiDB CDC,我们针对超大型的 TiDB 表,实现了 TiDB HybirdSource CDC,其主要思想是利用 TiKV 的快照机制,从 TiKV 里以批的方式读取 TiDB 的全量数据,然后在 Kafka 里以流的方式读取 TiDB CDC 的增量数据,最后全量加增量合成一张实时的流表。

区别于 Flink 官方提供的 TiDB CDC(https://ververica.github.io/flink-cdc-connectors/master/content/connectors/tidb-cdc.html),这种流批一体的方式更适用于大型 TiDB 表(这里指 1T 以上的表)。这是因为 Flink TiDB CDC 的实现基于 tikv-java-client(https://github.com/tikv/client-java)的 CDC 客户端,这个客户端目前还处于快速迭代阶段,相比于 TiDB golang 版本的 TiCDC 完善程度会低一些,可能在采集超大型 TiDB 表的 CDC 时,不能满足一些性能上的需求。而 TIDB HybirdSource CDC不直接从 TiDB 内读 CDC 数据,而是使用 Kafka 内由 TiCDC 采集的数据,性能完全由 TiCDC 保证。当然,如果 TiDB 表的大小没有超过 1T,我们还是直接使用 Flink 官方提供的 TiDB CDC,毕竟简单粗暴。

03

运维优化

1. 资源超买

对于 Application Mode/Per Job 的部署模式来说,可以极大程度避免 slot 浪费的情况,并且提供非常好的作业之间的隔离机制。但是同时会带来整个集群有非常多的 JobManager 的情况,当集群有数千个作业时,其资源开销不可忽略。

基于 Kubernets 部署的方案下,我们发现整个集群的资源瓶颈在 CPU 而不在内存上。对于大部分的作业来说,JobManager 仅在作业启动时成本比较重的职能,在作业运行稳定后,仅仅负责管理 TM 的心跳和 Checkpoint 触发。对于这种情况,平台默认为 JobManager 配置了非常小的 cpu request,但同时拥有非常大的 cpu limit 以在需要的时候获取到足够的 CPU 调度机会。通过这种方式,每个作业至少会节省出 0.9 CPU。

2. 日志采集和持久化

Flink 在 1.11 版本之后,在 Flink UI 上提供了非常好的日志查看工具,但是这仅限于作业运行时。对于已完成/失败/取消的作业,我们需要通过 HistoryServer 查看归档的日志,但是 HistoryServer 仅能提供异常日志的查看,这对我们定位问题是远远不够的。

基于 Kubernetes 我们构建了自己的日志采集方案,并将日志进行了持久化到 Hive/ElasticSearch 在平台上开放查询。在作业遇到问题时,可以优先考虑恢复作业,而不用担心详细日志随着作业重启而丢失。

图片

3. 动态 Checkpoint 配置

流量高峰或者环境不稳定时希望动态关闭 Checkpoint 来较少对作业的影响。

(1)方案

通过 JobManager 的 Restful 接口控制Checkpoint调度。

Flink源码中 CheckpointCoordinator 组件提供了Checkpoint 的调度逻辑。所以我们需要打通 RESTful 接口调用到CheckpointCoordinator 的调用通路。

图片

JobManager 提供的 RESTful 接口调用逻辑从 RestEndpoint 进入,经过 Dispatcher 最终会调用 JobMaster 中的逻辑。

JobMaster 中组合了负责实际 TM 和 CP 调度的 Scheduler 组件,Scheduler 中调用了 CheckpointCoordinator。

沿着上面的调用通路,提供 <span>startChecpointScheduler</span><span>stopCheckpointScheduler</span><span>updateCheckpointConfig</span> 接口即可。

(2)自动监控和报警

用户在使用 Flink 作业时,对 Flink 的指标并不熟悉。为了确保每个作业在遇到问题时能被及时发现,平台预设了一些常见的报警项目,和报警平台打通,这部分无需用户手动配置既可以使用。对于业务自己的逻辑,可以自定打点或者配置自定义的报警项。

图片

4. 逻辑资源隔离和计费

由于知乎的 Flink 作业都采用单作业单集群的部署模式,所以并不存在 Flink Session 集群 slots 空跑的情况。但是相对应的我们需要解决资源隔离的问题,避免部分大作业占用了集群太多的资源,导致短时间内其他业务无资源可用问题。

我们做法是为每个业务引入“资源池” 的概念。每个资源池是这个业务能使用的 Flink 算力的逻辑边界,所有运行中作业占用的资源总量不能超过资源池的总大小。用户可以调整资源池大小但是需要平台审核,所有资源资源池的大小总和就是集群的总资源量。

我们会按照每个业务申请的资源池子大小进行成本的分摊,以推动资源利用率得到更好的优化。

04

未来规划和展望

后续平台将着重提升平台的易用性和稳定性,并逐渐和数据湖技术进行融合。

  • 增强作业调试的能力,方便对 SQL 作业进行快速的验证
  • 改进 Flink SQL 在作业恢复是鲁棒性,兼容部分的 State Schema Evolution
  • 提供服务化的特征计算服务,降低算法业务使用的门槛

本文地址:https://www.6aiq.com/article/1672735455961
本文版权归作者和AIQ共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出