Executing Spark Engine Services
Spark uses the interactive CLI to execute SQL tasks. To check whether SparkExtension takes effect on Spark, add the EXPLAIN statement before the SQL statement or check it on the Spark UI. If the operator name starts with "Omni", SparkExtension has taken effect.
In this example, the tpcds_bin_partitioned_orc_3 data table is used as the test table and Q82 of the TPC-DS test dataset is used as the test SQL statement.
Table 1 lists the related table information.
|
Table Name |
Format |
Total Number of Rows |
|---|---|---|
|
item |
orc |
36000 |
|
inventory |
orc |
28188000 |
|
date_dim |
orc |
73049 |
|
store_sales |
orc |
8639377 |
- Start the Spark-SQL CLI.
Run the following command to start the native Spark-SQL:
/usr/local/spark/bin/spark-sql --deploy-mode client --driver-cores 5 --driver-memory 5g --num-executors 18 --executor-cores 21 --executor-memory 10g --master yarn --conf spark.executor.memoryOverhead=5g --conf spark.memory.offHeap.enabled=true --conf spark.memory.offHeap.size=45g --conf spark.task.cpus=1 --database tpcds_bin_partitioned_orc_3
Run the following command to start the SparkExtension plugin:/usr/local/spark/bin/spark-sql --deploy-mode client --driver-cores 5 --driver-memory 5g --num-executors 3 --executor-cores 21 --executor-memory 5g --master yarn --conf spark.executor.memoryOverhead=5g --conf spark.memory.offHeap.enabled=true --conf spark.memory.offHeap.size=5g --conf spark.task.cpus=1 --conf spark.driver.extraClassPath=/opt/omni-operator/lib/boostkit-omniop-spark-3.1.1-1.1.0-aarch64.jar:/opt/omni-operator/lib/boostkit-omniop-bindings-1.1.0-aarch64.jar:/opt/omni-operator/lib/dependencies/* --conf spark.executor.extraClassPath=/opt/omni-operator/lib/boostkit-omniop-spark-3.1.1-1.1.0-aarch64.jar:/opt/omni-operator/lib/boostkit-omniop-bindings-1.1.0-aarch64.jar:/opt/omni-operator/lib/dependencies/* --driver-java-options '-Djava.library.path=/opt/omni-operator/lib' --conf spark.sql.codegen.wholeStage=false --conf spark.executorEnv.LD_LIBRARY_PATH="/opt/omni-operator/lib/:/usr/local/lib/HMPP" --conf spark.executorEnv.OMNI_HOME="/opt/omni-operator" --conf spark.driverEnv.LD_LIBRARY_PATH="/opt/omni-operator/lib/:/usr/local/lib/HMPP" --conf spark.driverEnv.OMNI_HOME="/opt/omni-operator" --conf spark.executor.extraLibraryPath=/opt/omni-operator/lib --conf spark.driverEnv.LD_PRELOAD=/opt/omni-operator/lib/libjemalloc.so.2 --conf spark.executorEnv.LD_PRELOAD=/opt/omni-operator/lib/libjemalloc.so.2 --conf spark.sql.extensions="com.huawei.boostkit.spark.ColumnarPlugin" --jars /opt/omni-operator/lib/boostkit-omniop-spark-3.1.1-1.1.0-aarch64.jar --jars /opt/omni-operator/lib/boostkit-omniop-bindings-1.1.0-aarch64.jar --conf spark.sql.orc.impl=native --conf spark.shuffle.manager="org.apache.spark.shuffle.sort.OmniColumnarShuffleManager" --conf spark.sql.join.columnar.preferShuffledHashJoin=true --conf spark.executorEnv.OMNI_CONNECTED_ENGINE=Spark --database tpcds_bin_partitioned_orc_3
Table 2 describes the SparkExtension startup parameters.
Table 2 SparkExtension startup parameters Parameter
Default Value
Description
spark.sql.extensions
com.huawei.boostkit.spark.ColumnarPlugin
Starts SparkExtension.
spark.shuffle.manager
sort
Indicates whether to enable columnar shuffle. If yes, add the configuration --conf spark.shuffle.manager="org.apache.spark.shuffle.sort.OmniColumnarShuffleManager". To enable ock, configure the shuffleManager class of ock. By default, native shuffle is used for sorting.
spark.omni.sql.columnar.hashagg
true
Indicates whether to enable columnar HashAgg. true: yes; false: no.
spark.omni.sql.columnar.project
true
Indicates whether to enable columnar Project. true: yes; false: no.
spark.omni.sql.columnar.projfilter
true
Indicates whether to enable columnar ConditionProject (Project + Filter convergence operator). true: yes; false: no.
spark.omni.sql.columnar.filter
true
Indicates whether to enable columnar Filter. true: yes; false: no.
spark.omni.sql.columnar.sort
true
Indicates whether to enable columnar Sort. true: yes; false: no.
spark.omni.sql.columnar.window
true
Indicates whether to enable columnar Window. true: yes; false: no.
spark.omni.sql.columnar.broadcastJoin
true
Indicates whether to enable columnar BroadcastHashJoin. true: yes; false: no.
spark.omni.sql.columnar.nativefilescan
true
Indicates whether to enable columnar NativeFilescan. true: yes; false: no.
spark.omni.sql.columnar.sortMergeJoin
true
Indicates whether to enable columnar SortMergeJoin. true: yes; false: no.
spark.omni.sql.columnar.takeOrderedAndProject
true
Indicates whether to enable columnar TakeOrderedAndProject. true: yes; false: no.
spark.omni.sql.columnar.shuffledHashJoin
true
Indicates whether to enable columnar ShuffledHashJoin. true: yes; false: no.
spark.shuffle.columnar.shuffleSpillBatchRowNum
10000
Number of rows in each batch output by shuffle Adjust the parameter value based on the actual memory specifications. You can increase the value to reduce the number of batches for writing drive files and increase the write speed.
spark.shuffle.columnar.shuffleSpillMemoryThreshold
2147483648
Upper limit of shuffle spill, in bytes. When the shuffle memory reaches the default upper limit, data is spilled. Adjust the parameter value based on the actual memory specifications. You can increase the value to reduce the number of shuffle spills to drives and drive I/O operations.
spark.shuffle.columnar.compressBlockSize
65536
Size of a compressed shuffled data block, in bytes Adjust the parameter value based on the actual memory specifications. The default value is recommended.
spark.sql.execution.columnar.maxRecordsPerBatch
4096
Size of the initialized buffer for columnar shuffle, in bytes Adjust the parameter value based on the actual memory specifications. You can increase the value to reduce the number of shuffle reads/writes and improve performance.
spark.sql.join.columnar.preferShuffledHashJoin
false
Indicates whether to preferentially use ShuffledHashJoin in a JOIN operation. true: yes; false: no.
spark.shuffle.compress
true
Indicates whether to enable compression for the shuffle output. true: yes; false: no.
spark.io.compression.codec
lz4
Compression format for the shuffle output. Possible values are uncompressed, zlib, snappy, lz4, and zstd.
spark.omni.sql.columnar.sortSpill.rowThreshold
200000
Threshold that triggers spilling for the SORT operator, in bytes. When the number of data rows to be processed exceeds the specified value, data is spilled. Adjust the parameter value based on the actual memory specifications. You can increase the value to reduce the number of SORT operator spills to drives and drive I/O operations.
spark.omni.sql.columnar.sortSpill.dirDiskReserveSize
10737418240
Size of the available drive space reserved for data spilling of the SORT operator, in bytes. If the actual size is less than the specified value, an exception is thrown. Adjust the parameter value based on the actual drive capacity and service scenario. It is recommended that the value be less than or equal to the service data size. The upper limit of the value is the actual drive capacity.
spark.omni.sql.columnar.sortSpill.enabled
false
Indicates whether to enable spilling for the SORT operator. true: yes; false: no.
- Check whether SparkExtension takes effect.
Run the following SQL statements in the SparkExtension CLI and native Spark-SQL CLI:
explain select i_item_id ,i_item_desc ,i_current_price from item, inventory, date_dim, store_sales where i_current_price between 76 and 76+30 and inv_item_sk = i_item_sk and d_date_sk=inv_date_sk and d_date between cast('1998-06-29' as date) and cast('1998-08-29' as date) and i_manufact_id in (512,409,677,16) and inv_quantity_on_hand between 100 and 500 and ss_item_sk = i_item_sk group by i_item_id,i_item_desc,i_current_price order by i_item_id limit 100;The following figure shows the output execution plan of SparkExtension. If the operator name starts with "Omni", SparkExtension has taken effect.

The following figure shows the native Spark-SQL output execution plan.

- Run the SQL statements.
Run the following SQL statements in the SparkExtension CLI and native Spark-SQL CLI:
select i_item_id ,i_item_desc ,i_current_price from item, inventory, date_dim, store_sales where i_current_price between 76 and 76+30 and inv_item_sk = i_item_sk and d_date_sk=inv_date_sk and d_date between cast('1998-06-29' as date) and cast('1998-08-29' as date) and i_manufact_id in (512,409,677,16) and inv_quantity_on_hand between 100 and 500 and ss_item_sk = i_item_sk group by i_item_id,i_item_desc,i_current_price order by i_item_id limit 100; - Compare the results.



