调优Spark任务

使用OmniAdvisor参数调优特性,推荐Spark任务的最佳运行参数,从而优化任务性能。

  1. 在管理节点,修改需要调优的参数列表、参数默认值和参数范围配置,修改默认参数配置。

    1. 打开“$OMNIADVISOR_HOME/BoostKit-omniadvisor_1.1.0/config/spark/spark_config.yml”文件配置项。
      vi $OMNIADVISOR_HOME/BoostKit-omniadvisor_1.1.0/config/spark/spark_config.yml
    2. “i”进入编辑模式,根据实际情况新增、删除、保留调优的参数,同时需要指定参数名称、参数选择范围区间、参数缺省值、数据类型和单位。

      表1以spark.executor.memory参数为例,介绍参数配置项。

      表1 spark_config.yml参数配置项说明

      配置项名称

      配置说明

      spark.executor.memory

      被调优的Spark配置参数名称。用于设置每个执行器(Executor)可以使用的内存量,这个参数直接影响到Spark作业的性能和稳定性。

      choices

      参数值的选择范围。参数调优在进行推荐时,算法会从choices提供的选择范围中选择值。choices的范围通常以default_value作为中间值,按照实际可利用的资源进行上下范围的扩展。

      default_value

      参数缺省值。可参考实际业务所使用的值配置。缺省值必须包含在choices所提供的范围中,无特殊需求一般设为choices序列的中间值。

      type

      数据类型。支持类型有int、boolean、float。

      unit

      单位。支持的单位有K、M、G分别代表KB、MB、GB,一般默认使用GB表示。

      常用配置如下,配置值仅供用户参考,实际使用时可以根据实际业务场景和可利用资源对choices、default_value进行调整,或者增加/减少参与调优的参数。

      spark.executor.memory: # 配置每个Spark Executor(任务执行器)的内存分配量
        choices: [ 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36 ]
        default_value: 28
        type: int
        unit: G
      
      spark.executor.cores: # 配置每个Spark Executor使用的CPU核心数量
        choices: [ 6, 8, 10, 12, 14, 16, 18, 20 ]
        default_value: 8
        type: int
      
      spark.executor.instances: # 配置了Executor的实例数量,即同时运行的Executor的个数
        choices: [ 28, 30, 32, 34, 36, 38, 40, 42, 44, 46, 48 ]
        default_value: 30
        type: int
      
      spark.driver.cores: # 配置Spark Driver使用的CPU核心数量
        choices: [ 4, 6, 8, 10, 12, 14, 16, 18, 20 ]
        default_value: 6
        type: int
      
      spark.driver.memory: # 配置Spark Driver的内存分配量
        choices: [ 12, 14, 16, 18, 20, 22, 24, 26, 28, 30 ]
        default_value: 16
        type: int
        unit: G
      
      spark.memory.offHeap.enabled: # 配置Spark堆外内存是否启用,如果要配置spark.memory.offHeap.size,则该项必须开启
        choices: [ "true" ]
        default_value: "true"
        type: boolean
      
      spark.memory.offHeap.size: # 配置Spark堆外内存,需要保证spark.memory.offHeap.enabled是开启的状态
        choices: [ 2, 3, 4, 5, 6, 7, 8 ]
        default_value: 5
        type: int
        unit: G
      
      spark.broadcast.blockSize: # 配置广播变量(broadcast variable)的块大小
        choices: [ 2, 4, 6, 8, 10, 12, 14, 16, 18, 20 ]
        default_value: 2
        type: int
        unit: M
      
      spark.sql.shuffle.partitions: # 配置了Spark SQL中用于执行Shuffle操作(如聚合和连接)时的分区数量
        choices: [ 200, 300, 400, 500, 600, 700, 800, 900, 1000, 1100, 1200, 1300, 1400, 1500 ]
        default_value: 200
        type: int
      
      spark.executor.memoryOverhead: # 配置了每个Executor的额外的内存开销,以供Spark管理和维护
        choices: [ 2, 4, 6, 8, 10 ]
        default_value: 4
        type: int
        unit: G
      
      spark.memory.fraction: # 配置了Spark内存管理器用于存储缓存的堆内存比例
        choices: [ 0.4, 0.5, 0.6, 0.7, 0.8, 0.9 ]
        default_value: 0.6
        type: float
      
      spark.memory.storageFraction: # 配置了堆内存中用于存储缓存的比例
        choices: [ 0.4, 0.5, 0.6, 0.7, 0.8, 0.9 ]
        default_value: 0.5
        type: float
      
      spark.sql.autoBroadcastJoinThreshold: # 配置了Spark SQL中自动广播连接(Broadcast Join)的阈值
        choices: [ 60, 70, 80, 90, 100, 120, 150, 180, 200, 220, 250, 280, 300 ]
        default_value: 100
        type: int
        unit: M
      
      spark.sql.join.preferSortMergeJoin: # 配置了Spark SQL是否首选排序合并连接(Sort-Merge Join)而不是哈希连接(Hash Join)
        choices: [ "true", "false" ]
        default_value: "true"
        type: boolean
      
      spark.sql.adaptive.enabled: # 配置了Spark SQL自适应查询执行是否启用, AQE默认开启
        choices: [ "true" ]
        default_value: "true"
        type: boolean
      
      spark.sql.adaptive.coalescePartitions.enabled: # 在启用AQE特性后,会合并相邻Shuffle分区,避免太多小任务
        choices: [ "true" ]
        default_value: "true"
        type: boolean
      
      spark.sql.adaptive.advisoryPartitionSizeInBytes: # 在启用AQE特性后Shuffle分区的建议大小,当Spark合并小的Shuffle分区或拆分偏斜的Shuffle分区时,它会生效
        choices: [ 10, 30, 50, 100, 120, 150, 200, 300 ]
        default_value: 100
        type: int
        unit: M
      
      spark.sql.adaptive.coalescePartitions.initialPartitionNum: # 在合并之前的Shuffle分区的初始数量,如果生效,优先级高于spark.sql.shuffle.partitions
        choices: [ 200, 400, 500, 800, 1000, 1200, 1400, 1500, 1600, 1800, 2000 ]
        default_value: 1000
        type: int
      
      spark.sql.adaptive.skewJoin.enabled: # 配置了自适应查询执行中是否启用自适应倾斜连接优化
        choices: [ "true", "false" ]
        default_value: "true"
        type: boolean
      
      spark.sql.adaptive.skewJoin.skewedPartitionFactor: # 计算是否存在数据倾斜的系数
        choices: [ 2, 3, 4, 5, 6, 7, 8 ]
        default_value: 5
        type: int
      
      spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes: # 如果分区的大小超过这个阈值,那么这个分区可能存在数据倾斜的问题
        choices: [ 128, 256, 384, 512, 768, 896, 1024 ]
        default_value: 256
        type: int
        unit: M

      若用户使用的是Spark 3.3.0以上的版本,可以在配置文件中追加如下内容。

      spark.sql.optimizer.runtime.bloomFilter.enabled: # 是否启用布隆过滤器的优化
        choices: [ "true" ]
        default_value: "true"
        type: boolean
      
      spark.sql.optimizer.runtime.bloomFilter.applicationSideScanSizeThreshold: # Bloom过滤器应用程序侧执行计划的聚合扫描大小的字节阈值。
        choices: [ 10, 12, 15, 20, 40, 50 ]
        default_value: 10
        type: int
        unit: G
      
      spark.sql.optimizer.runtime.bloomFilter.creationSideThreshold: # Bloom过滤器创建侧执行计划的大小阈值
        choices: [ 10, 20, 30, 40, 50, 80, 100 ]
        default_value: 10
        type: int
        unit: M
      
      spark.sql.optimizer.runtimeFilter.number.threshold: # 注入的runtime filter的数量的阈值
        choices: [ 10, 12, 15, 20 ]
        default_value: 10
        type: int
      
      spark.sql.orc.aggregatePushdown: # 是否将Aggregate算子下推到orc数据格式中进行优化
        choices: [ "true", "false" ]
        default_value: "true"
        type: boolean
      
      spark.sql.optimizer.runtimeFilter.semiJoinReduction.enabled: # 是否允许插入semijoin,用来减少Shuffle数据量
        choices: [ "true", "false" ]
        default_value: "true"
        type: boolean
      
      spark.sql.shuffledHashJoinFactor: # 如果小表的数据大小乘以这个因子后仍然小于大表,则可以选择使用shuffle hash join
        choices: [ 2, 3, 4, 5, 6, 7, 8 ]
        default_value: 3
        type: int
    3. “Esc”键,输入:wq!,按“Enter”保存并退出编辑。

  2. 可选:当对Spark APP任务调优,且采用Yarn Cluster部署模式时,修改配置中的spark_default_config字段来配置默认conf。

    1. 以spark-submit MyApp.jar为例:
      1
      spark-submit --master yarn --deploy-mode cluster --conf -spark.driver.memory=40g --conf spark.executorEnv.OMNI_HOME=/opt/OmniRuntime --class org.example.MyAPP /path/to/MyApp.jar args1 args2
      
    2. 对Cluster部署模式的任务,将相关配置分为调优参数和非调优参数。其中调优参数指用户选择的需要进行调优的参数,非调优参数包括不可调优的参数以及用户不需要进行调优的参数。
      • 以该任务为例,调优参数为:
        1
        --conf -spark.driver.memory=40g
        
      • 非调优参数为:
        1
        --conf spark.executorEnv.OMNI_HOME=/opt/OmniRuntime
        
    3. 将非调优参数添加到“$OMNIADVISOR_HOME/BoostKit-omniadvisor_1.1.0/config/common_config.cfg”文件配置项中。
      1. 打开“$OMNIADVISOR_HOME/BoostKit-omniadvisor_1.1.0/config/common_config.cfg”文件配置项。
        vi $OMNIADVISOR_HOME/BoostKit-omniadvisor_1.1.0/config/common_config.cfg
      2. “i”进入编辑模式,在spark_default_config字段追加非调优参数配置。
        1
        2
        # spark默认参数,一般默认参数不参与参数采样过程
        spark_default_config = --conf spark.sql.orc.impl=native --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.locality.wait=0 --conf spark.sql.broadcastTimeout=300 --conf spark.executorEnv.OMNI_HOME=/opt/OmniRuntime
        
      3. “Esc”键,输入:wq!,按“Enter”保存并退出编辑。

  3. 在管理节点初始化数据库及同步参数配置到日志解析模块。

    1. 使用OmniAdvisor参数调优CLI选择Spark引擎。

      python $OMNIADVISOR_HOME/BoostKit-omniadvisor_1.1.0/main.pyc

    2. 输入init_environment或者按“Tab”查看提示选择init_environment,按“Enter”执行。

      • 执行成功后,会在test_advisor数据库中创建history_config表、best_config表。
      • 该步骤同时会将spark_config.yml中的调优参数同步至日志解析模块的配置中。当Spark调优参数有调整时,需要重新执行init_environment将参数配置同步到日志解析模块。

  4. 调用日志解析模块,将解析的数据写入数据库。

    1. 打开“$OMNIADVISOR_HOME/BoostKit-omniadvisor_1.1.0/config/common_config.cfg”配置文件。
      vi $OMNIADVISOR_HOME/BoostKit-omniadvisor_1.1.0/config/common_config.cfg
    2. “i”进入编辑模式,修改日志起止时间。common_config.cfg配置文件参数说明请参见common_config.cfg
      1
      2
      3
      4
      5
      [spark]
      # Spark日志的起始时间
      log_start_time = 2023-09-05 19:54:51
      # Spark日志的终止时间
      log_end_time = 2023-09-05 19:57:17
      
    3. “Esc”键,输入:wq!,按“Enter”保存并退出编辑。
    4. 执行采集命令。
      使用OmniAdvisor参数调优CLI选择Spark引擎。
      python $OMNIADVISOR_HOME/BoostKit-omniadvisor_1.1.0/main.pyc

      输入fetch_history_data或者按“Tab”查看提示选择fetch_history_data,按“Enter”执行。

      历史任务信息解析成功后,将结果写入了history_config表和best_config表中。

  5. 采样历史任务的参数。

    1. 使用OmniAdvisor CLI选择Spark引擎。

      python $OMNIADVISOR_HOME/BoostKit-omniadvisor_1.1.0/main.pyc

    2. 输入parameter_sampling或者按“Tab”查看提示选择parameter_sampling,按“Enter”执行。

    3. 输入数字指定参数采样的轮数。

  6. 进行参数调优。

    • 输入“yes”对数据库中所有可调优的任务进行参数采样n轮,等待采样结束即可。

    • 输入“no”则对列举的任务进行筛选,输入需要采样调优的任务的identification,多个任务用“,”隔开,回车即可对指定的任务进行参数采样n轮,等待采样结束即可。

    • 每次参数采样执行完成,会调用日志解析模块对该参数运行的任务状态、任务运行时间等任务信息进行解析保存到history_config表中,同时刷新best_config表中的最优配置等信息。
    • 参数采样完成之后,才能对该任务进行参数推荐。

  7. 推荐采样中运行最优的参数来执行任务。

    1. 使用OmniAdvisor参数调优CLI选择Spark引擎。

      python $OMNIADVISOR_HOME/BoostKit-omniadvisor_1.1.0/main.pyc

    2. 输入parameter_recommend或者按“Tab”查看提示选择parameter_recommend。

    3. 输入要调优的Spark任务提交指令,并通过OmniAdvisor参数调优来提交Spark任务。
      • SQL场景以q10.sql为例。

      • APP场景以HiBench WordCount为例。

    4. 可选:快速采用OmniAdvisor参数调优进行参数推荐和提交。
      python $OMNIADVISOR_HOME/BoostKit-omniadvisor_1.1.0/main.pyc -e spark -i parameter_recommend -c "spark-sql --master yarn --database tpcds_bin_partitioned_decimal_orc_2 -f q10.sql"
    • 当进行参数推荐时,会根据配置中的identification_type中的类型计算需要推荐的任务的identification值,并匹配best_config中的最优参数,替换原始参数提交到Spark执行。
    • 当未在best_config表中成功匹配,或者匹配到的参数执行失败,则回退到原始提交的参数执行。