Rate This Document
Findability
Accuracy
Completeness
Readability

Tuning a Spark Task

Use the OmniAdvisor feature to recommend the optimal running parameters of Spark tasks and optimize task performance.

  1. On the management node, initialize the database as the current user.
    1. Open the /opt/OmniAdvisor/BoostKit-omniadvisor_1.0.0/config/common_config.cfg file.
      1
      vi /opt/OmniAdvisor/BoostKit-omniadvisor_1.0.0/config/common_config.cfg
      
    2. Press i to enter the insert mode and modify the following content of the file: For details about the common_config.cfg file, see common_config.cfg.
      1
      2
      3
      4
      5
      [database]
      # MySQL database information, such as the user name and port number.
      db_name = test_spark
      db_host = localhost
      db_port = 3306
      
    3. Press Esc, type :wq!, and press Enter to save the file and exit.
    4. Run the initialization command:
      1
      python main.pyc spark init_environment
      

      After the command is run, tables yarn_app_result, best_config, and sampling_config are created in the test_spark database.

  2. Configure the log parsing module.
    Modify the /opt/OmniAdvisor/boostkit-omniadvisor-log-analyzer-1.0.0-aarch64/conf/omniAdvisorLogAnalyzer.properties configuration file.
    1. Open the file.
      1
      vi /opt/OmniAdvisor/boostkit-omniadvisor-log-analyzer-1.0.0-aarch64/conf/omniAdvisorLogAnalyzer.properties
      
    2. Press i to enter the insert mode and add or configure the following configuration. For details about the omniAdvisorLogAnalyzer.properties file, see omniAdvisorLogAnalyzer.properties.
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      # Number of concurrent log parsing processes.
      log.analyzer.thread.count=3
      # Database driver. Currently, only MySQL is supported.
      datasource.db.driver=com.mysql.cj.jdbc.Driver
      # Database URL.
      datasource.db.url=jdbc:mysql://server1:3306/database
      
      # Indicates whether to enable Spark log parsing.
      spark.enable=true
      # Name of the database on which SQL execution depends.
      spark.workload=default
      # Spark log parsing mode, which can be rest or log.
      spark.eventLogs.mode=rest
      # Timeout duration of Spark log analysis, in seconds. If this timeout interval is exceeded, the task analysis fails.
      spark.timeout.seconds=30
      
    3. Press Esc, type :wq!, and press Enter to save the file and exit.
  3. Spark logs can be collected in log or rest mode. In rest mode, the history server REST API of Spark is invoked to obtain the log file to be parsed. In log mode, the Spark task log file is directly analyzed.
    • Configuring the rest mode: To enable the rest mode, enable the history server of Spark and modify the /opt/OmniAdvisor/boostkit-omniadvisor-log-analyzer-1.0.0-aarch64/conf/omniAdvisorLogAnalyzer.properties configuration file.
      1. Open the file.
        1
        vi /opt/OmniAdvisor/boostkit-omniadvisor-log-analyzer-1.0.0-aarch64/conf/omniAdvisorLogAnalyzer.properties
        
      2. Press i to enter the insert mode and add or configure the following configuration. For details about the omniAdvisorLogAnalyzer.properties file, see omniAdvisorLogAnalyzer.properties.
        1
        2
        3
        spark.eventLogs.mode=rest
        # URL of the Spark history server.
        spark.rest.url=http://server1:18080
        
      3. Press Esc, type :wq!, and press Enter to save the file and exit.
    • Configuring the log mode: Modify the /opt/OmniAdvisor/boostkit-omniadvisor-log-analyzer-1.0.0-aarch64/conf/omniAdvisorLogAnalyzer.properties configuration file.
      1. Open the file.
        1
        vi /opt/OmniAdvisor/boostkit-omniadvisor-log-analyzer-1.0.0-aarch64/conf/omniAdvisorLogAnalyzer.properties
        
      2. Press i to enter the insert mode and add or configure the following configuration. For details about the omniAdvisorLogAnalyzer.properties file, see omniAdvisorLogAnalyzer.properties.
        1
        2
        3
        4
        5
        6
        7
        8
        9
        spark.eventLogs.mode=log
        # Directory of Spark log files.
        spark.log.directory=hdfs://server1:9000/spark2-history
        # Maximum size of a Spark log file, in MB. If this size exceeds the directory size, this log file will not be parsed.
        spark.log.maxSize.mb=500
        # User used for Kerberos authentication in secure mode. Skip this parameter in non-secure mode.
        kerberos.principal=principle
        # Keytab file path used for Kerberos authentication in secure mode. Skip this parameter in non-secure mode.
        kerberos.keytab.file=/usr/principle.keytab
        
      3. Press Esc, type :wq!, and press Enter to save the file and exit.
      4. Copy the Hadoop configuration files hdfs-site.xml and core-site.xml to the /opt/OmniAdvisor/boostkit-omniadvisor-log-analyzer-1.0.0-aarch64/conf directory.
        cp ${HADOOP_HOME}/etc/hadoop/hdfs-site.xml /opt/OmniAdvisor/boostkit-omniadvisor-log-analyzer-1.0.0-aarch64/conf
        cp ${HADOOP_HOME}/etc/hadoop/core-site.xml /opt/OmniAdvisor/boostkit-omniadvisor-log-analyzer-1.0.0-aarch64/conf
  4. Invoke the log parsing module to write the parsed data into the database.
    1. Open the /opt/OmniAdvisor/BoostKit-omniadvisor_1.0.0/config/common_config.cfg configuration file.
      1
      vi /opt/OmniAdvisor/BoostKit-omniadvisor_1.0.0/config/common_config.cfg
      
    2. Press i to enter the insert mode and modify the start time and end time of the log. For details about the common_config.cfg file, see common_config.cfg.
      1
      2
      3
      4
      5
      6
      7
      [spark]
      # Start time of Spark logs.
      log_start_time = 2023-09-05 19:54:51
      # End time of Spark logs.
      log_end_time = 2023-09-05 19:57:17
      # Indicates whether to sample all SQL statements that have been run (obtain application_name from the database). If the value is true, the history_application_name configuration item is invalid.
      enable_sampling_all_sql = true
      
    3. Press Esc, type :wq!, and press Enter to save the file and exit.
    4. Run the collection command.
      1
      python main.pyc spark fetch_history_data
      

      After the data is parsed successfully, the data result is written to the yarn_app_result and best_config tables, whereas the sampling_config table is still empty.

  5. Sample the parameters of historical tasks and tune the parameters.
    • Sample the parameters of each historical task for 40 times.
      1. Open the /opt/OmniAdvisor/BoostKit-omniadvisor_1.0.0/config/common_config.cfg configuration file.
        1
        vi /opt/OmniAdvisor/BoostKit-omniadvisor_1.0.0/config/common_config.cfg
        
      2. Press i to enter the insert mode and modify the file as follows: For details about the common_config.cfg file, see common_config.cfg.
        1
        2
        3
        4
        5
        [sampling]
        # Number of parameter sampling rounds.
        sampling_epochs = 40
        [spark]
        enable_sampling_all_sql = true
        
      3. Press Esc, type :wq!, and press Enter to save the file and exit.
      4. Open the /opt/OmniAdvisor/BoostKit-omniadvisor_1.0.0/config/spark/spark_config.yml configuration file. Modify the parameter value range or add tuning parameters based on your requirements and the comments in the configuration file.
      5. Run the collection command.
        1
        python main.pyc spark parameter_sampling
        

        After the command is run, you can view the sampled data in the sampling_config table of the database.

    • Sample the parameters of a specified historical task for 40 times.
      1. Open the /opt/OmniAdvisor/BoostKit-omniadvisor_1.0.0/config/common_config.cfg configuration file.
        1
        vi /opt/OmniAdvisor/BoostKit-omniadvisor_1.0.0/config/common_config.cfg
        
      2. Press i to enter the insert mode and modify the file as follows: For details about the common_config.cfg file, see common_config.cfg.
        1
        2
        3
        4
        5
        6
        7
        [sampling]
        # Number of parameter sampling rounds.
        sampling_epochs = 40
        
        [spark]
        enable_sampling_all_sql = false
        history_application_name=q12
        
      3. Press Esc, type :wq!, and press Enter to save the file and exit.
      4. Run the collection command.
        1
        python main.pyc spark parameter_sampling
        

        After the command is run, you can view the sampled data in the sampling_config table of the database. After the sampling tuning is complete, you can recommend parameters for the task.

  6. Modify the execution script.
    1. Copy the native spark-sql script and rename it spark-sql-recommend.
      rm -rf $SPARK_HOME/bin/spark-sql-recommend
      cp $SPARK_HOME/bin/spark-sql $SPARK_HOME/bin/spark-sql-recommend
    2. Open the spark-sql-recommend script.
      1
      vi spark-sql-recommend
      
    3. Press i to enter the insert mode and modify line 25 of the script.
      1
      exec "${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver "$@"
      
      Modify the content as follows:
      1
      python /opt/OmniAdvisor/BoostKit-omniadvisor_1.0.0/main.pyc spark parameter_recommend "$@"
      
    4. Press Esc, type :wq!, and press Enter to save the file and exit.
  7. Recommend executing the optimal parameters in the sampling to execute the task.
    • Recommend parameters based on the task name application_name.
      1. Open the /opt/OmniAdvisor/BoostKit-omniadvisor_1.0.0/config/common_config.cfg configuration file.
        1
        vi /opt/OmniAdvisor/BoostKit-omniadvisor_1.0.0/config/common_config.cfg
        
      2. Press i to enter the insert mode and modify the file as follows: For details about the common_config.cfg file, see common_config.cfg.
        1
        2
        3
        4
        [recommend]
        # After task sampling tuning is complete, you need to run a historical task again. You can use the task name (application_name) or query hash value (query_hash) to search the database for the optimal task parameters.
        # options: [application_name, query_hash]
        recommend_identifier = application_name
        
      3. Press Esc, type :wq!, and press Enter to save the file and exit.
      4. Run the command. Using q12 as an example, the following command recommends the optimal parameters in the database based on the task name.
        $SPARK_HOME/bin/spark-sql-recommend --deploy-mode client --driver-cores 5 --driver-memory 5g --num-executors 18 --conf spark.memory.offHeap.size=2G --database tpcds_bin_partitioned_decimal_orc_3000 --name q12 -f /home/test_spark/q12.sql
    • Recommend parameters based on query_hash.
      1. Open the /opt/OmniAdvisor/BoostKit-omniadvisor_1.0.0/config/common_config.cfg configuration file.
        1
        vi /opt/OmniAdvisor/BoostKit-omniadvisor_1.0.0/config/common_config.cfg
        
      2. Press i to enter the insert mode and modify the file as follows: For details about the common_config.cfg file, see common_config.cfg.
        1
        2
        3
        4
        [recommend]
        # After task sampling tuning is complete, you need to run a historical task again. You can use the task name (application_name) or query hash value (query_hash) to search the database for the optimal task parameters.
        # options: [application_name, query_hash]
        recommend_identifier = query_hash
        
      3. Press Esc, type :wq!, and press Enter to save the file and exit.
      4. Run the command. Using q12 as an example, the following command recommends the optimal parameters in the database based on query_hash.
        1
        $SPARK_HOME/bin/spark-sql-recommend --deploy-mode client --driver-cores 5 --driver-memory 5g --num-executors 18 --conf spark.memory.offHeap.size=2G --database tpcds_bin_partitioned_decimal_orc_3000 --name q12_bak -f /home/test_spark/q12.sql