开发者
SVE ArrayMap GetOutput算子连续序列查找与向量化填充

SVE ArrayMap GetOutput算子连续序列查找与向量化填充

指令集

发表于 2026/04/15

0

作者 | 刘洋

一、函数功能

功能描述

TraverseArrayMapGetOutput是Hash聚合结果下沉阶段的核心算子,负责:

  • 遍历isAssigned数组,识别已分配(值为1)的槽位。
  • 将槽位索引转换为实际键值actualKey = index + minValue - 1,并填充到输出向量groupVector。
  • 若开启聚合(hasAgg),同步拷贝中间状态指针slots[index]到states容器。

接口说明

template<bool hasAgg, typename VECTOR_TYPE>
void TraverseArrayMapGetOutput(
    vec::HaasBaseVectorImpl* groupVector,
    std::vector<uint8_t*>* states,
    int64_t minValue
);

参数说明:

参数

说明

groupVector

输出向量,存储所有分组键。

states

聚合中间状态指针数组(仅hasAgg=true有效)。

minValue

ArrayMap的最小key偏移。

二、原始实现分析

while (outputRows) {
    while (!isAssigned[index]) {
        ++index;
    }
    if (index != 0) {
        groupVector->SetValue(lambdaRowIndex, index + minValue - 1);
    } else {
        groupVector->SetNull(lambdaRowIndex);
    }
    if constexpr (hasAgg) {
        (*states)[lambdaRowIndex] = reinterpret_cast<uint8_t*>(slots[index]);
    }
    ++lambdaRowIndex;
    ++index;
    --outputRows;
}

存在问题:

  • 分支预测开销:原代码使用while(not isAssigned[index])进行逐个查找。由于isAssigned的分布取决于输入数据的哈希碰撞情况,分支高度随机,导致严重的流水线冲刷。
  • 标量操作瓶颈:键值计算(index + minValue - 1)和状态指针拷贝均为标量操作。在处理大规模数据时,无法利用现代CPU的SIMD宽度。
  • 内存访问低效:逐个元素的写回操作无法有效利用存储带宽。

在实际SQL运行过程中打印出Hash聚合数据中,发现以下数据分布特征:

  • isAssigned[]绝大多数是连续1
  • key计算是线性递增
  • slot指针是连续内存访问

因此属于典型的“顺序 run + 无数据依赖”场景,表明数据模式非常适合向量化。

三、优化思路

经分析,实际数据中isAssigned数组的“已分配”位(值为1)常连续分布。利用此特征,将算法改为:

  1. 向量化连续序列查找:使用SVE指令批量加载isAssigned数组,快速定位连续为1的序列(即一段需输出的槽位区间)。
  2. 向量化键值填充:对找到的连续序列,根据数据类型生成连续的键值向量,用SVE指令批量存储到输出向量。
  3. 向量化聚合状态保存:如需保存聚合状态,同样使用向量加载/存储指令批量处理指针数组。

向量化连续序列查找

通过lambda函数findNextRun实现,其返回一个连续为1的序列[runStart, runEnd)。

利用SVE的svbrkb(Break before)指令,可以快速定位向量中第一个出现的0或1,从而确定连续段的边界。

//核心逻辑:利用SVE指令快速跳过连续的0或找到连续的1
auto findNextRun = [&](uint64_t startIdx) -> std::pair<uint64_t, uint64_t> {
    svbool_t pg = svptrue_b8();
    // 阶段1:定位第一个1 (Run Start)
    while (startIdx < totalSize) {
        svuint8_t loaded = svld1_u8(pg, &isAssigned[startIdx]);
        svbool_t trueMask = svcmpne_n_u8(pg, loaded, 0); // 找到为1的位
        if (svptest_any(pg, trueMask)) {
            startIdx += svcntp_b8(pg, svbrkb_z(pg, trueMask)); // 跳过领先的0
            break;
        }
        startIdx += svcntb(); // 全为0,直接跳过整个向量
    }
    // 阶段2:定位从Start开始后的第一个0 (Run End)
    uint64_t runEnd = startIdx;
    while (runEnd < totalSize) {
        svuint8_t loaded = svld1_u8(pg, &isAssigned[runEnd]);
        svbool_t falseMask = svcmpeq_n_u8(pg, loaded, 0); // 找到为0的位
        if (svptest_any(pg, falseMask)) {
            runEnd += svcntp_b8(pg, svbrkb_z(pg, falseMask));
            return {startIdx, runEnd};
        }
        runEnd += svcntb();
    }
    return {startIdx, runEnd};
};

关键代码细节:

  • 使用svld1_u8一次加载32个isAssigned元素(SVE 256位向量)。
  • svcmpne_n_u8/svcmpeq_n_u8与svptest_any结合,快速判断向量中是否存在目标值,避免逐lane标量判断。
  • svbrkb_z指令结合svcntp_b8可一次性计算出到第一个1(或第一个0)的位置,极大加速连续序列边界定位。

向量化连续序列查找

对找到的连续序列,根据数据类型选择不同的SVE向量宽度进行算术运算和内存读写:

auto processRunBlock = [&](uint64_t runStart, uint64_t runCount) {
    // 根据数据类型选择向量宽度
    uint64_t sve_elements;
    svbool_t pg_full;
    if constexpr (sizeof(VECTOR_TYPE) == 2) {        // int16_t 等
        sve_elements = 16;  // 256位 / 16位 = 16个元素
        pg_full = svptrue_b16();
    } else if constexpr (sizeof(VECTOR_TYPE) == 4) { // int32_t, float 等
        sve_elements = 8;
        pg_full = svptrue_b32();
    } else if constexpr (sizeof(VECTOR_TYPE) == 8 || std::is_same_v<VECTOR_TYPE, Decimal128>) {
        sve_elements = 4;   // int64_t, double, Decimal128
        pg_full = svptrue_b64();
    }
    // 生成连续索引向量
    svint16_t index16 = sizeof(VECTOR_TYPE) == 2 ? svindex_s16(0, 1) : svundef_s16();
    svint32_t index32 = sizeof(VECTOR_TYPE) == 4 ? svindex_s32(0, 1) : svundef_s32();
    svint64_t index64 = (sizeof(VECTOR_TYPE) == 8 || std::is_same_v<VECTOR_TYPE, Decimal128>) 
                        ? svindex_s64(0, 1) : svundef_s64();
    // 处理完整向量块
    for (uint64_t i = 0; i < fullBlocks; i++) {
        const uint64_t blockStart = runStart + i * sve_elements;
        const int64_t vectorOffset = static_cast<int64_t>(blockStart) + baseValue;
        const uint64_t outputOffset = outRow + i * sve_elements;
        if constexpr (std::is_same_v<VECTOR_TYPE, Decimal128>) {
            // Decimal128 由低64位与高64位(符号位)组成
            svint64_t val = svadd_s64_z(pg_full, index64, svdup_s64(vectorOffset));
            svbool_t negative = svcmplt_s64(pg_full, val, svdup_s64(0));
            svint64_t highBits = svneg_s64_z(negative, svdup_s64(1)); // 负数高位为 -1,正数为 0
            svuint64_t lowBits = svreinterpret_u64_s64(val);
            // 使用 svst2 同时存储低64位与高64位
            svuint64x2_t decimal_data = svcreate2_u64(lowBits, svreinterpret_u64_s64(highBits));
            svst2_u64(pg_full, reinterpret_cast<uint64_t*>(&rawValues[outputOffset]), decimal_data);
        } else if constexpr (ELEMENT_SIZE == 2) {
            svint16_t val = svadd_s16_z(pg_full, index16, svdup_s16(vectorOffset));
            svst1_s16(pg_full, &rawValues[outputOffset], val);
         } else if constexpr (ELEMENT_SIZE == 4) {
            svint32_t val = svadd_s32_z(pg_full, index32, svdup_s32(vectorOffset));
            svst1_s32(pg_full, &rawValues[outputOffset], val);
         } else if constexpr (ELEMENT_SIZE == 8) {
            if constexpr (std::is_same_v<VECTOR_TYPE, double>) {
              svint64_t ival = svadd_s64_z(pg_full, index64, svdup_s64(vectorOffset));
              svfloat64_t fval = svcvt_f64_s64_z(pg_full, ival);
              svst1_f64(pg_full, &rawValues[outputOffset], fval);
            } else {
                svint64_t val = svadd_s64_z(pg_full, index64, svdup_s64(vectorOffset));
                svst1_s64(pg_full, &rawValues[outputOffset], val);
            }
        }
    // 处理尾部剩余元素(不足一个向量)
    if (tailElements > 0) {
        const uint64_t tailStart = runStart + fullBlocks * sve_elements;
        const uint64_t tailOffset = outRow + fullBlocks * sve_elements;
        for (uint64_t k = 0; k < tailElements; ++k) {
            const uint64_t actualIdx = tailStart + k;
            if constexpr (isDecimal128) {
                rawValues[tailOffset + k] = Decimal128(static_cast<int128_t>(actualIdx + baseValue));
            } else {
                rawValues[tailOffset + k] = static_cast<VECTOR_TYPE>(actualIdx + baseValue);
            }
        }
    }
    // 向量化保存聚合状态指针
    if constexpr (hasAgg) {
        svbool_t pg = svptrue_b64();
        const uint64_t sve_ptr_elements = svcntd();  // 获取可存放的64位元素个数
        for (uint64_t i = 0; i < fullPtrBlocks; i++) {
            const uint64_t blockStart = runStart + i * sve_ptr_elements;
            const uint64_t outOffset = outRow + i * sve_ptr_elements;
            svuint64_t slotPtrs = svld1_u64(pg, reinterpret_cast<const uint64_t*>(&slots[blockStart]));
            svst1_u64(pg, reinterpret_cast<uint64_t*>(&(*states)[outOffset]), slotPtrs);
        }
        // 处理尾部...
    }
};

关键代码细节:

  • 连续索引生成:使用svindex_*系列指令生成[0, 1, 2, ...]的连续索引,再配合svadd和svdup实现批量键值计算。
  • Decimal128特殊处理:由于Decimal128由两个64位部分lowBits和highBits组成,svcmplt_s64判断符号,svneg_s64_z根据符号生成高位(负数高位为-1,正数为0),最后用svst2_u64一条指令存储两个64位部分,高效完成Decimal128的构造与存储。
  • 状态指针拷贝:将slots[]中的指针视为uint64_t数组,用svld1_u64和svst1_u64批量加载和存储,大幅减少指针操作开销。

四、优化效果

性能收益

  1. 减少分支预测失败:原实现中while (not isAssigned[index])构成难以预测的条件循环,优化后通过向量化查找连续序列,将多个标量判断合并为少数向量比较与位测试,极大降低分支误预测率。
  2. 提高数据局部性:连续序列的批量处理使得内存访问模式从随机跳变转为连续加载/存储,充分利用缓存行,提高缓存命中率。
  3. 利用SVE向量化并行:键值计算:从逐元素标量计算改为整个向量块的并行计算。数据存储:用向量存储指令一次性写入多个元素。聚合状态:指针数组的批量搬运同样受益于向量化。
  4. 自适应向量宽度:通过模板特化与if constexpr,针对不同数据类型(16/32/64位)选择最优的SVE向量宽度与对应指令,确保资源利用率最大化。
  5. 处理尾部高效:对不足一个向量的尾部元素,回退到标量处理,避免过度填充带来的计算浪费。

适用场景

  • 最佳场景:isAssigned数组中“已分配”槽位集中、连续分布的情况。在实际哈希聚合中,由于哈希值经映射后相对集中,此条件常满足。
  • 退化场景:若isAssigned极度稀疏(例如全为0),查找逻辑仍会高效跳过整个向量块,性能优于标量处理。

该优化通过将不规则的条件遍历转为批量连续序列处理,充分发挥SVE向量指令的并行能力,在哈希聚合等数据密集场景中可带来显著的性能提升。