使用OmniAdvisor参数调优特性,推荐Spark任务的最佳运行参数,从而优化任务性能。
vi $OMNIADVISOR_HOME/BoostKit-omniadvisor_1.1.0/config/spark/spark_config.yml
表1以spark.executor.memory参数为例,介绍参数配置项。
配置项名称 |
配置说明 |
---|---|
spark.executor.memory |
被调优的Spark配置参数名称。用于设置每个执行器(Executor)可以使用的内存量,这个参数直接影响到Spark作业的性能和稳定性。 |
choices |
参数值的选择范围。参数调优在进行推荐时,算法会从choices提供的选择范围中选择值。choices的范围通常以default_value作为中间值,按照实际可利用的资源进行上下范围的扩展。 |
default_value |
参数缺省值。可参考实际业务所使用的值配置。缺省值必须包含在choices所提供的范围中,无特殊需求一般设为choices序列的中间值。 |
type |
数据类型。支持类型有int、boolean、float。 |
unit |
单位。支持的单位有K、M、G分别代表KB、MB、GB,一般默认使用GB表示。 |
常用配置如下,配置值仅供用户参考,实际使用时可以根据实际业务场景和可利用资源对choices、default_value进行调整,或者增加/减少参与调优的参数。
spark.executor.memory: # 配置每个Spark Executor(任务执行器)的内存分配量 choices: [ 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36 ] default_value: 28 type: int unit: G spark.executor.cores: # 配置每个Spark Executor使用的CPU核心数量 choices: [ 6, 8, 10, 12, 14, 16, 18, 20 ] default_value: 8 type: int spark.executor.instances: # 配置了Executor的实例数量,即同时运行的Executor的个数 choices: [ 28, 30, 32, 34, 36, 38, 40, 42, 44, 46, 48 ] default_value: 30 type: int spark.driver.cores: # 配置Spark Driver使用的CPU核心数量 choices: [ 4, 6, 8, 10, 12, 14, 16, 18, 20 ] default_value: 6 type: int spark.driver.memory: # 配置Spark Driver的内存分配量 choices: [ 12, 14, 16, 18, 20, 22, 24, 26, 28, 30 ] default_value: 16 type: int unit: G spark.memory.offHeap.enabled: # 配置Spark堆外内存是否启用,如果要配置spark.memory.offHeap.size,则该项必须开启 choices: [ "true" ] default_value: "true" type: boolean spark.memory.offHeap.size: # 配置Spark堆外内存,需要保证spark.memory.offHeap.enabled是开启的状态 choices: [ 2, 3, 4, 5, 6, 7, 8 ] default_value: 5 type: int unit: G spark.broadcast.blockSize: # 配置广播变量(broadcast variable)的块大小 choices: [ 2, 4, 6, 8, 10, 12, 14, 16, 18, 20 ] default_value: 2 type: int unit: M spark.sql.shuffle.partitions: # 配置了Spark SQL中用于执行Shuffle操作(如聚合和连接)时的分区数量 choices: [ 200, 300, 400, 500, 600, 700, 800, 900, 1000, 1100, 1200, 1300, 1400, 1500 ] default_value: 200 type: int spark.executor.memoryOverhead: # 配置了每个Executor的额外的内存开销,以供Spark管理和维护 choices: [ 2, 4, 6, 8, 10 ] default_value: 4 type: int unit: G spark.memory.fraction: # 配置了Spark内存管理器用于存储缓存的堆内存比例 choices: [ 0.4, 0.5, 0.6, 0.7, 0.8, 0.9 ] default_value: 0.6 type: float spark.memory.storageFraction: # 配置了堆内存中用于存储缓存的比例 choices: [ 0.4, 0.5, 0.6, 0.7, 0.8, 0.9 ] default_value: 0.5 type: float spark.sql.autoBroadcastJoinThreshold: # 配置了Spark SQL中自动广播连接(Broadcast Join)的阈值 choices: [ 60, 70, 80, 90, 100, 120, 150, 180, 200, 220, 250, 280, 300 ] default_value: 100 type: int unit: M spark.sql.join.preferSortMergeJoin: # 配置了Spark SQL是否首选排序合并连接(Sort-Merge Join)而不是哈希连接(Hash Join) choices: [ "true", "false" ] default_value: "true" type: boolean spark.sql.adaptive.enabled: # 配置了Spark SQL自适应查询执行是否启用, AQE默认开启 choices: [ "true" ] default_value: "true" type: boolean spark.sql.adaptive.coalescePartitions.enabled: # 在启用AQE特性后,会合并相邻Shuffle分区,避免太多小任务 choices: [ "true" ] default_value: "true" type: boolean spark.sql.adaptive.advisoryPartitionSizeInBytes: # 在启用AQE特性后Shuffle分区的建议大小,当Spark合并小的Shuffle分区或拆分偏斜的Shuffle分区时,它会生效 choices: [ 10, 30, 50, 100, 120, 150, 200, 300 ] default_value: 100 type: int unit: M spark.sql.adaptive.coalescePartitions.initialPartitionNum: # 在合并之前的Shuffle分区的初始数量,如果生效,优先级高于spark.sql.shuffle.partitions choices: [ 200, 400, 500, 800, 1000, 1200, 1400, 1500, 1600, 1800, 2000 ] default_value: 1000 type: int spark.sql.adaptive.skewJoin.enabled: # 配置了自适应查询执行中是否启用自适应倾斜连接优化 choices: [ "true", "false" ] default_value: "true" type: boolean spark.sql.adaptive.skewJoin.skewedPartitionFactor: # 计算是否存在数据倾斜的系数 choices: [ 2, 3, 4, 5, 6, 7, 8 ] default_value: 5 type: int spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes: # 如果分区的大小超过这个阈值,那么这个分区可能存在数据倾斜的问题 choices: [ 128, 256, 384, 512, 768, 896, 1024 ] default_value: 256 type: int unit: M
若用户使用的是Spark 3.3.0以上的版本,可以在配置文件中追加如下内容。
spark.sql.optimizer.runtime.bloomFilter.enabled: # 是否启用布隆过滤器的优化 choices: [ "true" ] default_value: "true" type: boolean spark.sql.optimizer.runtime.bloomFilter.applicationSideScanSizeThreshold: # Bloom过滤器应用程序侧执行计划的聚合扫描大小的字节阈值。 choices: [ 10, 12, 15, 20, 40, 50 ] default_value: 10 type: int unit: G spark.sql.optimizer.runtime.bloomFilter.creationSideThreshold: # Bloom过滤器创建侧执行计划的大小阈值 choices: [ 10, 20, 30, 40, 50, 80, 100 ] default_value: 10 type: int unit: M spark.sql.optimizer.runtimeFilter.number.threshold: # 注入的runtime filter的数量的阈值 choices: [ 10, 12, 15, 20 ] default_value: 10 type: int spark.sql.orc.aggregatePushdown: # 是否将Aggregate算子下推到orc数据格式中进行优化 choices: [ "true", "false" ] default_value: "true" type: boolean spark.sql.optimizer.runtimeFilter.semiJoinReduction.enabled: # 是否允许插入semijoin,用来减少Shuffle数据量 choices: [ "true", "false" ] default_value: "true" type: boolean spark.sql.shuffledHashJoinFactor: # 如果小表的数据大小乘以这个因子后仍然小于大表,则可以选择使用shuffle hash join choices: [ 2, 3, 4, 5, 6, 7, 8 ] default_value: 3 type: int
1
|
spark-submit --master yarn --deploy-mode cluster --conf -spark.driver.memory=40g --conf spark.executorEnv.OMNI_HOME=/opt/OmniRuntime --class org.example.MyAPP /path/to/MyApp.jar args1 args2 |
1
|
--conf -spark.driver.memory=40g |
1
|
--conf spark.executorEnv.OMNI_HOME=/opt/OmniRuntime |
vi $OMNIADVISOR_HOME/BoostKit-omniadvisor_1.1.0/config/common_config.cfg
1 2 |
# spark默认参数,一般默认参数不参与参数采样过程 spark_default_config = --conf spark.sql.orc.impl=native --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.locality.wait=0 --conf spark.sql.broadcastTimeout=300 --conf spark.executorEnv.OMNI_HOME=/opt/OmniRuntime |
使用OmniAdvisor参数调优CLI选择Spark引擎。
python $OMNIADVISOR_HOME/BoostKit-omniadvisor_1.1.0/main.pyc
vi $OMNIADVISOR_HOME/BoostKit-omniadvisor_1.1.0/config/common_config.cfg
1 2 3 4 5 |
[spark] # Spark日志的起始时间 log_start_time = 2023-09-05 19:54:51 # Spark日志的终止时间 log_end_time = 2023-09-05 19:57:17 |
输入fetch_history_data或者按“Tab”查看提示选择fetch_history_data,按“Enter”执行。
历史任务信息解析成功后,将结果写入了history_config表和best_config表中。
使用OmniAdvisor参数调优CLI选择Spark引擎。
python $OMNIADVISOR_HOME/BoostKit-omniadvisor_1.1.0/main.pyc
python $OMNIADVISOR_HOME/BoostKit-omniadvisor_1.1.0/main.pyc -e spark -i parameter_recommend -c "spark-sql --master yarn --database tpcds_bin_partitioned_decimal_orc_2 -f q10.sql"