来也技术|云原生机器学习平台技术综述(编排调度篇)
来也科技2015年成立以来,深耕于AI行业,先后推出智能助理“助理来也”、对话机器人平台、智能文档理解平台等人工智能类产品,服务超过两百家海内外五百强客户,在OCR、NLP领域积累了大量的数据、代码和模型。
为了满足各行各业的需求,我们通常需要在新的数据集上优化模型效果。这个工作是重复性较强的,都是基于已有的模型,去执行数据获取、标注、处理、finetune、评测效果、发布模型的流程。我们希望通过机器学习平台将上述工作尽可能自动化,更敏捷的交付项目;进一步释放算法工程师的人力,投入到新的创新性工作中。
在当前大数据、大模型的背景下,深度学习基础设施平台的架构需要能够满足大规模训练所需的算力、存储和网络,具备强可扩展性的云原生架构是不二之选。本系列文章我们重点关注机器学习平台的技术难点及其在Kubernetes云原生底座之上的解决方案。覆盖编排、调度、存储、通信、推理等方方面面。本文是系列的第一篇:编排调度篇。
01 分布式训练编排调度痛点
在分布式训练编排和调度方面,存在以下技术问题需要解决:
- 模型训练的工作流程中,不同的环节所需要的资源类型和数量是不同的:数据处理使用CPU、模型训练需要GPU。调度系统需要给不同环节自动分配不同资源,分布式环境下的多个环节之间需要进行高效的数据流转。(流程编排)
- 基于大规模数据或大规模模型训练时,单台机器无法独立完成存储和计算任务,数据和模型需要被划分到多个节点上,节点之间要能够高效的传递信息,并保证算法的收敛性。(分布式机器学习理论)
- 同时调度多个分布式训练任务时,每个训练任务都由多个pod组成,k8s默认调度器是以pod为粒度调度,而不是以job为粒度调度,以pod为粒度的调度方式会在调度多任务时产生资源竞争,造成死锁。(gang-scheduling)
- 多团队共用同一个训练集群时,需要提供资源隔离机制;一个团队内多个成员共用,需要提供多优先级队列和抢占机制,并保证同一优先级下调度的公平性。(调度策略)
- 在公有云环境,当所有包年包月机器都被占用时,为训练任务自动开通按量付费实例;在训练过程中,为训练任务动态开通竞价实例(抢占实例)或使用潮汐资源,实现最大性价比。(elasticity)
- 在NUMA(非一致性内存访问)环境中,感知NUMA节点的拓扑结构,在调度时考虑亲和性。(topology awareness)
02 层次结构
基于Kubernetes体系结构,我们可以通过不同层级的组件,分别解决上述问题。
我们可以看到上图的组件数量比较多,使用分层的体系架构的优点在于,基于Kubernetes框架抽象出的每个组件各司其职,缺点是学习曲线比较陡峭。为了让读者对各个组件的协作关系先有一个大致的了解,本节会做一个简单介绍,在后续章节里还会再进一步介绍每个组件的作用和原理。
在图中这些组件由上至下,依次是:
- Pipeline Orchestrator(流程编排引擎):模型训练流程中的各个环节(component)可以被定义为一个DAG,DAG主要描述了每个component的输入输出和拓扑关系。Orchestrator会根据DAG定义的顺序依次执行各个component。在这个方向的开源工具有:Kubeflow Pipeline、Airflow、MLflow等。
- Distributed Training Framework(分布式训练框架):Pipeline Orchestrator不会提供分布式训练功能,分布式训练需要在训练框架层面实现并行算法、提供通信机制。Tensorflow, PyTorch, Horovod框架基于不同并行化方法提供了各自的分布式训练API。
- Training Job Operator:每个训练框架都有各自的operator,operator的主要作用是将一个包含多个worker的分布式训练作业拆分为不同的built-in workload,向pod中注入用来达成共识的参数,交给scheduler调度。Kubeflow社区提供了TFJob、PytorchJob、MPIJob等多种框架的operator。
- Batch Job Scheduler:scheduler监听集群内新的pod,为pod找到合适的node调度上去。深度学习训练场景的调度器应该支持批量调度、公平调度等高级调度策略。但kubernetes一开始主要管理在线服务,原生的scheduler无法满足批处理任务的调度需求。目前在模型训练场景业界常用的开源调度器是Volcano。
- Topology Manager:Topology Manager在v1.16作为alpha功能引入kubelet的组件,v1.18升级为beta版本。主要作用是在调度的最后阶段,当服务已经绑定node时,Topology Manager会从第三方Device Manager获取到亲和性调度建议,决定将服务绑定在哪个Numa节点上。
03 Kubernetes基本原理
Kubernetes是云计算时代的操作系统,也是新一代基础设施的接入层,它提供了极强的可扩展性,鼓励用户以插件的形式介入服务生命周期的每一个阶段。我们的机器学习平台将运行在这样的基础设施平台上。因此,我们必须先对Kubernetes架构,编排、调度的原理和设计模式有一定了解。
对Kubernetes比较熟悉的读者可以跳过这一章节。
3.1 架构
master上部署了控制平面的组件:
- etcd:兼顾一致性和可用性的kv数据库,是k8s的数据库。
- kube-apiserver:所有服务不会跟etcd直接建立连接,而是通过apiserver读写数据。
- kube-controller-manager:负责容器编排。监听api server变更,发现待编排对象期望状态和实际状态的差异,将实际状态调整为期望状态。
- kube-scheduler:负责调度。为新创建出来的pod寻找最合适的节点调度上去。
以下组件会在每个node节点上运行:
- CNI(container networking interface):管理容器网络。
- kubelet:接受scheduler调度过来的pod,保证pod中容器健康运行。
- CRI(container runtime interface):kubelet通过cri跟容器运行时(比如docker、containerd)交互。
- CSI(container storage interface):管理容器持久化存储。
- Device Plugin:管理硬件设备,向kubernetes上报设备信息,在调度时给容器绑定为其分配的设备。
3.2 编排原理
上文说道controller manager主要负责编排工作。编排是k8s的当家本领,也集合了k8s最精粹的设计理念,理解controller的工作原理对理解整个系统非常重要。
对于编排实现原理,两个设计模式不得不提:
3.2.1 声明式API
在向k8s部署服务时,我们一般会定义一个yaml文件,描述我们所期望的服务的最终状态,k8s接收到请求后自动执行一些操作,让系统达到期望状态。与声明式API相对的是命令式API,比如通过CURD直接操作集群内资源。声明式API屏蔽了服务部署的细节,把复杂留给自己,将易用性留给使用者。
以下文为例,定义了一个3个副本的nginx服务,部署服务时,只需要将这个yaml应用到k8s。
kind: Deployment
metadata:
name: nginx-deployment
labels:
app: nginx
spec:
replicas: 3
selector:
matchLabels:
app: nginx
template:
metadata:
labels:
app: nginx
spec:
containers:
- name: nginx
image: nginx:1.14.2
ports:
- containerPort: 80
3.2.2 控制器模式
在机器人设计和自动化领域,控制器模式是非常常见的设计模式。控制器会不断检测期望状态(spec)和实际状态(status)的一致性,并控制系统达到期望状态。
下图展示了k8s中的具体实现,以上文的nginx-deployment为例,informer组件会通过List&Watch监听到集群内出现了一个新的deployment,然后通过队列通知控制循环(也叫control loop、reconcile loop)处理这个请求。control loop接受到请求后执行具体的diff操作,发现集群内现在pod数为0,而期望启动3个。最后根据deployment的定义,向集群内创建3个1.14.2版本的nginx pod。而这些pod也是以yaml的方式创建的。
k8s原生的controller manager只能控制原生对象,比如deployment、statefulset、service、configmap等。但k8s也提供了很好的可扩展性,让用户也可以自己定义资源类型和控制器。现在大家一般会用CoreOS开源的operator框架进行自定义。
operator框架主要做两件事:
- 定义CRD(Custom Resource Definition),下面以TFJob为例,可以看到CRD主要描述了资源的名称、分组及其yaml的字段。
kind: CustomResourceDefinition
metadata:
name: tfjobs.kubeflow.org
spec:
# 资源分组
group: kubeflow.org
names:
plural: tfjobs
singular: tfjob
# 资源名称
kind: TFJob
listKind: TFJobList
scope: Namespaced
versions:
# 版本
- name: v1
served: true
storage: true
schema:
# yaml的字段
openAPIV3Schema:
description: TFJob represents a TFJob resource.
type: object
properties:
apiVersion:
...
kind:
...
- 实现自定义controller:自定义controller一般以deployment的形式启动。它会监听CRD类型的资源,运行你实现的control loop逻辑,将资源拆分成pod、service、configmap等细粒度资源,应用到k8s集群。下面是启动一个TFJob的yaml示例。
# 资源的分组+版本
apiVersion: kubeflow.org/v1
# 资源名称
kind: TFJob
metadata:
generateName: tfjob
namespace: your-user-namespace
spec:
tfReplicaSpecs:
# 启动1个parameter server
PS:
replicas: 1
restartPolicy: OnFailure
template:
metadata:
annotations:
sidecar.istio.io/inject: "false"
spec:
containers:
- name: tensorflow
image: gcr.io/your-project/your-image
command:
- python
- -m
- trainer.task
- --batch_size=32
- --training_steps=1000
# 启动三个worker
Worker:
replicas: 3
restartPolicy: OnFailure
template:
metadata:
annotations:
sidecar.istio.io/inject: "false"
spec:
containers:
- name: tensorflow
image: gcr.io/your-project/your-image
command:
- python
- -m
- trainer.task
- --batch_size=32
- --training_steps=1000
3.3 调度原理
其实,不只是controller运用了控制器模式,scheduler和kubelet也是以控制器模式工作的。
上节说到,当资源被controller处理后会被拆分为pod。这些新的pod被应用到k8s集群时,实际只是向etcd里保存了一个key value对。真正的调度工作是scheduler和kubelet做的。
3.3.1 scheduler
scheduler的工作职责是为pod找到最适合的node,然后将nodeName更新到pod的yaml上。
在scheduler的control loop中,scheduler会监听etcd的变更,拿到未被调度的pod,为pod选择node。选择node时会执行预选和优选两个策略:
- 预选:找到能够被调度的node。node需要满足一些必要条件:剩余资源大于pod所申请的资源;污点;亲和性等策略。
- 优选:在满足要求的node里找到最适合的node。优选会考虑:选择调度后最空闲的节点;选择调度后资源使用最平均的节点;亲和性等策略。统一打分,选择分数最高的节点。
k8s脱胎于google的Brog,一开始主要管理在线服务,原生的scheduler无法满足批处理任务的调度。深度学习训练场景的调度器应该支持批量调度、公平调度等高级调度策略。这要求我们自定义调度器。
为了满足自定义scheduler的需求,k8s先后提出了scheduler extender、multiple scheduler、scheduling framework 3种方案,最新的scheduling framework已经在v1.19达到stable状态。读者如果对3种方案的优缺点和演进历程感兴趣,可以移步阿里云ACK团队发表的文章:进击的Kubernetes调度系统(一):Scheduling Framework
3.3.2 kubelet
3.1我们提到每个node都会有一个kubelet。这个kubelet的control loop会监听包含当前节点nodeName字段,但还未被调度的pod。kubelet会再次判断当前节点的资源是否满足调度需求,如果满足,则调用容器运行时的接口创建容器,并指定资源申请相关的参数。
原生的k8s只提供CPU和内存资源,对于GPU、SR-IOV这类extended resource,设备厂商一般会提供device plugin,以deamonset的形式启动在每个node上,向kubelet上报设备的类型、数量甚至拓扑结构,在容器启动阶段负责设备的虚拟化和绑定。
至此,我们花了比较大的篇幅介绍kubernetes的架构和编排调度原理,特别指出了声明式API和控制器模式的重要性。理解这些内容对理解云原生机器学习平台技术非常重要,想深入学习Kubernetes的读者,可以购买张磊老师的 《深入剖析Kubernetes》 一书,或学习极客时间的同名课程,相信一定会有所收获。
04 流程编排
机器学习训练通常是一个任务流,其中每个组件(component)分别承担不同职责。以来也通用文字识别模型为例,下图展示了数据下载、数据处理、训练、测试、寻找最优模型、上传模型一系列组件,这些组件之间的关系可以被表示为一个DAG(有向无环图)。
流程引擎核心要解决的问题是控制流程中每个组件的执行顺序,负责组件之间的数据传递。
流程引擎作为发起模型训练的入口,选型是十分重要的。Kubeflow、Airflow、Mlflow都提供了流程编排的能力。Kubeflow作为k8s云原生的开源项目,拥有更开放的生态,提供了更多定制化的可能性。因此,本节会主要关注Kubeflow。
Kubeflow Pipeline底层其实是基于流程引擎Argo做了一层封装,所以我们会先介绍一下Argo的工作原理,再介绍Kubeflow Pipeline。
4.1 Argo
Argo本身是用于CI/CD的云原生流程引擎,是CNCF的孵化项目,后来被Kubeflow引入模型训练和数据处理场景。
如3.2.2中所描述的operator的工作原理,Argo其实就是一个用来做资源编排的operator,发起一个Argo任务也是向集群里应用一个yaml。下面定义了一个工作流,可以看到yaml里定义了2个component,一个用来生成字符串,一个接受字符串打印到控制台。通过这个简单的yaml,我们可以大致理解流程里每个组件是怎么定义的,他们之间是怎么通信的。
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: output-parameter-
spec:
entrypoint: output-parameter
templates:
# 定义2个component的关系
- name: output-parameter
steps:
- - name: generate-parameter
template: whalesay
- - name: consume-parameter
template: print-message
arguments:
parameters:
- name: message
# 使用第一个step的输出作为当前step的输入
value: "{{steps.generate-parameter.outputs.parameters.hello-param}}"
# 定义第一个component,用于生成字符串
- name: whalesay
container:
image: docker/whalesay:latest
command: [sh, -c]
# 打印字符串并写入文件
args: ["sleep 1; echo -n hello world > /tmp/hello_world.txt"]
outputs:
parameters:
# 将文件内作为当前步骤的参数输出
- name: hello-param
valueFrom:
default: "Foobar" # Default value to use if retrieving valueFrom fails. If not provided workflow will fail instead
path: /tmp/hello_world.txt
# 定义第二个component,用于打印字符串
- name: print-message
inputs:
parameters:
- name: message
container:
image: docker/whalesay:latest
# 接受一个参数并在控制台打印参数
command: [cowsay]
args: ["{{inputs.parameters.message}}"]
在实际运行的时候,一个component就是一个pod。我们知道在kubernetes分布式环境下,不同的pod会被调度到不同的node上。在模型训练这个大数据量的场景里,跨服务器的数据通信的效率十分重要。因此我们有必要了解一点Argo数据传递的原理。
这里指的数据传递主要有两种情况:传递参数和传递文件。
4.1.1 传递参数
当pod之间需要传递参数时,operator会从前一个pod中查询到信息,在第二个pod启动时将参数注入到yaml的annotation字段里。
由于yaml是用k8s etcd作为存储,因此这种传输方式对数据大小有限制,每个参数要满足etcd max-request-bytes的限制,默认是1.5M,而etcd最大支持存储8G的数据。这就意味着我们不可能通过这种方式传太大的数据,只能用来传输一些超参数。
4.1.2 传递文件
当pod之间需要传输文件时,operator在启动component时不仅会启动用户容器,还会启动一个用来复制数据的sidecar容器。这个sidecar会在用户容器执行结束时从其中拷贝结果文件,通过s3协议上传到跟argo一起部署的一个minio里。在下一个component启动时,sidecar再从minio把文件下载下来。
这个方案的问题在于,在模型训练场景里,每个component的output非常大,以OCR训练来说,一个数据集有超kw数据,如果在每个component里都要上传下载,是很浪费时间的做法。
由于argo原生的两种数据传递方式对模型训练场景并不是很友好,目前也没有太好的解决方案,我们现在的做法是通过挂载一个ReadWriteMany的分布式文件存储系统来实现的。
文件存储是机器学习平台另一个重要话题,机器学习workload要求文件存储支持较大的存储量、优秀的性能和小文件随机读写能力,近些年云原生社区也涌现了一些数据编排方案,我们将在未来再发一篇博客详细讨论。感兴趣的读者可以关注《来也技术团队》公众号。
对于Argo数据传输相关更多实现原理,可以移步华为云的技术博客:KubeFlow-Pipeline及Argo实现原理速析(https://bbs.huaweicloud.com/blogs/172101)。
4.2 Kubeflow Pipeline(kfp)
既然Argo提供了流程编排能力,kfp又做了什么呢?我认为kfp主要在3个方面提供了重要价值。
- 提供了一个sdk,支持通过python代码的方式定义DAG,而不是让算法工程师手撕yaml。
- 提供了step cache能力,当一个component第二次执行时,kfp的operator检测到镜像、输入、输出等参数都没有发生变化,不会再重新执行,而是直接使用上一次的结果。这个特性在模型训练场景非常有用,很多时候我们只是改了超参数重新训练,没必要重新执行数据处理的component。
- 提供了一个可视化页面,可以在训练过程中收集metrics、tensorboard,展示在页面上。
那么现在还有一个问题没有讨论清楚,基于kfp,怎么能够用到分布式训练的controller和scheduler。实际上,kfp提供了3种定义component的方式:
- 直接给python方法加装饰器:Building Python Function-based Components (https://www.kubeflow.org/docs/components/pipelines/sdk/python-function-components/)
- 通过kfp提供的格式定义一个component yaml:Building Components (https://www.kubeflow.org/docs/components/pipelines/sdk/component-development/)
- 直接使用k8s resource作为component:Manipulate Kubernetes Resources as Part of a Pipeline (https://www.kubeflow.org/docs/components/pipelines/sdk/manipulate-resources/)
使用第三种方式,我们可以直接把自定义的CRD作为pipeline的一个component,自定义的controller和scheduler只需要监听特定的资源执行control loop即可。
05 分布式机器学习理论
在了解如何启动分布式训练任务之前,我们需要对分布式机器学习理论有基本了解。本章会分3个方面简单介绍分布式机器学习理论:
- 之所以需要使用分布式机器学习,主要是为了解决数据量大和模型规模大两方面问题。对于数据太多的情况,需要将数据划分到多个节点上训练;对于模型规模大的情形,则需要将模型分配到不同节点训练。数据和模型的划分方法是我们关注的首要问题。
- 完成模型和数据划分后,每个工作节点需要根据分配给自己的局部训练数据和子模型进行训练。而局部训练产生的参数需要跟全局共享,才能达到并行训练的效果。因此参数共享时涉及到的通信拓扑结构、步调和频率是我们关注的第二个问题。
- 最后,无论使用哪种拓扑结构,都需要解决如何聚合来自不同工作节点的模型参数和梯度的问题。
5.1 数据与模型划分模块
5.1.1 数据并行(Data Parallelism)
将大规模数据集划分到不同节点并行训练的方法,称之为“数据并行”。当模型参数单机能放得下时,通常采用数据并行,OCR、NLP模型训练一般用数据并行的方案。
数据划分主要有2种操作的角度:一是对训练样本进行划分,二是对特征维度进行划分。我们主要讨论前者。对训练样本划分有2种常见做法:
- 随机采样:采用有放回的方式随机采样,为每个节点分配训练样本,保证局部数据和全局数据的独立同分布。缺点是全局采样代价较高。
- 置乱切分:先分好再训练,周期性打乱重新分一次。这种方法的缺点是,不能保证数据独立同分布,接近于有放回的随机采样,影响收敛效率。
目前数据并行技术已经达到可用阶段,Tensorflow、PyTorch、Horovod都提供了数据并行API。
5.1.2 模型并行(Model Parallelism)
将大规模模型切分成若干子模型(包含一部分模型参数),将每个子模型放在一个工作节点上的方法,称之为“模型并行”。搜推⼴、大规模预训练模型训练会用到模型并行方案。
划分模型主要有横向划分和纵向划分两种方法:
- 逐层横向划分:也叫流水并行(Pipeline Parallelism)。优点是接口简单,缺点是处理不了单层参数单机放不下的情况;而且并行度较差。为了消除空闲时间(bubble),通常会并行处理多个microbatches,保证每个节点都在工作。
- 纵向划分:将单层参数划分到不同节点。这种方法的缺点是各个子模型之间依赖比较复杂,实现难度也更大。
一般来说,每层神经元数目很多但层数很少,可以考虑横向划分;反之如果每层神经元数目很多而层数较少,可以考虑纵向划分。
5.2 通信模块
通信分为两种方式,一是共享内存(share memory)、二是消息传递(message passing)。共享内存主要用在单机多卡场景,单机多卡不可避免会遇到内存墙的问题。多机多卡通信需要用message passing。所以这节我们主要讨论后者。
通信对于分布式训练至关重要,主要关注3个挑战:“怎样同步”、“何时同步”、“如何最小化通信的代价”,这3个挑战分别对应3个小节:“拓扑结构”、“步调”和“频率”。
5.2.1 拓扑结构
5.2.1.1 中心化
中心化的拓扑结构是指所有节点分为server和worker两种角色。worker用来做计算,server负责协调各个worker。
中心化最经典的编程范式是google的MapReduce。MapReduce将程序抽象成Map和Reduce两个操作,Map负责数据分发和并行处理,Reduce负责数据同步和规约。
MapReduce很适合大数据处理,但用来做并行计算并不高效。这是因为Reduce流程是同步的,训练速度会受系统中最慢节点的影响,5.2.2小节会介绍同步和异步两种通信步调。
Parameter Server(PS)是另一种常见的中心化异步编程模型。参数服务器中存储了模型的所有参数,worker可以去参数服务器中获取参数、更新参数。
Tensorflow最早发布分布式训练功能时就使用了PS架构。PS架构的主要问题在于多个worker共享同一套server,集群带宽不足时,server会产生网络拥塞,导致worker一直在数据同步环节阻塞,而GPU处于空转状态。
5.2.1.2 去中心化
跟中心化相对的是p2p的无共享拓扑结构,没有中心节点协调各个worker之间的通信,worker之间自行建立链接同步参数。拓扑结构的实现方式多种多样,星形、树形、蝶形、all-to-all理论上都可以,而拓扑结构影响了数据传输的数量和次数。每个节点之间一般是使用MPI的AllReduce接口实现通信,在gpu训练场景,一般使用Nvidia的NCCL通信库,它实现了部分AllReduce接口。
现在常用的拓扑结构是环形结构(Ring All-Reduce)。2017年百度首先实现Ring All-Reduce算法,如今,Ring All-Reduce已经成为主流的单机多卡和多机多卡训练方案。
与PS架构的server负责所有通信工作不同,Ring All-Reduce将每个节点的参数切分成不同分片,在参数同步时,每轮将一个分片同步到下一个节点。这种做法能够充分利用到集群内的带宽资源,避免PS架构的网络阻塞问题。理论上Ring All-Reduce的传输耗时只受模型规模和带宽影响,不受节点数量影响,因此可以达到线性加速比。
5.2.2 步调
通信还要考虑步调。同步通信是指一个节点完成本轮迭代后,等待其他节点都完成再统一更新参数。异步通信是指完成本轮迭代后,不等其他节点,直接更新自己的参数。
同步通信的问题在于整体速度受掉队者(straggler)的影响,导致计算资源浪费严重。这个问题一般叫做straggler problem。因此,同步通信也不适合用在异构计算场景。
异步通信虽然能够提升效率,但速度较慢的节点依然会影响整体效率,当速度较慢的节点将一个陈旧的局部模型同步到其他节点时,会导致收敛性变差。这要求模型训练采用更小的学习率,进而影响收敛速度。为了解决这一问题,学界在结合同步、异步通信方法上有进一步研究成果,本文不再详述。
5.2.3 频率
直观上看,通信频率越高,工作节点之间协调的越好,模型训练的效果应该越有保障。但通信频率越高,随之而来的通信代价就越高,在这一方向有人进行了一些研究,寻找计算代价和通信代价的最优匹配,从而实现最高的学习效率。
除了降低通信频率外,还可以使用过滤、量化、压缩等方式提高通信效率,本文不再详述。
5.3 数据与模型聚合模块
不同节点产生各自的模型或梯度之后,聚合模块负责将多个节点的参数聚合,更新模型。
- 常用的聚合方式主要是基于模型加和和投票的聚合方法,在异步通信的情况下,有一些部分聚合的优化方法。
- 另一种聚合方法是模型集成,为了避免模型集成带来的参数爆炸问题,在模型训练中也会利用知识蒸馏技术压缩模型。
本章主要提供了一个理论视角,用来看待分布式训练算法,实际上不同的分布式训练算法分别采用了不同的划分、聚合算法,选择了不同的通信拓扑结构、步调和频率。
在云原生背景下,从工程角度来看,了解分布式算法的重点在于理解通信的拓扑结构,因为通信的拓扑结构决定了部署服务的拓扑结构。
如果读者希望分布式机器学习有更直观的了解,推荐学习王树森博士的DeepLearning课程(https://github.com/wangshusen/DeepLearning)Parallel Computing一节。如果希望深入了解理论,推荐阅读微软亚洲研究院《分布式机器学习:算法、理论与实践》一书。
06 分布式训练框架及其operator
出于篇幅考虑,这节不会介绍分布式训练框架具体实现,而是会先从使用者的角度切入,了解当前常用的三种分布式训练框架Tensorflow、Pytorch、Horovod如何启动分布式训练任务,再介绍一下在云原生社区提供的分布式训练operator。
6.1 分布式训练任务Setup
6.1.1 Tensorflow
Tensorflow1.0使用parameter server进行分布式训练,下面这段代码可以直观了解到如何启动一个分布式训练任务。这段代码接受命令行传入的参数,参数列表指定了当前运行服务是parameter server还是worker,根据不同角色执行不同的初始化逻辑。
ps_hosts = FLAGS.ps_hosts.split(",")
worker_hosts = FLAGS.worker_hosts.split(",")
# Create a cluster from the parameter server and worker hosts.
cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})
# Create and start a server for the local task.
server = tf.train.Server(cluster,
job_name=FLAGS.job_name,
task_index=FLAGS.task_index)
if FLAGS.job_name == "ps":
server.join()
elif FLAGS.job_name == "worker":
is_chief = (FLAGS.task_index == 0)
# Assigns ops to the local worker by default.
with tf.device(tf.train.replica_device_setter(
worker_device="/job:worker/task:%d" % FLAGS.task_index,
cluster=cluster)):
# Compute
下面这段脚本展示了如何在4台节点上启动2个ps节点和2个worker节点。
# On ps0.example.com:
$ python trainer.py \
--ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \
--worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \
--job_name=ps --task_index=0
# On ps1.example.com:
$ python trainer.py \
--ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \
--worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \
--job_name=ps --task_index=1
# On worker0.example.com:
$ python trainer.py \
--ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \
--worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \
--job_name=worker --task_index=0
# On worker1.example.com:
$ python trainer.py \
--ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \
--worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \
--job_name=worker --task_index=1
6.1.2 PyTorch
PyTorch的DDP使用了ring all-reduce进行分布式训练,DDP依赖c10d(collective communication)库,提供了gloo、MPI、NCCL等多种通信方式。在启动任务时也是向环境变量里注入了用于服务发现的master节点地址和当前节点标识符。
import os
import sys
import tempfile
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP
def setup(rank, world_size):
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12355'
# initialize the process group
dist.init_process_group("gloo", rank=rank, world_size=world_size)
def cleanup():
dist.destroy_process_group()
6.1.3 Horovod
Horovod是uber开源的、兼容主流模型训练框架的分布式训练框架,也是目前最流行的第三方分布式训练框架。使用Horovod对代码有一定侵入性,下方的示例展示了Horovod在Tensorflow上的一个example。通过horovodrun命令启动训练任务时,需要通过命令行参数传入各节点的地址。
import horovod.tensorflow.keras as hvd
# Horovod: initialize Horovod.
# 初始化 Horovod,启动相关线程和MPI线程
hvd.init()
# Horovod: pin GPU to be used to process local rank (one GPU per process)
# 依据 local rank 为不同的进程分配不同的GPU
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
if gpus:
tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')
(mnist_images, mnist_labels), _ = \
tf.keras.datasets.mnist.load_data(path='mnist-%d.npz' % hvd.rank())
# 切分数据
dataset = tf.data.Dataset.from_tensor_slices(
(tf.cast(mnist_images[..., tf.newaxis] / 255.0, tf.float32),
tf.cast(mnist_labels, tf.int64))
)
dataset = dataset.repeat().shuffle(10000).batch(128)
mnist_model = tf.keras.Sequential([
tf.keras.layers.Conv2D(32, [3, 3], activation='relu'),
......
tf.keras.layers.Dense(10, activation='softmax')
])
# Horovod: adjust learning rate based on number of GPUs.
# 根据Worker的数量增加学习率的大小
scaled_lr = 0.001 * hvd.size()
opt = tf.optimizers.Adam(scaled_lr)
# Horovod: add Horovod DistributedOptimizer.
# 把常规TensorFlow Optimizer通过Horovod包装起来,进而使用 ring-allreduce 来得到平均梯度
opt = hvd.DistributedOptimizer(
opt, backward_passes_per_step=1, average_aggregated_gradients=True)
# Horovod: Specify `experimental_run_tf_function=False` to ensure TensorFlow
# uses hvd.DistributedOptimizer() to compute gradients.
mnist_model.compile(loss=tf.losses.SparseCategoricalCrossentropy(),
optimizer=opt, metrics=['accuracy'],
experimental_run_tf_function=False)
callbacks = [
# 广播初始化,将模型的参数从第一个设备传向其他设备,以保证初始化模型参数的一致性
hvd.callbacks.BroadcastGlobalVariablesCallback(0),
hvd.callbacks.MetricAverageCallback(),
hvd.callbacks.LearningRateWarmupCallback(initial_lr=scaled_lr, warmup_epochs=3, verbose=1),
]
# Horovod: save checkpoints only on worker 0 to prevent other workers from corrupting them.
# 只有设备0需要保存模型参数作为checkpoint
if hvd.rank() == 0:
callbacks.append(tf.keras.callbacks.ModelCheckpoint('./checkpoint-{epoch}.h5'))
# Horovod: write logs on worker 0.
verbose = 1 if hvd.rank() == 0 else 0
# Train the model.
# Horovod: adjust number of steps based on number of GPUs.
mnist_model.fit(dataset, steps_per_epoch=500 // hvd.size(), callbacks=callbacks, epochs=24, verbose=verbose)
6.2 Operators
从6.1示例中可以看到,setup一个分布式训练任务时,所有训练框架都要求使用者通过环境变量或命令行参数传入节点地址和标识符。这些信息会用来互相发现,建立拓扑关系,从而在各个节点内实现参数的传递和同步。
但是这种操作带来诸多不便,每次启动都要传很多参数,不同节点标识符也不一样,使用MPI通信的训练框架还需要配置节点间访问秘钥,如果同时用很多机器训练模型,启动过程会非常繁琐,而这个启动流程完全是可以被自动化的。
在3.2章节我们介绍了k8s的编排原理,重点说明如何在k8s自定义一个CRD,自己开发operator控制CRD的编排。kubeflow正是利用了k8s这一可扩展的特性,开发了一系列training operator,支持将不同框架实现的分布式训练任务的编排自动化。
这节我们以基于tensorflow的tfoperator为例,介绍training operator的实现原理。
tfoperator定义了一个名叫TFJob的CRD,先看一个使用TFJob启动的训练任务的示例。下面的yaml定义了2个ps节点、2个worker节点,将yaml apply到k8s里,operator会自动启动训练任务。
apiVersion: kubeflow.org/v1
kind: TFJob
metadata:
generateName: tfjob
namespace: your-user-namespace
spec:
tfReplicaSpecs:
PS:
replicas: 2
restartPolicy: OnFailure
template:
metadata:
annotations:
sidecar.istio.io/inject: "false"
spec:
containers:
- name: tensorflow
image: gcr.io/your-project/your-image
command:
- python
- -m
- trainer.task
- --batch_size=32
- --training_steps=1000
Worker:
replicas: 2
restartPolicy: OnFailure
template:
metadata:
annotations:
sidecar.istio.io/inject: "false"
spec:
containers:
- name: tensorflow
image: gcr.io/your-project/your-image
command:
- python
- -m
- trainer.task
- --batch_size=32
- --training_steps=1000
tfoperator实现的具体原理是:当监听到集群中出现了TFJob类型的资源时,operator会解析tfReplicaSpecs并创建ps, worker, chief类型的pod。由于tensorflow启动分布式任务时需要设置TF_CONFIG的环境变量,因此在启动pod之前,operator也会生成TF_CONFIG并注入到环境变量中。
// TF_CONFIG
{
"cluster":{
"ps": [
"tfjob-ps-0.your-user-namespace.svc:2222",
"tfjob-ps-1.your-user-namespace.svc:2222"
],
"worker":[
"tfjob-worker-0.your-user-namespace.svc:2222",
"tfjob-worker-1.your-user-namespace.svc:2222"
]
},
"task":{
"type":"ps",
"index":0
}
}
在上文的TF_CONFIG示例中,可以看到每个ps和worker的pod都可以由一串dns地址表示,各个pod之间可以通过dns地址互相访问。使用dns地址而不是pod ip的好处是:如果运行过程中pod发生重启,dns地址不会变,其他pod也就不用感知到这次重启,更新本地的TF_CONFIG了。在k8s中,这种为每个pod绑定不变dns地址的需求可以通过创建headless service实现。
总而言之,tfoperator利用了控制器模式,监听TFJob对象当前状态和期望状态的差异,保证集群内启动了足够数量的ps、worker pod,并为每个pod创建一个headless service用于互相访问。
07 批处理任务调度
7.1 模型训练场景下的调度问题
Training operators解决了训练任务编排的问题,但也带来新的问题。一个训练任务(job)包含了多个pod,但k8s的默认scheduler是以pod为粒度调度的,不是以job维度调度。这种简单的调度策略在深度学习模型训练场景存在一些缺陷:
- 死锁:下图展示了一个死锁的示例。集群中有2个job正在调度,已经分别成功调度了3个pod,还剩1个pod未能调度,此时集群已经没有剩余资源,两个job又都不愿意释放资源,导致两个job都无法成功启动。
- 资源碎片:下图展示了一个资源碎片的示例。集群内3个节点一共有6张卡,每个节点已经被调度上了1卡的pod,集群内剩余3张卡。任务队列里还有一个需要单机2卡的任务正在等待,尽管集群内共剩余3卡,但由于没有一个node能够满足pod申请的资源,导致pod无法被成功调度。
- 调度的公平性:在下面左图的示例中我们能看到,当2个job包含的pod数存在明显差异时,如果我们对所有用一视同仁的优先级调度,就会出现pod越多,占用资源也越多的问题。
- 亲和性与反亲和性:对于ps架构来说,我们希望调度可以将不同的ps pod调度到不同的node上,将ps和worker尽量调度到同一个node上。如果没有使用亲和性调度策略,会造成网络通信的压力,进而影响计算资源的利用率。
7.2 Volcano
Volcano(以前叫kube-batch)是华为开源的、k8s生态中高性能计算场景最常用的调度器,能够解决上述调度问题。
7.2.1 在training operator中使用Volcano
kubeflow training operator支持将workload指定给volcano调度器调度。如官网(https://www.kubeflow.org/docs/components/training/job-scheduling/)所描述:在k8s中安装volcano调度器,并为training operator设置一个命令行参数即可。在集群中发起新的训练任务时会自动被volcano调度。
我们从实现的视角理解一下volcano调度器是如何被指定的。
首先,就像我们在6.2所描述的那样,kubeflow training operator会监听TFJob类型的CRD并创建pod。当设置了volcano调度器时,training operator还做了其他事情:
- 创建一个PodGroup类型的对象并应用到集群中。这个PodGroup CRD就是volcano调度的最小单位,它代表了一组强关联的pod集合。在PodGroup的yaml中,支持为其设置最小启动的pod数、最小资源申请量、关联使用的优先级队列。
- 将pod绑定podGroup、并将pod的schedulerName设置为volcano,保证volcano调度器的control loop可以发现并处理这个pod。
而volcano则负责给pod找到一个最合适的节点,修改pod的nodeName字段。
7.2.2 Volcano实现原理
Volcano scheduler的工作流程如下图所示:
- 周期性的开启session,一个调度周期开始。
- 将没有被调度的Job发送到会话的待调度队列中。
- 遍历所有的待调度Job,按照定义的次序依次执行enqueue(入队)、allocate(分配)、preempt(抢占)、reclaim(回收)、backfill(预留)等动作,为每个Job找到一个最合适的节点。将该Job 绑定到这个节点。action中执行的具体算法逻辑取决于注册的plugin中各函数的实现。
- 关闭本次会话。
回到7.1提出的问题,volcano通过提供各种plugin能够解决这些困境:
- Gang Plugin:解决死锁问题。对PodGroup使用“All or nothing”策略。
- Binpack Plugin:解决资源碎片问题。根据资源使用率计算节点权重,在优选阶段为pod选择打分最高的节点。
- Priority、DRF Plugin:解决Job调度的公平性问题,支持以fair-share的形式共享资源。
- Proportion Plugin:为不同团队划分资源使用比例。
- Task-topology Plugin:提供亲和性和反亲和性配置策略。
volcano的强可扩展性也提供给我们很大的自由度,去定制符合业务要求的调度策略。
08 总结
本文从分布式训练痛点出发,提出了云原生场景解决这些问题的层次架构,针对流程编排、operator和scheduler展开具体描述,介绍了常见的开源方案和实现原理。理解kubernetes原理和分布式机器学习理论对理解mlops系统至关重要,因此也在文中穿插对两者的介绍。希望能够对读者有所帮助。
由于篇幅原因,本文没有涉及弹性计算和拓扑感知相关技术,我们会在公众号上继续发布云原生机器学习平台系列文章,也欢迎大家关注 《来也技术团队》 公众号。