Rate This Document
Findability
Accuracy
Completeness
Readability

Executing Spark Services

Spark uses the interactive CLI to execute SQL tasks. To check whether SparkExtension takes effect on Spark, add the EXPLAIN statement before the SQL statement or check it on the Spark UI. If the operator name starts with "Omni", SparkExtension has taken effect.

In this example, the tpcds_bin_partitioned_orc_3 data table is used as the test table and Q82 of the TPC-DS test dataset is used as the test SQL statement.

Table 1 lists the related table information.

Table 1 Table information

Table Name

Format

Total Number of Rows

item

orc

36000

inventory

orc

28188000

date_dim

orc

73049

store_sales

orc

8639377

  1. Start the Spark SQL CLI.

    Run the following command to start the native Spark SQL:

    1
    /usr/local/spark/bin/spark-sql --deploy-mode client --driver-cores 8 --driver-memory 20g --master yarn --executor-cores 8 --executor-memory 26g --num-executors 36 --conf spark.executor.extraJavaOptions='-XX:+UseG1GC -XX:+UseNUMA' --conf spark.locality.wait=0 --conf spark.network.timeout=600 --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.adaptive.enabled=true --conf spark.sql.autoBroadcastJoinThreshold=100M --conf spark.sql.broadcastTimeout=600 --conf spark.sql.shuffle.partitions=1000 --conf spark.sql.orc.impl=native --conf spark.task.cpus=1 --database tpcds_bin_partitioned_orc_3 
    
    Run the following command to start the SparkExtension plugin:
    1
    /usr/local/spark/bin/spark-sql --deploy-mode client --driver-cores 8 --driver-memory 40g --conf spark.driverEnv.LD_LIBRARY_PATH=/opt/omni-operator/lib:/usr/local/lib/HMPP --conf spark.driverEnv.LD_PRELOAD=/opt/omni-operator/lib/libjemalloc.so.2 --conf spark.driverEnv.OMNI_HOME=/opt/omni-operator --conf spark.driver.extraClassPath=/opt/omni-operator/lib/boostkit-omniop-spark-3.1.1-1.3.0-aarch64.jar:/opt/omni-operator/lib/boostkit-omniop-bindings-1.3.0-aarch64.jar:/opt/omni-operator/lib/dependencies/* --conf spark.driver.extraLibraryPath=/opt/omni-operator/lib --driver-java-options -Djava.library.path=/opt/omni-operator/lib --master yarn --executor-cores 12 --executor-memory 5g --conf spark.memory.offHeap.enabled=true --conf spark.memory.offHeap.size=35g --num-executors 24 --conf spark.executorEnv.LD_LIBRARY_PATH=/opt/omni-operator/lib:/usr/local/lib/HMPP --conf spark.executorEnv.LD_PRELOAD=/opt/omni-operator/lib/libjemalloc.so.2 --conf spark.executorEnv.MALLOC_CONF=narenas:2 --conf spark.executorEnv.OMNI_HOME=/opt/omni-operator --conf spark.executor.extraClassPath=/opt/omni-operator/lib/boostkit-omniop-spark-3.1.1-1.3.0-aarch64.jar:/opt/omni-operator/lib/boostkit-omniop-bindings-1.3.0-aarch64.jar:/opt/omni-operator/lib/dependencies/* --conf spark.executor.extraJavaOptions='-XX:+UseG1GC -XX:+UseNUMA' --conf spark.executor.extraLibraryPath=/opt/omni-operator/lib --jars /opt/omni-operator/lib/boostkit-omniop-spark-3.1.1-1.3.0-aarch64.jar --jars /opt/omni-operator/lib/boostkit-omniop-bindings-1.3.0-aarch64.jar --conf spark.omni.sql.columnar.fusion=false --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.shuffle.manager=org.apache.spark.shuffle.sort.OmniColumnarShuffleManager --conf spark.sql.adaptive.enabled=true --conf spark.sql.autoBroadcastJoinThreshold=100M --conf spark.sql.broadcastTimeout=500 --conf spark.network.timeout=600 --conf spark.sql.codegen.wholeStage=false --conf spark.sql.extensions=com.huawei.boostkit.spark.ColumnarPlugin --conf spark.sql.orc.impl=native --conf spark.sql.shuffle.partitions=600 --conf spark.task.cpus=1 --database tpcds_bin_partitioned_orc_3
    

    The preceding startup command is used in Yarn mode. If the SparkExtension plugin is started in local mode, change --master yarn to --master local. Before starting the plugin, add export LD_PRELOAD=/opt/omni-operator/lib/libjemalloc.so.2 to the ~/.bashrc file on all nodes and update environment variables. For details, see 6.

    Table 2 describes the SparkExtension startup parameters.

    Table 2 SparkExtension startup parameters

    Parameter

    Default Value

    Description

    spark.sql.extensions

    com.huawei.boostkit.spark.ColumnarPlugin

    Starts SparkExtension.

    spark.shuffle.manager

    sort

    Indicates whether to enable columnar shuffle. If you enable this function, configure the shuffleManager class of OmniShuffle and add the configuration item --conf spark.shuffle.manager="org.apache.spark.shuffle.sort.OmniColumnarShuffleManager". By default, native shuffle is used for sorting.

    spark.omni.sql.columnar.hashagg

    true

    Indicates whether to enable columnar HashAgg. true: yes; false: no.

    spark.omni.sql.columnar.project

    true

    Indicates whether to enable columnar Project. true: yes; false: no.

    spark.omni.sql.columnar.projfilter

    true

    Indicates whether to enable columnar ConditionProject (Project + Filter convergence operator). true: yes; false: no.

    spark.omni.sql.columnar.filter

    true

    Indicates whether to enable columnar Filter. true: yes; false: no.

    spark.omni.sql.columnar.sort

    true

    Indicates whether to enable columnar Sort. true: yes; false: no.

    spark.omni.sql.columnar.window

    true

    Indicates whether to enable columnar Window. true: yes; false: no.

    spark.omni.sql.columnar.broadcastJoin

    true

    Indicates whether to enable columnar BroadcastHashJoin. true: yes; false: no.

    spark.omni.sql.columnar.nativefilescan

    true

    Indicates whether to enable columnar NativeFilescan, including ORC and Parquet file formats. true: yes; false: no.

    spark.omni.sql.columnar.sortMergeJoin

    true

    Indicates whether to enable columnar SortMergeJoin. true: yes; false: no.

    spark.omni.sql.columnar.takeOrderedAndProject

    true

    Indicates whether to enable columnar TakeOrderedAndProject. true: yes; false: no.

    spark.omni.sql.columnar.shuffledHashJoin

    true

    Indicates whether to enable columnar ShuffledHashJoin. true: yes; false: no.

    spark.shuffle.columnar.shuffleSpillBatchRowNum

    10000

    Indicates the number of rows in each batch output by shuffle. Adjust the parameter value based on the actual memory specifications. You can increase the value to reduce the number of batches for writing drive files and increase the write speed.

    spark.shuffle.columnar.shuffleSpillMemoryThreshold

    2147483648

    Indicates the upper limit of shuffle spill, in bytes. When the shuffle memory reaches the default upper limit, data is spilled. Adjust the parameter value based on the actual memory specifications. You can increase the value to reduce the number of shuffle spills to drives and drive I/O operations.

    spark.omni.sql.columnar.sortMergeJoin.fusion

    false

    Indicates whether to enable SortMergeJoin convergence. true: yes; false: no.

    spark.shuffle.columnar.compressBlockSize

    65536

    Indicates the size of a compressed shuffle data block, in bytes. Adjust the parameter value based on the actual memory specifications. The default value is recommended.

    spark.sql.execution.columnar.maxRecordsPerBatch

    4096

    Indicates the size of the initialized buffer for columnar shuffle, in bytes. Adjust the parameter value based on the actual memory specifications. You can increase the value to reduce the number of shuffle reads/writes and improve performance.

    spark.shuffle.compress

    true

    Indicates whether to enable compression for the shuffle output. true: yes; false: no.

    spark.io.compression.codec

    lz4

    Indicates the compression format for the shuffle output. Possible values are uncompressed, zlib, snappy, lz4, and zstd.

    spark.omni.sql.columnar.sortSpill.rowThreshold

    214783647

    Indicates the threshold that triggers spilling for the SORT operator, in rows. When the number of data rows to be processed exceeds the specified value, data is spilled. Adjust the parameter value based on the actual memory specifications. You can increase the value to reduce the number of SORT operator spills to drives and drive I/O operations.

    spark.omni.sql.columnar.sortSpill.memFraction

    90

    Indicates the threshold that triggers spilling for the SORT operator. When the off-heap memory usage for data processing exceeds the specified value, data is spilled. This parameter is used together with the spark.memory.offHeap.size parameter, which means the total off-heap memory size. Adjust the parameter value based on the actual memory specifications. You can increase the value to reduce the number of SORT operator spills to drives and drive I/O operations.

    spark.omni.sql.columnar.broadcastJoin.shareHashtable

    true

    Indicates whether the builder constructs only one hash table and whether the hash table is shared by all lookup joins in Broadcast Join. true: yes; false: no.

    spark.omni.sql.columnar.sortSpill.dirDiskReserveSize

    10737418240

    Indicates the size of the available drive space reserved for data spilling of the SORT operator, in bytes. If the actual size is less than the specified value, an exception is thrown. Adjust the parameter value based on the actual drive capacity and service scenario. It is recommended that the value be less than or equal to the service data size. The upper limit of the value is the actual drive capacity.

    spark.omni.sql.columnar.sortSpill.enabled

    false

    Indicates whether to enable spilling for the SORT operator. true: yes; false: no.

    spark.omni.sql.columnar.heuristicJoinReorder

    true

    Indicates whether to enable the join reordering optimization policy. true: yes; false: no. The heuristic join reordering algorithm automatically optimizes join reordering based on the number of where filter criteria and the table size.

    spark.default.parallelism=600

    200

    Indicates the number of tasks concurrently executed by Spark.

    spark.sql.shuffle.partitions

    200

    Indicates the number of shuffle partitions when Spark performs aggregation or join operations.

    spark.sql.adaptive.enabled

    false

    Indicates whether to enable adaptive query optimization. The execution plan can be dynamically adjusted during query execution. true: yes; false: no.

    spark.executorEnv.MALLOC_CONF

    narenas:1

    Controls the memory allocation policy of each Executor process in Spark.

    spark.sql.autoBroadcastJoinThreshold

    10M

    Specifies the threshold for using broadcast join to join small tables during Join operations.

    spark.sql.broadcastTimeout

    300

    Specifies the timeout duration of broadcasting small tables to other nodes.

    spark.omni.sql.columnar.fusion

    false

    Indicates whether to fuse multiple operators into one operator.

    spark.locality.wait

    3

    Indicates the waiting duration for data localization.

    spark.sql.cbo.enabled

    false

    Specifies whether to enable CBO.

    spark.sql.codegen.wholeStage

    true

    Indicates whether to enable whole stage code generation.

    spark.sql.orc.impl

    native

    native indicates that the ORC library of the native version is used, and hive indicates that the ORC library in Hive is used.

    spark.serializer

    -

    Indicates serialization with Kryo.

    spark.executor.extraJavaOptions

    -

    Specifies the path of the local Hadoop library that the executor uses for acceleration.

    spark.driver.extraJavaOptions

    -

    Specifies the path of the local Hadoop library that the driver uses for acceleration.

    spark.network.timeout

    120

    Specifies the default timeout duration of all network interactions, in seconds.

  2. Check whether SparkExtension takes effect.

    Run the following SQL statement in the SparkExtension CLI and native Spark SQL CLI:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    set spark.sql.adaptive.enabled=false;
    explain select  i_item_id
        ,i_item_desc
        ,i_current_price
    from item, inventory, date_dim, store_sales
    where i_current_price between 76 and 76+30
    and inv_item_sk = i_item_sk
    and d_date_sk=inv_date_sk
    and d_date between cast('1998-06-29' as date) and cast('1998-08-29' as date)
    and i_manufact_id in (512,409,677,16)
    and inv_quantity_on_hand between 100 and 500
    and ss_item_sk = i_item_sk
    group by i_item_id,i_item_desc,i_current_price
    order by i_item_id
    limit 100;
    

    The following figure shows the execution plan output in the SparkExtension CLI. If the operator name starts with "Omni", SparkExtension has taken effect.

    The following figure shows the execution plan output in the native Spark SQL CLI.

  3. Run the following SQL statement.

    Run the following SQL statement in the SparkExtension CLI and native Spark SQL CLI:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    select  i_item_id
        ,i_item_desc
        ,i_current_price
    from item, inventory, date_dim, store_sales
    where i_current_price between 76 and 76+30
    and inv_item_sk = i_item_sk
    and d_date_sk=inv_date_sk
    and d_date between cast('1998-06-29' as date) and cast('1998-08-29' as date)
    and i_manufact_id in (512,409,677,16)
    and inv_quantity_on_hand between 100 and 500
    and ss_item_sk = i_item_sk
    group by i_item_id,i_item_desc,i_current_price
    order by i_item_id
    limit 100;
    
  4. Compare the results.
    • Native Spark SQL CLI execution result

      The execution plan is as follows:

    • SparkExtension plugin execution result

      The execution plan is as follows:

    • Compare the results.

      The output in the native Spark SQL CLI and SparkExtension CLI is the same.