OmniShuffle Shuffle加速叠加OmniOperator算子加速特性在Spark引擎上的应用

通过OmniOperator算子加速特性提升算子执行效率,同时使用OmniShuffle Shuffle加速特性优化数据交互过程,实现端到端提升引擎查询性能。

前提条件

使用叠加特性之前请参见安装OmniOperator算子加速特性安装OmniShuffle特性完成OmniOperator和OmniShuffle安装。

安装OmniShuffle Shuffle加速叠加OmniOperator算子加速特性

  1. 表3下载得到BoostKit-omnishuffle-spark-3.1.1-1.3.0-aarch64.zip,并上传至服务器的“/home/”目录下。

    “/home”目录用户可自行定义。

  2. 解压BoostKit-omnishuffle-spark-3.1.1-1.3.0-aarch64.zip,获取ock-omniop-shuffle-manager-23.0.0-for-spark-3.1.jar和ock-omniop-tuning-23.0.0-for-spark-3.1.jar,将两个JAR包拷贝到“${OCK_HOME}/jars”目录下,修改权限为550,并分发到所有子节点同样的目录下。

    unzip BoostKit-omnishuffle-spark-3.1.1-1.3.0-aarch64.zip
    mv ock-omniop-shuffle-manager-23.0.0-for-spark-3.1.jar $OCK_HOME/jars
    mv ock-omniop-tuning-23.0.0-for-spark-3.1.jar $OCK_HOME/jars
    chmod 550 $OCK_HOME/jars/ock-omniop-shuffle-manager-23.0.0-for-spark-3.1.jar $OCK_HOME/jars/ock-omniop-tuning-23.0.0-for-spark-3.1.jar
    scp ock-omniop-shuffle-manager-23.0.0-for-spark-3.1.jar ock-omniop-tuning-23.0.0-for-spark-3.1.jar xx.xx.xx.xx:$OCK_HOME/jars

    xx.xx.xx.xx为各个子节点的IP地址

执行Spark引擎

  1. “/home”目录下新建shuffle_operator.conf文件,关于shuffle_operator.conf文件中的参数说明请参考spark.conf

    1. 新建文件。
      1
      vi /home/shuffle_operator.conf
      
    2. “i”进入编辑模式,在文件中添加如下内容。
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      spark.master yarn 
      spark.task.cpus 1 
      spark.shuffle.compress true 
      spark.shuffle.spill.compress true 
      spark.rdd.compress true 
      spark.executor.extraClassPath     /usr/local/hive/lib/mysql-connector-java-8.0.13.jar:usr/local/hive/lib:/home/ockadmin/opt/ock/jars/*:/opt/omni-operator/lib/boostkit-omniop-spark-3.1.1-1.3.0-aarch64.jar:/opt/omni-operator/lib/boostkit-omniop-bindings-1.3.0-aarch64.jar:/opt/omni-operator/lib/dependencies/*:. 
      spark.driver.extraClassPath       /usr/local/hive/lib/mysql-connector-java-8.0.13.jar:usr/local/hive/lib:/home/ockadmin/opt/ock/jars/*:/opt/omni-operator/lib/boostkit-omniop-spark-3.1.1-1.3.0-aarch64.jar:/opt/omni-operator/lib/boostkit-omniop-bindings-1.3.0-aarch64.jar:/opt/omni-operator/lib/dependencies/*:. 
      spark.driver.extraJavaOptions -Djava.library.path=/home/ockadmin/opt/ock/ucache/23.0.0/linux-aarch64/lib/common:/home/ockadmin/opt/ock/ucache/23.0.0/linux-aarch64/lib/common/ucx:/home/ockadmin/opt/ock/ucache/23.0.0/linux-aarch64/lib/common/ucx/ucx:/home/ockadmin/opt/ock/ucache/23.0.0/linux-aarch64/lib/datakit:/home/ockadmin/opt/ock/ucache/23.0.0/linux-aarch64/lib/mf:/home/ockadmin/opt/ock/jars:/opt/omni-operator/lib
      spark.executor.extraJavaOptions -Djava.library.path=/home/ockadmin/opt/ock/ucache/23.0.0/linux-aarch64/lib/common:/home/ockadmin/opt/ock/ucache/23.0.0/linux-aarch64/lib/common/ucx:/home/ockadmin/opt/ock/ucache/23.0.0/linux-aarch64/lib/common/ucx/ucx:/home/ockadmin/opt/ock/ucache/23.0.0/linux-aarch64/lib/datakit:/home/ockadmin/opt/ock/ucache/23.0.0/linux-aarch64/lib/mf:/home/ockadmin/opt/ock/jars:/opt/omni-operator/lib 
      spark.driver.extraLibraryPath    /home/:/home/sasl/lib/:/home/ockadmin/opt/ock/security/kmc_tool/lib/:/home/ockadmin/opt/ock/ucache/23.0.0/linux-aarch64/lib/common:/home/ockadmin/opt/ock/ucache/23.0.0/linux-aarch64/lib/common/ucx:/home/ockadmin/opt/ock/ucache/23.0.0/linux-aarch64/lib/common/ucx/ucx:/home/ockadmin/opt/ock/ucache/23.0.0/linux-aarch64/lib/datakit:/home/ockadmin/opt/ock/ucache/23.0.0/linux-aarch64/lib/mf:/home/ockadmin/opt/ock/jars:/opt/omni-operator/lib:. 
      spark.executor.extraLibraryPath  /home/:/home/sasl/lib/:/home/ockadmin/opt/ock/security/kmc_tool/lib/:/home/ockadmin/opt/ock/ucache/23.0.0/linux-aarch64/lib/common:/home/ockadmin/opt/ock/ucache/23.0.0/linux-aarch64/lib/common/ucx:/home/ockadmin/opt/ock/ucache/23.0.0/linux-aarch64/lib/common/ucx/ucx:/home/ockadmin/opt/ock/ucache/23.0.0/linux-aarch64/lib/datakit:/home/ockadmin/opt/ock/ucache/23.0.0/linux-aarch64/lib/mf:/home/ockadmin/opt/ock/jars:/opt/omni-operator/lib:. 
      spark.shuffle.manager              org.apache.spark.shuffle.ock.OckColumnarShuffleManager 
      spark.shuffle.ock.manager true 
      spark.broadcast.ock.manager false 
      spark.blacklist.enabled true 
      spark.files.fetchFailure.unRegisterOutputOnHost true 
      spark.shuffle.service.enabled  false 
      spark.sql.adaptive.enabled true
      spark.blacklist.application.fetchFailure.enabled true 
      spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2 
      spark.driver.maxResultSize 2g 
      spark.serializer                        org.apache.spark.serializer.KryoSerializer 
      spark.shuffle.ock.home /home/ockadmin/opt/ock
      spark.shuffle.ock.version 23.0.0
      spark.shuffle.ock.binaryType linux-aarch64
      spark.sql.broadcastTimeout 3000 
      #omni 
      spark.executorEnv.LD_LIBRARY_PATH /opt/omni-operator/lib:/usr/local/lib/HMPP 
      spark.executorEnv.OMNI_HOME   /opt/omni-operator 
      spark.driverEnv.LD_LIBRARY_PATH /opt/omni-operator/lib:/usr/local/lib/HMPP 
      spark.driverEnv.OMNI_HOME /opt/omni-operator 
      spark.driverEnv.LD_PRELOAD   /opt/omni-operator/lib/libjemalloc.so.2 
      spark.executorEnv.LD_PRELOAD /opt/omni-operator/lib/libjemalloc.so.2 
      spark.sql.orc.impl native 
      spark.omni.sql.columnar.fusion false 
      spark.locality.wait 8 
      spark.sql.autoBroadcastJoinThreshold 100M 
      spark.executorEnv.MALLOC_CONF narenas:2 
      spark.sql.extensions=com.huawei.boostkit.spark.ColumnarPlugin,org.apache.spark.sql.execution.adaptive.ock.OmniOpBoostTuningExtension 
      spark.sql.ock.autoConfig.enabled true 
      spark.sql.ock.autoConfig.history true 
      spark.sql.ock.autoConfig.globalRuntimePartition false 
      spark.sql.ock.autoConfig.sample false 
      spark.kryoserializer.buffer.max 512m
      
    3. “Esc”键,输入:wq!,按“Enter”保存并退出编辑。

  2. 启动Spark-SQL命令行窗口。

    原生Spark-SQL启动命令如下。

    1
    /usr/local/spark/bin/spark-sql --deploy-mode client --driver-cores 8 --driver-memory 40g --num-executors 30 --executor-cores 6 --executor-memory 35g --master yarn --conf spark.task.cpus=1 --conf spark.default.parallelism=600 --conf spark.sql.broadcastTimeout=500 --conf spark.sql.shuffle.partitions=600 --conf spark.sql.adaptive.enabled=true --database tpcds_bin_partitioned_orc_3
    
    SparkExtension插件启动命令如下。
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    spark-sql --deploy-mode client --driver-cores 8 \
                                   --driver-memory 40G \
                                   --num-executors 24 \
                                   --executor-cores 12 \
                                   --executor-memory 5g \
                                   --master yarn \
                                   --conf spark.memory.offHeap.enabled=true \
                                   --conf spark.memory.offHeap.size=26g \
                                   --conf spark.sql.codegen.wholeStage=false \
                                   --jars /home/ockadmin/opt/ock/jars/* \
                                   --jars /opt/omni-operator/lib/boostkit-omniop-spark-3.1.1-1.3.0-aarch64.jar \
                                   --jars /opt/omni-operator/lib/boostkit-omniop-bindings-1.3.0-aarch64.jar \
                                   --properties-file /home/shuffle_operator.conf \
                                   --database tpcds_bin_partitioned_orc_3
    

  3. 查看OmniShuffle Shuffle加速叠加OmniOperator算子加速是否生效。

    在SparkExtension和原生Spark-SQL交互式命令行窗口分别运行以下SQL语句。

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    set spark.sql.adaptive.enabled=false;
    explain select  i_item_id
        ,i_item_desc
        ,i_current_price
    from item, inventory, date_dim, store_sales
    where i_current_price between 76 and 76+30
    and inv_item_sk = i_item_sk
    and d_date_sk=inv_date_sk
    and d_date between cast('1998-06-29' as date) and cast('1998-08-29' as date)
    and i_manufact_id in (512,409,677,16)
    and inv_quantity_on_hand between 100 and 500
    and ss_item_sk = i_item_sk
    group by i_item_id,i_item_desc,i_current_price
    order by i_item_id
    limit 100;
    

    输出执行计划如下图,如果算子中Shuffle算子变成BoostTuningOmniColumnarShuffleExchange则生效。