我要评分
获取效率
正确性
完整性
易理解

Executing Spark Engine Services

Spark uses the interactive CLI to execute SQL tasks. To check whether SparkExtension takes effect on Spark, add the EXPLAIN statement before the SQL statement or check on the Spark UI. If the operator name starts with "Omni", SparkExtension has taken effect.

In this example, the tpcds_bin_partitioned_orc_3 data table is used as the test table and Q82 of the TPC-DS test dataset is used as the test SQL statement.

Table 1 lists the related table information.

Table 1 Table information

Table Name

Format

Row Counts

item

orc

36000

inventory

orc

28188000

date_dim

orc

73049

store_sales

orc

8639377

  1. Start the Spark-SQL CLI.

    Run the following command to start the native Spark-SQL:

    /usr/local/spark/bin/spark-sql --deploy-mode client --driver-cores 5 --driver-memory 5g --num-executors 18 --executor-cores 21 --executor-memory 10g --master yarn  --conf spark.executor.memoryOverhead=5g --conf spark.memory.offHeap.enabled=true --conf spark.memory.offHeap.size=45g --conf spark.task.cpus=1 --database tpcds_bin_partitioned_orc_3
    Run the following command to start the SparkExtension plugin:
    /usr/local/spark/bin/spark-sql --deploy-mode client --driver-cores 5 --driver-memory 5g --num-executors 18 --executor-cores 21 --executor-memory 10g --master yarn  --conf spark.executor.memoryOverhead=5g --conf spark.memory.offHeap.enabled=true --conf spark.memory.offHeap.size=45g --conf spark.task.cpus=1 --conf spark.driver.extraClassPath=/opt/omni-operator/lib/boostkit-omniop-spark-3.1.1-1.0.0-aarch64.jar:/opt/omni-operator/lib/boostkit-omniop-bindings-1.0.0-aarch64.jar:/opt/omni-operator/lib/dependencies/* --conf spark.executor.extraClassPath=/opt/omni-operator/lib/boostkit-omniop-spark-3.1.1-1.0.0-aarch64.jar:/opt/omni-operator/lib/boostkit-omniop-bindings-1.0.0-aarch64.jar:/opt/omni-operator/lib/dependencies/* --driver-java-options '-Djava.library.path=/opt/omni-operator/lib' --conf spark.sql.codegen.wholeStage=false --conf  spark.executorEnv.LD_LIBRARY_PATH="/opt/omni-operator/lib/" --conf spark.driverEnv.LD_LIBRARY_PATH="/opt/omni-operator/lib/" --conf spark.executor.extraLibraryPath=/opt/omni-operator/lib --conf spark.driverEnv.LD_PRELOAD=/opt/omni-operator/lib/libjemalloc.so.2 --conf spark.executorEnv.LD_PRELOAD=/opt/omni-operator/lib/libjemalloc.so.2 --conf spark.sql.extensions="com.huawei.boostkit.spark.ColumnarPlugin" --jars /opt/omni-operator/lib/boostkit-omniop-spark-3.1.1-1.0.0-aarch64.jar --jars /opt/omni-operator/lib/boostkit-omniop-bindings-1.0.0-aarch64.jar  --conf spark.sql.orc.impl=native --conf spark.shuffle.manager="org.apache.spark.shuffle.sort.ColumnarShuffleManager" --conf spark.sql.join.columnar.preferShuffledHashJoin=true --database tpcds_bin_partitioned_orc_3

    Table 2 lists the parameters for starting SparkExtension.

    Table 2 SparkExtension startup parameters

    Parameter

    Default Value

    Description

    spark.sql.extensions

    com.huawei.boostkit.spark.ColumnarPlugin

    Starts SparkExtension.

    spark.shuffle.manager

    sort

    Indicates whether to enable columnar shuffle. If yes, add the configuration item --conf spark.shuffle.manager="org.apache.spark.shuffle.sort.ColumnarShuffleManager". By default, native shuffle is used for sorting.

    spark.omni.sql.columnar.hashagg

    true

    Indicates whether to enable columnar HashAgg. true: yes; false: no.

    spark.omni.sql.columnar.project

    true

    Indicates whether to enable columnar Project. true: yes; false: no.

    spark.omni.sql.columnar.projfilter

    true

    Indicates whether to enable columnar ConditionProject (Project + Filter convergence operator). true: yes; false: no.

    spark.omni.sql.columnar.filter

    true

    Indicates whether to enable columnar Filter. true: yes; false: no.

    spark.omni.sql.columnar.sort

    true

    Indicates whether to enable columnar Sort. true: yes; false: no.

    spark.omni.sql.columnar.window

    true

    Indicates whether to enable columnar Window. true: yes; false: no.

    spark.omni.sql.columnar.broadcastJoin

    true

    Indicates whether to enable columnar BroadcastHashJoin. true: yes; false: no.

    spark.omni.sql.columnar.nativefilescan

    true

    Indicates whether to enable columnar NativeFilescan. true: yes; false: no.

    spark.omni.sql.columnar.sortMergeJoin

    true

    Indicates whether to enable columnar SortMergeJoin. true: yes; false: no.

    spark.omni.sql.columnar.takeOrderedAndProject

    true

    Indicates whether to enable columnar TakeOrderedAndProject. true: yes; false: no.

    spark.omni.sql.columnar.shuffledHashJoin

    true

    Indicates whether to enable columnar ShuffledHashJoin. true: yes; false: no.

    spark.shuffle.columnar.shuffleSpillBatchRowNum

    10000

    Number of rows in each batch output by shuffle

    spark.shuffle.columnar.shuffleSpillMemoryThreshold

    2147483648

    Upper limit of shuffle spill, in bytes. When the shuffle memory reaches the default upper limit, data is spilled.

    spark.shuffle.columnar.compressBlockSize

    65536

    Size of a compressed shuffled data block, in bytes

    spark.sql.execution.columnar.maxRecordsPerBatch

    4096

    Size of the initialized buffer for columnar shuffle, in bytes

    spark.sql.join.columnar.preferShuffledHashJoin

    false

    Indicates whether to preferentially use ShuffledHashJoin in a JOIN operation. true: yes; false: no.

    spark.omni.sql.columnar.jit

    false

    Indicates whether to enable columnar JIT. true: yes; false: no. To start JIT, you need to configure the OMNI_HOME environment variable. The --conf spark.executorEnv.OMNI_HOME="/opt/omni-operator" --conf spark.driverEnv.OMNI_HOME="/opt/omni-operator" configuration item needs to be added in the On Yarn mode.

    spark.shuffle.compress

    true

    Indicates whether to enable compression for the shuffle output. true: yes; false: no.

    spark.io.compression.codec

    lz4

    Compression format for the shuffle output. Possible values are uncompressed, zlib, snappy, lz4, and zstd.

    spark.omni.sql.columnar.sortSpill.rowThreshold

    200000

    Threshold that triggers spilling for the SORT operator, in bytes. When the number of data rows to be processed exceeds the specified value, data is spilled.

    spark.omni.sql.columnar.sortSpill.dirDiskReserveSize

    10737418240

    Size of the available drive space reserved for data spilling of the SORT operator, in bytes. If the actual size is less than the specified value, an exception is thrown.

    spark.omni.sql.columnar.sortSpill.enabled

    false

    Indicates whether to enable spilling for the SORT operator. true: yes; false: no.

  2. Check whether SparkExtension takes effect.

    Run the following SQL statements in the SparkExtension CLI and native Spark-SQL CLI:

    explain select  i_item_id
        ,i_item_desc
        ,i_current_price
    from item, inventory, date_dim, store_sales
    where i_current_price between 76 and 76+30
    and inv_item_sk = i_item_sk
    and d_date_sk=inv_date_sk
    and d_date between cast('1998-06-29' as date) and cast('1998-08-29' as date)
    and i_manufact_id in (512,409,677,16)
    and inv_quantity_on_hand between 100 and 500
    and ss_item_sk = i_item_sk
    group by i_item_id,i_item_desc,i_current_price
    order by i_item_id
    limit 100;

    The following figure shows the output execution plan of SparkExtension. If the operator name starts with "Omni", SparkExtension has taken effect.

    The following figure shows the native Spark-SQL output execution plan.

  3. Run the SQL statements.

    Run the following SQL statements in the SparkExtension CLI and native Spark-SQL CLI:

    select  i_item_id
        ,i_item_desc
        ,i_current_price
    from item, inventory, date_dim, store_sales
    where i_current_price between 76 and 76+30
    and inv_item_sk = i_item_sk
    and d_date_sk=inv_date_sk
    and d_date between cast('1998-06-29' as date) and cast('1998-08-29' as date)
    and i_manufact_id in (512,409,677,16)
    and inv_quantity_on_hand between 100 and 500
    and ss_item_sk = i_item_sk
    group by i_item_id,i_item_desc,i_current_price
    order by i_item_id
    limit 100;
  4. Compare the results.
    • Native Spark-SQL execution result

    The execution plan is as follows:

    • SparkExtension plugin execution result

    The execution plan is as follows:

    • Compare the results.

      The SQL output data of the native Spark-SQL and SparkExtension is the same.