Combining OmniData with OmniOperator on Spark
Installing the OmniData and OmniOperator Combination Feature
Before installing the combination feature, install OmniData and OmniOperator on Spark.
- Create an /opt/omni-operator-omnidata directory on the management node (server1) and copy all files in /opt/omni-operator to the new directory.
mkdir -p /opt/omni-operator-omnidata cp -r /opt/omni-operator/* /opt/omni-operator-omnidata/
- Copy the OmniData plugin package to the /opt/omni-operator-omnidata/lib directory.
cp -r /opt/boostkit/* /opt/omni-operator-omnidata/lib
- Configure the omni.conf file.
- Open the file.
vim /opt/omni-operator-omnidata/conf/omni.conf
- Press i to enter the insert mode and add the following content to the file or modify the parameters as follows:
enableHMPP=true enableBatchExprEvaluate=false hiveUdfPropertyFilePath=/opt/omni-operator-omnidata/hive-udf/udf.properties hiveUdfDir=/opt/omni-operator-omnidata/hive-udf/udf RoundingRule=DOWN CheckReScaleRule=CHECK_RESCALE EmptySearchStrReplaceRule=NOT_REPLACE CastDecimalToDoubleRule=CONVERT_WITH_STRING NegativeStartIndexOutOfBoundsRule=INTERCEPT_FROM_BEYOND SupportContainerVecRule=NOT_SUPPORT StringToDateFormatRule=ALLOW_REDUCED_PRECISION
- Press Esc, type :wq!, and press Enter to save the file and exit.
- Open the file.
- Copy the /opt/omni-operator-omnidata directory to the compute nodes (agent1, agent2, and agent3) and replace hostname with the host name of the corresponding node.
scp -r /opt/omni-operator-omnidata hostname:/opt/
- Add parameters of the combination feature to the Spark configuration file ($SPARK_HOME/conf/spark-defaults.conf).
- Open the Spark configuration file.
1vim /usr/local/spark/conf/spark-defaults.conf - Press i to go to the insert mode and add the following parameter setting to spark-defaults.conf:
1spark.sql.ndp.operator.combine.enable true
- Press Esc, type :wq!, and press Enter to save the file and exit.
- Open the Spark configuration file.
Executing Spark Services
Use the spark-sql command for operator pushdown on Spark.
In this example, a non-partition table with 1 TB data of tpch is used as the test table. The test SQL statement is tpch-sql6.
Table 1 lists the related table information.
Logs of the INFO level are not printed on Spark 3.1.1 (on Yarn). Therefore, log redirection is required for Spark 3.1.1.
- Define the log file log4j.properties.
log4j.rootCategory=INFO, FILE log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.err log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n log4j.logger.org.apache.spark.sql.execution=DEBUG log4j.logger.org.apache.spark.repl.Main=INFO log4j.appender.FILE=org.apache.log4j.FileAppender log4j.appender.FILE.file=/usr/local/spark/logs/file.log log4j.appender.FILE.layout=org.apache.log4j.PatternLayout log4j.appender.FILE.layout.ConversionPattern=%m%n - Change the value of log4j.appender.FILE.file in the log4j.properties file to the custom directory and file name.
- Start the Spark-SQL CLI.
Start the combination feature.
/usr/local/spark/bin/spark-sql \ --driver-cores 5 \ --driver-memory 5g \ --num-executors 18 \ --executor-cores 21 \ --executor-memory 10g \ --master yarn \ --conf spark.executor.memoryOverhead=5g \ --conf spark.memory.offHeap.enabled=true \ --conf spark.memory.offHeap.size=45g \ --conf spark.task.cpus=1 \ --conf spark.sql.codegen.wholeStage=true \ --conf spark.sql.orc.impl=native \ --conf spark.sql.extensions=com.huawei.boostkit.spark.ColumnarPlugin \ --conf spark.shuffle.manager=org.apache.spark.shuffle.sort.OmniColumnarShuffleManager \ --conf spark.sql.join.columnar.preferShuffledHashJoin=true \ --conf spark.executor.extraClassPath=/opt/omni-operator-omnidata/lib/* \ --conf spark.driver.extraClassPath=/opt/omni-operator-omnidata/lib/* \ --conf spark.executorEnv.LD_LIBRARY_PATH=/opt/omni-operator-omnidata/lib/:/usr/local/lib/HMPP:/home/omm/omnidata-install/haf-host/lib \ --conf spark.driverEnv.LD_LIBRARY_PATH=/opt/omni-operator-omnidata/lib/:/usr/local/lib/HMPP:/home/omm/omnidata-install/haf-host/lib \ --conf spark.executor.extraLibraryPath=/opt/omni-operator-omnidata/lib:/home/omm/omnidata-install/haf-host/lib \ --conf spark.driverEnv.extraLibraryPath=/opt/omni-operator-omnidata/lib:/home/omm/omnidata-install/haf-host/lib \ --conf spark.executorEnv.LD_PRELOAD=/opt/omni-operator-omnidata/lib/libjemalloc.so.2 \ --conf spark.driverEnv.LD_PRELOAD=/opt/omni-operator-omnidata/lib/libjemalloc.so.2 \ --driver-java-options -Djava.library.path=/opt/omni-operator-omnidata/lib \ --jars /opt/omni-operator-omnidata/lib/boostkit-omniop-spark-3.1.1-1.3.0-aarch64.jar \ --jars /opt/omni-operator-omnidata/lib/boostkit-omniop-bindings-1.3.0-aarch64.jar \ --conf spark.executorEnv.OMNI_CONNECTED_ENGINE=Spark \ --conf spark.executorEnv.HAF_CONFIG_PATH=/home/omm/omnidata-install/haf-host/etc/ \ --conf spark.omni.sql.columnar.fusion=false \ --conf spark.omni.sql.columnar.sortSpill.enabled=true \ --conf spark.omni.sql.columnar.sortSpill.rowThreshold=400000 \ --conf spark.omni.sql.columnar.sortSpill.dirDiskReserveSize=214748364800 \ --driver-java-options -Dlog4j.configuration=file:/usr/local/spark/conf/log4j.properties
Table 2 describes the SparkExtension startup parameters.
Table 2 SparkExtension startup parameters Parameter
Default Value
Description
spark.sql.extensions
com.huawei.boostkit.spark.ColumnarPlugin
Starts SparkExtension.
spark.shuffle.manager
sort
Indicates whether to enable columnar shuffle. If yes, add the configuration --conf spark.shuffle.manager=org.apache.spark.shuffle.sort.OmniColumnarShuffleManager. To enable OmniShuffle, configure the ShuffleManager class of OmniShuffle. By default, native shuffle is used for sorting.
spark.omni.sql.columnar.hashagg
true
Indicates whether to enable columnar HashAgg. true: yes; false: no.
spark.omni.sql.columnar.project
true
Indicates whether to enable columnar Project. true: yes; false: no.
spark.omni.sql.columnar.projfilter
true
Indicates whether to enable columnar ConditionProject (Project + Filter convergence operator). true: yes; false: no.
spark.omni.sql.columnar.filter
true
Indicates whether to enable columnar Filter. true: yes; false: no.
spark.omni.sql.columnar.sort
true
Indicates whether to enable columnar Sort. true: yes; false: no.
spark.omni.sql.columnar.window
true
Indicates whether to enable columnar Window. true: yes; false: no.
spark.omni.sql.columnar.broadcastJoin
true
Indicates whether to enable columnar BroadcastHashJoin. true: yes; false: no.
spark.omni.sql.columnar.nativefilescan
true
Indicates whether to enable columnar NativeFilescan. true: yes; false: no.
spark.omni.sql.columnar.orcNativefilescan
true
Indicates whether to enable ORC columnar NativeFilescan. true: yes; false: no.
spark.omni.sql.columnar.sortMergeJoin
true
Indicates whether to enable columnar SortMergeJoin. true: yes; false: no.
spark.omni.sql.columnar.takeOrderedAndProject
true
Indicates whether to enable columnar TakeOrderedAndProject. true: yes; false: no.
spark.omni.sql.columnar.shuffledHashJoin
true
Indicates whether to enable columnar ShuffledHashJoin. true: yes; false: no.
spark.shuffle.columnar.shuffleSpillBatchRowNum
10000
Number of rows in each batch output by shuffle. Adjust the parameter value based on the actual memory specifications. You can increase the value to reduce the number of batches for writing drive files and increase the write speed.
spark.shuffle.columnar.shuffleSpillMemoryThreshold
2147483648
Indicates the upper limit of shuffle spill, in bytes. When the shuffle memory reaches the default upper limit, data is spilled. Adjust the parameter value based on the actual memory specifications. You can increase the value to reduce the number of shuffle spills to drives and drive I/O operations.
spark.shuffle.columnar.compressBlockSize
65536
Size of a compressed shuffled data block, in bytes. Adjust the parameter value based on the actual memory specifications. The default value is recommended.
spark.sql.execution.columnar.maxRecordsPerBatch
4096
Size of the initialized buffer for columnar shuffle, in bytes Adjust the parameter value based on the actual memory specifications. You can increase the value to reduce the number of shuffle reads/writes and improve performance.
spark.sql.join.columnar.preferShuffledHashJoin
false
Indicates whether to preferentially use ShuffledHashJoin in a JOIN operation. true: yes; false: no.
spark.shuffle.compress
true
Indicates whether to enable compression for the shuffle output. true: yes; false: no.
spark.io.compression.codec
lz4
Compression format for the shuffle output. Possible values are uncompressed, zlib, snappy, lz4, and zstd.
spark.omni.sql.columnar.sortSpill.rowThreshold
214783647
Indicates the threshold that triggers spilling for the SORT operator, in rows. When the number of data rows to be processed exceeds the specified value, data is spilled. Adjust the parameter value based on the actual memory specifications. You can increase the value to reduce the number of SORT operator spills to drives and drive I/O operations.
spark.omni.sql.columnar.sortSpill.dirDiskReserveSize
10737418240
Size of the available drive space reserved for data spilling of the SORT operator, in bytes. If the actual size is less than the specified value, an exception is thrown. Adjust the parameter value based on the actual drive capacity and service scenario. It is recommended that the value be less than or equal to the service data size. The upper limit of the value is the actual drive capacity.
spark.omni.sql.columnar.sortSpill.enabled
false
Indicates whether to enable spilling for the SORT operator. true: yes; false: no.
- Run the sql6 statement.
select sum(l_extendedprice * l_discount) as revenue from tpch_flat_orc_1000.lineitem where l_shipdate >= '1993-01-01' and l_shipdate < '1994-01-01' and l_discount between 0.06 - 0.01 and 0.06 + 0.01 and l_quantity < 25;
After the execution is complete, check the log file configured in log4j.properties. If the log contains pushdown information, OmniData has taken effect.
Selectivity: 0.014160436451808448 Push down with [PushDownInfo(ListBuffer(FilterExeInfo((((((((isnotnull(l_shipdate#11) AND isnotnull(l_discount#7)) AND isnotnull(l_quantity#5)) AND (l_shipdate#11 >= 8401)) AND (l_shipdate#11 < 8766)) AND (l_discount#7 >= 0.05)) AND (l_discount#7 <= 0.07)) AND (l_quantity#5 < 25.0)),List(l_quantity#5, l_extendedprice#6, l_discount#7, l_shipdate#11))),ListBuffer(AggExeInfo(List(sum((l_extendedprice#6 * l_discount#7))),List(),List(sum#36))),None,Map(server1 -> server1, agent2 -> agent2, agent1 -> agent1))]

Run the EXPLAIN statement. If the displayed execution plan contains OmniOperator operators, OmniOperator has taken effect.
explain select sum(l_extendedprice * l_discount) as revenue from tpch_flat_orc_1000.lineitem where l_shipdate >= '1993-01-01' and l_shipdate < '1994-01-01' and l_discount between 0.06 - 0.01 and 0.06 + 0.01 and l_quantity < 25;

If both features have taken effect, the OmniData and OmniOperator combination feature has taken effect.