知乎 Flink 数据集成平台建设实践
分享嘉宾:孙晓光 知乎 技术平台负责人
内容来源:Flink中文社区
摘要: 本文由知乎技术平台负责人孙晓光分享,主要介绍知乎 Flink 数据集成平台建设实践。内容如下:
- 业务场景
- 历史设计
- 全面转向 Flink 后的设计
- 未来 Flink 应用场景的规划
01 业务场景
很高兴和大家分享近期知乎以 Flink 为基础,重构上一代数据集成平台 过程中的一些收获。数据集成平台作为连接各种异构数据的纽带,需要连接多种多样的存储系统。而不同的技术栈和不同的业务场景会对数据集成系统提出不同的设计要求。
我们首先来看一下在知乎内部数据集成的业务场景。同许多互联网公司相似,过去知乎的在线存储系统主要以 MySQL 和 Redis 为主,同时对于部分数据量级较大的业务也使用了 HBase。近年来随着技术的演进,我们开始了从 MySQL 向 TiDB 的迁移。与此类似,我们也开始将 HBase 向基于 TiKV 技术栈研发的 Zetta 演进。在离线存储方面绝大多数的场景则是以 Hive 表来支撑的。
从在线存储到离线存储,期间有着非常强的数据同步需求。除此以外也存在着大量的流式数据,比如消息系统中的数据,我们也希望它能够同各种在线或离线存储系统打通。过去知乎主要使用 Kafka 支撑流式数据,近期也开始引入 Pulsar。这两套消息系统同存储系统之间的数据交换存在着较强的需求。
在知乎的业务场景和当前发展状态下,数据集成工作在技术和流程管理上都存在着一些挑战。
- 首先从技术角度看,数据源多样化会对数据集成系统的连接扩展能力提出较高的要求。而且下一代的存储系统在给业务带来更强能力的同时也释放了业务的压力,进而促使了数据量的加速膨胀。数据量级上的快速增长对数据集成平台的吞吐和实时性都提出了更高的要求。当然作为数据相关的基础系统,数据准确性则是最基础的要求,这块我们也必须把它做好。
- 另外从流程管理角度看,我们需要理解并整合散落在不同业务团队的数据,做好管理并确保数据访问的安全,所以整个数据整合的流程是相对复杂的。虽然平台化能够将复杂的流程自动化起来,但数据集成工作所固有的高成本并不能完全以平台化的方式消除。因此尽最大的可能提升流程的可复用性和可管理性也是数据集成系统需要持续应对的挑战。
基于这两个方向上的挑战,我们对数据集成平台的设计目标 进行了规划。
- 从技术方向看,我们需要支持知乎已经投入使用和将来要推广使用的多种存储系统,具备将这些系统中多样化的数据进行集成的能力。此外我们还需要在满足高吞吐,低调度时延的前提下保障数据集成的可靠性和准确性。
- 从流程方面看,可以通过整合各种内部存储系统的元数据以及调度系统,复用现有系统基础设施的能力,达到简化数据接入流程,降低用户接入成本的目的。我们还希望能够以平台化的方式为用户提供自助满足数据需求的手段,从而提升数据集成工作的整体效率。
- 从提升任务可管理性的角度看,我们还要维护好数据的血缘关系。让业务更好的去度量数据产出之间的关系,更有效的评估数据产出的业务价值,避免低质量和重复性的数据集成工作。最后我们需要对所有任务提供系统化的监控和报警能力来保障数据产出的稳定性。
02 历史设计
在知乎的第一代数据集成平台成型前,大量的任务散落在各个业务方自己维护的 crontab 或者自行搭建的各种调度系统中。在这样的无管理状态下,各项集成任务的可靠性和数据质量都很难得到有效的保障。因此在这个阶段我们要最迫切解决的是管理上的问题,让数据集成的流程可管理可监控。
因此,我们整合了各种存储系统的元数据系统,让大家可以在统一的地方看到公司所有的数据资产。然后在调度中心统一管理这些数据的同步任务,由调度中心负责任务的依赖管理。同时调度中心对任务的关键指标进行监控并提供异常告警能力。在这个阶段我们沿用了从前大家广泛使用的 Sqoop 来实现 MySQL 和 Hive 之间数据的同步。且在平台建设后期,随着流数据同步需求的出现,我们又引入了 Flink 来同步 Kafka 数据到 HDFS。
在建设初代集成平台时我们做过一次技术选型的选择,是继续使用已经得到广泛验证的 Sqoop 还是迁移到其它可选的技术方案。同 Sqoop 相比,阿里开源的 DataX 是这个领域一个非常有竞争力的对手。如果把这两个产品进行横向对比,可以发现他们在不同的方面互相有对方所不具备的优势。
- 比如 Sqoop 在系统规模上具备 MapReduce 级别的扩展性和原生的 Hive 支持。但 Sqoop 又有数据源支持不丰富,缺乏一些重要功能特性的缺点。
- 而 DataX 提供了非常丰富的数据源支持,内置了数据集成系统非常重要的限速能力,还有它的良好设计所带来的易于定制和扩展的能力。但它也存在无集群资源管理支持和欠缺 Hive Catalog 原生支持的缺陷。
在当时的状态下这两个产品相互比较起来没有一款产品具有绝对的优势。所以我们选择了继续使用 Sqoop,而维持使用 Sqoop 在验证环节上也为我们节约了许多投入,所以第一代的数据集成平台在非常短的时间内就完成了开发和验证并完成上线。
随着初代数据集成平台的上线和成熟,它很好的支撑了公司的数据集成业务需求并获得了显著的收益。到目前为止平台上一共有大约 4000 个任务,每天运行超过 6000 个任务实例,同步大约 82 亿条共计 124TB 的数据。
在平台的帮助下,数据接入流程得到了极大的简化,为用户提供了自助解决数据集成需求的能力。并且,平台在关键的流程节点上能够辅以必要的规范约束和安全审查,在提升了管理水平的同时,整体的安全性和数据质量也得到了显著的提升。
借助于 Yarn 和 K8s 的弹性能力,集成任务的规模扩展能力也有了很大的提升。当然,作为解决从 0 到 1 问题的第一代系统,也必然会伴随着一系列问题。比如:
- Sqoop 的 MapReduce 模式所固有的高调度时延问题
- 业务数据分布不均所导致的数据倾斜问题
- 社区不活跃导致部分 Issue 长期无法得到解决的问题
- Sqoop 代码设计不理想导致的可扩展性和可管理性弱的问题。
03 转向 Flink
与 Sqoop 相对的,是用于支持 Kafka 消息到 HDFS 数据集成任务的 Flink,它以优秀的可靠性和灵活的可定制性获得了大家更多的信任。基于流式数据集成任务为 Flink 建立的信心,我们开始尝试全面转向 Flink 来建设下一代的数据集成平台。
虽然 Flink 是本次平台演进中的最佳候选,我们还是基于当时的情况对市面上可选的技术方案再次进行了调研。这次我们将 Apache NIFI 项目和 Flink 进行了多方面的比较,从功能角度看:
- Apache NIFI 非常强大且完全覆盖了我们当前的数据集成需求。但是恰恰因为它功能过于强大并且自成体系,所以也带来了较高的整合门槛。而且,无法利用现有 Yarn 和 K8s 资源池也会带来额外的资源池建设和维护的成本。
- 相比之下, Flink 具有一个非常活跃和开放的社区,在立项时刻就已经具备了非常丰富的数据源支持,可以预期在未来它的数据源覆盖一定会更加全面。而且 Flink 作为一个通用计算引擎有着强大易用的 API 设计,在这个基础上进行二次开发非常容易,所以它在可扩展性方面的优势也非常突出。
最后基于我们对批流一体目标的认同,未来在知乎完成大数据计算引擎技术栈的统一 也是一个极具吸引力的目标。
基于这些考量,在本轮迭代中我们选择了全面使用 Flink 替代 Sqoop,基于 Flink 完整实现了之前 Sqoop 的功能并重新建设了全新的集成平台。
如下图所示,橙色部分是本轮迭代中发生了变化的部分。除了作为主角出现的 Flink 之外,在本轮迭代的过程中我们还开发了 TiDB、Redis 和 Zetta 三种存储系统的数据集成功能。在消息系统这边则直接从社区获得了 Pulsar 的支持。在我们开始开发工作的时候,Flink 已经演进到了比较成熟的阶段,对 Hive 内建了原生的支持,整个迁移过程没有遇到过多的技术困难,非常顺畅。
Flink 的迁移为我们带来了许多收益。
1. 首先从可维护性上看, 相比 Sqoop 有了非常显著的改善。如下图所示,左边是过去使用 Sqoop 时的任务定义,这里是一大堆非结构化的容易出错的原始命令。而 Flink 则只需使用 SQL 定义一个源表和一个目标表再配合写入命令来定义任务。任务的可理解性、可调试性远好于从前,变成最终用户也能够理解的模式。很多问题不再需要平台开发者配合排查,用户就能够自助的解决许多常见的任务异常。
2. 在性能角度方面, 我们也有针对性的做了许多优化。
2.1 调度策略
首先是调度策略上的优化,在第一代集成平台中我们只使用 Flink 同步流式数据,所以任务调度完全使用 Per Job。现在平台同时支持了 Session 和 Per Job 的混合调度模式,于是,对于从消息系统接入数据的流式任务 会继续使用 Per-Job 模式运行,而批同步的任务 则采用 Session 模式复用集群从而避免集群启动的耗时提升同步效率。
当然,在这样的场景中使用 Session 集群也存在着一系列的挑战,比如工作负载随着任务提交不停变化而带来的资源需求变化问题。所以我们建设了自动的扩缩容机制来帮助 Session 集群应对变化的负载。除此以外,为了简化计费机制和隔离风险,我们还为不同的业务线创建了私有 Session 集群用于服务对应业务线的数据集成任务。
2.2 数据库
在关系数据库方面,我们采用了常见的 JDBC 方式对 MySQL 进行数据同步,但这种方式也会存在一些固有难以解决的问题。
- 比如因业务数据在主键维度上空间分布不均导致的数据倾斜问题。
- 再比如为了隔离在线离线工作负载所建设的专用同步从库,所产生的资源浪费和管理成本。
- 并且由于 MySQL 实例众多规格不一,合理协调多个并发任务的实例和实例所在的主机,进行合理的速度控制也非常困难。
相比之下,考虑到正在全面将数据从 MySQL 迁移到 TiDB 这一趋势。我们开发了原生 TiDB 的 Flink connector 来充分利用 TiDB 架构上的优势。
- 首先 region 级别的负载均衡策略能够确保对于任何表结构和任何的数据分布,同步任务都能够以 region 为颗粒度进行拆分避免数据倾斜问题。
- 其次通过设定副本放置策略,可以在离线数据中心对数据统一放置一个 Follower 副本。进而在保持原有目标副本数量不变,无需额外资源成本的情况下,利用 Follower read 的能力隔离在线交易和数据抽取的负载。
- 最后我们还引入了分布式的数据提交方式提升了数据写入的吞吐能力。
3. 最后是为知乎内部广泛使用的 Redis 提供数据集成的能力。 Flink 社区已经有一个 Redis connector,但它目前只具备写入能力并且难以灵活定制写入时所使用的 key。所以我们基于自身需求重新开发了一个 Redis connector,同时支持以 Redis 作为 Source 和 Sink。
同样为了避免数据抽取过程影响在线交易,在数据读取路径上我们采用了 Redis 原生的 master/slave 机制获取并解析 RDB 文件抽取数据,获得了单实例约 150MB 每秒的数据抽取吞吐。而且得益于打通内部存储系统的元数据,我们不但能够支持分片模式 Redis 集群的数据抽取,还可以只选择每个分片的 slave 节点作为数据抽取源头,避免抽取对 master 节点产生压力。
这次全面转向 Flink 的演进,解决了很多上一代数据集成平台的问题,获得了非常显著的收益。
- 从吞吐角度看, 以 Flink 替代 MR 模式将整个调度的时延从分钟级降低到了 10 秒左右。并且在同样的数据量和同样的 Flink 资源量情况下,TiDB 原生 connector 能够比 JDBC 提升 4 倍的吞吐。
- 从功能角度看, 新平台不但能够原生支持分库分表的数据集成任务,还能够以业务无关的方式避免数据倾斜的问题。
- 在数据源支持能力上,我们以非常低的成本获得了 TiDB、Zetta、Redis 和 Pulsar 的支持。而且,随着 Flink 的生态越来越完善,未来一定会有更多的开箱即用的 connector 供我们使用。
- 从成本上看,最后下线 MySQL 离线节点和统一使用 K8s 资源池所带来的资源效率提升,在成本和管理角度看都使我们获得了显著的收益。
04 Flink 即未来
回过头看,本次全面 Flink 化的演进投入产出比非常高,这也进一步增强了我们对 “Flink 即未来” 的的信心。目前在知乎内部除了数据集成场景,Flink 在搜索 Query 的时效性分析、商业广告点击数据处理和关键业务指标的实时数仓上也都有所应用。
在未来我们希望能够进一步扩展 Flink 在知乎的使用场景,建设更加全面的实时数仓、系统化的在线机器学习平台。我们更希望批流一体的落地,让报表类和 ETL 类的大型批任务也能够在 Flink 平台上落地。
基于知乎大数据系统建设的模式和总体资源投入的情况,未来将技术栈向 Flink 收拢是一个非常适合知乎的选择。作为用户我们非常期待能够一同见证未来 Flink 批流一体目标的达成。同时作为社区的成员,我们也希望能够用自己的方式为这一目标的达成贡献一份力量。