Gluten+OmniRuntime性能调优十板斧
发表于 2025/08/18
0
作者 | 周思佩
摘要
Gluten作为一个开源的向量化执行加速框架,旨在为上层大数据计算引擎(如Apache Spark)提供高性能的向量化执行支持,通过整合原生执行库(如Velox、ClickHouse等)的优势,解决JVM环境下传统行式执行的性能瓶颈,显著提升大数据处理效率。同时,OmniRuntime也是一个鲲鹏自研的Native引擎加速库,尤其针对ARM平台有很好的Spark加速效果,目前正在积极推动合入开源Gluten社区。本文介绍了Gluten集成OmniRuntime在Spark执行上的一些性能调优的实践,总结了从通用的Spark参数调优到OmniRuntime独有的特性优化,再到常用的第三方组件或环境优化,帮助用户更好地发挥OmniRuntime的Native向量化执行优势,提升Spark整体作业性能。
1 Executor参数调优
如果想充分榨干服务器的计算资源,那么负责Spark计算任务实际执行的executor就需要充分调优,它的数量以及每个executor的内存、线程数等参数的设置影响整体的执行效率。尤其针对Gluten这种通过Native加速引擎使能Spark的情况,堆外内存的设置对Native加速引擎至关重要。
Spark官网针对Executor相关的核心参数,建议的默认配置如下:
属性名称 |
默认值 |
含义 |
spark.executor.memory |
1g |
每个executor进程使用的内存量,格式与JVM内存字符串相同,带有大小单位后缀(“k”、“m”、“g”或“t”)(例如 512m、2g)。 |
spark.executor.memoryOverhead |
executorMemory * spark.executor.memoryOverheadFactor,最小值为 spark.executor.minMemoryOverhead |
为每个执行器进程分配的额外内存量,除非另有说明,否则以MB为单位。此内存用于虚拟机开销、interned字符串、其他本机开销等。它倾向于随着执行器大小的增加而增加(通常为6-10%)。此选项目前在 YARN和Kubernetes上受支持。注意:额外内存包括PySpark执行器内存(当未配置spark.executor.pyspark.memory 时)和同一容器中运行的其他非执行器进程使用的内存。运行执行器的容器的最大内存大小由 spark.executor.memoryOverhead、spark.executor.memory、spark.memory.offHeap.size和 spark.executor.pyspark.memory的总和决定。 |
spark.executor.minMemoryOverhead |
384m |
在集群模式下为每个执行器进程分配的非堆内存的最小量,此选项目前在YARN和Kubernetes上受支持。 |
|
false |
如果为true,Spark 将尝试将堆外内存用于某些操作。如果启用堆外内存使用,则 |
|
0 |
可用于堆外分配的绝对内存量,除非另有说明,否则以字节为单位。此设置对堆内存使用没有影响,因此如果您的执行器的总内存消耗必须在某个硬限制内,请务必相应地缩小您的 JVM 堆大小。当 |
spark.executor.cores |
YARN 模式下为 1,Standalone 模式下为 worker 上所有可用核心。 |
每个executor使用的核心数量。 |
可见,Spark的默认参数并不适合大部分的实际应用场景,需要根据实际情况进行调优。下面针对大数据量的作业场景,讲解通常的参数调优的过程。
1. 在实际的性能业务中,Executor的堆内内存不能太小(易OOM),也不能太大(导致 GC停顿时间长)。通常建议每个Executor的内存设置在32GB以内(经验值,JVM GC在大堆上效率会下降)。常见范围是16GB - 32GB。
2. 在初步确定了堆内内存后,再设置Native引擎所需的堆外内存,建议堆外内存从35g起步,可用内存越大,越减少算子执行过程中数据溢盘的代价。memoryOverhead可根据情况设为默认值或者5g。最终ExecutorTotalMem = spark.executor.memory + spark.executor.memoryOverhead + spark.memory.offHeap.size
3. 确定executor内存后,再设置每个executor的核数大小,通常云服务器的CPU与内存的核存比大概是1:3-1:4范围,所以executor内的核存比也可以设置为这个比值,或者根据实际的资源比例设置。
4. 最后,计算最终节点启动的Executor数量:
1) 基于CPU:maxExecutorsCPU = floor(节点可用核心数 / spark.executor.cores)
2) 基于内存:maxExecutorsMem = floor(节点可用内存 / ExecutorTotalMem)
3) 取两者中的较小值:节点Executor数 = min(maxExecutorsCPU, maxExecutorsMem)
提醒一点,一般情况下我们需要预留资源,比如管理器本身(如YARN的NodeManager, K8s的Kubelet)和Spark的Driver也需要资源,计算节点可用资源时要扣除。如部署其他任务也要减去对应的资源,不要超过实际可用资源,避免线程争抢和OOM等问题。特殊情况下,可以通过超配让spark/YARN自行调度,能够提升部分性能。
以一台鲲鹏服务器环境为例,总核数为320,Spark可用内存为1T,在运行1TB的TPCDS任务,参数可以设置如下:
spark-sql--master yarn--driver-cores 8 --driver-memory 20g--num-executors 18 --executor-cores 16 --executor-memory 20g--conf spark.memory.offHeap.enabled=true--conf spark.memory.offHeap.size=35g
此外,还可以启动动态分配特性,开启spark.dynamicAllocation.enabled=true可以让 Spark根据负载自动增减Executor数量,这通常比静态配置更优,尤其是在共享集群上。但初始的spark.executor.cores和spark.executor.memory仍需按上述流程配置。
2 Shuffle Task并行度调优
在Spark SQL的任务执行过程中,Task的运行数量也同样重要。如果Task的个数设置过小,可能会导致每个Task处理的数据量过大,整体耗时较长,计算资源利用不够充分,还可能存在严重数据倾斜;如果Task的个数设置过大,也有可能导致每个Task处理的数据量较小,从而耗时都浪费在Task的分发和任务解析上,并且上下游的shuffle开销也会倍级膨胀。
一般需要设置的shuffle分区关键参数的默认配置,如下:
参数 |
默认值 |
描述 |
spark.sql.shuffle.partitions |
200 |
建议设置为num-executors*executor-cores的2~3倍。但是在大数据的情况也不宜过大,否则会造成shuffle io瓶颈。 |
spark.sql.adaptive.coalescePartitions.initialPartitionNum |
/ |
合并之前的初始shuffle分区数。如果未设置,则等于spark.sql.shuffle.partitions。此配置仅在 'spark.sql.adaptive.enabled' 和 'spark.sql.adaptive.coalescePartitions.enabled' 都为 true 时才有效。避免开启AQE后,把Task并行度降低的过小。设置起始分区数量,提高shuffle阶段的资源利用率。 |
spark.sql.adaptive.advisoryPartitionSizeInBytes |
64 MB |
开启AQE后shuffle分区的建议大小(以字节为单位)。当Spark合并小的shuffle分区或拆分倾斜的shuffle分区时生效。可以根据资源是否为瓶颈进行调节,增大或者减少Task个数。 |
spark.sql.adaptive.coalescePartitions.minPartitionSize |
1MB |
设置合并后shuffle分区的最小大小。当分区合并期间自适应计算的目标大小过小时,通过该参数调节,可以和前面的参数结合使用。 |
spark.sql.adaptive.coalescePartitions.parallelismFirst |
tue |
当为true时,Spark在合并连续的shuffle分区时不会遵守 'spark.sql.adaptive.advisoryPartitionSizeInBytes'(默认64MB)指定的目标大小,而是根据Spark集群的默认并行度自适应地计算目标大小。计算出的尺寸通常小于配置的目标尺寸。这是为了在启用AQE化并行度并避免性能退化。建议在繁忙的集群上将此配置设置为false,以使资源利用更高效(任务数量不多的小任务)。 |
spark.sql.adaptive.coalescePartitions.minPartitionNum |
/ |
用于控制动态合并小分区时的最小分区数量,虽然该参数在Spark代码中不建议使用,标识为废弃。但在一些实际场景中还是可以有效地避免过小分区带来的资源利用率过低问题。 |
3 SQL表元数据查询优化
Spark读取数据冷启动时,会从文件系统中获取文件的一些元数据信息(location/size等)用于优化,在后续的查询中,表的partition就可以在内存中进行下推,得以快速的查询。如果一个目录下的文件过多,就会比较耗时,可能达到数十分钟,该逻辑在InMemoryFieIndex中实现。
涉及的参数如下:
1. spark.sql.sources.parallelPartitionDiscovery.threshold:在driver端允许列出文件的最大路径数。如果在分区发现期间检测到的路径数超过此值,它将尝试使用另一个Spark分布式作业列出文件。此配置仅在使用基于文件的源(如Parquet、JSON和ORC)时有效。默认值是32。
2. spark.sql.sources.parallelPartitionDiscovery.parallelism:设置分区查询时Spark Job的最大并行数,默认是10000。
整个作业流程解释如下:读取表元数据前会先判断分区的数量,如果分区数量不超过32,则使用driver循环读取文件元数据,否则会启动一个spark job,分布式地处理元数据信息,即每个分区下的文件生成一个task进行处理。但是,分区数量很多意味着“Listing leaf files task”的任务会很多,最大的并发数由spark.sql.sources.parallelPartitionDiscovery.parallelism控制。Web UI上执行的Job如下图所示。

一般情况下,如果分区里的文件数量不多的话,那么每个task的负载较低,大部分的时间都花在了任务的序列化和反序列化上,在这种情况需要降低最大并行参数进行调优。
4 Scan并行度参数调优
同样的,Spark中Scan阶段切分为多少个Task也是由多个关键参数决定的,如下:
1. spark.sql.files.maxPartitionBytes:读取文件时要打包到单个分区中的最大字节数。此配置仅在使用基于文件的源(如Parquet、JSON和ORC)时有效。默认值是128MB。
2. spark.sql.files.openCostInBytes:打开文件的估计成本,通过同时可扫描的字节数来衡量。这用于将多个文件放入一个分区时。官网建议高估,这样小文件分区将比大文件分区更快。默认值是4MB。
3. spark.sql.files.minPartitionNum:建议的最小拆分文件分区数,Spark并不能保证运行时不小于该值。
通过调节上述参数,优化scan job的并行度。
5 Join调优
Join算子是大数据算子中较为复杂和耗时的一个,它基本上可以分为三类:BroadcastHashJoin、ShuffledHashJoin以及SortMergeJoin,性能也逐步降低。因此在性能要求较高的场景,尽量使能BroadcastHashJoin和ShuffledHashJoin。
其中,走到BroadcastHashJoin需要满足广播表的大小不超过spark.sql.autoBroadcastJoinThreshold,默认值为10MB,正常情况下可以调大一些。若设置为-1,则禁用广播Join功能。
针对ShuffledHashJoin,Gluten提供了参数spark.gluten.sql.columnar.forceShuffledHashJoin,若设置为true,可强制将SortMergeJoin转为ShuffledHashJoin,减少SQL运行时间。
此外,若存在多个连续的Join,列式计算可能不如原生Spark的Codegen执行效果好。在性能不佳的情况下,可以回退为原生的执行,Gluten提供了回退参数,如下:
1. spark.gluten.sql.columnar.physicalJoinOptimizeEnable:优化连续join场景,超过阈值则回退原生执行,默认为false;2. spark.gluten.sql.columnar.logicalJoinOptimizationLevel:上述参数的阈值,默认值为12。
6 OmniRowShuffle特性使能
不同于其他列式加速引擎(Velox\Clickhouse),OmniRuntime既支持列式shuffle,也支持行式shuffle。因为在某些多列shuffle的场景下,行式shuffle能够有效地降低数据split时的频繁搬移操作。Gluten+OmniRuntime下,可以通过如下参数控制行式shuffle:
1. spark.gluten.sql.columnar.backend.omni.rowShuffle.enabled:是否开启行式shuffle特性。默认为true。
2. spark.gluten.sql.columnar.backend.omni.rowShuffle.columnsThreshold:参与shuffle的总列数超过该阈值走行式shuffle,否则走列式shuffle。默认值是10。建议在varchar列比较多的场景,可以适当降低参数。
7 OmniRuntime执行计划优化
OmniRuntime作为Gluten的Backend之一,还实现了一些独有的SQL执行计划优化的Rule,包含了逻辑执行计划以及物理执行计划阶段的优化。部分优化已内嵌到代码中,部分优化通过开关使能,方便用户进行调优。相关Rule的参数详细如下:
参数 |
默认值 |
描述 |
spark.gluten.sql.columnar.backend.omni.topNSort |
true |
将TopN操作下推到Window中,减少Window处理的数据量 |
spark.gluten.sql.columnar.backend.omni.topN.threshold |
100 |
topNsort可以下推的最高阈值 |
spark.gluten.sql.columnar.backend.omni.joinReorderEnhance |
true |
Join顺序重排序 |
spark.gluten.sql.columnar.backend.omni.dedupLeftSemiJoin |
false |
对leftsemi join右表去重,减少join数据量 |
spark.gluten.sql.columnar.backend.omni.dedupLeftSemiJoinThreshold |
3 |
超过阈值才使能rule优化 |
spark.gluten.sql.columnar.backend.omni.pushOrderedLimitThroughAggEnable |
false |
满足特定条件下,将TopNSort(或者sort+limit)下推到部分聚合后,减少下游算子数据处理量 |
spark.gluten.sql.columnar.backend.omni.adaptivePartialAggregation |
false |
针对部分聚合效果较差的情况下,跳过该聚合过程 |
spark.gluten.sql.columnar.backend.omni.filterMerge |
false |
将subquery中的filters进行合并 |
spark.gluten.sql.columnar.backend.omni.filterMerge.maxCost |
100.0 |
计算合并filter的代价,超过该阈值则不使能 |
spark.gluten.sql.columnar.backend.omni.rewriteSelfJoinInInPredicate |
false |
in表达式中的self join转换为hashagg,删除没用到的列,减少处理数据量 |
spark.gluten.sql.columnar.backend.omni.combineJoinedAggregates |
false |
通过合并相同Scan的子查询语句,减少重复的读表操作 |
spark.gluten.sql.columnar.backend.omni.vec.predicate.enabled |
false |
将谓词过滤下推到scan阶段执行,基于向量化比较进行过滤 |
spark.gluten.sql.columnar.backend.omni.enableDelayCartesianProduct.enabled |
true |
将所有的笛卡尔积操作移动到查询计划的根部,以简化执行计划 |
spark.gluten.sql.columnar.backend.omni.preferShuffledHashJoin |
false |
使能后会优化SHJ的BuildSide |
spark.gluten.sql.columnar.backend.omni.rollupOptimization.enabled |
true |
优化rollup算子,拆解为多步执行,减少数据膨胀 |
用户在使用Gluten+OmniRuntime时,可根据实际情况使能这些rule。
8 列式Batch大小优化
类似于velox,OmniRuntime也是列式处理引擎,算子之间传递和处理的单位是OmniVecBatch,其由多列OmniVector组成。OmniVector的容量大小,关系到内存的申请和释放,循环处理的次数,对性能有重要的影响。以下参数调节了各阶段产生的vector容量,可根据实际场景增减设置值进行调优:
参数 |
默认值 |
描述 |
spark.sql.orc.columnarReaderBatchSize/spark.sql.parquet.columnarReaderBatchSize |
4096 |
设置orc/parquet读取过程中,返回的batch行数 |
spark.gluten.sql.columnar.maxBatchSize |
4096 |
控制RowToOmniColumnar时,生成的batch最大行数 |
spark.gluten.shuffleWriter.bufferSize |
默认使用spark.gluten.sql.columnar.maxBatchSize参数,且不能设置超过该阈值 |
控制shuffle split时生成的每个Batch的最大行数,超出该设置则拆分到新的Batch中 |
spark.gluten.sql.columnar.backend.omni.sql.shuffle.merge |
true |
控制AQE开启后,合并shuffle read之后的小Batch,降低Batch数量 |
spark.gluten.sql.columnar.backend.omni.maxBatchSizeInBytes |
2MB |
合并后的Batch最大size |
spark.gluten.sql.columnar.backend.omni.maxRowCount |
20000 |
合并后的Batch最大行数 |
spark.gluten.sql.columnar.backend.omni.mergedBatchThreshold |
100 |
合并时,单个Batch最大的合并上游Batch数量,超过该阈值会新启一个Batch |
9 Native库加速
1、Hadoop Native库加速
在ARM机器上运行Spark经常会出现如下告警提示:
该告警是提示没有加载native hadoop库。原因是Hadoop社区默认提供的是x86版本的libhadoop.so,如下:

如果需要在arm上使用native库,需要手动编译生成或者下载第三方提供的库,如下:

在Spark中可通过如下参数加载native hadoop库:
--conf spark.executor.extraJavaOptions=`-Djava.library.path=/usr/local/hadoop/lib/native/`--conf spark.driver.extraJavaOptions=`-Djava.library.path=/usr/local/hadoop/lib/native/`
加载后可以加速很多处理流程。比如,在拉取数据时,会校验数据的crc一致性。如果走java的逻辑,火焰图如下:

若成功加载libhadoop.so后,可以获得数倍的性能提升,火焰图如下:

2、Kunpeng KAE加速库使能
KAE(Kunpeng Accelerator Engine,鲲鹏加速引擎)是基于鲲鹏920系列处理器提供的硬件加速解决方案,可以加速Spark依赖的常见压缩库,如zlib、gzip、zstd、lz4等,对shuffle、scan阶段有比较好的加速作用。部署和使能文档参考如下链接:
https://gitee.com/kunpengcompute/KAE
10 运行环境调优
运行环境调优主要是针对操作系统进行调优,包含以下两个手段:
1、numa亲和
通过numa绑核,减少跨numa/跨P的访问,提升整体业务作业性能。开启方式可以通过Linux命令numactl使能,也可以通过开启Gluten中配置spark.gluten.sql.columnar.numaBinding=true使能。
此外,在openEuler系统中,针对业务不绑核的场景,还可以通过开启动态感知系统负载特性,在低负载时选用物理核,提高业务性能。部署和使用参考链接https://gitee.com/openeuler/kernel/issues/IAJEHU
2、大页优化
操作系统中内存页的大小对系统性能有影响。内存页设得太小,内存页会很多,导致TLB miss增加,因此需要开启HugePage优化。openEuler 20.03(内核版本4.19)/openEuler22.03(内核版本5.10),在使能鲲鹏64K大页后,可以给部分Spark场景带来6%的性能提升。在更高的版本openEuler 24.03(内核版本6.6)上,支持动态复合页,使能更多粒度大页,Spark场景性能提升9%以上。参考链接https://www.openeuler.org/zh/news/20240719-dt/20240719-dt.html
结语
性能调优从无 “银弹”,唯有秉持朴素理念进行极致优化。我们在 15 台鲲鹏集群上运行 10TB TPCDS 的 Spark 测试实验时,启用了 Gluten+OmniRuntime,通过上述调优方法,将开始未调优状态下的 8826 秒最终缩短至 4572 秒,整体性能实现了 93% 的大幅提升。用户可借鉴本文提供的经验,并将其应用于 Spark 业务中,从而助力在 ARM 平台上启用 OmniRuntime 后获得更优性能,实现降本增效。