OmniShuffle Shuffle加速叠加OmniOperator算子加速特性在Spark引擎上的应用
前提条件
使用叠加特性之前请参见安装OmniOperator算子加速特性、安装OmniShuffle Shuffle加速特性完成OmniOperator和OmniShuffle安装。
安装OmniShuffle Shuffle加速叠加OmniOperator算子加速特性
- 从表3下载得到BoostKit-omnishuffle-spark-3.1.1-1.3.0-aarch64.zip,并上传至服务器的“/home/”目录下。
“/home”目录用户可自行定义。
- 解压BoostKit-omnishuffle-spark-3.1.1-1.3.0-aarch64.zip,获取ock-omniop-shuffle-manager-23.0.0-for-spark-3.1.jar和ock-omniop-tuning-23.0.0-for-spark-3.1.jar,将两个JAR包拷贝到“${OCK_HOME}/jars”目录下,修改权限为550,并分发到所有子节点同样的目录下。
unzip BoostKit-omnishuffle-spark-3.1.1-1.3.0-aarch64.zip mv ock-omniop-shuffle-manager-23.0.0-for-spark-3.1.jar $OCK_HOME/jars mv ock-omniop-tuning-23.0.0-for-spark-3.1.jar $OCK_HOME/jars chmod 550 $OCK_HOME/jars/ock-omniop-shuffle-manager-23.0.0-for-spark-3.1.jar $OCK_HOME/jars/ock-omniop-tuning-23.0.0-for-spark-3.1.jar scp ock-omniop-shuffle-manager-23.0.0-for-spark-3.1.jar ock-omniop-tuning-23.0.0-for-spark-3.1.jar xx.xx.xx.xx:$OCK_HOME/jars
xx.xx.xx.xx为各个子节点的IP地址。
执行Spark引擎
- 在/home目录下新建shuffle_operator.conf文件,关于shuffle_operator.conf文件中的参数说明请参考spark.conf。
- 新建文件。
vim /home/shuffle_operator.conf
- 按“i”进入编辑模式,在文件中添加如下内容。
spark.master yarn spark.task.cpus 1 spark.shuffle.compress true spark.shuffle.spill.compress true spark.rdd.compress true spark.executor.extraClassPath /usr/local/hive/lib/mysql-connector-java-8.0.13.jar:usr/local/hive/lib:/opt/ock/jars/*:/opt/omni-operator/lib/boostkit-omniop-spark-3.1.1-1.3.0-aarch64.jar:/opt/omni-operator/lib/boostkit-omniop-bindings-1.3.0-aarch64.jar:/opt/omni-operator/lib/dependencies/*:. spark.driver.extraClassPath /usr/local/hive/lib/mysql-connector-java-8.0.13.jar:usr/local/hive/lib:/opt/ock/jars/*:/opt/omni-operator/lib/boostkit-omniop-spark-3.1.1-1.3.0-aarch64.jar:/opt/omni-operator/lib/boostkit-omniop-bindings-1.3.0-aarch64.jar:/opt/omni-operator/lib/dependencies/*:. spark.driver.extraJavaOptions -Djava.library.path=/opt/ock/ucache/23.0.0/linux-aarch64/lib/common:/opt/ock/ucache/23.0.0/linux-aarch64/lib/common/ucx:/opt/ock/ucache/23.0.0/linux-aarch64/lib/common/ucx/ucx:/opt/ock/ucache/23.0.0/linux-aarch64/lib/datakit:/opt/ock/ucache/23.0.0/linux-aarch64/lib/mf:/opt/ock/jars:/opt/omni-operator/lib spark.executor.extraJavaOptions -Djava.library.path=/opt/ock/ucache/23.0.0/linux-aarch64/lib/common:/opt/ock/ucache/23.0.0/linux-aarch64/lib/common/ucx:/opt/ock/ucache/23.0.0/linux-aarch64/lib/common/ucx/ucx:/opt/ock/ucache/23.0.0/linux-aarch64/lib/datakit:/opt/ock/ucache/23.0.0/linux-aarch64/lib/mf:/opt/ock/jars:/opt/omni-operator/lib spark.driver.extraLibraryPath /home/:/home/sasl/lib/:/opt/ock/security/kmc_tool/lib/:/opt/ock/ucache/23.0.0/linux-aarch64/lib/common:/opt/ock/ucache/23.0.0/linux-aarch64/lib/common/ucx:/opt/ock/ucache/23.0.0/linux-aarch64/lib/common/ucx/ucx:/opt/ock/ucache/23.0.0/linux-aarch64/lib/datakit:/opt/ock/ucache/23.0.0/linux-aarch64/lib/mf:/opt/ock/jars:/opt/omni-operator/lib:. spark.executor.extraLibraryPath /home/:/home/sasl/lib/:/opt/ock/security/kmc_tool/lib/:/opt/ock/ucache/23.0.0/linux-aarch64/lib/common:/opt/ock/ucache/23.0.0/linux-aarch64/lib/common/ucx:/opt/ock/ucache/23.0.0/linux-aarch64/lib/common/ucx/ucx:/opt/ock/ucache/23.0.0/linux-aarch64/lib/datakit:/opt/ock/ucache/23.0.0/linux-aarch64/lib/mf:/opt/ock/jars:/opt/omni-operator/lib:. spark.shuffle.manager org.apache.spark.shuffle.ock.OckColumnarShuffleManager spark.shuffle.ock.manager true spark.broadcast.ock.manager false spark.blacklist.enabled true spark.files.fetchFailure.unRegisterOutputOnHost true spark.shuffle.service.enabled false spark.sql.adaptive.enabled true spark.blacklist.application.fetchFailure.enabled true spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2 spark.driver.maxResultSize 2g spark.serializer org.apache.spark.serializer.KryoSerializer spark.executorEnv.OCK_HOME /opt/ock spark.executorEnv.UCX_USE_MT_MUTEX y spark.executorEnv.UCX_TLS tcp spark.sql.broadcastTimeout 3000 #omni spark.executorEnv.LD_LIBRARY_PATH /opt/omni-operator/lib:/usr/local/lib/HMPP spark.executorEnv.OMNI_HOME /opt/omni-operator spark.driverEnv.LD_LIBRARY_PATH /opt/omni-operator/lib:/usr/local/lib/HMPP spark.driverEnv.OMNI_HOME /opt/omni-operator spark.driverEnv.LD_PRELOAD /opt/omni-operator/lib/libjemalloc.so.2 spark.executorEnv.LD_PRELOAD /opt/omni-operator/lib/libjemalloc.so.2 spark.sql.orc.impl native spark.omni.sql.columnar.fusion false spark.locality.wait 8 spark.sql.autoBroadcastJoinThreshold 100M spark.executorEnv.MALLOC_CONF narenas:2 spark.sql.extensions=com.huawei.boostkit.spark.ColumnarPlugin,org.apache.spark.sql.execution.adaptive.ock.OmniOpBoostTuningExtension spark.sql.ock.autoConfig.enabled true spark.sql.ock.autoConfig.history true spark.sql.ock.autoConfig.globalRuntimePartition false spark.sql.ock.autoConfig.sample false spark.kryoserializer.buffer.max 512m
- 按“Esc”键,输入:wq!,按“Enter”保存并退出编辑。
- 新建文件。
- 启动Spark-SQL命令行窗口。
原生Spark-SQL启动命令如下。
/usr/local/spark/bin/spark-sql --deploy-mode client --driver-cores 8 --driver-memory 40g --num-executors 30 --executor-cores 6 --executor-memory 35g --master yarn --conf spark.task.cpus=1 --conf spark.default.parallelism=600 --conf spark.sql.broadcastTimeout=500 --conf spark.sql.shuffle.partitions=600 --conf spark.sql.adaptive.enabled=true --database tpcds_bin_partitioned_orc_3
SparkExtension插件启动命令如下。spark-sql --deploy-mode client --driver-cores 8 \ --driver-memory 40G \ --num-executors 24 \ --executor-cores 12 \ --executor-memory 5g \ --master yarn \ --conf spark.memory.offHeap.enabled=true \ --conf spark.memory.offHeap.size=26g \ --conf spark.sql.codegen.wholeStage=false \ --jars /opt/ock/jars/* \ --jars /opt/omni-operator/lib/boostkit-omniop-spark-3.1.1-1.3.0-aarch64.jar \ --jars /opt/omni-operator/lib/boostkit-omniop-bindings-1.3.0-aarch64.jar \ --properties-file /home/shuffle_operator.conf \ --database tpcds_bin_partitioned_orc_3
- 查看OmniShuffle Shuffle加速叠加OmniOperator算子加速是否生效。
在SparkExtension和原生Spark-SQL交互式命令行窗口分别运行以下SQL语句。
set spark.sql.adaptive.enabled=false; explain select i_item_id ,i_item_desc ,i_current_price from item, inventory, date_dim, store_sales where i_current_price between 76 and 76+30 and inv_item_sk = i_item_sk and d_date_sk=inv_date_sk and d_date between cast('1998-06-29' as date) and cast('1998-08-29' as date) and i_manufact_id in (512,409,677,16) and inv_quantity_on_hand between 100 and 500 and ss_item_sk = i_item_sk group by i_item_id,i_item_desc,i_current_price order by i_item_id limit 100;
输出执行计划如下图,如果算子中Shuffle算子变成BoostTuningOmniColumnarShuffleExchange则生效。
父主题: OmniRuntime叠加特性使用