鲲鹏社区首页
中文
注册
开发者
我要评分
获取效率
正确性
完整性
易理解
在线提单
论坛求助

执行Spark引擎任务

验证Gluten的生效情况,并通过测试示例展示其带来的性能优化效果,确保能够顺利执行Spark引擎任务。

Spark使用交互式页面命令行来执行SQL任务。如果需要确认Gluten是否生效,可以通过以下两种方式判断:在SQL语句前加EXPLAIN或查看Spark UI,观察执行计划中的算子名称,若出现以Omni开头或Transformer的结尾算子,则表明Gluten已生效。

本次任务示例使用tpcds_bin_partitioned_varchar_orc_2的数据表作为测试表,测试表的信息如表1所示。测试SQL为TPC-DS测试集的Q82。

表1 测试表信息

表名

表格式

总行数

item

orc

26000

inventory

orc

16966305

date_dim

orc

73049

store_sales

orc

5760749

  1. 启动Spark-SQL命令行窗口。
    • 开源版本Spark-SQL启动命令如下。
      1
      /usr/local/spark/bin/spark-sql --deploy-mode client --driver-cores 8 --driver-memory 20g --master yarn --executor-cores 8 --executor-memory 26g --num-executors 36 --conf spark.executor.extraJavaOptions='-XX:+UseG1GC -XX:+UseNUMA' --conf spark.locality.wait=0 --conf spark.network.timeout=600 --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.adaptive.enabled=true --conf spark.sql.autoBroadcastJoinThreshold=100M --conf spark.sql.broadcastTimeout=600 --conf spark.sql.shuffle.partitions=1000 --conf spark.sql.orc.impl=native --conf spark.task.cpus=1 --database tpcds_bin_partitioned_varchar_orc_2
      
    • Gluten插件启动步骤如下。
      1. 进入“/usr/local/spark/conf”目录创建spark-defaults-omnioperator.conf文件。
        1
        2
        cd /usr/local/spark/conf
        cp spark-defaults.conf spark-defaults-omnioperator.conf
        
      2. 修改spark-defaults-omnioperator.conf文件权限为640。
        1
        chmod 640 spark-defaults-omnioperator.conf
        
      3. 打开“spark-defaults-omnioperator.conf”文件。
        1
        vi spark-defaults-omnioperator.conf
        
      4. “i”进入编辑模式,在文件末尾追加以下参数。
        spark.plugins org.apache.gluten.GlutenPlugin
        spark.shuffle.manager org.apache.spark.shuffle.sort.ColumnarShuffleManager
        spark.executor.memoryOverhead=3g
        spark.memory.offHeap.enabled true
        spark.memory.offHeap.size 35g
        spark.gluten.sql.columnar.backend.lib omni
        spark.executor.extraClassPath ${PWD}/omni/omni-operator/lib/gluten-omni-bundle-spark3.3_2.12-openEuler_22.03_aarch_64-1.3.0.jar
        spark.driver.extraClassPath /opt/omni-operator/lib/gluten-omni-bundle-spark3.3_2.12-openEuler_22.03_aarch_64-1.3.0.jar
        spark.executorEnv.LD_LIBRARY_PATH ${PWD}/omni/omni-operator/lib
        spark.executorEnv.OMNI_HOME ${PWD}/omni/omni-operator
        spark.driverEnv.LD_LIBRARY_PATH /opt/omni-operator/lib
        spark.driverEnv.OMNI_HOME /opt/omni-operator
        spark.executorEnv.MALLOC_CONF narenas:2
        spark.driverEnv.MALLOC_CONF tcache:false
        spark.driverEnv.LD_PRELOAD /opt/omni-operator/lib/libjemalloc.so.2
        spark.executorEnv.LD_PRELOAD ${PWD}/omni/omni-operator/lib/libjemalloc.so.2
        spark.gluten.sql.columnar.libpath /opt/omni-operator/lib/libspark_columnar_plugin.so
        spark.gluten.sql.columnar.executor.libpath ${PWD}/omni/omni-operator/lib/libspark_columnar_plugin.so
        spark.gluten.sql.native.union true
        spark.gluten.sql.columnar.forceShuffledHashJoin true
        spark.sql.ansi.enabled false
        spark.executorEnv.MALLOC_CONF tcache:false
        spark.driverEnv.MALLOC_CONF tcache:false
        spark.sql.parquet.datetimeRebaseModeInRead CORRECTED
        spark.sql.parquet.int96RebaseModeInRead CORRECTED
        spark.sql.optimizer.runtime.bloomfilter.enabled false
        spark.gluten.sql.columnar.backend.omni.combineJoinedAggregates true
        spark.gluten.sql.columnar.backend.omni.joinReorderEnhance true
        spark.gluten.sql.columnar.backend.omni.dedupLeftSemiJoin true
        spark.gluten.sql.columnar.backend.omni.pushOrderedLimitThroughAggEnable true
        spark.gluten.sql.columnar.backend.omni.adaptivePartialAggregation true
        spark.gluten.sql.columnar.backend.omni.filterMerge true
        spark.gluten.sql.columnar.backend.omni.preferShuffledHashJoin true
        spark.gluten.sql.columnar.backend.omni.aggregationSpillEnabled false
        spark.gluten.sql.columnar.backend.omni.vec.predicate.enabled true
        spark.sql.optimizer.runtime.bloomFilter.enabled false
        spark.gluten.sql.columnar.backend.omni.rewriteSelfJoinInInPredicate true
        spark.gluten.sql.columnar.physicalJoinOptimizeEnable true
        spark.gluten.sql.columnar.physicalJoinOptimizationLevel 19
        spark.driver.maxResultSize 2G
        spark.network.timeout 600
        spark.serializer org.apache.spark.serializer.KryoSerializer
        spark.sql.adaptive.enabled true
        spark.sql.adaptive.skewedJoin.enabled true
        spark.sql.autoBroadcastJoinThreshold 100M
        spark.sql.broadcastTimeout 600
        spark.sql.shuffle.partitions 200
        spark.sql.orc.impl native
        spark.task.cpus 1
        spark.sql.sources.parallelPartitionDiscovery.parallelism 60
        spark.sql.shuffle.partitions 1000
        spark.sql.adaptive.coalescePartitions.minPartitionNum 400
        spark.sql.adaptive.coalescePartitions.initialPartitionNum 400
        spark.kryoserializer.buffer.max 1024m
        spark.reducer.maxSizeInFlight 128m
        spark.gluten.sql.columnar.maxBatchSize 8192
      5. “Esc”键,输入:wq!,按“Enter”保存并退出编辑。
      6. 执行启动命令。
        /usr/local/spark/bin/spark-sql --archives hdfs://server1:9000/user/root/omni-operator.tar.gz#omni --deploy-mode client --driver-cores 8 --driver-memory 40g --master yarn --executor-cores 12 --executor-memory 5g --conf spark.memory.offHeap.enabled=true --conf spark.memory.offHeap.size=35g --num-executors 24 --conf spark.executor.extraJavaOptions='-XX:+UseG1GC' --conf spark.locality.wait=0 --conf spark.network.timeout=600 --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.adaptive.enabled=true --conf spark.sql.adaptive.skewedJoin.enabled=true --conf spark.sql.autoBroadcastJoinThreshold=100M --conf spark.sql.broadcastTimeout=600 --conf spark.sql.shuffle.partitions=600 --conf spark.sql.orc.impl=native --conf spark.task.cpus=1 --properties-file /usr/local/spark/conf/spark-defaults-omnioperator.conf --database tpcds_bin_partitioned_varchar_orc_2
        • hdfs://server1:9000/user/root/omni-operator.tar.gz#omni:依据用户Hadoop的core-site.xml中配置的fs.defaultFS实际值设置“hdfs://server1:9000”。“/user/root/omni-operator.tar.gz”用户可自行定义,与2的操作关联。“#omni”表示实际运行时omni-operator.tar.gz解压的目录,用户可自行定义。
        • 上述启动命令为Yarn模式使用,若使用local模式启动SparkExtension插件,需将--master yarn改为--master local,同时在启动前需在所有节点的“~/.bashrc”文件中添加export LD_PRELOAD=/opt/omni-operator/lib/libjemalloc.so.2并更新环境变量。启动命令中“${PWD}/omni”全部替换为“/opt”。

        Gluten相关的启动参数信息如表2所示。

        表2 Gluten相关启动参数信息

        启动参数名称

        缺省值

        含义

        spark.plugins

        org.apache.gluten.GlutenPlugin

        启用Gluten。

        spark.shuffle.manager

        sort

        是否启用列式Shuffle,若启用请配置OmniShuffle Shuffle加速自有的shuffleManager类,需添加配置项--conf spark.shuffle.manager="org.apache.spark.shuffle.sort.OmniColumnarShuffleManager"。默认sort使用开源版本的Shuffle。

        spark.gluten.sql.columnar.hashagg

        true

        是否启用列式HashAgg,true表示启用,false表示关闭

        spark.gluten.sql.columnar.project

        true

        是否启用列式Project,true表示启用,false表示关闭。

        spark.gluten.sql.columnar.filter

        true

        是否启用列式Filter,true表示启用,false表示关闭。

        spark.gluten.sql.columnar.sort

        true

        是否启用列式Sort,true表示启用,false表示关闭。

        spark.gluten.sql.columnar.window

        true

        是否启用列式Window,true表示启用,false表示关闭。

        spark.gluten.sql.columnar.broadcastJoin

        true

        是否启用列式BroadcastHashJoin,true表示启用,false表示关闭。

        spark.gluten.sql.columnar.filescan

        true

        是否启用列式NativeFilescan,true表示启用,false表示关闭,包括ORC和Parquet的文件格式。

        spark.gluten.sql.columnar.sortMergeJoin

        true

        是否启用列式SortMergeJoin,true表示启用,false表示关闭。

        spark.gluten.sql.columnar.takeOrderedAndProject

        true

        是否启用列式TakeOrderedAndProject,true表示启用,false表示关闭。

        spark.gluten.sql.columnar.shuffledHashJoin

        true

        是否启用列式ShuffledHashJoin,true表示启用,false表示关闭。

        spark.gluten.sql.columnar.backend.omni.shuffleSpillBatchRowNum

        10000

        Shuffle输出的每个batch中包含数据的行数。请根据实际环境的内存调整参数,可以适当增大此参数,从而减少写入磁盘文件的批次,提升写入速度。

        spark.gluten.sql.columnar.backend.omni.shuffleTaskSpillMemoryThreshold

        2147483648

        Shuffle内存溢写上限,Shuffle内存上限达到缺省值时会发生溢写,单位:Byte。请根据实际环境的内存调整参数,可以适当增大此参数,从而减少Shuffle内存溢写到磁盘文件次数,减少磁盘IO操作。

        spark.gluten.sql.columnar.backend.omni.compressBlockSize

        65536

        Shuffle数据压缩块大小,单位:Byte。请根据实际环境的内存调整参数,建议采用缺省值。

        spark.gluten.sql.columnar.backend.omni.shuffleSpillBatchRowNum

        10000

        列式Shuffle初始化Buffer大小,单位:Byte。请根据实际环境的内存调整参数,可以适当增大此参数,从而减少Shuffle读写次数,提升性能。

        spark.shuffle.compress

        true

        Shuffle是否开启压缩。true表示压缩,false表示不压缩。

        spark.io.compression.codec

        lz4

        Shuffle压缩格式。支持uncompressed、zlib、snappy、lz4和zstd格式。

        spark.gluten.sql.columnar.backend.omni.sortSpill.rowThreshold

        214783647

        sort算子溢写触发条件,处理数据行超过此值触发溢写,单位:行。请根据实际环境的内存调整参数,可以适当增大此参数,从而减少sort算子溢写到磁盘文件的次数,减少磁盘IO操作。

        spark.gluten.sql.columnar.backend.omni.memFraction

        90

        sort算子溢写触发条件,处理数据使用堆外内存超过此百分比触发溢写,与堆外内存总大小参数“spark.memory.offHeap.size”同时使用。请根据实际环境的内存调整参数,可以适当增大此参数,从而减少sort算子溢写到磁盘文件的次数,减少磁盘IO操作。

        spark.gluten.sql.columnar.backend.omni.broadcastJoin.sharehashtable

        true

        在Broadcast Join场景下,是否开启builder侧只构建一份hash table,并允许所有lookup join侧共用。true表示开启,false表示关闭。

        spark.gluten.sql.columnar.backend.omni.spill.dirDiskReserveSize

        10737418240

        sort溢写磁盘预留可用空间大小,如果实际小于此值会抛异常,单位:Byte。根据实际环境的磁盘容量和业务场景调整参数,建议不超过业务数据大小,取值上限为实际环境的磁盘容量大小。

        spark.gluten.sql.columnar.backend.omni.joinReorderEnhance

        true

        是否开启join重排序优化策略。默认为true表示开启,false表示关闭。启发式join可以根据where过滤条件的数量和table表的大小,自动优化join的顺序。

        spark.default.parallelism

        200

        Spark并行执行的任务数。

        spark.sql.shuffle.partitions

        200

        Spark执行聚合操作或者Join操作时的Shuffle分区数。

        spark.sql.adaptive.enabled

        false

        是否启用自适应查询执行优化,可以在查询执行过程中动态地调整执行计划,true开启,false关闭。

        spark.executorEnv.MALLOC_CONF

        narenas:1

        控制Spark中每一个Executor进程中的内存分配策略。

        spark.sql.autoBroadcastJoinThreshold

        10M

        控制在执行Join操作时使用boradcastjoin小表的阈值大小。

        spark.sql.broadcastTimeout

        300

        控制广播小表到其它节点的超时时间。

        spark.locality.wait

        3

        数据本地化等待时长。

        spark.sql.cbo.enabled

        false

        是否开启CBO。true表示开启,false表示关闭。

        spark.sql.codegen.wholeStage

        true

        是否开启全阶段代码生成。true表示开启,false表示关闭。

        spark.sql.orc.impl

        native

        native表示使用开源版本的ORC库,hive表示使用Hive中的ORC库。

        spark.serializer

        使用Kryo序列化。

        spark.executor.extraJavaOptions

        Executor使用Hadoop本地库加速路径。

        spark.driver.extraJavaOptions

        Driver使用Hadoop本地库加速路径。

        spark.network.timeout

        120

        所有网络交互的默认超时时间,单位:s。

        spark.gluten.sql.columnar.backend.omni.rewriteSelfJoinInInPredicate

        false

        是否启用将in表达式中的self join转换为hashagg,删除没用到的列,减少数据量。true表示开启,false表示关闭。

        spark.gluten.sql.columnar.backend.omni.filterMerge

        false

        是否开启将在同一个表上的结构相似的多个表达式合并处理,减少Scan数据量。true表示开启,false表示关闭。

        spark.gluten.sql.columnar.backend.omni.dedupLeftSemiJoin

        false

        是否启用对leftsemi join右表去重,减少join数据量。true表示开启,false表示关闭。

        spark.gluten.sql.columnar.backend.omni.preferShuffledHashJoin

        false

        是否开启尽可能使用ShuffledHashJoin。true表示开启,false表示关闭。

        spark.sql.adaptive.skewedJoin.enabled

        false

        是否开启自适应倾斜连接优化。自适应倾斜连接优化会在连接操作中检测到数据倾斜的情况下,自动采用一些特殊的连接算法来处理倾斜数据,从而提高连接操作的效率。true表示开启,false表示关闭。

        spark.sql.adaptive.coalescePartitions.minPartitionNum

        1

        合并后的最小Shuffle分区数。如果不设置,默认为Spark集群的默认并行度。

        spark.gluten.sql.columnar.backend.omni.adaptivePartialAggregation

        false

        是否开启自适应跳过HashAgg分组聚合操作Partial阶段处理优化。该优化为运行时优化,在满足必要条件:存在分组聚合操作,但不存在First/Last聚合前提下,若采样识别为高基数场景,则跳过分组聚合Partial阶段处理,直接向下游算子输出数据。true表示开启,false表示关闭。

        spark.gluten.sql.columnar.backend.omni.pushOrderedLimitThroughAggEnable

        false

        是否开启pushOrderedLimitThroughAgg优化。在执行计划包含Sort+Limit Operator,且排序字段为分组聚合操作中分组字段的子集时,该优化将TopNSort Operator下推到分组聚合partial阶段后,以减少下游算子数据处理量。true表示开启,false表示关闭。

        该优化不会和adaptivePartialAggregation优化同时生效。

        spark.gluten.sql.columnar.backend.omni.combineJoinedAggregates

        false

        是否开启combineJoinedAggregates优化。该优化通过合并基于相同数据的子查询减少重复的读表操作。true表示开启,false表示关闭。

        spark.gluten.sql.columnar.wholeStage.fallback.threshold

        -1

        在AQE开启的情况下,如果Stage回退的算子个数大于等于这个阈值,则该Stage的全部算子(除OmniColumnarToRow和OmniAQEShuffleReadExec算子)全部回退为开源软件对应算子。当设置为-1时,关闭此功能。

        spark.gluten.sql.columnar.query.fallback.threshold

        -1

        在AQE关闭的情况下,如果整个执行计划回退的算子个数大于等于这个阈值,则该Stage的全部算子全部回退为开源软件对应算子。当设置为-1时,关闭此功能。

        spark.gluten.sql.columnar.backend.omni.unixTimeFunc.enabled

        true

        是否启用from_unixtime和unix_timestamp表达式,true表示启用,false表示关闭。

        spark.sql.orc.filterPushdown

        true

        控制ORC文件格式的数据查询时是否启用谓词下推功能。

        spark.gluten.sql.columnar.backend.omni.catalog.cache.size

        128

        设置缓存Catelog元数据的缓存空间大小。小于或等于0时为关闭缓存功能。

        spark.gluten.sql.columnar.backend.omni.catalog.cache.expire.time

        600

        设置缓存Catelog元数据的缓存过期时间,默认为600秒。

        spark.gluten.sql.columnar.backend.omni.vec.predicate.enabled

        false

        是否开启向量化谓词下推功能,true表示开启,false表示关闭。

  2. 查看Gluten是否生效。

    在Gluten和开源版本Spark-SQL交互式命令行窗口运行以下SQL语句。

    set spark.sql.adaptive.enabled=false;
    explain select i_item_id
        ,i_item_desc
        ,i_current_price
    from item, inventory, date_dim, store_sales
    where i_current_price between 76 and 76+30
    and inv_item_sk = i_item_sk
    and d_date_sk=inv_date_sk
    and d_date between cast('1998-06-29' as date) and cast('1998-08-29' as date)
    and i_manufact_id in (512,409,677,16)
    and inv_quantity_on_hand between 100 and 500
    and ss_item_sk = i_item_sk
    group by i_item_id,i_item_desc,i_current_price
    order by i_item_id
    limit 100;

    Gluten输出执行计划如下图。如果算子以Omni开头或者Transformer结尾,证明Gluten生效。

    开源版本Spark-SQL输出执行计划如下图。

  3. 运行SQL语句。

    在Gluten和开源版本Spark-SQL交互式命令行窗口运行以下SQL语句。

    set spark.sql.adaptive.enabled=false;
    select i_item_id
        ,i_item_desc
        ,i_current_price
    from item, inventory, date_dim, store_sales
    where i_current_price between 76 and 76+30
    and inv_item_sk = i_item_sk
    and d_date_sk=inv_date_sk
    and d_date between cast('1998-06-29' as date) and cast('1998-08-29' as date)
    and i_manufact_id in (512,409,677,16)
    and inv_quantity_on_hand between 100 and 500
    and ss_item_sk = i_item_sk
    group by i_item_id,i_item_desc,i_current_price
    order by i_item_id
    limit 100;
  4. 对比开源版本Spark-SQL和Gluten执行TPC-DS测试集Q82的查询结果,查看使能Gluten前后的性能差异。
    • 开源版本Spark-SQL执行结果。

    • 使能Gluten后的执行结果。

    执行结果对比:两项测试在查询结果上完全一致,使能Gluten后,执行SQL的耗时减少。Gluten在不影响查询结果的前提下,有效提升了Q82查询的执行效率。