开发者
鲲鹏 BoostKit 数据分流使能套件:核心内容与实战实践
鲲鹏 BoostKit 数据分流使能套件:核心内容与实战实践
发表于04/08
2100

鲲鹏 BoostKit 数据分流使能套件是华为鲲鹏架构面向大数据 / 存算融合场景的核心加速套件,核心通过OmniData 算子下推、OmniShuffle 分流、HAF 异构加速三大能力,实现 “计算下沉、数据减载、分流调度”,将存算分离场景下的网络传输量降低 60%+,整体查询性能提升 20%~40%,充分释放鲲鹏 CPU 多核高并发、大内存带宽优势,适配 Spark、Hive、openLooKeng 等主流大数据引擎。

一、核心技术与内容

1. 核心模块

  • OmniData 算子下推:将 Filter、Aggregation、Limit 等计算算子从计算节点下沉到存储节点(HDFS/Ceph)执行,只返回计算结果,减少无效数据跨网传输,典型场景如大规模数据过滤、聚合查询。
  • OmniShuffle 分流优化:重构 Spark Shuffle 管理器,采用 Native 列式 Shuffle 替代原生行式 Shuffle,结合鲲鹏 NEON 指令集加速数据序列化 / 压缩,降低 Shuffle 阶段 CPU 与 IO 开销,提升 Join、Group By 等复杂查询效率。
  • HAF 异构加速框架:统一计算节点与存储节点的通信协议,支持算子下推任务的分流调度,实现 “计算就近、负载均衡”,同时提供资源隔离与 QoS 保障,避免分流任务抢占核心业务资源。
  • 离线分流调度:支持按数据热度、节点负载自动分流,冷数据 / 低频计算下沉至存储节点,热数据保留在计算节点,兼顾性能与资源利用率。

2. 核心价值

  • 减载:网络传输量减少 60%~80%,降低带宽成本与延迟;
  • 提速:Spark SQL 复杂查询性能提升 20%~40%,Shuffle 阶段耗时降低 50%+;
  • 兼容:无需修改业务代码,透明适配 Spark 2.X/3.X、Hive 3.X 等组件,支持 x86 与鲲鹏混合部署;
  • 可控:支持分流任务的资源限制、优先级配置,保障核心业务稳定性。

二、实战配置与代码实践

1. 环境准备(openEuler 22.03 LTS)

安装 BoostKit 数据分流套件

bash

运行

# 配置鲲鹏YUM源
cat > /etc/yum.repos.d/kunpeng-boostkit.repo << EOF
[kunpeng-boostkit]
name=Kunpeng BoostKit - openEuler 22.03 LTS
baseurl=https://repo.oepkgs.net/openeuler/rpm/openEuler-22.03-LTS/extras/aarch64/
enabled=1
gpgcheck=0
EOF

# 安装核心套件(OmniData + OmniShuffle)
yum install -y boostkit-omnidata boostkit-omnioperator --nogpgcheck

2. Spark OmniData 算子下推配置

全局配置(spark-defaults.conf)

properties

# 启用OmniData算子下推扩展
spark.sql.extensions=com.huawei.boostkit.spark.OmniDataExtension

# 配置Native库路径(指向BoostKit安装目录)
spark.driver.extraLibraryPath=/opt/omnidata/lib
spark.executor.extraLibraryPath=/opt/omnidata/lib

# 开启算子下推(核心开关)
spark.sql.omnidata.enabled=true

# 配置存储节点列表(HDFS NameNode/存储节点IP)
spark.sql.omnidata.storage.nodes=192.168.1.10,192.168.1.11

# 配置分流策略:按数据大小自动下推(阈值10GB)
spark.sql.omnidata.pushdown.threshold=10737418240

# 关闭原生行式Shuffle,启用OmniShuffle
spark.shuffle.manager=org.apache.spark.shuffle.sort.OmniShuffleManager

# 堆外内存配置(适配Native执行)
spark.memory.offHeap.enabled=true
spark.memory.offHeap.size=30g

作业级配置(无需修改集群配置)

bash

运行

# 提交Spark作业,启用数据分流与算子下推
spark-submit \
  --class com.example.YourSparkJob \
  --master yarn \
  --deploy-mode cluster \
  --executor-memory 30g \
  --num-executors 50 \
  --executor-cores 16 \
  --conf spark.sql.extensions=com.huawei.boostkit.spark.OmniDataExtension \
  --conf spark.driver.extraLibraryPath=/opt/omnidata/lib \
  --conf spark.executor.extraLibraryPath=/opt/omnidata/lib \
  --conf spark.sql.omnidata.enabled=true \
  --conf spark.sql.omnidata.pushdown.threshold=10737418240 \
  --conf spark.shuffle.manager=org.apache.spark.shuffle.sort.OmniShuffleManager \
  your-spark-job.jar

3. 验证分流生效

代码验证(Scala)

scala

import org.apache.spark.sql.SparkSession

object DataOffloadVerify {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("DataOffload-Verify")
      .getOrCreate()

    // 读取HDFS大表(10GB+),触发过滤下推
    val salesDF = spark.read.parquet("hdfs:///tpcds/1tb/store_sales")
    salesDF.createOrReplaceTempView("store_sales")

    // 复杂查询:多过滤+聚合,验证算子下推
    val query = """
      SELECT ss_item_id, SUM(ss_ext_sales_price) AS total_sales
      FROM store_sales
      WHERE ss_sold_date_sk BETWEEN 2451911 AND 2452000
        AND ss_quantity BETWEEN 1 AND 10
      GROUP BY ss_item_id
      ORDER BY total_sales DESC
      LIMIT 100
    """

    // 执行查询并打印物理计划(包含Columnar/OmniData标识则生效)
    spark.sql(query).explain(true)
    // 执行耗时统计
    val start = System.currentTimeMillis()
    spark.sql(query).collect()
    val cost = (System.currentTimeMillis() - start) / 1000
    println(s"查询耗时:${cost}s")

    spark.stop()
  }
}

验证标准

  • 物理计划中出现OmniDataFilterOmniDataAggregate等算子,说明算子下推生效
  • 日志中出现OmniShuffle initialized,说明分流 Shuffle 生效
  • 对比未启用套件的查询耗时,性能提升 20%+,网络传输量减少 60%+。

4. Hive OmniData 配置(补充)

properties

# hive-site.xml
<property>
  <name>hive.omnidata.enabled</name>
  <value>true</value>
</property>
<property>
  <name>hive.omnidata.storage.nodes</name>
  <value>192.168.1.10,192.168.1.11</value>
</property>

三、关键参数说明与调优

表格

参数含义建议值适用场景
spark.sql.omnidata.enabled算子下推总开关true所有存算融合场景
spark.sql.omnidata.pushdown.threshold下推数据阈值10737418240(10GB)大表过滤 / 聚合
spark.shuffle.managerShuffle 类型OmniShuffleManagerJoin/Group By 密集场景
spark.memory.offHeap.size堆外内存与 executor-memory 相当Native 执行密集场景
spark.sql.omnidata.parallelism分流任务并行度等于 executor-cores高并发计算场景

四、性能与实践案例

某终端平台采用 BoostKit 数据分流套件优化大规模存算融合场景,通过 OmniData 算子下推将 10TB + 数据的过滤聚合任务下沉至存储节点,网络传输量减少 68%,Spark SQL 查询性能提升 34%,同时降低了计算节点资源占用,保障了核心业务的稳定性。

总结

鲲鹏 BoostKit 数据分流使能套件通过算子下推、分流 Shuffle、异构调度三大核心能力,从根本上解决了大数据存算分离场景下 “数据传输多、计算效率低” 的痛点。无需修改业务代码,仅需简单配置即可实现性能跃迁,是鲲鹏平台大数据场景下提升资源利用率、降低成本的核心工具。

收藏举报
Level 1
0
帖子
0
粉丝
0
获赞