通过OmniData算子下推特性优化数据加载流程,同时使用OmniOperator算子加速特性提升算子执行效率,从而提升端到端引擎查询性能。
进行OmniData算子下推叠加OmniOperator算子加速特性的安装之前,需完成OmniData算子下推和OmniOperator算子加速两个特性在Spark上的安装。
1 2 |
mkdir -p /opt/omni-operator-omnidata cp -r /opt/omni-operator/* /opt/omni-operator-omnidata/ |
1
|
cp -r /opt/boostkit/* /opt/omni-operator-omnidata/lib |
1
|
vi /opt/omni-operator-omnidata/conf/omni.conf
|
1 2 3 4 5 6 7 8 9 10 11 |
enableHMPP=true enableBatchExprEvaluate=false hiveUdfPropertyFilePath=/opt/omni-operator-omnidata/hive-udf/udf.properties hiveUdfDir=/opt/omni-operator-omnidata/hive-udf/udf RoundingRule=DOWN CheckReScaleRule=CHECK_RESCALE EmptySearchStrReplaceRule=NOT_REPLACE CastDecimalToDoubleRule=CONVERT_WITH_STRING NegativeStartIndexOutOfBoundsRule=INTERCEPT_FROM_BEYOND SupportContainerVecRule=NOT_SUPPORT StringToDateFormatRule=ALLOW_REDUCED_PRECISION |
scp -r /opt/omni-operator-omnidata hostname:/opt/
1
|
vi /usr/local/spark/conf/spark-defaults.conf
|
1
|
spark.sql.ndp.operator.combine.enable true
|
Spark算子下推使用spark-sql命令来执行。
本次任务示例使用tpch的1T数据的非分区表作为测试表,测试SQL为tpch-sql6。
相关的表信息如表1所示。
由于Spark 3.1.1 Yarn模式下不打印INFO级别的日志信息,所以Spark 3.1.1需要做日志重定向。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
log4j.rootCategory=INFO, FILE log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.err log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n log4j.logger.org.apache.spark.sql.execution=DEBUG log4j.logger.org.apache.spark.repl.Main=INFO log4j.appender.FILE=org.apache.log4j.FileAppender log4j.appender.FILE.file=/usr/local/spark/logs/file.log log4j.appender.FILE.layout=org.apache.log4j.PatternLayout log4j.appender.FILE.layout.ConversionPattern=%m%n |
OmniData叠加OmniOperator特性启动命令如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
/usr/local/spark/bin/spark-sql \ --driver-cores 5 \ --driver-memory 5g \ --num-executors 18 \ --executor-cores 21 \ --executor-memory 10g \ --master yarn \ --conf spark.executor.memoryOverhead=5g \ --conf spark.memory.offHeap.enabled=true \ --conf spark.memory.offHeap.size=45g \ --conf spark.task.cpus=1 \ --conf spark.sql.codegen.wholeStage=true \ --conf spark.sql.orc.impl=native \ --conf spark.sql.extensions=com.huawei.boostkit.spark.ColumnarPlugin \ --conf spark.shuffle.manager=org.apache.spark.shuffle.sort.OmniColumnarShuffleManager \ --conf spark.sql.join.columnar.preferShuffledHashJoin=true \ --conf spark.executor.extraClassPath=/opt/omni-operator-omnidata/lib/* \ --conf spark.driver.extraClassPath=/opt/omni-operator-omnidata/lib/* \ --conf spark.executorEnv.LD_LIBRARY_PATH=/opt/omni-operator-omnidata/lib/:/usr/local/lib/HMPP:/home/omm/omnidata-install/haf-host/lib \ --conf spark.driverEnv.LD_LIBRARY_PATH=/opt/omni-operator-omnidata/lib/:/usr/local/lib/HMPP:/home/omm/omnidata-install/haf-host/lib \ --conf spark.executor.extraLibraryPath=/opt/omni-operator-omnidata/lib:/home/omm/omnidata-install/haf-host/lib \ --conf spark.driverEnv.extraLibraryPath=/opt/omni-operator-omnidata/lib:/home/omm/omnidata-install/haf-host/lib \ --conf spark.executorEnv.LD_PRELOAD=/opt/omni-operator-omnidata/lib/libjemalloc.so.2 \ --conf spark.driverEnv.LD_PRELOAD=/opt/omni-operator-omnidata/lib/libjemalloc.so.2 \ --driver-java-options -Djava.library.path=/opt/omni-operator-omnidata/lib \ --jars /opt/omni-operator-omnidata/lib/boostkit-omniop-spark-3.1.1-1.3.0-aarch64.jar \ --jars /opt/omni-operator-omnidata/lib/boostkit-omniop-bindings-1.3.0-aarch64.jar \ --conf spark.executorEnv.OMNI_CONNECTED_ENGINE=Spark \ --conf spark.executorEnv.HAF_CONFIG_PATH=/home/omm/omnidata-install/haf-host/etc/ \ --conf spark.omni.sql.columnar.fusion=false \ --conf spark.omni.sql.columnar.sortSpill.enabled=true \ --conf spark.omni.sql.columnar.sortSpill.rowThreshold=400000 \ --conf spark.omni.sql.columnar.sortSpill.dirDiskReserveSize=214748364800 \ --driver-java-options -Dlog4j.configuration=file:/usr/local/spark/conf/log4j.properties |
SparkExtension相关的启动参数信息如表2所示。
启动参数名称 |
缺省值 |
含义 |
---|---|---|
spark.sql.extensions |
com.huawei.boostkit.spark.ColumnarPlugin |
启用SparkExtension。 |
spark.shuffle.manager |
sort |
是否启用列式Shuffle,若启用则需添加配置项--conf spark.shuffle.manager=org.apache.spark.shuffle.sort.OmniColumnarShuffleManager。OmniShuffle Shuffle加速启用请配置OmniShuffle Shuffle加速自有的shuffleManager类。默认sort走原生的Shuffle。 |
spark.omni.sql.columnar.hashagg |
true |
是否启用列式HashAgg,true表示启用,false表示关闭。 |
spark.omni.sql.columnar.project |
true |
是否启用列式Project,true表示启用,false表示关闭。 |
spark.omni.sql.columnar.projfilter |
true |
是否启用列式ConditionProject(Project + Filter融合算子),true表示启用,false表示关闭。 |
spark.omni.sql.columnar.filter |
true |
是否启用列式Filter,true表示启用,false表示关闭。 |
spark.omni.sql.columnar.sort |
true |
是否启用列式Sort,true表示启用,false表示关闭。 |
spark.omni.sql.columnar.window |
true |
是否启用列式Window,true表示启用,false表示关闭。 |
spark.omni.sql.columnar.broadcastJoin |
true |
是否启用列式BroadcastHashJoin,true表示启用,false表示关闭。 |
spark.omni.sql.columnar.nativefilescan |
true |
是否启用列式NativeFilescan,true表示启用,false表示关闭。 |
spark.omni.sql.columnar.orcNativefilescan |
true |
是否启用ORC列式NativeFilescan,true表示启用,false表示关闭。 |
spark.omni.sql.columnar.sortMergeJoin |
true |
是否启用列式SortMergeJoin,true表示启用,false表示关闭。 |
spark.omni.sql.columnar.takeOrderedAndProject |
true |
是否启用列式TakeOrderedAndProject,true表示启用,false表示关闭。 |
spark.omni.sql.columnar.shuffledHashJoin |
true |
是否启用列式ShuffledHashJoin,true表示启用,false表示关闭。 |
spark.shuffle.columnar.shuffleSpillBatchRowNum |
10000 |
Shuffle输出的每个Batch中包含数据的行数。请根据实际环境的内存调整参数,可以适当增大此参数,从而减少写入磁盘文件的批次,提升写入速度。 |
spark.shuffle.columnar.shuffleSpillMemoryThreshold |
2147483648 |
Shuffle内存溢写上限,Shuffle内存上限达到缺省值时会发生溢写,单位:Byte。请根据实际环境的内存调整参数,可以适当增大此参数,从而减少Shuffle内存溢写到磁盘文件次数,减少磁盘IO操作。 |
spark.shuffle.columnar.compressBlockSize |
65536 |
Shuffle数据压缩块大小,单位:Byte。请根据实际环境的内存调整参数,建议采用缺省值。 |
spark.sql.execution.columnar.maxRecordsPerBatch |
4096 |
列式Shuffle初始化Buffer大小,单位:Byte。请根据实际环境的内存调整参数,可以适当增大此参数,从而减少Shuffle读写次数,提升性能。 |
spark.sql.join.columnar.preferShuffledHashJoin |
false |
Join发生时,是否优先使用ShuffledHashJoin,true表示启用,false表示关闭。 |
spark.shuffle.compress |
true |
Shuffle是否开启压缩。true表示压缩,false表示不压缩。 |
spark.io.compression.codec |
lz4 |
Shuffle压缩格式。支持uncompressed、zlib、snappy、lz4和zstd格式。 |
spark.omni.sql.columnar.sortSpill.rowThreshold |
214783647 |
sort算子溢写触发条件,处理数据行超过此值触发溢写,单位:行。请根据实际环境的内存调整参数,可以适当增大此参数,从而减少sort算子溢写到磁盘文件的次数,减少磁盘IO操作。 |
spark.omni.sql.columnar.sortSpill.dirDiskReserveSize |
10737418240 |
sort溢写磁盘预留可用空间大小,如果实际小于此值会抛异常,单位:Byte。根据实际环境的磁盘容量和业务场景调整参数,建议不超过业务数据大小,取值上限为实际环境的磁盘容量大小。 |
spark.omni.sql.columnar.sortSpill.enabled |
false |
sort算子是否开启溢写能力。true表示开启溢写能力,false表示关闭。 |
1 2 3 4 5 6 7 8 9 |
select sum(l_extendedprice * l_discount) as revenue from tpch_flat_orc_1000.lineitem where l_shipdate >= '1993-01-01' and l_shipdate < '1994-01-01' and l_discount between 0.06 - 0.01 and 0.06 + 0.01 and l_quantity < 25; |
待任务执行完毕,查看log4j.properties中所配置的日志文件,若日志中包含下推信息,则OmniData算子下推生效。
1 2 |
Selectivity: 0.014160436451808448 Push down with [PushDownInfo(ListBuffer(FilterExeInfo((((((((isnotnull(l_shipdate#11) AND isnotnull(l_discount#7)) AND isnotnull(l_quantity#5)) AND (l_shipdate#11 >= 8401)) AND (l_shipdate#11 < 8766)) AND (l_discount#7 >= 0.05)) AND (l_discount#7 <= 0.07)) AND (l_quantity#5 < 25.0)),List(l_quantity#5, l_extendedprice#6, l_discount#7, l_shipdate#11))),ListBuffer(AggExeInfo(List(sum((l_extendedprice#6 * l_discount#7))),List(),List(sum#36))),None,Map(server1 -> server1, agent2 -> agent2, agent1 -> agent1))] |
执行explain语句,若显示的执行计划中存在OmniOperator算子加速相关的算子,则OmniOperator算子加速生效。
1 2 3 4 5 6 7 8 9 |
explain select sum(l_extendedprice * l_discount) as revenue from tpch_flat_orc_1000.lineitem where l_shipdate >= '1993-01-01' and l_shipdate < '1994-01-01' and l_discount between 0.06 - 0.01 and 0.06 + 0.01 and l_quantity < 25; |
若两特性均生效,可证明OmniData算子下推叠加OmniOperator算子加速特性生效。