Apache Kyuubi 在爱奇艺的实践
导读 本文将分享 Apache Kyuubi 在爱奇艺的一些实践和落地。
文章将围绕下面三点展开:
-
爱奇艺 Spark Thrift Server 服务演进
-
Spark SQL 平台化适配
-
Spark SQL 服务优化
分享嘉宾|王震 爱奇艺 高级研发工程师
编辑整理|马慧 汇丰达
出品社区|DataFun
01/爱奇艺 Spark Thrift Server 服务演进
1. 爱奇艺统一 SQL 网关 Pilot 服务架构
首先分享爱奇艺的统一 SQL 网关 Pilot 服务,Pilot 作为统一的 SQL 服务,上层是通过 JDBC 的协议对接了爱奇艺的数据开发平台、即席查询的平台以及业务的取数服务。服务端通过 SQL 解析的模块对 SQL 进行分析,对异常 SQL 进行拦截,并且智能的选择合适的执行策略和路由策略。结合服务发现提供高可用。在配置中心可以配置标签化的配置,并基于运行历史提供统计和审计视图。底层支持了 Trino、Spark、Hive 还有 Clickhouse 等引擎。其中我们使用 Kyuubi 来作为 Spark Thrift Server 服务。
2. 爱奇艺 Spark Thrift Server 服务演进
在爱奇艺,Spark Thrift Server 服务经历了三个阶段:
(1)最开始我们使用的是 Spark 原生的 Thrift Server 服务,支持简单的 Ad-hoc 查询。
(2)后来我们引入了 Kyuubi 0.7 的版本,支持 Ad-hoc 查询以及少量的 ETL 任务。
(3)Kyuubi 在 1.X 的版本中做了一些架构优化,我们是在 Apache Kyuubi 进入到 Apache 之后,也升级到了 1.4 的版本,支持了 Ad-hoc 查询,慢慢有大量的 ETL 任务。并且使用 Spark SQL 代替了 Hive,成为主要的离线处理引擎,同时也支持了离线数据湖的分析操作。
下面通过对比这 3 个服务,分享一下 Spark Thrift Server 服务的演进过程。
第一阶段:Spark 原生的 Thrift Server 服务的架构
这是 Spark 原生的 Thrift Server 服务的架构,通过启动常驻的 Spark 服务 ,在 Driver 端去暴露 Thrift Server 的服务接口来接收并执行 SQL 请求,是基于一个 SparkContext 的多线程的应用场景, 它的服务特性:**基于 HiveServer2 协议,可以很好地兼容 Hive 相关的生态。因为是一个常驻的服务,可以有效减少 Spark 启动的开销,**可以快速响应请求。
不过由于使用的是同一个 SparkContext,所以不支持多租户,资源不隔离,多个任务之间会相互影响,抢占资源之类的。 并且服务启动后配置是固定的,在启动后资源的配置都不可以修改。
第二阶段:引入 Kyuubi 的 0.7 的版本
后面我们引入了 Kyuubi 的 0.7 的版本,对 Thrift Server 进行了增强,允许不同的连接使用不同的 SparkContext ,同时也支持同一个用户级别的 SparkContext 共享。
也是基于 HiveServer2 协议的,支持了多租户。 同时在服务启动后可以动态传入一些资源配置, 调整 SparkContext 的资源,并且不同的链接可以使用不同 SparkContext 进行 Executor 层面的资源隔离。
不过由于它是 Yarn-Client 模式运行 的,所有 **SparkContext 其实是共享在一个 Spark Driver 的进程之内的,**所以对 Driver 端的压力还是挺大的,也容易达到性能的瓶颈。
第三阶段:升级 Kyuubi 1.X 的版本
Kyuubi 1.X 的版本对架构进行了优化,主要是对 Server 端和引擎端进行了解耦操作。
**Server 端主要是做引擎的启动以及 SQL 转发,同时支持引擎的共享策略,**比如支持用户级别共享,允许同一个用户的不同连接使用同一个 Kyuubi Engine,从而实现上下文共享,并且节省资源。
服务特点:低耦合,Kyuubi 和 Engine 进行解耦不同的任务使用不同的引擎,这样就可以达到完全的资源隔离。
同时支持各种共享策略:User 级别、Group 级别、还有 Server 级别的。使用共享引擎主要能够节省资源,避免反复启停引擎,可以快速响应请求。
三个服务的一个简单的对比
可以看到 Kyuubi 1.X 版本中支持多租户、有很好的并发支持。
**资源隔离性方面:**Spark Thrift Server 主要依赖于 Spark 原生的 Pools。Kyuubi 0.X 共用同一个 Driver 维护多个 SparkContext。Kyuubi 1.X 支持独立和复用引擎。
资源的使用方面: Spark Thrift Server 是常驻的 SparkContext 资源。Kyuubi 0.X 版本是可以使用独立的 SparkContext,也可以复用。Kyuubi 1.X 版本也可以支持独立的引擎以及各种共享引擎,它的独立引擎是在执行完成之后释放资源的,共享引擎是在超时之后释放。
02 /Spark SQL 平台化适配
接下来分享 Spark SQL 平台化适配,将从四个方面进行介绍:
① 标签化配置;
② SQL 事件审计;
③ Spark SQL血缘采集;
④ 服务监控。
1. 标签化配置
我们在使用中发现有很多任务有共同的特性。
**① ETL 任务:**它的任务特点是运行时间比较长,数据量比较大,并且因为运行长需要稳定运行。
**② 即席查询的任务:**快速响应、数据量比较小,同时稳定性要求没有那么高。
**③ 对于同一个用户和业务内部:**又有一些相似性,数据量级比较接近、用户开发习惯比较接近。
所以我们**对这些任务进行一些标签化的配置。**比如离线 ETL,我们配置一个 Connection 级别的独立引擎,即席查询使用 User 级别的引擎。不同的用户和业务根据不同的需求配置一些资源,另外用户开发习惯不同,可能有一些 Hive 兼容性的配置等。
用户任务在请求的时候只需要带上一些标签,比如带上一个 ETL 任务,这样就会使用独立的引擎,或者是一个即席查询任务,就会用 User 引擎级别。
Kyuubi 中是提供了一个 SessionConfAdvisor 接口,允许用户对 Session 注入一些配置,进而可以用来实现上述的标签化配置的功能。
2. SQL 事件审计
SQL 事件审计是基于 Kyuubi 的 Event 体系进行实现的。
**Kyuubi 在 Server 和 Engine 中都暴露了很多事件:**在 Server 中暴露的 Server 的启停事件、Session 的打开\关闭事件,Operator 的执行事件。Engine 中也记录了 Session、Operation、还有血缘等事件。
Kyuubi 中通过 Event Handler 对这些事件进行处理。在 Server 端,我们实现了一个 ES 的 Event Handler, 将 Server 的事件直接吐到 ES 中。
引擎端是用 Kyuubi 的 SparkHistory Logging EventHandler, 将 Event 记录到了 Spark History 的 HDFS 文件里面。
这样我们就可以对 SQL 执行事件进行分析统计,比如可以统计出来 SQL 执行的总量、失败量,还有 SQL 的资源使用情况。 SQL 的执行事件里面还包括了一些执行的错误信息,可以使用正则匹配规则建立错误信息的规则库, 来进行错误提示实现辅助运维。
我们还可以基于 SQL 的一些运行历史,对任务进行 HBO 的优化,推断出更合适的配置。
同时 Spark UI 上面其实 Kyuubi 也通过吐出的事件渲染出了 Kyuubi 的 Tab 页,方便了 Kyuubi 任务的分析和排障。
Kyuubi 中是可以实现自定义的 Event Handler 对事件进行处理。
3. Spark 血缘采集
这个是血缘采集的过程:
先通过 Spark Catalyst 对 SQL 解析成执行计划。
用 SparkAtlas Connector 的插件, 通过 QueryExecutionListener 拦截到执行计划,再把它解析出血缘信息,再通过 Atlas 的客户端给它发送到 Atlas 里面。
**Kyuubi 中也带了 Spark 的血缘插件,也支持列级别的血缘,**大家有兴趣也可以去了解一下。
4. 服务监控
**Kyuubi 服务也暴露了很多的监控指标,**服务端暴露了:GC、Memory、 Connection、Operation 等监控的指标,内部会有很多 Reporter 发送到不同的服务里面。
我们通过自定义的 Reporter Service,把这些指标直接吐到内部的监控平台上,再配置一些告警和视图。
同时也开发了拨测任务,使用 JDBC 连接提交一些测试的任务到各个 Kyuubi Server 上执行,判断任务能否正常执行。另外,定时调用 Kyuubi Server 的 Rest API 进行健康检查。
03 /Spark SQL 服务优化
1. 小文件优化
下面分享一下 Spark SQL 的服务优化,先看一下 Spark SQL 的小文件优化。
为什么产生小文件?
左边是读一个表,直接写入到另外一个表里面的操作,右边是两个表 Join 写入到另外一个表中。看一下左边,假如它读取的分区数是 100 个,写入也是 100 个分区,最终将会写入 100 个文件。
右边假设两个表的并行度也是 100,经过 Join 之后会产生 Shuffle 操作变成 200 个分区,可能就写入 200 个文件。所以最终写入的文件数是跟最终写入 Stage 的 Task 数是相关的。
如何解决小文件?
我们怎么解决小文件?需要在写入操作之前进行一个 Repartition 的操作,来控制写入的 Stage 的分区数,进而控制写入的小文件数。
左边示例,我们插入了一个 Repartition 10 的算子,最终写入的 Stage 的 Partition 数就是 10,最终可能也就生成 10 个文件。
右边示例也是类似的一个操作。
Repartition 的两个问题
① 第一个:Repartition 这个数我们怎么确定?
② 第二个:就是动态分区写入的一个情况下,假设某一些分区的数据量比较大,又比较容易导致分区倾斜的情况。
Repartition 数量如何确定?AQE 动态合并小分区
下面看一下具体怎么解决这两个问题。第一个 Repartition 数量如何确定?
首先我们可以把 Repartition 数设置为一个较大的值,并且因此触发了 Shuffle 操作。再借助于 Spark 3 的 AQE 动态合并小分区的功能,自动的根据配置的大小将小分区进行合并,分区数控制在合理的范围。这样就不需要特意的去配置 Repartition 的数量。
动态分区写入导致超大文件-添加随机数
动态分区如果有一些分区的数量比较大,进行 Repartition 操作之后,会导致数据倾斜。
对于这种情况,我们通过添加随机数,将分区进行拆分。示例中我们加入了值为 4 的随机数,相当于把每个分区拆成了 4 份,再对分区以及随机数字段进行 Repartition 操作。这样可以看到分区倾斜有一个很好的效果。
不过这样也会存在一个问题,如果随机数控制的不合理,它又容易再次导致小文件的问题。
Rebalance 平衡分区( Spark3.2+ )
在 Spark 3.2 里面,引入了 **Rebalance 的特性,可以自动平衡分区,**分区过大的时候会进行拆分,分区过小的时候进行合并,这样就可以有效地控制小文件和数据倾斜。
图中上面两个大分区,在进行 Rebalance 之后,会自动地分成两个分区,小分区又会自动合并成大分区,这样整体的效果就比较平衡。
右边是经过 Rebalance 的写入的执行计划,上面是合并了小分区,下面是拆了一下数据倾斜的分区,写入都是控制在了比较合适的范围,基本上解决了小文件的问题。
Kyuubi 的相关特性是在 Kyuubi Extension 的插件里面,使用的时候只需要把这个插件放到 Spark_HOME 的 Jars 里面,同时配上 KyuubiSparkSQLExtension ,然后再配置一下 AQE 的一些配置,基本上就可以自动地优化小文件。
2. Z-order 优化简介
下面分享 Kyuubi 的另一个比较优秀的特性,就是 Z-order 优化。
简单介绍一下 Z-order 优化,Z-order 排序实际上是将多维数据映射到一维上的排序算法。
这个图片是来自 Databricks 的技术博客,例子可以看到,假设这有一个二维数据 X、Y,左边是一个线性排序:X = 0,Y = 1、2、3、4 进行排序,并且 4 个数据点存放在一个存储单元里面。Parquet 这一类的文件格式在查询的时候会根据文件的元信息,做 Data Skipping 操作,比如文件块里面的最大最小值来做过滤。左边我们要执行上面 SQL,过滤 X = 2 或者 Y = 2 的数据,第一个数据块 X = 0,Y 的最大最小值就是 0 ~ 3 之间,所以它会被选中,会被扫描,第二个文件块,用 X = 0,Y 的最小值是 3,会直接跳过。如此以来就会扫描到 9 个文件块,并且查询了 21 个无效数据点。
右边这个是 Z-order 的排序,可以看到它就类似一个 z 型曲线排序。同样也是 4 个点放在一个文件块里面。第一个文件块,如果我们也执行上面查询 X = 2 或者 Y = 2,第一个文件块 X、Y 最大最小值就是 0 ~ 1 之间,所以会被过滤掉。第二个文件块 X,Y 的最大最小值是 2~3 之间,所以它就会被选中,第三个会被跳过。如此以来它会扫描到 7 个文件块,13 个无效的数据点,跟左边线性对比可以看到它的扫描的文件块数其实是减小了,并且无效的数据会少很多。
这样经过 Z-order 排序之后,会提高加速查询的速度。 并且由于它多维映射到一维上,相当于多维数据上更加临近了,相似性更高一些,所以它的压缩效果会提高很多。
这边是 Z-order 的一个实现,最关键是将多维数据计算出 Z-value 值,再通过 Z-value 值进行排序。
这是维基百科上面的两张图介绍。通过一个二进制位交叉的算法,计算 Z-value 值,比如 X、Y 都变成了二进制的数,进行位交错计算出来 Z-value 的值,通过这个值进行排序。右边就是经过算法之后的排序效果。可以看到第一个点 000000,第二个点000001,第三个点再到 000010,再到 000011。是一个 Z 型的曲线。
这个特性在 Kyuubi 中也是放在 KyuubiExtensionSpark 的插件里面,我们只需要把它复制到 Spark Jars 里面,并且在 SparkConf 里面配置 Kyuubi Spark SQL Extension。
**对存量的数据只需执行一下 Optimize 的命令,**就可以对存量数据进行重排序然后再写入的操作。
**对于增量数据,我们可以在表里面加上 kyuubi.zorder.enabled、kyuubi.zorder.cols 两个Properties,**在增量写入的时候,它就会自动进行 Z-order 的优化写入。
04 /问答环节
Q1:Kyuubi 怎么实现持久化的 Spark Context?
A1:Kyuubi 中会启动一个独立的引擎,并且支持一些共享策略。比如像 User 级别共享引擎,很多人使用同一个 Hadoop 用户进行提交,将会使用同一个引擎,引擎具有一个超时时间,在空闲一段时间后才会退出。比如你提交一个任务,执行完之后连接会断掉,但是引擎是没有退出的,还常驻在那里,其他的人一提交就会马上连到引擎上面。
Q2:什么叫拨测?
A2:拨测就是对服务的健康检测。比如每 5 分钟提交一个 JDBC 请求,保证服务可用。每 1 分钟去调一下它的 Rest API,确定服务是存活的,连续失败了几次就可以确定服务不可用,自动地对它进行一个重启。
今天的分享就到这里,谢谢大家。