EN
注册
我要评分
文档获取效率
文档正确性
内容完整性
文档易理解
在线提单
论坛求助
鲲鹏小智

OmniData算子下推叠加OmniOperator算子加速在Spark引擎上的应用

通过OmniData算子下推特性优化数据加载流程,同时使用OmniOperator算子加速特性提升算子执行效率,从而提升端到端引擎查询性能。

安装OmniData算子下推叠加OmniOperator算子加速特性

进行OmniData算子下推叠加OmniOperator算子加速特性的安装之前,需完成OmniData算子下推和OmniOperator算子加速两个特性在Spark上的安装。

  1. 在管理节点(server1)下创建目录“/opt/omni-operator-omnidata”,并将“/opt/omni-operator”中的全部文件拷贝至所建目录下。
    1
    2
    mkdir -p /opt/omni-operator-omnidata
    cp -r /opt/omni-operator/* /opt/omni-operator-omnidata/
    
  2. 将OmniData算子下推插件包拷贝至“/opt/omni-operator-omnidata/lib”目录下。
    1
    cp -r /opt/boostkit/* /opt/omni-operator-omnidata/lib
    
  3. 配置“omni.conf”文件。
    1. 打开文件。
      1
      vi /opt/omni-operator-omnidata/conf/omni.conf
      
    2. “i”进入编辑模式,添加或修改参数。
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      enableHMPP=true
      enableBatchExprEvaluate=false
      hiveUdfPropertyFilePath=/opt/omni-operator-omnidata/hive-udf/udf.properties   
      hiveUdfDir=/opt/omni-operator-omnidata/hive-udf/udf  
      RoundingRule=DOWN
      CheckReScaleRule=CHECK_RESCALE
      EmptySearchStrReplaceRule=NOT_REPLACE
      CastDecimalToDoubleRule=CONVERT_WITH_STRING
      NegativeStartIndexOutOfBoundsRule=INTERCEPT_FROM_BEYOND
      SupportContainerVecRule=NOT_SUPPORT
      StringToDateFormatRule=ALLOW_REDUCED_PRECISION
      
    3. “Esc”键,输入:wq!,按“Enter”保存并退出编辑。
  4. “/opt/omni-operator-omnidata”目录拷贝到计算节点(agent1、agent2、agent3),将“hostname”替换为相应节点的主机名。
    scp -r /opt/omni-operator-omnidata hostname:/opt/
  5. 新增OmniData算子下推叠加OmniOperator算子加速特性的参数到Spark的配置文件($SPARK_HOME/conf/spark-defaults.conf)中。
    1. 编辑Spark配置文件。
      1
      vi /usr/local/spark/conf/spark-defaults.conf
      
    2. “i”进入编辑模式,将以下参数配置添加到spark-defaults.conf中。
      1
      spark.sql.ndp.operator.combine.enable true
      
    3. “Esc”键,输入:wq!,按“Enter”保存并退出编辑。

执行Spark引擎业务

Spark算子下推使用spark-sql命令来执行。

本次任务示例使用tpch的1T数据的非分区表作为测试表,测试SQL为tpch-sql6。

相关的表信息如表1所示。

表1 相关表信息

表名

表格式

总数

占用空间

lineitem

orc

5999989709

169.6 G

由于Spark 3.1.1 Yarn模式下不打印INFO级别的日志信息,所以Spark 3.1.1需要做日志重定向。

  1. 定义日志文件log4j.properties。
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    log4j.rootCategory=INFO, FILE
    log4j.appender.console=org.apache.log4j.ConsoleAppender
    log4j.appender.console.target=System.err
    log4j.appender.console.layout=org.apache.log4j.PatternLayout
    log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
     
    log4j.logger.org.apache.spark.sql.execution=DEBUG
    log4j.logger.org.apache.spark.repl.Main=INFO
     
    log4j.appender.FILE=org.apache.log4j.FileAppender
    log4j.appender.FILE.file=/usr/local/spark/logs/file.log
    log4j.appender.FILE.layout=org.apache.log4j.PatternLayout
     
    log4j.appender.FILE.layout.ConversionPattern=%m%n
    
  2. 修改log4j.properties中的log4j.appender.FILE.file为自定义的目录和文件名。
  3. 启动Spark-SQL命令行窗口。

    OmniData叠加OmniOperator特性启动命令如下。

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    /usr/local/spark/bin/spark-sql \
    --driver-cores 5 \
    --driver-memory 5g \
    --num-executors 18 \
    --executor-cores 21 \
    --executor-memory 10g \
    --master yarn \
    --conf spark.executor.memoryOverhead=5g \
    --conf spark.memory.offHeap.enabled=true \
    --conf spark.memory.offHeap.size=45g \
    --conf spark.task.cpus=1 \
    --conf spark.sql.codegen.wholeStage=true \
    --conf spark.sql.orc.impl=native \
    --conf spark.sql.extensions=com.huawei.boostkit.spark.ColumnarPlugin \
    --conf spark.shuffle.manager=org.apache.spark.shuffle.sort.OmniColumnarShuffleManager \
    --conf spark.sql.join.columnar.preferShuffledHashJoin=true \
    --conf spark.executor.extraClassPath=/opt/omni-operator-omnidata/lib/* \
    --conf spark.driver.extraClassPath=/opt/omni-operator-omnidata/lib/* \
    --conf spark.executorEnv.LD_LIBRARY_PATH=/opt/omni-operator-omnidata/lib/:/usr/local/lib/HMPP:/home/omm/omnidata-install/haf-host/lib \
    --conf spark.driverEnv.LD_LIBRARY_PATH=/opt/omni-operator-omnidata/lib/:/usr/local/lib/HMPP:/home/omm/omnidata-install/haf-host/lib \
    --conf spark.executor.extraLibraryPath=/opt/omni-operator-omnidata/lib:/home/omm/omnidata-install/haf-host/lib \
    --conf spark.driverEnv.extraLibraryPath=/opt/omni-operator-omnidata/lib:/home/omm/omnidata-install/haf-host/lib \
    --conf spark.executorEnv.LD_PRELOAD=/opt/omni-operator-omnidata/lib/libjemalloc.so.2 \
    --conf spark.driverEnv.LD_PRELOAD=/opt/omni-operator-omnidata/lib/libjemalloc.so.2 \
    --driver-java-options -Djava.library.path=/opt/omni-operator-omnidata/lib \
    --jars /opt/omni-operator-omnidata/lib/boostkit-omniop-spark-3.1.1-1.3.0-aarch64.jar \
    --jars /opt/omni-operator-omnidata/lib/boostkit-omniop-bindings-1.3.0-aarch64.jar  \
    --conf spark.executorEnv.OMNI_CONNECTED_ENGINE=Spark \
    --conf spark.executorEnv.HAF_CONFIG_PATH=/home/omm/omnidata-install/haf-host/etc/ \
    --conf spark.omni.sql.columnar.fusion=false \
    --conf spark.omni.sql.columnar.sortSpill.enabled=true \
    --conf spark.omni.sql.columnar.sortSpill.rowThreshold=400000 \
    --conf spark.omni.sql.columnar.sortSpill.dirDiskReserveSize=214748364800 \
    --driver-java-options -Dlog4j.configuration=file:/usr/local/spark/conf/log4j.properties
    

    SparkExtension相关的启动参数信息如表2所示。

    表2 SparkExtension相关启动参数信息

    启动参数名称

    缺省值

    含义

    spark.sql.extensions

    com.huawei.boostkit.spark.ColumnarPlugin

    启用SparkExtension。

    spark.shuffle.manager

    sort

    是否启用列式Shuffle,若启用则需添加配置项--conf spark.shuffle.manager=org.apache.spark.shuffle.sort.OmniColumnarShuffleManager。OmniShuffle Shuffle加速启用请配置OmniShuffle Shuffle加速自有的shuffleManager类。默认sort走原生的Shuffle。

    spark.omni.sql.columnar.hashagg

    true

    是否启用列式HashAgg,true表示启用,false表示关闭。

    spark.omni.sql.columnar.project

    true

    是否启用列式Project,true表示启用,false表示关闭。

    spark.omni.sql.columnar.projfilter

    true

    是否启用列式ConditionProject(Project + Filter融合算子),true表示启用,false表示关闭。

    spark.omni.sql.columnar.filter

    true

    是否启用列式Filter,true表示启用,false表示关闭。

    spark.omni.sql.columnar.sort

    true

    是否启用列式Sort,true表示启用,false表示关闭。

    spark.omni.sql.columnar.window

    true

    是否启用列式Window,true表示启用,false表示关闭。

    spark.omni.sql.columnar.broadcastJoin

    true

    是否启用列式BroadcastHashJoin,true表示启用,false表示关闭。

    spark.omni.sql.columnar.nativefilescan

    true

    是否启用列式NativeFilescan,true表示启用,false表示关闭。

    spark.omni.sql.columnar.orcNativefilescan

    true

    是否启用ORC列式NativeFilescan,true表示启用,false表示关闭。

    spark.omni.sql.columnar.sortMergeJoin

    true

    是否启用列式SortMergeJoin,true表示启用,false表示关闭。

    spark.omni.sql.columnar.takeOrderedAndProject

    true

    是否启用列式TakeOrderedAndProject,true表示启用,false表示关闭。

    spark.omni.sql.columnar.shuffledHashJoin

    true

    是否启用列式ShuffledHashJoin,true表示启用,false表示关闭。

    spark.shuffle.columnar.shuffleSpillBatchRowNum

    10000

    Shuffle输出的每个Batch中包含数据的行数。请根据实际环境的内存调整参数,可以适当增大此参数,从而减少写入磁盘文件的批次,提升写入速度。

    spark.shuffle.columnar.shuffleSpillMemoryThreshold

    2147483648

    Shuffle内存溢写上限,Shuffle内存上限达到缺省值时会发生溢写,单位:Byte。请根据实际环境的内存调整参数,可以适当增大此参数,从而减少Shuffle内存溢写到磁盘文件次数,减少磁盘IO操作。

    spark.shuffle.columnar.compressBlockSize

    65536

    Shuffle数据压缩块大小,单位:Byte。请根据实际环境的内存调整参数,建议采用缺省值。

    spark.sql.execution.columnar.maxRecordsPerBatch

    4096

    列式Shuffle初始化Buffer大小,单位:Byte。请根据实际环境的内存调整参数,可以适当增大此参数,从而减少Shuffle读写次数,提升性能。

    spark.sql.join.columnar.preferShuffledHashJoin

    false

    Join发生时,是否优先使用ShuffledHashJoin,true表示启用,false表示关闭。

    spark.shuffle.compress

    true

    Shuffle是否开启压缩。true表示压缩,false表示不压缩。

    spark.io.compression.codec

    lz4

    Shuffle压缩格式。支持uncompressed、zlib、snappy、lz4和zstd格式。

    spark.omni.sql.columnar.sortSpill.rowThreshold

    214783647

    sort算子溢写触发条件,处理数据行超过此值触发溢写,单位:行。请根据实际环境的内存调整参数,可以适当增大此参数,从而减少sort算子溢写到磁盘文件的次数,减少磁盘IO操作。

    spark.omni.sql.columnar.sortSpill.dirDiskReserveSize

    10737418240

    sort溢写磁盘预留可用空间大小,如果实际小于此值会抛异常,单位:Byte。根据实际环境的磁盘容量和业务场景调整参数,建议不超过业务数据大小,取值上限为实际环境的磁盘容量大小。

    spark.omni.sql.columnar.sortSpill.enabled

    false

    sort算子是否开启溢写能力。true表示开启溢写能力,false表示关闭。

  4. 执行sql6语句。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    select
    sum(l_extendedprice * l_discount) as revenue
    from
    tpch_flat_orc_1000.lineitem
    where
    l_shipdate >= '1993-01-01'
    and l_shipdate < '1994-01-01'
    and l_discount between 0.06 - 0.01 and 0.06 + 0.01
    and l_quantity < 25;
    

    待任务执行完毕,查看log4j.properties中所配置的日志文件,若日志中包含下推信息,则OmniData算子下推生效。

    1
    2
    Selectivity: 0.014160436451808448
    Push down with [PushDownInfo(ListBuffer(FilterExeInfo((((((((isnotnull(l_shipdate#11) AND isnotnull(l_discount#7)) AND isnotnull(l_quantity#5)) AND (l_shipdate#11 >= 8401)) AND (l_shipdate#11 < 8766)) AND (l_discount#7 >= 0.05)) AND (l_discount#7 <= 0.07)) AND (l_quantity#5 < 25.0)),List(l_quantity#5, l_extendedprice#6, l_discount#7, l_shipdate#11))),ListBuffer(AggExeInfo(List(sum((l_extendedprice#6 * l_discount#7))),List(),List(sum#36))),None,Map(server1 -> server1, agent2 -> agent2, agent1 -> agent1))]
    

    执行explain语句,若显示的执行计划中存在OmniOperator算子加速相关的算子,则OmniOperator算子加速生效。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    explain select
    sum(l_extendedprice * l_discount) as revenue
    from
    tpch_flat_orc_1000.lineitem
    where
    l_shipdate >= '1993-01-01'
    and l_shipdate < '1994-01-01'
    and l_discount between 0.06 - 0.01 and 0.06 + 0.01
    and l_quantity < 25;
    

    若两特性均生效,可证明OmniData算子下推叠加OmniOperator算子加速特性生效。

搜索结果
找到“0”个结果

当前产品无相关内容

未找到相关内容,请尝试其他搜索词