我要评分
获取效率
正确性
完整性
易理解

Combining OmniShuffle with OmniOperator on Spark

Prerequisites

Before using a combination feature, install OmniOperator and OmniShuffle. For details, see Installing OmniOperator and Installing OmniShuffle.

Installing the OmniShuffle and OmniOperator Combination Feature

  1. Download BoostKit-omnishuffle-spark-3.1.1-1.3.0-aarch64.zip from Table 3 and upload it to the /home/ directory on the server.

    The /home directory can be customized.

  2. Decompress BoostKit-omnishuffle-spark-3.1.1-1.3.0-aarch64.zip to obtain the ock-omniop-shuffle-manager-23.0.0-for-spark-3.1.jar and ock-omniop-tuning-23.0.0-for-spark-3.1.jar files, copy the two JAR files to the ${OCK_HOME}/jars directory, change the file permission to 550, and distribute the files to the same directory on all subnodes.
    unzip BoostKit-omnishuffle-spark-3.1.1-1.3.0-aarch64.zip
    mv ock-omniop-shuffle-manager-23.0.0-for-spark-3.1.jar $OCK_HOME/jars
    mv ock-omniop-tuning-23.0.0-for-spark-3.1.jar $OCK_HOME/jars
    chmod 550 $OCK_HOME/jars/ock-omniop-shuffle-manager-23.0.0-for-spark-3.1.jar $OCK_HOME/jars/ock-omniop-tuning-23.0.0-for-spark-3.1.jar
    scp ock-omniop-shuffle-manager-23.0.0-for-spark-3.1.jar ock-omniop-tuning-23.0.0-for-spark-3.1.jar xx.xx.xx.xx:$OCK_HOME/jars

    xx.xx.xx.xx: IP address of each node.

Executing Spark

  1. Create a shuffle_operator.conf file in the /home directory. For details about the parameters in the file, see spark.conf.
    1. Create a file.
      vim /home/shuffle_operator.conf
    2. Press i to enter the insert mode and add the following content to the file:
      spark.master yarn 
      spark.task.cpus 1 
      spark.shuffle.compress true 
      spark.shuffle.spill.compress true 
      spark.rdd.compress true 
      spark.executor.extraClassPath     /usr/local/hive/lib/mysql-connector-java-8.0.13.jar:usr/local/hive/lib:/opt/ock/jars/*:/opt/omni-operator/lib/boostkit-omniop-spark-3.1.1-1.3.0-aarch64.jar:/opt/omni-operator/lib/boostkit-omniop-bindings-1.3.0-aarch64.jar:/opt/omni-operator/lib/dependencies/*:. 
      spark.driver.extraClassPath       /usr/local/hive/lib/mysql-connector-java-8.0.13.jar:usr/local/hive/lib:/opt/ock/jars/*:/opt/omni-operator/lib/boostkit-omniop-spark-3.1.1-1.3.0-aarch64.jar:/opt/omni-operator/lib/boostkit-omniop-bindings-1.3.0-aarch64.jar:/opt/omni-operator/lib/dependencies/*:. 
      spark.driver.extraJavaOptions -Djava.library.path=/opt/ock/ucache/23.0.0/linux-aarch64/lib/common:/opt/ock/ucache/23.0.0/linux-aarch64/lib/common/ucx:/opt/ock/ucache/23.0.0/linux-aarch64/lib/common/ucx/ucx:/opt/ock/ucache/23.0.0/linux-aarch64/lib/datakit:/opt/ock/ucache/23.0.0/linux-aarch64/lib/mf:/opt/ock/jars:/opt/omni-operator/lib
      spark.executor.extraJavaOptions -Djava.library.path=/opt/ock/ucache/23.0.0/linux-aarch64/lib/common:/opt/ock/ucache/23.0.0/linux-aarch64/lib/common/ucx:/opt/ock/ucache/23.0.0/linux-aarch64/lib/common/ucx/ucx:/opt/ock/ucache/23.0.0/linux-aarch64/lib/datakit:/opt/ock/ucache/23.0.0/linux-aarch64/lib/mf:/opt/ock/jars:/opt/omni-operator/lib 
      spark.driver.extraLibraryPath    /home/:/home/sasl/lib/:/opt/ock/security/kmc_tool/lib/:/opt/ock/ucache/23.0.0/linux-aarch64/lib/common:/opt/ock/ucache/23.0.0/linux-aarch64/lib/common/ucx:/opt/ock/ucache/23.0.0/linux-aarch64/lib/common/ucx/ucx:/opt/ock/ucache/23.0.0/linux-aarch64/lib/datakit:/opt/ock/ucache/23.0.0/linux-aarch64/lib/mf:/opt/ock/jars:/opt/omni-operator/lib:. 
      spark.executor.extraLibraryPath  /home/:/home/sasl/lib/:/opt/ock/security/kmc_tool/lib/:/opt/ock/ucache/23.0.0/linux-aarch64/lib/common:/opt/ock/ucache/23.0.0/linux-aarch64/lib/common/ucx:/opt/ock/ucache/23.0.0/linux-aarch64/lib/common/ucx/ucx:/opt/ock/ucache/23.0.0/linux-aarch64/lib/datakit:/opt/ock/ucache/23.0.0/linux-aarch64/lib/mf:/opt/ock/jars:/opt/omni-operator/lib:. 
      spark.shuffle.manager              org.apache.spark.shuffle.ock.OckColumnarShuffleManager 
      spark.shuffle.ock.manager true 
      spark.broadcast.ock.manager false 
      spark.blacklist.enabled true 
      spark.files.fetchFailure.unRegisterOutputOnHost true 
      spark.shuffle.service.enabled  false 
      spark.sql.adaptive.enabled true
      spark.blacklist.application.fetchFailure.enabled true 
      spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2 
      spark.driver.maxResultSize 2g 
      spark.serializer                        org.apache.spark.serializer.KryoSerializer 
      spark.executorEnv.OCK_HOME /opt/ock 
      spark.executorEnv.UCX_USE_MT_MUTEX y 
      spark.executorEnv.UCX_TLS  tcp 
      spark.sql.broadcastTimeout 3000 
      #omni 
      spark.executorEnv.LD_LIBRARY_PATH /opt/omni-operator/lib:/usr/local/lib/HMPP 
      spark.executorEnv.OMNI_HOME   /opt/omni-operator 
      spark.driverEnv.LD_LIBRARY_PATH /opt/omni-operator/lib:/usr/local/lib/HMPP 
      spark.driverEnv.OMNI_HOME /opt/omni-operator 
      spark.driverEnv.LD_PRELOAD   /opt/omni-operator/lib/libjemalloc.so.2 
      spark.executorEnv.LD_PRELOAD /opt/omni-operator/lib/libjemalloc.so.2 
      spark.sql.orc.impl native 
      spark.omni.sql.columnar.fusion false 
      spark.locality.wait 8 
      spark.sql.autoBroadcastJoinThreshold 100M 
      spark.executorEnv.MALLOC_CONF narenas:2 
      spark.sql.extensions=com.huawei.boostkit.spark.ColumnarPlugin,org.apache.spark.sql.execution.adaptive.ock.OmniOpBoostTuningExtension 
      spark.sql.ock.autoConfig.enabled true 
      spark.sql.ock.autoConfig.history true 
      spark.sql.ock.autoConfig.globalRuntimePartition false 
      spark.sql.ock.autoConfig.sample false 
      spark.kryoserializer.buffer.max 512m
    3. Press Esc, type :wq!, and press Enter to save the file and exit.
  2. Start the Spark-SQL CLI.

    Start the native Spark-SQL.

    /usr/local/spark/bin/spark-sql --deploy-mode client --driver-cores 8 --driver-memory 40g --num-executors 30 --executor-cores 6 --executor-memory 35g --master yarn --conf spark.task.cpus=1 --conf spark.default.parallelism=600 --conf spark.sql.broadcastTimeout=500 --conf spark.sql.shuffle.partitions=600 --conf spark.sql.adaptive.enabled=true --database tpcds_bin_partitioned_orc_3
    Start the SparkExtension plugin.
    spark-sql --deploy-mode client --driver-cores 8 \
                                   --driver-memory 40G \
                                   --num-executors 24 \
                                   --executor-cores 12 \
                                   --executor-memory 5g \
                                   --master yarn \
                                   --conf spark.memory.offHeap.enabled=true \
                                   --conf spark.memory.offHeap.size=26g \
                                   --conf spark.sql.codegen.wholeStage=false \
                                   --jars /opt/ock/jars/* \
                                   --jars /opt/omni-operator/lib/boostkit-omniop-spark-3.1.1-1.3.0-aarch64.jar \
                                   --jars /opt/omni-operator/lib/boostkit-omniop-bindings-1.3.0-aarch64.jar \
                                   --properties-file /home/shuffle_operator.conf \
                                   --database tpcds_bin_partitioned_orc_3
  3. Check whether the OmniShuffle and OmniOperator combination has taken effect.

    Run the following SQL statement in the SparkExtension CLI and native Spark-SQL CLI:

    set spark.sql.adaptive.enabled=false;
    explain select  i_item_id
        ,i_item_desc
        ,i_current_price
    from item, inventory, date_dim, store_sales
    where i_current_price between 76 and 76+30
    and inv_item_sk = i_item_sk
    and d_date_sk=inv_date_sk
    and d_date between cast('1998-06-29' as date) and cast('1998-08-29' as date)
    and i_manufact_id in (512,409,677,16)
    and inv_quantity_on_hand between 100 and 500
    and ss_item_sk = i_item_sk
    group by i_item_id,i_item_desc,i_current_price
    order by i_item_id
    limit 100;

    The following figure shows the execution plan. If the shuffle operator changes to BoostTuningOmniColumnarShuffleExchange, the combination has taken effect.