Rate This Document
Findability
Accuracy
Completeness
Readability

Combining OmniData with OmniOperator on Spark

Installing the OmniData and OmniOperator Combination Feature

Before installing the combination feature, install OmniData and OmniOperator on Spark.

  1. Create an /opt/omni-operator-omnidata directory on the management node (server1) and copy all files in /opt/omni-operator to the new directory.
    mkdir -p /opt/omni-operator-omnidata
    cp -r /opt/omni-operator/* /opt/omni-operator-omnidata/
  2. Copy the OmniData plugin package to the /opt/omni-operator-omnidata/lib directory.
    cp -r /opt/boostkit/* /opt/omni-operator-omnidata/lib
  3. Configure the omni.conf file.
    1. Open the file.
      vim /opt/omni-operator-omnidata/conf/omni.conf
    2. Press i to enter the insert mode and add the following content to the file or modify the parameters as follows:
      enableHMPP=true
      enableBatchExprEvaluate=false
      hiveUdfPropertyFilePath=/opt/omni-operator-omnidata/hive-udf/udf.properties   
      hiveUdfDir=/opt/omni-operator-omnidata/hive-udf/udf  
      RoundingRule=DOWN
      CheckReScaleRule=CHECK_RESCALE
      EmptySearchStrReplaceRule=NOT_REPLACE
      CastDecimalToDoubleRule=CONVERT_WITH_STRING
      NegativeStartIndexOutOfBoundsRule=INTERCEPT_FROM_BEYOND
      SupportContainerVecRule=NOT_SUPPORT
      StringToDateFormatRule=ALLOW_REDUCED_PRECISION
    3. Press Esc, type :wq!, and press Enter to save the file and exit.
  4. Copy the /opt/omni-operator-omnidata directory to the compute nodes (agent1, agent2, and agent3) and replace hostname with the host name of the corresponding node.
    scp -r /opt/omni-operator-omnidata hostname:/opt/
  5. Add parameters of the combination feature to the Spark configuration file ($SPARK_HOME/conf/spark-defaults.conf).
    1. Open the Spark configuration file.
      1
      vim /usr/local/spark/conf/spark-defaults.conf
      
    2. Press i to go to the insert mode and add the following parameter setting to spark-defaults.conf:
      1
      spark.sql.ndp.operator.combine.enable true
      
    3. Press Esc, type :wq!, and press Enter to save the file and exit.

Executing Spark Services

Use the spark-sql command for operator pushdown on Spark.

In this example, a non-partition table with 1 TB data of tpch is used as the test table. The test SQL statement is tpch-sql6.

Table 1 lists the related table information.

Table 1 Table information

Table Name

Format

Total

Occupied Space

lineitem

orc

5999989709

169.6 GB

Logs of the INFO level are not printed on Spark 3.1.1 (on Yarn). Therefore, log redirection is required for Spark 3.1.1.

  1. Define the log file log4j.properties.
    log4j.rootCategory=INFO, FILE
    log4j.appender.console=org.apache.log4j.ConsoleAppender
    log4j.appender.console.target=System.err
    log4j.appender.console.layout=org.apache.log4j.PatternLayout
    log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
     
    log4j.logger.org.apache.spark.sql.execution=DEBUG
    log4j.logger.org.apache.spark.repl.Main=INFO
     
    log4j.appender.FILE=org.apache.log4j.FileAppender
    log4j.appender.FILE.file=/usr/local/spark/logs/file.log
    log4j.appender.FILE.layout=org.apache.log4j.PatternLayout
     
    log4j.appender.FILE.layout.ConversionPattern=%m%n
  2. Change the value of log4j.appender.FILE.file in the log4j.properties file to the custom directory and file name.
  3. Start the Spark-SQL CLI.

    Start the combination feature.

    /usr/local/spark/bin/spark-sql \
    --driver-cores 5 \
    --driver-memory 5g \
    --num-executors 18 \
    --executor-cores 21 \
    --executor-memory 10g \
    --master yarn \
    --conf spark.executor.memoryOverhead=5g \
    --conf spark.memory.offHeap.enabled=true \
    --conf spark.memory.offHeap.size=45g \
    --conf spark.task.cpus=1 \
    --conf spark.sql.codegen.wholeStage=true \
    --conf spark.sql.orc.impl=native \
    --conf spark.sql.extensions=com.huawei.boostkit.spark.ColumnarPlugin \
    --conf spark.shuffle.manager=org.apache.spark.shuffle.sort.OmniColumnarShuffleManager \
    --conf spark.sql.join.columnar.preferShuffledHashJoin=true \
    --conf spark.executor.extraClassPath=/opt/omni-operator-omnidata/lib/* \
    --conf spark.driver.extraClassPath=/opt/omni-operator-omnidata/lib/* \
    --conf spark.executorEnv.LD_LIBRARY_PATH=/opt/omni-operator-omnidata/lib/:/usr/local/lib/HMPP:/home/omm/omnidata-install/haf-host/lib \
    --conf spark.driverEnv.LD_LIBRARY_PATH=/opt/omni-operator-omnidata/lib/:/usr/local/lib/HMPP:/home/omm/omnidata-install/haf-host/lib \
    --conf spark.executor.extraLibraryPath=/opt/omni-operator-omnidata/lib:/home/omm/omnidata-install/haf-host/lib \
    --conf spark.driverEnv.extraLibraryPath=/opt/omni-operator-omnidata/lib:/home/omm/omnidata-install/haf-host/lib \
    --conf spark.executorEnv.LD_PRELOAD=/opt/omni-operator-omnidata/lib/libjemalloc.so.2 \
    --conf spark.driverEnv.LD_PRELOAD=/opt/omni-operator-omnidata/lib/libjemalloc.so.2 \
    --driver-java-options -Djava.library.path=/opt/omni-operator-omnidata/lib \
    --jars /opt/omni-operator-omnidata/lib/boostkit-omniop-spark-3.1.1-1.3.0-aarch64.jar \
    --jars /opt/omni-operator-omnidata/lib/boostkit-omniop-bindings-1.3.0-aarch64.jar  \
    --conf spark.executorEnv.OMNI_CONNECTED_ENGINE=Spark \
    --conf spark.executorEnv.HAF_CONFIG_PATH=/home/omm/omnidata-install/haf-host/etc/ \
    --conf spark.omni.sql.columnar.fusion=false \
    --conf spark.omni.sql.columnar.sortSpill.enabled=true \
    --conf spark.omni.sql.columnar.sortSpill.rowThreshold=400000 \
    --conf spark.omni.sql.columnar.sortSpill.dirDiskReserveSize=214748364800 \
    --driver-java-options -Dlog4j.configuration=file:/usr/local/spark/conf/log4j.properties

    Table 2 describes the SparkExtension startup parameters.

    Table 2 SparkExtension startup parameters

    Parameter

    Default Value

    Description

    spark.sql.extensions

    com.huawei.boostkit.spark.ColumnarPlugin

    Starts SparkExtension.

    spark.shuffle.manager

    sort

    Indicates whether to enable columnar shuffle. If yes, add the configuration --conf spark.shuffle.manager=org.apache.spark.shuffle.sort.OmniColumnarShuffleManager. To enable OmniShuffle, configure the ShuffleManager class of OmniShuffle. By default, native shuffle is used for sorting.

    spark.omni.sql.columnar.hashagg

    true

    Indicates whether to enable columnar HashAgg. true: yes; false: no.

    spark.omni.sql.columnar.project

    true

    Indicates whether to enable columnar Project. true: yes; false: no.

    spark.omni.sql.columnar.projfilter

    true

    Indicates whether to enable columnar ConditionProject (Project + Filter convergence operator). true: yes; false: no.

    spark.omni.sql.columnar.filter

    true

    Indicates whether to enable columnar Filter. true: yes; false: no.

    spark.omni.sql.columnar.sort

    true

    Indicates whether to enable columnar Sort. true: yes; false: no.

    spark.omni.sql.columnar.window

    true

    Indicates whether to enable columnar Window. true: yes; false: no.

    spark.omni.sql.columnar.broadcastJoin

    true

    Indicates whether to enable columnar BroadcastHashJoin. true: yes; false: no.

    spark.omni.sql.columnar.nativefilescan

    true

    Indicates whether to enable columnar NativeFilescan. true: yes; false: no.

    spark.omni.sql.columnar.orcNativefilescan

    true

    Indicates whether to enable ORC columnar NativeFilescan. true: yes; false: no.

    spark.omni.sql.columnar.sortMergeJoin

    true

    Indicates whether to enable columnar SortMergeJoin. true: yes; false: no.

    spark.omni.sql.columnar.takeOrderedAndProject

    true

    Indicates whether to enable columnar TakeOrderedAndProject. true: yes; false: no.

    spark.omni.sql.columnar.shuffledHashJoin

    true

    Indicates whether to enable columnar ShuffledHashJoin. true: yes; false: no.

    spark.shuffle.columnar.shuffleSpillBatchRowNum

    10000

    Number of rows in each batch output by shuffle. Adjust the parameter value based on the actual memory specifications. You can increase the value to reduce the number of batches for writing drive files and increase the write speed.

    spark.shuffle.columnar.shuffleSpillMemoryThreshold

    2147483648

    Indicates the upper limit of shuffle spill, in bytes. When the shuffle memory reaches the default upper limit, data is spilled. Adjust the parameter value based on the actual memory specifications. You can increase the value to reduce the number of shuffle spills to drives and drive I/O operations.

    spark.shuffle.columnar.compressBlockSize

    65536

    Size of a compressed shuffled data block, in bytes. Adjust the parameter value based on the actual memory specifications. The default value is recommended.

    spark.sql.execution.columnar.maxRecordsPerBatch

    4096

    Size of the initialized buffer for columnar shuffle, in bytes Adjust the parameter value based on the actual memory specifications. You can increase the value to reduce the number of shuffle reads/writes and improve performance.

    spark.sql.join.columnar.preferShuffledHashJoin

    false

    Indicates whether to preferentially use ShuffledHashJoin in a JOIN operation. true: yes; false: no.

    spark.shuffle.compress

    true

    Indicates whether to enable compression for the shuffle output. true: yes; false: no.

    spark.io.compression.codec

    lz4

    Compression format for the shuffle output. Possible values are uncompressed, zlib, snappy, lz4, and zstd.

    spark.omni.sql.columnar.sortSpill.rowThreshold

    214783647

    Indicates the threshold that triggers spilling for the SORT operator, in rows. When the number of data rows to be processed exceeds the specified value, data is spilled. Adjust the parameter value based on the actual memory specifications. You can increase the value to reduce the number of SORT operator spills to drives and drive I/O operations.

    spark.omni.sql.columnar.sortSpill.dirDiskReserveSize

    10737418240

    Size of the available drive space reserved for data spilling of the SORT operator, in bytes. If the actual size is less than the specified value, an exception is thrown. Adjust the parameter value based on the actual drive capacity and service scenario. It is recommended that the value be less than or equal to the service data size. The upper limit of the value is the actual drive capacity.

    spark.omni.sql.columnar.sortSpill.enabled

    false

    Indicates whether to enable spilling for the SORT operator. true: yes; false: no.

  4. Run the sql6 statement.
    select
    sum(l_extendedprice * l_discount) as revenue
    from
    tpch_flat_orc_1000.lineitem
    where
    l_shipdate >= '1993-01-01'
    and l_shipdate < '1994-01-01'
    and l_discount between 0.06 - 0.01 and 0.06 + 0.01
    and l_quantity < 25;

    After the execution is complete, check the log file configured in log4j.properties. If the log contains pushdown information, OmniData has taken effect.

    Selectivity: 0.014160436451808448
    Push down with [PushDownInfo(ListBuffer(FilterExeInfo((((((((isnotnull(l_shipdate#11) AND isnotnull(l_discount#7)) AND isnotnull(l_quantity#5)) AND (l_shipdate#11 >= 8401)) AND (l_shipdate#11 < 8766)) AND (l_discount#7 >= 0.05)) AND (l_discount#7 <= 0.07)) AND (l_quantity#5 < 25.0)),List(l_quantity#5, l_extendedprice#6, l_discount#7, l_shipdate#11))),ListBuffer(AggExeInfo(List(sum((l_extendedprice#6 * l_discount#7))),List(),List(sum#36))),None,Map(server1 -> server1, agent2 -> agent2, agent1 -> agent1))]

    Run the EXPLAIN statement. If the displayed execution plan contains OmniOperator operators, OmniOperator has taken effect.

    explain select
    sum(l_extendedprice * l_discount) as revenue
    from
    tpch_flat_orc_1000.lineitem
    where
    l_shipdate >= '1993-01-01'
    and l_shipdate < '1994-01-01'
    and l_discount between 0.06 - 0.01 and 0.06 + 0.01
    and l_quantity < 25;

    If both features have taken effect, the OmniData and OmniOperator combination feature has taken effect.