Fork me on GitHub

分布式机器学习平台架构设计

图片

文章作者:赵喜生 机器学习平台架构师

导读:用户数据大规模积累、用户体验需求升级、算力革新和计算模型的演进,这三大核心要素及其相互作用成为现在和未来基于网络互联用户活动的主要组成部分。机器学习平台作为使数据、计算和用户体验三者相互作用的关键基础设施发挥着作用。

▶ 数据

用户在APP上每个点击,划过,从一个APP切换到另一APP,在每个页面的停留,甚至浏览的注意力和速度;屏幕、电池、GPS定位、运动传感器......等设备的运行数据。人们的各种数据无时不刻的被记录、存储和计算。

数据的用途也从事务交易、统计分析到体验优化进行了一个完整的演进迭代。

▶ 算力&计算模型

从单核CPU到HPC,再到基于微内核的GPU,外存和内存容量和速度的不断增长,万兆网络、InfiniBand、RDMA网络技术的发展和应用。我们走过的短短数十年中,算力的革新已经是有很大的变化,同时适配于计算硬件的新的计算模型的发展:从分布式的RPC,到基于低I/O成本的MapReduce,发展到GPU的矩阵运算等,计算模型也在影响着软件开发的变革。

图片

▶ 用户体验

最开始用户是在基于命令行/桌面进行指令性交互,随后门户时代是基于类目组织的信息浏览,搜索引擎给我们带来的是用户主动信息检索体验,而今各种Feed大行其道。用户在“懒”的道路上一路向前,用户参与信息交互更广、更深,信息获取更加简单高效。

用户、数据和计算正在以前所未有的程度参与并加速重构人类生活方式。

01 全栈机器学习平台

一个良好的机器学习平台产品需要满足整个数据科学的完整生命周期的各个环节。在产品设计方面需要考虑到:

▶ 全栈用户体验

用户数据科学的所有活动需要在一个产品中无缝且无差别体验地完成,这是解放数据科学家生产力,服务数据科学家聚焦于其核心工作的基础

▶ 工程能力和数据科学解耦

完成一个完整的数据接入、特征处理、模型训练优化及模型上线服务的整体工作,尤其在面向大规模/超大规模数据和模型的情况下,数据存储格式,多机多卡协同计算,资源调度管理,模型弹性伸缩等工程化工作是必不可少的。让用户对无感知或者提供易用的自定义完成对复杂工程的应用

▶ 开放性

通常一个团队中的数据科学具有不同的技术背景,不同的计算框架有其特定,通用的机器学习平台面向的场景和任务具有多样性;平台要对变化开发,以高效的方式支持不同用户的需求

图片

02 关键架构因素

构建机器学习平台需要的技术包括了存储、通信、计算、分布式、资源管理等计算机体系中的很多技术,在项目周期及成本条件下设计一个面向业务且具有生命力的机器学习平台架构,需要考虑诸多因素:

▶ 遗留系统集成

通常在组织中已经存在数据存储、资源调度和任务调度系统,架构设计需要拥抱这些遗留系统。对遗留系统良好的集成不仅可以提升用户体验,降低系统复杂程度,还可以缩短实现周期。

▶ 数据和模型规模

数据量、模型的复杂程度和规模以及用户对模型训练周期的容忍度决定了技术选型和核心能力的设计方式。

*▶ 资源管理调度

依托于Kubernetes或者YARN设计资源队列,进行异构资源管理调度,确保资源的合理高效利用。

▶ Pipeline

一方面对数据接入、特征工程、训练及上线等环节通过Pipeline组织,驱动数据在不同计算组件中运转起来,合理利用内存共享,避免低效I/O,从而系统性降低数据处理周期。

▶ Serving系统

Serving是一个相对独立的子系统,需要考虑E2E预测延时,弹性伸缩能力,模型版本管理,监控,A/B分流等能力,一般Serving都是无状态的,适合用微服务的架构来实现。

▶ 数据存储和表示

在数据源,特征工程及模型训练阶段数据可能以不同形式存储在不同的介质上;同时数据有Dense或者Sparse不同特点,以及数据可能存储在单机或者分布式多节点上。需要考虑数据存储格式,存储介质,分布式表示方式。

▶ 并行化

并行化不仅仅存在于训练阶段,同样存在于数据访问,特征处理甚至在Serving阶段。

▶ 计算加速

除了基于GPU的CUDA加速,我们同样可以在Intel CPU上用合适的计算库来实现加速。

▶ 网络通信

尤其在训练复杂网络的情况下,通信可能是主要的性能瓶颈;合理的网络拓扑和连接方式,以及采用高效的通信协议会降低计算等待。

03 架构设计

此处我们以面向:存储在HDFS上TB级用户和物品数据训练面向CTR场景的Embedding+MLP结构的模型,提供在线低延时且可弹性伸缩的排序服务,这样一个具有通用场景展开机器学习平台的架构设计。

▶ Hadoop集成

数据的传输和存储成本很高,利用已有Hadoop体系的完成机器学习平台中合适的组件将有效降存储和传输成本,同时也在一定程度上让平台变的轻量。利用Hadoop Kerberos安全机制可以将数据和资源调度基于租户隔离开。

在计算方面可以用Data Locality特性将计算和数据调度到同一结点上,业界的TensorFlowOnSparkAngel都用这个思路实现Hadoop体系和机器学习的结合。简单来说该方案是:

图片

  • 通过YARN调度提交一个Spark作业
  • 在Spark中调用foreachPartition,这样每个结点上将会在内存中持有RDD的一个分区
  • 在每个Worker中通过JNI调用或者本地进程的方式调用Tensorflow或者Torch进行模型计算

这样相当于Spark变成Container进行计算资源的调度,而充分利用了已有的大数据体系的数据和计算特性。

▶ 稀疏特征压缩&向量/矩阵分布式表示

在分布式机器学习计算中对分布式向量和矩阵的分布式表示以及对稀疏特征的压缩处理可以提高数据的并行计算度和存储压缩比,Spark中会采用RowMatrix, CoordinateMatrix和BlockMatrix等存储来对不同形式的数据类型进行矩阵进表示。

业界也有很多标准的稀疏矩阵和分布式矩阵表示方法。

图片

▶ 特征处理Pipeline

在特征处理过程中,需要对离散和连续的特征进行分桶标准化Onehot编码等特征处理。Berkeley Data Analytics Stack中的Spark是非常适用于特征处理,采用Spark进行特征处理可以帮我们解决三个主要的问题:

1. 内存计算,加速特征处理

由于特征处理过程中大部分情况不需要进行数据Shuffle,这样我们可以很好的利用Spark内存计算,将多个特征处理用Spark ML Pipeline串联起来,这样可以在分布式的将多个特征处理环节在一个Job中完成。

图片

2. Feature Map一致性

Pipeline中的每个算子都是一个Estimator,我们在fit阶段计算特征的Feature Map(例如Onehot编码中离散值对应Index),然后在模型save阶段对Feature Map进行序列化存储,在transform阶段对特征进行编码。fit时可以尽可能加载全量数据,这样在特征处理时就降低没有出现过样本值出现的机率,对于Feature Map的更新可以根据实时性要求进行Stream更新或者Batch全量更新。

特征处理阶段生成的Feature Map也会在Oneline阶段被加载。

3. 特征格式

在进行模型训练阶段,可能CSR, TFRecordn或者其他数据格式,可以实现自定义数据类型,让Spark来读写自定义的数据格式。

首先实现自定义FileReaderOutputWriter

在自定义数据源中用自定义FileReaderOutputWriter实现readwrite

def write(kryo: Kryo, out: Output): Unit = {}def read(kryo: Kryo, in: Input): Unit = {}

首先在org.apache.spark.sql.sources.DataSourceRegister中添注册自定义数据源类型

然后就可以像处理CSV格式数据一样spark.read.format("csv"),处理自定义数据格式

▶ 并行计算

在大规模数据和模型训练的场景中,由于训练样本规模大或者网络参数大,通常单个节点不能完成对模型的训练,这时就需要多节点协同的方式完成模型的训练,采用分而治之的思想。一般会有数据并行和模型并行两种思路来实现任务的分解和并行训练。

1. 数据并行

为不同的计算节点保留同一个模型的副本,每个节点分配到不同的数据,每个节点在本地计算持有数据的模型参数,然后将所有计算节点的计算结果按照某种方式合并生成最终的模型。

在这个过程中数据拆分的方式可以是随机的方式,也可以采用shuffle机制保证数据样本的均衡;而参数同步和合并方式主流的有Parameter ServerRing All-Reduce方式。

图片

Parameter Server:

在Parameter Server架构中,集群中的节点被分为两类:Parameter Server和Worker。其中Parameter Server存放模型的参数,而Worker负责计算参数的梯度。在每个迭代过程,Worker从Parameter Sever中获得参数,然后将计算的梯度返回给Parameter Server,Parameter Server聚合从Worker传回的梯度,然后更新参数,并将新的参数广播给Worker。

其中参数在Parameter Server和Worker之间的同步既可以是同步的 ( Synchronous ),也可以是异步的 ( Asynchronous )。

图片

Ring AllReduce:

在Ring-Allreduce架构中,各个节点都是Worker,没有中心节点来聚合所有Worker计算的梯度。在一个迭代过程,每个Worker完成自己的mini-batch训练,计算出梯度,并将梯度传递给环中的下一个Worker,同时它也接收从上一个Worker的梯度。对于一个包含N个Worker的环,各个Worker需要收到其它N-1个worker的梯度后就可以更新模型参数。

图片

2. 模型并行

如果训练模型的规模很大,不能在每个计算节点的本地内存中完全存储,那么就可以对模型进行划分,然后每个计算节点负责对本地局部模型的参数进行更新。通常对线性可分的模型和非线性模型(神经网络),模型并行的方法也会有所不同。

线性模型:

把模型和数据按照特征维度进行划分,分配到不同的计算节点上,在每个计算节点上采用梯度下降优化算法进行优化,局部的模型参数计算不依赖于其他维度的特征,相对独立,那么就不需要与其他节点进行参数交换。

在Angel的实现中就主要使用模型并行的方法使用ModelPartitioner实现模型分布式计算。

图片

神经网络:

神经网络中模型具有很强的非线性性,参数之间有较强的关联依赖,通常可以横向按层划分或纵向跨层划分进行网络划分。每个计算节点计算局部参数然后通过RPC将参数传递到其他节点上进行参数的合并,复杂的神经网络需要较高的网络带宽来完成节点之间的通信。

▶ CTR模型并行训练

通常典型深度CTR模型是Embedding Layer + MLP结构;对于10亿特征,Embedding Size为16的CTR模型来说Embedding模型的大小为10^9 * 16 * 4B ≈ 60GB,而MLP的大小只有几个MB。训练Embedding网络的效率将会是CTR模型训练的瓶颈所在。如果在网络中传输Embedding模型参数,整个时延和成本将是不可接受的。如何解决模型的存储及减少网路传输是关键,华为Mindspore和NVIDIAHugeCTR分别给出Host-Device和Embedding Hashtable的方案。

Host-Device:

图片

根据模型大小,选择将模型放在Device或者Host内存中,Device计算梯度后更新Embedding模型,如果模型在Host内存中,只需要在Device和Host之间进行内存拷贝,这个速度是远远大于网络传输的。

Embedding Hash Table:

图片

HugeCTR介绍中详细描述其设计方案:通过open addressing hash算法将所有的特征平均地分在所有Device上,每个Device内存中存储Embedding的一部分,通过实现reduce_scatter算子实现模型传输,all_gather进行模型合并。

▶ 在线Serving

在实际场景中,业务对模型的要求是高复杂,低延时,大批量,高吞吐量和弹性伸缩。

对于高吞吐量和弹性伸缩的处理我们可以交给kubernetes来实现,在容器基础设施具备的前提下并不会成为瓶颈,而对于复杂模型在大batch的请求和低延时的矛盾中还是需要通过设计和优化来实现。

而在更进一步的模型在线优化中可以考虑通过模型量化和硬件加速的策略提升模型Serving性能。

合理使用缓存和多级缓存结合的设计方法将有效降低高延时请求处理,从而提升系统的整体延时表现。缓存的设计主要考虑缓存介质的成本、容量、读写速度以及缓存更新策略 。

参考资料:

Hidden Technical Debt in Machine Learning Systems https://papers.nips.cc/paper/2015/file/86df7dcfd896fcaf2674f757a2463eba-Paper.pdf

Data locality in Hadoop: The Most Comprehensive Guide

https://data-flair.training/blogs/data-locality-in-hadoop-mapreduce

TensorFlowOnSpark

https://github.com/yahoo/TensorFlowOnSpark

Angel

https://github.com/Angel-ML/angel

Mindspore

https://zhuanlan.zhihu.com/p/164683221

HugeCTR

https://www.nvidia.cn/content/dam/en-zz/zh_cn/assets/webinars/nov19/HugeCTR_Webinar_1.pdf

Spark Data Types - RDD-based API

https://spark.apache.org/docs/latest/mllib-data-types.html

Spark ML Pipelines

https://spark.apache.org/docs/latest/ml-pipeline.html

PERFORMANCE COMPARISON OF STORAGE FORMATS FOR SPARSE MATRICES

http://facta.junis.ni.ac.rs/mai/mai24/fumi-24_39_51.pdf

BDAS, the Berkeley Data Analytics Stack

https://amplab.cs.berkeley.edu/software


本文地址:https://www.6aiq.com/article/1612618273866
本文版权归作者和AIQ共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出