京东 Flink 优化与技术实践
分享嘉宾:付海涛 京东 高级技术专家
编辑整理:刘明明
出品平台:DataFunTalk
导读: Flink是目前流式处理领域的热门引擎,具备高吞吐、低延迟的特点,在实时数仓、实时风控、实时推荐等多个场景有着广泛的应用。京东于2018年开始基于Flink+K8s深入打造高性能、稳定、可靠、易用的实时计算平台,支撑了京东内部多条业务线平稳度过618、双11多次大促。本次讲演将分享京东Flink计算平台在容器化实践过程中遇到的问题和方案,在性能、稳定性、易用性等方面对社区版Flink所做的深入的定制和优化,以及未来的展望和规划。主要内容包括:
- 实时计算演进
- Flink容器化实践
- Flink优化改进
- 未来规划
01 实时计算引进
1.发展历程
最初大数据的模式基本都是T+1,但是随着业务发展,对数据实时性的要求越来越高,比如对于一个数据,希望能够在分钟级甚至秒级得到计算结果。京东是在2014年开始基于Storm打造第一代流式计算平台,并在Storm的基础上,做了很多优化改进,比如基于cgroup实现对worker使用资源的隔离、网络传输压缩优化、引入任务粒度toplogy master分担zk压力等。到2016年,Storm已经成为京东内部流式处理的最主要的计算引擎,服务于各个业务线,可以达到比较高的实时性。
随着业务规模的不断扩大,Storm也暴露出许多问题,特别是对于吞吐量巨大、但是对于延迟不是那么敏感的业务场景显得力不从心。于是,京东在2017年引入了Spark Streaming流式计算引擎,用于满足此类场景业务需要。
随着业务的发展,不光是对于数据的延迟有很高要求,同时对于数据的吞吐处理能力也有很高的要求,所以迫切需要一个兼具低延迟和高吞吐能力的计算框架,于是在2018年我们引入了Flink。在Flink社区版的基础上,我们从性能、稳定性、易用性还有功能等方面,都做了一些深入的定制和优化。同时我们基于k8s实现了实时计算全面的容器化,因为容器化有很多的优点,它可以做到很好的资源隔离,同时它有一个很强的自愈能力,另外它很容易实现资源的弹性调度。同时我们基于Flink打造了全新的SQL平台,降低用户开发实时计算应用的门槛。
到2020年,基于Flink和k8s实时计算平台已经做的比较完善了。过去流式处理是我们关注的重点,今年我们也开始逐渐的支持批处理,朝着批流一体的方向演进。另外AI是目前比较火的一个方向,对于AI来说,它的实时化也是一个重要的研究方向。所以我们的实时计算平台将会朝着批流一体和AI的方向进行发展。
2.平台架构
上面是京东实时计算平台JRC的整体架构,整个架构以定制化改造后的Flink为核心,Flink运行在K8S上,状态存储在HDFS集群上,通过Zookeeper保证集群的高可用。支持流式源JDQ(京东基于Kafka深入定制实现的实时数据总线)和Hive,数据主要写入JimDB(京东内存数据库)、ES、Hbase和京东OLAP。计算平台支持SQL和普通JAR包两种方式的作业,具有配置、部署、调试、监控、和日志处理等功能。
3. 业务场景
京东Flink服务于京东内部非常多的业务线,有70多个一级部门在使用,主要应用场景包括实时数仓,实时大屏,实时推荐,实时报表,实时风控和实时监控,当然还有其他一些应用场景。对数据计算实时性有一定要求的场景,一般都会使用Flink进行开发。
4. 业务规模
京东Flink集群目前由5000多台物理机组成,它服务了京东内部70多个一级业务部门,目前线上的流计算任务大概有3000多个,数据的处理能力可以达到每分钟数十亿甚至更高。
02 Flink容器化实践
1.容器化历程
京东从2018年开始进行计算引擎的容器化改造,2019年初已经实现计算单元全部容器化,2020年进行了容器化方案升级,使用native k8s实现计算资源的弹性扩容。容器化改造的好处是提升了资源使用率,提高了研发效率,增强了业务稳定性,减少了运维部署成本。
2.容器化方案
旧的容器化方案是基于k8s Deployment部署的Standalone Session集群,它需要事先预估出集群所需资源,比如需要的JobManager和TaskManager的资源规格和个数。然后JRC平台通过K8S客户端向K8S Master提出请求,创建JobManager的Deployment和TaskManager的Deployment。其中使用ZK保证高可用,使用Hdfs实现状态存储,使用Prometheus实现监控指标的上传,结合Grafana实现指标的直观展示。集群使用ES存储日志,方便日志的查询。
3.容器化遇到的问题&对策
容器化过程中可能遇到很多问题:
① JM/TM故障自动恢复
应用部署在容器中,当应用出现异常时,如何发现应用或者异常的情况呢?比如可以使用存活探针,编写检测脚本定期读取应用的心跳信息。当检测到Pod处于不健康状态时,可以采用k8s的重启机制来重启不健康的容器。
② 减少Pod异常对业务影响
在k8s中由于硬件异常、资源过载、Pod不健康等问题会导致Pod被驱逐或自动重启,Pod重启时势必会影响到该Pod上分布计算任务的正常运行。这个时候可以考虑采用适当的重启策略、改造内核等方案来减少对任务影响。比如京东实现了JM Failover优化,当Pod异常引起JM Failover时采用的是任务不恢复、重建任务状态恢复的方式,可以一定程度上减少Pod重启对业务带来的影响。
③ 性能问题
在容器环境下,JVM对cpu和内存的感知会有一定的问题,在Java8版本中,一些参数就要进行显式的设置。对于机器性能差异或热点等问题导致部分Pod计算慢的问题,可以考虑进行针对性优化(比如实现基于负载的数据分发)或处理(比如检测到计算慢的Pod将其驱逐到负载较低的机器)。此外,对于使用容器网络的情况下,可能会带来一定的网络性能损耗,此时可以根据情况选择使用主机网络避免网络虚拟化带来的开销,或者选择更高性能的网络插件。
④ 重要业务稳定性
如何保证业务的稳定性是一个需要重点考虑的问题。除了保证系统各个环节的高可用外,还可以根据业务情况考虑使用其它合理的方案,例如业务分级管理,独立资源池,多机房互备等。
4.容器化方案升级(Native k8s)
原有容器化方案存在一定的问题:
- 资源需要提前分配
- 无法实现资源弹性伸缩
- 极端场景下Pod不能正常拉起,影响任务恢复
- 重要业务稳定性
容器化升级的解决方案是采用Native K8s的方式。由JRC平台先向K8S Master发出请求,创建JobManager的Deployment;然后在用户通过Rest服务提交任务后,由JobMaster通过JDResourceManager 向JRC平台发出请求,然后JRC平台向 K8s Master 动态申请资源去创建运行TaskManager 的Pod。
此处,通过引入JRC平台与K8s交互,屏蔽了不同容器平台的差异,解耦了镜像与平台集群配置&逻辑变化。另外,为了兼容原有Slot分配策略,在提交任务时会预估出任务所需资源并一次性申请,之后采用等待一定时间后进行slot分配的方式达到兼容目的。
03 Flink优化改进
主要做了以下四个方面的优化:
- 性能
- 稳定性
- 易用性
- 功能扩展
下边分几个重要的点进行讲解:
1.预览拓扑
预览拓扑主要是为了解决业务的一些痛点:比如任务调优繁琐、SQL任务无法指定并行度、任务需要的额Slot数不清楚、并行度调整后网络buffer不足等。在Flink任务调试阶段,对任务并行度、Slot分组、Chaining策略的调整是个反复的过程,如果把参数写到命令行就太繁琐了。而基于预览拓扑就可以很方便地对这些参数进行配置。
预览拓扑基本的实现方案如上图:用户提交JAR包后可根据JAR包生成对应的拓扑图,之后用户根据拓扑图可以进行在线调整,最后自动将修改后的配置和原来的JAR包一起进行任务提交。
预览拓扑机制使得不修改程序多次提交任务调优成为可能,但是如何保证前后两次提交生成算子稳定的对应关系呢?解决方案的关键是保证算子有稳定的唯一身份标识,具体算法是:如果算子指定了uidHash就用uidHash,如果算子指定了uid就使用uid,否则就从source开始广度优先遍历,利用算子在graph中的位置生成一个稳定hash值。
2.背压量化
第二个重要的优化是背压量化。
在Flink开发的时候,主要有两种方式:
① 通过Flink UI背压面板观察是否背压。使用这种方式在某些场景比较方便,但是它存在几个问题:
- 在有些场景下采集不到背压
- 对于历史背压情况无法跟踪
- 背压影响不直观
- 大并行度时背压采集压力
② 通过任务背压相关指标进行观察和分析,通过将指标定期采集并存储起来,可以进行实时或历史的背压分析。但是它也有一些不足的地方:
- 不同Flink版本中指标含义有一定差异
- 分析背压有一定门槛,需要对于指标含义有深入理解,联合进行分析
- 背压未量化,对业务影响程度不够直观
京东的解决方案是采集背压发生的位置、时间和次数指标,并对这些指标进行上报存储。同时对量化的背压指标结合运行时拓扑,可以精确反映发生背压现场的情况。
3.HDFS优化
随着业务数量的增多,HDFS集群的压力就会变得很大。这会直接导致RPC响应时间变慢,造成请求堆积,同时大量小文件也会对NN内存造成很大压力。对此京东尝试的解决方案有4方面:限制checkpoint最小间隔,时间最小设置在1min左右可以满足大部分业务需求;进行小文件合并;降低cp创建和删除时的hdfs rpc请求;HDFS集群多ns分散均衡压力。
4.网络分发优化
在实践过程中我们发现,即使业务使用了rebalance并且对任务进行了打散分布,但是由于机器处理能力和负载的差异,会导致任务各个并行度不同程序的背压表现,严重影响了任务的性能。为此,我们开发了基于负载的动态rebalance,在数据进行分发时优先选择下游负载最小的channel进行分发。
经测试,在特定场景下性能能够提升近一倍。
5.ZK防抖
目前一般都是使用ZK集群实现Flink集群的高可用,但是当网络抖动、机器繁忙、ZK集群暂时无响应或运维机器的时候,都可能会导致任务重启。
任务重启的原因是由于在这些场景发生时,Curator会将状态设置为suspended,并且Curator认为suspended为Error状态,从而会释放leader,Flink发现notleader后会revokeLeadership,从而造成任务重启。
一个可行的解决办法是升级Curator的版本,同时将connectionStateErrorPolicy设置为SessionConnetionStateErrorPolicy。
6.日志分离
目前我们一个集群是支持跑多个任务的,这时日志会出现的问题是:任务的日志和集群Framework日志混在一起,同时集群的多个任务日志也是混在一起的,不太方便用户查看日志,快速定位问题。
为了解决这个问题,首先要弄清楚目前Flink加载日志框架的基本机制:为了避免跟业务Job中可能包含的日志框架的依赖、配置文件产生冲突,Flink日志相关类的加载都代理给TaskManager框架的类加载器,也就是Parent Classloader,而框架加载的这些类都是从Flink安装包的lib目录下加载的。对于日志配置文件,Flink通过 JVM 启动参数来指定配置日志配置文件路径。
日志分离的解决方案是:将日志相关jar包加入到各个task自己classloader(user classloader)的类路径中;同时确保使用user classloader加载日志类和加载自己的日志配置;
另外对于使用了Flink框架的类(比如PrintSinkFunction),日志不能做到很好的分离,可以考虑使用logback MDC机制。
04 未来规划
未来规划主要包括四个方面:
① 统一计算引擎
引擎Storm全部升级为Flink,这样可以减少平台的运维成本,同时可以提高作业性能(目前已经接近完成)。
② 更多SQL作业
持续完善SQL平台,降低用户的使用门槛,推动用户更多使用SQL开发作业。
③ 智能运维
使用智能诊断,自适应调整运行参数,提升任务的鲁棒性
④ 批流一体
深度打造批流一体实时计算平台,兼具低延迟的流处理和高性能的批处理能力。另外统一架构,实现代码复用,降低用户的使用成本。