Rate This Document
Findability
Accuracy
Completeness
Readability

Executing Spark Services

Verify that SparkExtension takes effect and run a test case to show the performance optimization. Ensure that Spark engine services are running properly.

Spark uses interactive command lines to execute SQL tasks. To check whether SparkExtension has taken effect, add EXPLAIN before the SQL statement or view the Spark UI to check the operator names in the execution plan. If an operator name starting with Omni is displayed, SparkExtension has taken effect.

This test case uses the tpcds_bin_partitioned_varchar_orc_2 data table as the test table. Table 1 describes the test table. The test SQL statement is the TPC-DS test dataset Q82.

Table 1 Test table information

Table

Format

Rows

item

orc

26000

inventory

orc

16966305

date_dim

orc

73049

store_sales

orc

5760749

  1. Start the Spark SQL CLI.
    • Command for starting open source Spark SQL:
      1
      /usr/local/spark/bin/spark-sql --deploy-mode client --driver-cores 8 --driver-memory 20g --master yarn --executor-cores 8 --executor-memory 26g --num-executors 36 --conf spark.executor.extraJavaOptions='-XX:+UseG1GC -XX:+UseNUMA' --conf spark.locality.wait=0 --conf spark.network.timeout=600 --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.adaptive.enabled=true --conf spark.sql.autoBroadcastJoinThreshold=100M --conf spark.sql.broadcastTimeout=600 --conf spark.sql.shuffle.partitions=1000 --conf spark.sql.orc.impl=native --conf spark.task.cpus=1 --database tpcds_bin_partitioned_varchar_orc_2
      
    • Perform the following operations to start the SparkExtension 3.1.1 plugin:
      1. Go to the /usr/local/spark/conf directory and create the spark-defaults-omnioperator.conf file.
        1
        2
        cd /usr/local/spark/conf
        cp spark-defaults.conf spark-defaults-omnioperator.conf
        
      2. Change the permission on spark-defaults-omnioperator.conf to 640.
        1
        chmod 640 spark-defaults-omnioperator.conf
        
      3. Open spark-defaults-omnioperator.conf.
        1
        vi spark-defaults-omnioperator.conf
        
      4. Press i to enter the insert mode and add the following content to the end of the file:
         1
         2
         3
         4
         5
         6
         7
         8
         9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        spark.sql.optimizer.runtime.bloomFilter.enabled true
        spark.driverEnv.LD_LIBRARY_PATH /opt/omni-operator/lib
        spark.driverEnv.LD_PRELOAD /opt/omni-operator/lib/libjemalloc.so.2
        spark.driverEnv.OMNI_HOME /opt/omni-operator
        spark.driver.extraClassPath /opt/omni-operator/lib/boostkit-omniop-spark-3.1.1-2.0.0-aarch64.jar:/opt/omni-operator/lib/boostkit-omniop-bindings-2.0.0-aarch64.jar:/opt/omni-operator/lib/dependencies/protobuf-java-3.15.8.jar:/opt/omni-operator/lib/dependencies/boostkit-omniop-native-reader-3.1.1-2.0.0.jar
        spark.driver.extraLibraryPath /opt/omni-operator/lib
        spark.driver.defaultJavaOptions -Djava.library.path=/opt/omni-operator/lib
        spark.executorEnv.LD_LIBRARY_PATH ${PWD}/omni/omni-operator/lib
        spark.executorEnv.LD_PRELOAD ${PWD}/omni/omni-operator/lib/libjemalloc.so.2
        spark.executorEnv.MALLOC_CONF narenas:2
        spark.executorEnv.OMNI_HOME ${PWD}/omni/omni-operator
        spark.executor.extraClassPath ${PWD}/omni/omni-operator/lib/boostkit-omniop-spark-3.1.1-2.0.0-aarch64.jar:${PWD}/omni/omni-operator/lib/boostkit-omniop-bindings-2.0.0-aarch64.jar:${PWD}/omni/omni-operator/lib/dependencies/protobuf-java-3.15.8.jar:${PWD}/omni/omni-operator/lib/dependencies/boostkit-omniop-native-reader-3.1.1-2.0.0.jar
        spark.executor.extraLibraryPath ${PWD}/omni/omni-operator/lib
        spark.omni.sql.columnar.fusion false
        spark.shuffle.manager org.apache.spark.shuffle.sort.OmniColumnarShuffleManager
        spark.sql.codegen.wholeStage false
        spark.sql.extensions com.huawei.boostkit.spark.ColumnarPlugin
        spark.omni.sql.columnar.RewriteSelfJoinInInPredicate true
        spark.sql.execution.filterMerge.enabled true
        spark.omni.sql.columnar.dedupLeftSemiJoin true
        spark.omni.sql.columnar.radixSort.enabled true
        spark.executorEnv.MALLOC_CONF tcache:false
        spark.sql.adaptive.coalescePartitions.minPartitionNum 200
        spark.sql.join.columnar.preferShuffledHashJoin true
        
      5. Press Esc, type :wq!, and press Enter to save the file and exit.
      6. Run the startup command.
        1
        /usr/local/spark/bin/spark-sql --archives hdfs://server1:9000/user/root/omni-operator.tar.gz#omni --deploy-mode client --driver-cores 8 --driver-memory 40g --master yarn --executor-cores 12 --executor-memory 5g --conf spark.memory.offHeap.enabled=true --conf spark.memory.offHeap.size=35g --num-executors 24 --conf spark.executor.extraJavaOptions='-XX:+UseG1GC' --conf spark.locality.wait=0 --conf spark.network.timeout=600 --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.adaptive.enabled=true --conf spark.sql.adaptive.skewedJoin.enabled=true --conf spark.sql.autoBroadcastJoinThreshold=100M --conf spark.sql.broadcastTimeout=600 --conf spark.sql.shuffle.partitions=600 --conf spark.sql.orc.impl=native --conf spark.task.cpus=1 --properties-file /usr/local/spark/conf/spark-defaults-omnioperator.conf --database tpcds_bin_partitioned_varchar_orc_2
        
    • Perform the following operations to start the SparkExtension 3.3.1 plugin:
      1. Go to the /usr/local/spark/conf directory and create the spark-defaults-omnioperator.conf file.
        1
        2
        cd /usr/local/spark/conf
        cp spark-defaults.conf spark-defaults-omnioperator.conf
        
      2. Change the permission on spark-defaults-omnioperator.conf to 640.
        1
        chmod 640 spark-defaults-omnioperator.conf
        
      3. Open spark-defaults-omnioperator.conf.
        1
        vi spark-defaults-omnioperator.conf
        
      4. Press i to enter the insert mode and add the following content to the end of the file:
         1
         2
         3
         4
         5
         6
         7
         8
         9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        spark.sql.optimizer.runtime.bloomFilter.enabled true
        spark.driverEnv.LD_LIBRARY_PATH /opt/omni-operator/lib
        spark.driverEnv.LD_PRELOAD /opt/omni-operator/lib/libjemalloc.so.2
        spark.driverEnv.OMNI_HOME /opt/omni-operator
        spark.driver.extraClassPath /opt/omni-operator/lib/boostkit-omniop-spark-3.3.1-2.0.0-aarch64.jar:/opt/omni-operator/lib/boostkit-omniop-bindings-2.0.0-aarch64.jar:/opt/omni-operator/lib/dependencies/protobuf-java-3.15.8.jar:/opt/omni-operator/lib/dependencies/boostkit-omniop-native-reader-3.3.1-2.0.0.jar
        spark.driver.extraLibraryPath /opt/omni-operator/lib
        spark.driver.defaultJavaOptions -Djava.library.path=/opt/omni-operator/lib
        spark.executorEnv.LD_LIBRARY_PATH ${PWD}/omni/omni-operator/lib
        spark.executorEnv.LD_PRELOAD ${PWD}/omni/omni-operator/lib/libjemalloc.so.2
        spark.executorEnv.MALLOC_CONF narenas:2
        spark.executorEnv.OMNI_HOME ${PWD}/omni/omni-operator
        spark.executor.extraClassPath ${PWD}/omni/omni-operator/lib/boostkit-omniop-spark-3.3.1-2.0.0-aarch64.jar:${PWD}/omni/omni-operator/lib/boostkit-omniop-bindings-2.0.0-aarch64.jar:${PWD}/omni/omni-operator/lib/dependencies/protobuf-java-3.15.8.jar:${PWD}/omni/omni-operator/lib/dependencies/boostkit-omniop-native-reader-3.3.1-2.0.0.jar
        spark.executor.extraLibraryPath ${PWD}/omni/omni-operator/lib
        spark.omni.sql.columnar.fusion false
        spark.shuffle.manager org.apache.spark.shuffle.sort.OmniColumnarShuffleManager
        spark.sql.codegen.wholeStage false
        spark.sql.extensions com.huawei.boostkit.spark.ColumnarPlugin
        spark.omni.sql.columnar.RewriteSelfJoinInInPredicate true
        spark.sql.execution.filterMerge.enabled true
        spark.omni.sql.columnar.dedupLeftSemiJoin true
        spark.omni.sql.columnar.radixSort.enabled true
        spark.executorEnv.MALLOC_CONF tcache:false
        spark.sql.adaptive.coalescePartitions.minPartitionNum 200
        spark.sql.join.columnar.preferShuffledHashJoin true
        
      5. Press Esc, type :wq!, and press Enter to save the file and exit.
      6. Run the startup command.
        1
        /usr/local/spark/bin/spark-sql --archives hdfs://server1:9000/user/root/omni-operator.tar.gz#omni --deploy-mode client --driver-cores 8 --driver-memory 40g --master yarn --executor-cores 12 --executor-memory 5g --conf spark.memory.offHeap.enabled=true --conf spark.memory.offHeap.size=35g --num-executors 24 --conf spark.executor.extraJavaOptions='-XX:+UseG1GC' --conf spark.locality.wait=0 --conf spark.network.timeout=600 --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.adaptive.enabled=true --conf spark.sql.adaptive.skewedJoin.enabled=true --conf spark.sql.autoBroadcastJoinThreshold=100M --conf spark.sql.broadcastTimeout=600 --conf spark.sql.shuffle.partitions=600 --conf spark.sql.orc.impl=native --conf spark.task.cpus=1 --properties-file /usr/local/spark/conf/spark-defaults-omnioperator.conf --database tpcds_bin_partitioned_varchar_orc_2
        
    • Perform the following operations to start the SparkExtension 3.4.3 plugin:
      1. Go to the /usr/local/spark/conf directory and create the spark-defaults-omnioperator.conf file.
        1
        2
        cd /usr/local/spark/conf
        cp spark-defaults.conf spark-defaults-omnioperator.con
        
      2. Change the permission on spark-defaults-omnioperator.conf to 640.
        1
        chmod 640 spark-defaults-omnioperator.conf
        
      3. Open spark-defaults-omnioperator.conf.
        1
        vi spark-defaults-omnioperator.conf
        
      4. Press i to enter the insert mode and add the following content to the end of the file:
         1
         2
         3
         4
         5
         6
         7
         8
         9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        spark.sql.optimizer.runtime.bloomFilter.enabled true 
        spark.driverEnv.LD_LIBRARY_PATH /opt/omni-operator/lib 
        spark.driverEnv.LD_PRELOAD /opt/omni-operator/lib/libjemalloc.so.2 
        spark.driverEnv.OMNI_HOME /opt/omni-operator 
        spark.driver.extraClassPath /opt/omni-operator/lib/boostkit-omniop-spark-3.4.3-2.0.0-aarch64.jar:/opt/omni-operator/lib/boostkit-omniop-bindings-2.0.0-aarch64.jar:/opt/omni-operator/lib/dependencies/protobuf-java-3.15.8.jar:/opt/omni-operator/lib/dependencies/boostkit-omniop-native-reader-3.4.3-2.0.0.jar 
        spark.driver.extraLibraryPath /opt/omni-operator/lib 
        spark.driver.defaultJavaOptions -Djava.library.path=/opt/omni-operator/lib 
        spark.executorEnv.LD_LIBRARY_PATH ${PWD}/omni/omni-operator/lib
        spark.executorEnv.LD_PRELOAD ${PWD}/omni/omni-operator/lib/libjemalloc.so.2 
        spark.executorEnv.MALLOC_CONF narenas:2 
        spark.executorEnv.OMNI_HOME ${PWD}/omni/omni-operator 
        spark.executor.extraClassPath ${PWD}/omni/omni-operator/lib/boostkit-omniop-spark-3.4.3-2.0.0-aarch64.jar:${PWD}/omni/omni-operator/lib/boostkit-omniop-bindings-2.0.0-aarch64.jar:${PWD}/omni/omni-operator/lib/dependencies/protobuf-java-3.15.8.jar:${PWD}/omni/omni-operator/lib/dependencies/boostkit-omniop-native-reader-3.4.3-2.0.0.jar
        spark.executor.extraLibraryPath ${PWD}/omni/omni-operator/lib 
        spark.omni.sql.columnar.fusion false 
        spark.shuffle.manager org.apache.spark.shuffle.sort.OmniColumnarShuffleManager 
        spark.sql.codegen.wholeStage false 
        spark.sql.extensions com.huawei.boostkit.spark.ColumnarPlugin 
        spark.omni.sql.columnar.RewriteSelfJoinInInPredicate true 
        spark.sql.execution.filterMerge.enabled true 
        spark.omni.sql.columnar.dedupLeftSemiJoin true 
        spark.omni.sql.columnar.radixSort.enabled true 
        spark.executorEnv.MALLOC_CONF tcache:false 
        spark.sql.adaptive.coalescePartitions.minPartitionNum 200 
        spark.sql.join.columnar.preferShuffledHashJoin true
        
      5. Press Esc, type :wq!, and press Enter to save the file and exit.
      6. Run the startup command.
        1
        /usr/local/spark/bin/spark-sql --archives hdfs://server1:9000/user/root/omni-operator.tar.gz#omni --deploy-mode client --driver-cores 8 --driver-memory 40g --master yarn --executor-cores 12 --executor-memory 5g --conf spark.memory.offHeap.enabled=true --conf spark.memory.offHeap.size=35g --num-executors 24 --conf spark.executor.extraJavaOptions='-XX:+UseG1GC' --conf spark.locality.wait=0 --conf spark.network.timeout=600 --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.adaptive.enabled=true --conf spark.sql.adaptive.skewedJoin.enabled=true --conf spark.sql.autoBroadcastJoinThreshold=100M --conf spark.sql.broadcastTimeout=600 --conf spark.sql.shuffle.partitions=600 --conf spark.sql.orc.impl=native --conf spark.task.cpus=1 --properties-file /usr/local/spark/conf/spark-defaults-omnioperator.conf --database tpcds_bin_partitioned_varchar_orc_2
        
    • Perform the following operations to start the SparkExtension 3.5.2 plugin:
      1. Go to the /usr/local/spark/conf directory and create the spark-defaults-omnioperator.conf file.
        1
        2
        cd /usr/local/spark/conf
        cp spark-defaults.conf spark-defaults-omnioperator.conf
        
      2. Change the permission on spark-defaults-omnioperator.conf to 640.
        1
        chmod 640 spark-defaults-omnioperator.conf
        
      3. Open spark-defaults-omnioperator.conf.
        1
        vi spark-defaults-omnioperator.conf
        
      4. Press i to enter the insert mode and add the following content to the end of the file:
         1
         2
         3
         4
         5
         6
         7
         8
         9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        spark.sql.optimizer.runtime.bloomFilter.enabled true
        spark.driverEnv.LD_LIBRARY_PATH /opt/omni-operator/lib
        spark.driverEnv.LD_PRELOAD /opt/omni-operator/lib/libjemalloc.so.2
        spark.driverEnv.OMNI_HOME /opt/omni-operator
        spark.driver.extraClassPath /opt/omni-operator/lib/boostkit-omniop-spark-3.5.2-2.0.0-aarch64.jar:/opt/omni-operator/lib/boostkit-omniop-bindings-2.0.0-aarch64.jar:/opt/omni-operator/lib/dependencies/protobuf-java-3.15.8.jar:/opt/omni-operator/lib/dependencies/boostkit-omniop-native-reader-3.5.2-2.0.0.jar
        spark.driver.extraLibraryPath /opt/omni-operator/lib
        spark.driver.defaultJavaOptions -Djava.library.path=/opt/omni-operator/lib
        spark.executorEnv.LD_LIBRARY_PATH ${PWD}/omni/omni-operator/lib
        spark.executorEnv.LD_PRELOAD ${PWD}/omni/omni-operator/lib/libjemalloc.so.2
        spark.executorEnv.MALLOC_CONF narenas:2
        spark.executorEnv.OMNI_HOME ${PWD}/omni/omni-operator
        spark.executor.extraClassPath ${PWD}/omni/omni-operator/lib/boostkit-omniop-spark-3.5.2-2.0.0-aarch64.jar:${PWD}/omni/omni-operator/lib/boostkit-omniop-bindings-2.0.0-aarch64.jar:${PWD}/omni/omni-operator/lib/dependencies/protobuf-java-3.15.8.jar:${PWD}/omni/omni-operator/lib/dependencies/boostkit-omniop-native-reader-3.5.2-2.0.0.jar
        spark.executor.extraLibraryPath ${PWD}/omni/omni-operator/lib
        spark.omni.sql.columnar.fusion false
        spark.shuffle.manager org.apache.spark.shuffle.sort.OmniColumnarShuffleManager
        spark.sql.codegen.wholeStage false
        spark.sql.extensions com.huawei.boostkit.spark.ColumnarPlugin
        spark.omni.sql.columnar.RewriteSelfJoinInInPredicate true
        spark.sql.execution.filterMerge.enabled true
        spark.omni.sql.columnar.dedupLeftSemiJoin true
        spark.omni.sql.columnar.radixSort.enabled true
        spark.executorEnv.MALLOC_CONF tcache:false
        spark.sql.adaptive.coalescePartitions.minPartitionNum 200
        spark.sql.join.columnar.preferShuffledHashJoin true
        
      5. Press Esc, type :wq!, and press Enter to save the file and exit.
      6. Run the startup command.
        1
        /usr/local/spark/bin/spark-sql --archives hdfs://server1:9000/user/root/omni-operator.tar.gz#omni --deploy-mode client --driver-cores 8 --driver-memory 40g --master yarn --executor-cores 12 --executor-memory 5g --conf spark.memory.offHeap.enabled=true --conf spark.memory.offHeap.size=35g --num-executors 24 --conf spark.executor.extraJavaOptions='-XX:+UseG1GC' --conf spark.locality.wait=0 --conf spark.network.timeout=600 --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.adaptive.enabled=true --conf spark.sql.adaptive.skewedJoin.enabled=true --conf spark.sql.autoBroadcastJoinThreshold=100M --conf spark.sql.broadcastTimeout=600 --conf spark.sql.shuffle.partitions=600 --conf spark.sql.orc.impl=native --conf spark.task.cpus=1 --properties-file /usr/local/spark/conf/spark-defaults-omnioperator.conf --database tpcds_bin_partitioned_varchar_orc_2
        
      • hdfs://server1:9000/user/root/omni-operator.tar.gz#omni: Set hdfs://server1:9000 based on the actual value of fs.defaultFS in the core-site.xml file of Hadoop. You can replace /user/root/omni-operator.tar.gz with a custom directory and this directory is associated with the operations in 2. #omni indicates the directory where the omni-operator.tar.gz package is extracted. You can customize the directory.
      • The preceding startup command is used in Yarn mode. If the SparkExtension plugin is started in local mode, change --master yarn to --master local. Before starting the plugin, add export LD_PRELOAD=/opt/omni-operator/lib/libjemalloc.so.2 to the ~/.bashrc file on all nodes and update environment variables. Replace ${PWD}/omni in the startup command with /opt.

    Table 2 describes the SparkExtension startup parameters.

    Table 2 SparkExtension startup parameters

    Parameter

    Default Value

    Description

    spark.sql.extensions

    com.huawei.boostkit.spark.ColumnarPlugin

    Starts SparkExtension.

    spark.shuffle.manager

    sort

    Indicates whether to enable columnar shuffle. If you enable this function, configure the shuffleManager class of OmniShuffle and add the configuration item --conf spark.shuffle.manager="org.apache.spark.shuffle.sort.OmniColumnarShuffleManager". By default, open source Shuffle is used for sorting.

    spark.omni.sql.columnar.hashagg

    true

    Indicates whether to enable columnar HashAgg. true: yes; false: no.

    spark.omni.sql.columnar.project

    true

    Indicates whether to enable columnar Project. true: yes; false: no.

    spark.omni.sql.columnar.projfilter

    true

    Indicates whether to enable columnar ConditionProject (Project + Filter convergence operator). true: yes; false: no.

    spark.omni.sql.columnar.filter

    true

    Indicates whether to enable columnar Filter. true: yes; false: no.

    spark.omni.sql.columnar.sort

    true

    Indicates whether to enable columnar Sort. true: yes; false: no.

    spark.omni.sql.columnar.window

    true

    Indicates whether to enable columnar Window. true: yes; false: no.

    spark.omni.sql.columnar.broadcastJoin

    true

    Indicates whether to enable columnar BroadcastHash Join. true: yes; false: no.

    spark.omni.sql.columnar.nativefilescan

    true

    Indicates whether to enable columnar NativeFilescan, including ORC and Parquet file formats. true: yes; false: no.

    spark.omni.sql.columnar.sortMergeJoin

    true

    Indicates whether to enable columnar SortMerge Join. true: yes; false: no.

    spark.omni.sql.columnar.takeOrderedAndProject

    true

    Indicates whether to enable columnar TakeOrderedAndProject. true: yes; false: no.

    spark.omni.sql.columnar.shuffledHashJoin

    true

    Indicates whether to enable columnar ShuffledHash Join. true: yes; false: no.

    spark.shuffle.columnar.shuffleSpillBatchRowNum

    10000

    Specifies the number of rows in each batch output by shuffle. Adjust the parameter value based on the actual memory specifications. You can increase the value to reduce the number of batches for writing drive files and increase the write speed.

    spark.shuffle.columnar.shuffleSpillMemoryThreshold

    2147483648

    Specifies the upper limit of shuffle spill, in bytes. When the shuffle memory reaches the default upper limit, data is spilled. Adjust the parameter value based on the actual memory specifications. You can increase the value to reduce the number of shuffle spills to drives and drive I/O operations.

    spark.omni.sql.columnar.sortMergeJoin.fusion

    false

    Indicates whether to enable SortMerge Join convergence. true: yes; false: no.

    spark.shuffle.columnar.compressBlockSize

    65536

    Specifies the size of a compressed shuffle data block, in bytes. Adjust the parameter value based on the actual memory specifications. The default value is recommended.

    spark.sql.execution.columnar.maxRecordsPerBatch

    4096

    Specifies the size of the initialized buffer for columnar shuffle, in bytes. Adjust the parameter value based on the actual memory specifications. You can increase the value to reduce the number of shuffle reads/writes and improve performance.

    spark.shuffle.compress

    true

    Indicates whether to enable compression for the shuffle output. true: yes; false: no.

    spark.io.compression.codec

    lz4

    Specifies the compression format for the shuffle output. Possible values are uncompressed, zlib, snappy, lz4, and zstd.

    spark.omni.sql.columnar.sortSpill.rowThreshold

    214783647

    Specifies the threshold that triggers spilling for the Sort operator, in rows. When the number of data rows to be processed exceeds the specified value, data is spilled. Adjust the parameter value based on the actual memory specifications. You can increase the value to reduce the number of Sort operator spills to drives and drive I/O operations.

    spark.omni.sql.columnar.sortSpill.memFraction

    90

    Specifies the threshold that triggers spilling for the Sort operator. When the off-heap memory usage for data processing exceeds the specified value, data is spilled. This parameter is used together with the spark.memory.offHeap.size parameter, which means the total off-heap memory size. Adjust the parameter value based on the actual memory specifications. You can increase the value to reduce the number of Sort operator spills to drives and drive I/O operations.

    spark.omni.sql.columnar.broadcastJoin.shareHashtable

    true

    Indicates whether the builder constructs only one hash table and whether the hash table is shared by all lookup joins in Broadcast Join. true: yes; false: no.

    spark.omni.sql.columnar.sortSpill.dirDiskReserveSize

    10737418240

    Specifies the size of the available drive space reserved for data spilling of the Sort operator, in bytes. If the actual size is less than the specified value, an exception is thrown. Adjust the parameter value based on the actual drive capacity and service scenario. It is recommended that the value be less than or equal to the service data size. The upper limit of the value is the actual drive capacity.

    spark.omni.sql.columnar.sortSpill.enabled

    false

    Indicates whether to enable spilling for the Sort operator. true: yes; false: no.

    spark.omni.sql.columnar.JoinReorderEnhance

    true

    Indicates whether to enable the join reordering optimization policy. true: yes; false: no. The heuristic join reordering algorithm automatically optimizes join reordering based on the number of where filter criteria and the table size.

    spark.default.parallelism

    200

    Specifies the number of tasks concurrently executed by Spark.

    spark.sql.shuffle.partitions

    200

    Specifies the number of shuffle partitions when Spark performs aggregation or join operations.

    spark.sql.adaptive.enabled

    false

    Indicates whether to enable adaptive query optimization. The execution plan can be dynamically adjusted during query execution. true: yes; false: no.

    spark.executorEnv.MALLOC_CONF

    narenas:1

    Controls the memory allocation policy of each Executor process in Spark.

    spark.sql.autoBroadcastJoinThreshold

    10M

    Specifies the threshold for using Broadcast Join to join small tables during join operations.

    spark.sql.broadcastTimeout

    300

    Specifies the timeout duration of broadcasting small tables to other nodes.

    spark.omni.sql.columnar.fusion

    false

    Indicates whether to fuse multiple operators into one operator. true: yes; false: no.

    spark.locality.wait

    3

    Specifies the waiting duration for data localization.

    spark.sql.cbo.enabled

    false

    Indicates whether to enable CBO. true: yes; false: no.

    spark.sql.codegen.wholeStage

    true

    Indicates whether to enable whole stage code generation. true: yes; false: no.

    spark.sql.orc.impl

    native

    native indicates that an open source ORC library version is used, and hive indicates that the ORC library in Hive is used.

    spark.serializer

    -

    Specifies serialization with Kryo.

    spark.executor.extraJavaOptions

    -

    Specifies the path of the local Hadoop library that the Executor uses for acceleration.

    spark.driver.extraJavaOptions

    -

    Specifies the path to the local Hadoop library that the driver uses for acceleration.

    spark.network.timeout

    120

    Specifies the default timeout duration of all network interactions, in seconds.

    spark.omni.sql.columnar.RewriteSelfJoinInInPredicate

    false

    Indicates whether to convert Self Join in the in expression to HashAgg so as to delete unused columns to reduce the data volume. true: yes; false: no.

    spark.sql.execution.filterMerge.enabled

    false

    Indicates whether to combine expressions with similar structures in the same table so as to reduce the scan data volume. true: yes; false: no.

    spark.omni.sql.columnar.dedupLeftSemiJoin

    false

    Indicates whether to deduplicate the LeftSemi Join right table so as to reduce the join data volume. true: yes; false: no.

    spark.omni.sql.columnar.radixSort.enabled

    false

    Indicates whether to enable cardinality sorting optimization. When the number of rows to be sorted in a single task exceeds the threshold, cardinality sorting is invoked. The default value is 1000000. true: yes; false: no.

    spark.sql.join.columnar.preferShuffledHashJoin

    false

    Indicates whether to use ShuffledHashJoin whenever possible. true: yes; false: no.

    spark.sql.adaptive.skewedJoin.enabled

    false

    Indicates whether to enable adaptive skewed join optimization. During adaptive skewed join optimization, some special join algorithms are used to process skewed data if any, improving the join operation efficiency. true: yes; false: no.

    spark.sql.adaptive.coalescePartitions.minPartitionNum

    1

    Specifies the minimum number of shuffle partitions after merging. If this parameter is not set, the default degree of parallelism of the Spark cluster is used.

    spark.omni.sql.columnar.bloomfilterSubqueryReuse

    false

    Indicates whether to reuse BloomFilter subquery, that is, reuse the data table so as to reduce one scan operation when BloomFilter takes effect. true: yes; false: no.

    spark.omni.sql.columnar.adaptivePartialAggregation.enabled

    false

    Indicates whether to enable adaptive skipping of the HashAgg group aggregation operation in the partial stage. This optimization is performed during software running. The partial stage of group aggregation is skipped and data is directly output to the downstream operator if the sampling scenario is identified as a high cardinality scenario and if group aggregation is performed but the first/last aggregation does not exist. true: yes; false: no.

    spark.omni.sql.columnar.adaptivePartialAggregationMinRows

    500000

    Specifies the minimum number of rows sampled for adaptivePartialAggregation optimization. When this number of rows has been sampled, the tool calculates the aggregation of the sampled data.

    spark.omni.sql.columnar.adaptivePartialAggregationRatio

    0.8

    Specifies the minimum aggregation threshold for adaptivePartialAggregation optimization. If the aggregation of sampled data has reached the threshold, this type of optimization is applied.

    spark.omni.sql.columnar.pushOrderedLimitThroughAggEnable.enabled

    false

    Indicates whether to enable pushOrderedLimitThroughAgg optimization. If the execution plan contains the Sort+Limit operator and the sorting field is a subset of the grouping field for the group aggregation operation, the TopNSort operator is pushed down to the partial stage of the group aggregation operation. This reduces the data processing volume of the downstream operator. true: yes; false: no.

    This type of optimization and the adaptivePartialAggregation optimization do not take effect at the same time.

    spark.omni.sql.columnar.combineJoinedAggregates.enabled

    false

    Indicates whether to enable combineJoinedAggregates optimization. This type of optimization reduces repeated table read operations by merging subqueries that are based on the same data. true: yes; false: no.

    spark.omni.sql.columnar.wholeStage.fallback.threshold

    -1

    When AQE is enabled, if the number of operators rolled back in a stage is greater than or equal to the threshold, all operators (except OmniColumnarToRow and OmniAQEShuffleReadExec) of the stage are rolled back to open source operators. The value –1 indicates that this function is disabled.

    spark.omni.sql.columnar.query.fallback.threshold

    -1

    When AQE is disabled, if the number of operators rolled back in the execution plan is greater than or equal to the threshold, all operators of the stage are rolled back to open source operators. The value –1 indicates that this function is disabled.

    spark.omni.sql.columnar.unixTimeFunc.enabled

    true

    Indicates whether to enable the from_unixtime and unix_timestamp expressions. true: yes; false: no.

    spark.sql.orc.filterPushdown

    true

    Indicates whether to enable predicate pushdown for data query in ORC format.

    spark.omni.sql.columnar.windowGroupLimit

    true

    Indicates whether to enable columnar WindowGroupLimit operator. true: yes; false: no.

    spark.omni.sql.columnar.catalog.cache.size

    128

    Specifies the cache space size for the catalog metadata. If the value is less than or equal to 0, caching is disabled.

    spark.omni.sql.columnar.catalog.cache.expire.time

    600

    Specifies the cache expiration time of the cached catalog metadata. The default value is 600 seconds.

    spark.omni.sql.columnar.vec.predicate.enabled

    false

    Indicates whether to enable the vectorized predicate pushdown function. true: yes; false: no.

    spark.omni.sql.columnar.numaBinding

    false

    Indicates whether to enable NUMA binding. This parameter is available for the NUMA architecture. true: yes; false: no. To enable NUMA binding, set --conf spark.plugins=com.huawei.boostkit.spark.OmniSparkPlugin and also spark.omni.sql.columnar.coreRange.

    spark.omni.sql.columnar.coreRange

    -

    Set this parameter when enabling NUMA binding. It indicates the core ID range for each NUMA node. Separate different NUMA nodes using vertical bars (|). For example, if the machine has 96 cores for 4 NUMA nodes, set this parameter to 0-23|24-47|48-71|72-95.

  2. Check whether SparkExtension has taken effect.

    Run the following SQL statement in the SparkExtension CLI and open source Spark SQL CLI:

    You are advised to launch two command-line interfaces simultaneously and start SparkExtension in one window and the open source Spark-SQL in the other, for easy comparison.

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    set spark.sql.adaptive.enabled=false;
    explain select i_item_id
        ,i_item_desc
        ,i_current_price
    from item, inventory, date_dim, store_sales
    where i_current_price between 76 and 76+30
    and inv_item_sk = i_item_sk
    and d_date_sk=inv_date_sk
    and d_date between cast('1998-06-29' as date) and cast('1998-08-29' as date)
    and i_manufact_id in (512,409,677,16)
    and inv_quantity_on_hand between 100 and 500
    and ss_item_sk = i_item_sk
    group by i_item_id,i_item_desc,i_current_price
    order by i_item_id
    limit 100;
    

    The following figure shows the execution plan output in the SparkExtension CLI. If the operator name starts with Omni, SparkExtension has taken effect.

    Execution plan outputted by open source Spark SQL:

  3. Run the following SQL statement.

    Run the following SQL statement in the SparkExtension CLI and open source Spark SQL CLI:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    set spark.sql.adaptive.enabled=false;
    select i_item_id
        ,i_item_desc
        ,i_current_price
    from item, inventory, date_dim, store_sales
    where i_current_price between 76 and 76+30
    and inv_item_sk = i_item_sk
    and d_date_sk=inv_date_sk
    and d_date between cast('1998-06-29' as date) and cast('1998-08-29' as date)
    and i_manufact_id in (512,409,677,16)
    and inv_quantity_on_hand between 100 and 500
    and ss_item_sk = i_item_sk
    group by i_item_id,i_item_desc,i_current_price
    order by i_item_id
    limit 100;
    
  4. Compare the query results of the TPC-DS test dataset Q82 executed by open source Spark SQL and SparkExtension, and check the performance differences before and after SparkExtension is enabled.
    • Open source Spark SQL execution result

      The execution plan is as follows:

    • Execution result after SparkExtension is enabled

      The execution plan is as follows:

    Execution result comparison: The query results of the two tests are the same. After SparkExtension is enabled, the time required for executing SQL statements is reduced. SparkExtension improves the Q82 query execution efficiency without affecting the query result.