Tuning a Spark Task
Use the OmniAdvisor feature to recommend the optimal running parameters of Spark tasks and optimize task performance.
- On the management node, modify the list of parameters to be tuned, default parameter values, and parameter ranges, and modify the default parameter settings.
- Open the $OMNIADVISOR_HOME/BoostKit-omniadvisor_1.1.0/config/spark/spark_config.yml file.
vi $OMNIADVISOR_HOME/BoostKit-omniadvisor_1.1.0/config/spark/spark_config.yml
- Press i to enter the insert mode. Add, delete, or retain the parameters to be tuned as needed. In addition, set the parameter name, value range, default value, data type, and unit.
As an example, Table 1 describes the configuration items of the spark.executor.memory parameter.
Table 1 Configuration items in spark_config.yml Configuration Item
Description
spark.executor.memory
Name of the Spark configuration parameter to be tuned. It specifies the memory space that can be used by each executor. It affects the performance and stability of Spark jobs.
choices
Parameter value range. During parameter tuning, the algorithm selects a value from the value range defined by choices. The value range usually uses default_value as the median value and stretches based on the available resources.
default_value
Default parameter value. You can set this configuration item based on your actual service requirements. The default parameter value must be included in the value range defined by choices. Generally it is set to the median value of choices.
type
Data type, which is int, boolean, or float.
unit
Can be K, M, and G, which indicate KB, MB, and GB respectively. Generally, GB is used by default.
The common configuration items are as follows. The parameter values are for reference only. You can adjust the values of choices and default_value or add or delete parameters involved in the tuning process based on your service scenario and available resources.
spark.executor.memory: # Configure the memory space allocated to each Spark executor (task executor). choices: [ 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36 ] default_value: 28 type: int unit: G spark.executor.cores: # Configure the number of CPU cores used by each Spark executor. choices: [ 6, 8, 10, 12, 14, 16, 18, 20 ] default_value: 8 type: int spark.executor.instances: # Configure the number of executor instances, that is, the number of executors that run concurrently. choices: [ 28, 30, 32, 34, 36, 38, 40, 42, 44, 46, 48 ] default_value: 30 type: int spark.driver.cores: # Configure the number of CPU cores used by the Spark driver. choices: [ 4, 6, 8, 10, 12, 14, 16, 18, 20 ] default_value: 6 type: int spark.driver.memory: # Configure the memory space allocated to the Spark driver. choices: [ 12, 14, 16, 18, 20, 22, 24, 26, 28, 30 ] default_value: 16 type: int unit: G spark.memory.offHeap.enabled: # Set whether to enable the Spark off-heap memory. Enable this option if you want to configure spark.memory.offHeap.size. choices: [ "true" ] default_value: "true" type: boolean spark.memory.offHeap.size: # Configure the Spark off-heap memory size. Ensure that spark.memory.offHeap.enabled has been enabled. choices: [ 2, 3, 4, 5, 6, 7, 8 ] default_value: 5 type: int unit: G spark.broadcast.blockSize: # Configure the block size of the broadcast variable. choices: [ 2, 4, 6, 8, 10, 12, 14, 16, 18, 20 ] default_value: 2 type: int unit: M spark.sql.shuffle.partitions: # Configure the number of partitions used for shuffle operations (such as aggregation and join) in Spark SQL. choices: [ 200, 300, 400, 500, 600, 700, 800, 900, 1000, 1100, 1200, 1300, 1400, 1500 ] default_value: 200 type: int spark.executor.memoryOverhead: # Configure the extra memory overhead of each executor for Spark management and maintenance. choices: [ 2, 4, 6, 8, 10 ] default_value: 4 type: int unit: G spark.memory.fraction: # Configure the proportion of heap memory used by the Spark memory manager to store caches. choices: [ 0.4, 0.5, 0.6, 0.7, 0.8, 0.9 ] default_value: 0.6 type: float spark.memory.storageFraction: # Configure the proportion of the heap memory used to store caches. choices: [ 0.4, 0.5, 0.6, 0.7, 0.8, 0.9 ] default_value: 0.5 type: float spark.sql.autoBroadcastJoinThreshold: # Configure the threshold of automatic broadcast join in Spark SQL. choices: [ 60, 70, 80, 90, 100, 120, 150, 180, 200, 220, 250, 280, 300 ] default_value: 100 type: int unit: M spark.sql.join.preferSortMergeJoin: # Set whether Spark SQL prefers Sort-Merge Join instead of Hash Join. choices: [ "true", "false" ] default_value: "true" type: boolean spark.sql.adaptive.enabled: # Set whether to enable the Adaptive Query Execution (AQE) feature in Spark SQL. AQE is enabled by default. choices: [ "true" ] default_value: "true" type: boolean spark.sql.adaptive.coalescePartitions.enabled: # When AQE is enabled, Spark merges contiguous shuffle partitions to avoid too many small tasks. choices: [ "true" ] default_value: "true" type: boolean spark.sql.adaptive.advisoryPartitionSizeInBytes: # Configure the advisory size of a shuffle partition when AQE is enabled. This configuration item takes effect when Spark merges small shuffle partitions or splits skewed shuffle partitions. choices: [ 10, 30, 50, 100, 120, 150, 200, 300 ] default_value: 100 type: int unit: M spark.sql.adaptive.coalescePartitions.initialPartitionNum: # Configure the initial number of shuffle partitions before merging. This configuration item takes priority over spark.sql.shuffle.partitions. choices: [ 200, 400, 500, 800, 1000, 1200, 1400, 1500, 1600, 1800, 2000 ] default_value: 1000 type: int spark.sql.adaptive.skewJoin.enabled: # Set whether to enable adaptive optimization of skewed joins for AQE. choices: [ "true", "false" ] default_value: "true" type: boolean spark.sql.adaptive.skewJoin.skewedPartitionFactor: # Configure the multiplier used to determine whether a partition is a data skew partition. choices: [ 2, 3, 4, 5, 6, 7, 8 ] default_value: 5 type: int spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes: # If the partition size is greater than this threshold, data skew may occur in the partition. choices: [ 128, 256, 384, 512, 768, 896, 1024 ] default_value: 256 type: int unit: M
If you are using Spark 3.3.0 or later, you can add the following content to the configuration file:
spark.sql.optimizer.runtime.bloomFilter.enabled: # Set whether to enable Bloom filter optimization. choices: [ "true" ] default_value: "true" type: boolean spark.sql.optimizer.runtime.bloomFilter.applicationSideScanSizeThreshold: # Configure the byte size threshold of the Bloom filter application side plan's aggregated scan size. choices: [ 10, 12, 15, 20, 40, 50 ] default_value: 10 type: int unit: G spark.sql.optimizer.runtime.bloomFilter.creationSideThreshold: # Configure the size threshold of the Bloom filter creation side plan. choices: [ 10, 20, 30, 40, 50, 80, 100 ] default_value: 10 type: int unit: M spark.sql.optimizer.runtimeFilter.number.threshold: # Configure the maximum number of injected runtime filters. choices: [ 10, 12, 15, 20 ] default_value: 10 type: int spark.sql.orc.aggregatePushdown: # Set whether to push aggregate operators down to the ORC data format for optimization. choices: [ "true", "false" ] default_value: "true" type: boolean spark.sql.optimizer.runtimeFilter.semiJoinReduction.enabled: # Set whether to insert a semi join to reduce the amount of shuffle data. choices: [ "true", "false" ] default_value: "true" type: boolean spark.sql.shuffledHashJoinFactor: # The shuffle hash join can be selected if the data size of the small table multiplied by this factor is still smaller than that of the large table. choices: [ 2, 3, 4, 5, 6, 7, 8 ] default_value: 3 type: int
- Press Esc, type :wq!, and press Enter to save the file and exit.
- Open the $OMNIADVISOR_HOME/BoostKit-omniadvisor_1.1.0/config/spark/spark_config.yml file.
- Optional: To tune Spark application tasks that run in Yarn cluster deployment mode, modify the spark_default_config field in the configuration file to set the default configuration.
- Take spark-submit MyApp.jar as an example:
1spark-submit --master yarn --deploy-mode cluster --conf -spark.driver.memory=40g --conf spark.executorEnv.OMNI_HOME=/opt/OmniRuntime --class org.example.MyAPP /path/to/MyApp.jar args1 args2
- For tasks that run in cluster deployment mode, the configuration includes tuned parameters and non-tuned parameters. Tuned parameters refer to the parameters that need to be tuned. Non-tuned parameters include the parameters that cannot be or do not need to be tuned.
- For the preceding task, the tuned parameter is:
1--conf -spark.driver.memory=40g
- The non-tuned parameter is:
1--conf spark.executorEnv.OMNI_HOME=/opt/OmniRuntime
- For the preceding task, the tuned parameter is:
- Add the non-tuned parameter to the $OMNIADVISOR_HOME/BoostKit-omniadvisor_1.1.0/config/common_config.cfg file.
- Open the $OMNIADVISOR_HOME/BoostKit-omniadvisor_1.1.0/config/common_config.cfg file.
vi $OMNIADVISOR_HOME/BoostKit-omniadvisor_1.1.0/config/common_config.cfg
- Press i to enter the insert mode and add non-tuned parameter configuration to the spark_default_config field.
1 2
# Default Spark parameter. Generally, the default parameter is not involved in parameter sampling. spark_default_config = --conf spark.sql.orc.impl=native --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.locality.wait=0 --conf spark.sql.broadcastTimeout=300 --conf spark.executorEnv.OMNI_HOME=/opt/OmniRuntime
- Press Esc, type :wq!, and press Enter to save the file and exit.
- Open the $OMNIADVISOR_HOME/BoostKit-omniadvisor_1.1.0/config/common_config.cfg file.
- Take spark-submit MyApp.jar as an example:
- On the management node, initialize the database and synchronize the parameter configuration to the log parsing module.
-
Use the OmniAdvisor CLI to choose the Spark engine.
python $OMNIADVISOR_HOME/BoostKit-omniadvisor_1.1.0/main.pyc

- Input init_environment or press Tab to select init_environment, and then press Enter.

- After the command is run, tables history_config and best_config are created in the test_advisor database.
- In this step, the tuned parameters in spark_config.yml are synchronized to the configuration of the log parsing module. If the Spark tuned parameters have a change, you need to run the init_environment command again to synchronize the new parameter setting to the log parsing module.
-
- Invoke the log parsing module to write the parsed data into the database.
- Open the $OMNIADVISOR_HOME/BoostKit-omniadvisor_1.1.0/config/common_config.cfg configuration file.
vi $OMNIADVISOR_HOME/BoostKit-omniadvisor_1.1.0/config/common_config.cfg
- Press i to enter the insert mode and modify the start time and end time of the log. For details about the common_config.cfg file, see common_config.cfg.
1 2 3 4 5
[spark] # Start time of Spark logs. log_start_time = 2023-09-05 19:54:51 # End time of Spark logs. log_end_time = 2023-09-05 19:57:17
- Press Esc, type :wq!, and press Enter to save the file and exit.
- Run the collection command.
Use the OmniAdvisor CLI to choose the Spark engine.
python $OMNIADVISOR_HOME/BoostKit-omniadvisor_1.1.0/main.pyc

Input fetch_history_data or press Tab to select fetch_history_data, and then press Enter.
After the historical task information is parsed, the result is written into the history_config and best_config tables.
- Open the $OMNIADVISOR_HOME/BoostKit-omniadvisor_1.1.0/config/common_config.cfg configuration file.
- Sample parameters of historical tasks.
- Tune parameters.
- Input yes to sample parameters of all tunable tasks in the database for n rounds. Wait until the sampling is complete.

- If you input no, the listed tasks are filtered. Input the identification values of the tasks to tuned. Use commas (,) to separate multiple task identification values. Press Enter to sample task parameters for n rounds. Wait until the sampling is complete.

- Each time a task parameter is sampled, the program invokes the log parsing module to parse task information such as the task status and task running time, saves the information to the history_config table, and updates the optimal configuration in the best_config table.
- You can recommend parameters for a task only after parameter sampling is complete.
- Input yes to sample parameters of all tunable tasks in the database for n rounds. Wait until the sampling is complete.
- Recommend executing the optimal parameters in the sampling to execute the task.
-
Use the OmniAdvisor CLI to choose the Spark engine.
python $OMNIADVISOR_HOME/BoostKit-omniadvisor_1.1.0/main.pyc

- Input parameter_recommend or press Tab to select parameter_recommend.

- Input the command of submitting the Spark task to be tuned, and submit the Spark task through OmniAdvisor.
- The following uses q10.sql as an example for an SQL task:

- The following uses HiBench WordCount as an example for an application task:

- The following uses q10.sql as an example for an SQL task:
- Optional: Use OmniAdvisor to recommend and submit parameters.
python $OMNIADVISOR_HOME/BoostKit-omniadvisor_1.1.0/main.pyc -e spark -i parameter_recommend -c "spark-sql --master yarn --database tpcds_bin_partitioned_decimal_orc_2 -f q10.sql"
- During parameter recommendation, the task identification value is calculated based on identification_type in the configuration, the optimal parameters in best_config are matched to replace the original parameters, and the new parameters submitted to Spark for execution.
- If no optimal parameters are matched in the best_config table or the matched parameters fail to be executed, the task is executed using the original parameters.
-


