SVE ArrayMap GetOutput算子连续序列查找与向量化填充
发表于 2026/04/15
0
作者 | 刘洋
一、函数功能
功能描述
TraverseArrayMapGetOutput是Hash聚合结果下沉阶段的核心算子,负责:
- 遍历isAssigned数组,识别已分配(值为1)的槽位。
- 将槽位索引转换为实际键值actualKey = index + minValue - 1,并填充到输出向量groupVector。
- 若开启聚合(hasAgg),同步拷贝中间状态指针slots[index]到states容器。

接口说明
template<bool hasAgg, typename VECTOR_TYPE>
void TraverseArrayMapGetOutput(
vec::HaasBaseVectorImpl* groupVector,
std::vector<uint8_t*>* states,
int64_t minValue
);参数说明:
参数 | 说明 |
groupVector | 输出向量,存储所有分组键。 |
states | 聚合中间状态指针数组(仅hasAgg=true有效)。 |
minValue | ArrayMap的最小key偏移。 |
二、原始实现分析
while (outputRows) {
while (!isAssigned[index]) {
++index;
}
if (index != 0) {
groupVector->SetValue(lambdaRowIndex, index + minValue - 1);
} else {
groupVector->SetNull(lambdaRowIndex);
}
if constexpr (hasAgg) {
(*states)[lambdaRowIndex] = reinterpret_cast<uint8_t*>(slots[index]);
}
++lambdaRowIndex;
++index;
--outputRows;
}存在问题:
- 分支预测开销:原代码使用while(not isAssigned[index])进行逐个查找。由于isAssigned的分布取决于输入数据的哈希碰撞情况,分支高度随机,导致严重的流水线冲刷。
- 标量操作瓶颈:键值计算(index + minValue - 1)和状态指针拷贝均为标量操作。在处理大规模数据时,无法利用现代CPU的SIMD宽度。
- 内存访问低效:逐个元素的写回操作无法有效利用存储带宽。
在实际SQL运行过程中打印出Hash聚合数据中,发现以下数据分布特征:
- isAssigned[]绝大多数是连续1
- key计算是线性递增
- slot指针是连续内存访问
因此属于典型的“顺序 run + 无数据依赖”场景,表明数据模式非常适合向量化。
三、优化思路
经分析,实际数据中isAssigned数组的“已分配”位(值为1)常连续分布。利用此特征,将算法改为:
- 向量化连续序列查找:使用SVE指令批量加载isAssigned数组,快速定位连续为1的序列(即一段需输出的槽位区间)。
- 向量化键值填充:对找到的连续序列,根据数据类型生成连续的键值向量,用SVE指令批量存储到输出向量。
- 向量化聚合状态保存:如需保存聚合状态,同样使用向量加载/存储指令批量处理指针数组。
向量化连续序列查找
通过lambda函数findNextRun实现,其返回一个连续为1的序列[runStart, runEnd)。
利用SVE的svbrkb(Break before)指令,可以快速定位向量中第一个出现的0或1,从而确定连续段的边界。
//核心逻辑:利用SVE指令快速跳过连续的0或找到连续的1
auto findNextRun = [&](uint64_t startIdx) -> std::pair<uint64_t, uint64_t> {
svbool_t pg = svptrue_b8();
// 阶段1:定位第一个1 (Run Start)
while (startIdx < totalSize) {
svuint8_t loaded = svld1_u8(pg, &isAssigned[startIdx]);
svbool_t trueMask = svcmpne_n_u8(pg, loaded, 0); // 找到为1的位
if (svptest_any(pg, trueMask)) {
startIdx += svcntp_b8(pg, svbrkb_z(pg, trueMask)); // 跳过领先的0
break;
}
startIdx += svcntb(); // 全为0,直接跳过整个向量
}
// 阶段2:定位从Start开始后的第一个0 (Run End)
uint64_t runEnd = startIdx;
while (runEnd < totalSize) {
svuint8_t loaded = svld1_u8(pg, &isAssigned[runEnd]);
svbool_t falseMask = svcmpeq_n_u8(pg, loaded, 0); // 找到为0的位
if (svptest_any(pg, falseMask)) {
runEnd += svcntp_b8(pg, svbrkb_z(pg, falseMask));
return {startIdx, runEnd};
}
runEnd += svcntb();
}
return {startIdx, runEnd};
};关键代码细节:
- 使用svld1_u8一次加载32个isAssigned元素(SVE 256位向量)。
- svcmpne_n_u8/svcmpeq_n_u8与svptest_any结合,快速判断向量中是否存在目标值,避免逐lane标量判断。
- svbrkb_z指令结合svcntp_b8可一次性计算出到第一个1(或第一个0)的位置,极大加速连续序列边界定位。
向量化连续序列查找
对找到的连续序列,根据数据类型选择不同的SVE向量宽度进行算术运算和内存读写:
auto processRunBlock = [&](uint64_t runStart, uint64_t runCount) {
// 根据数据类型选择向量宽度
uint64_t sve_elements;
svbool_t pg_full;
if constexpr (sizeof(VECTOR_TYPE) == 2) { // int16_t 等
sve_elements = 16; // 256位 / 16位 = 16个元素
pg_full = svptrue_b16();
} else if constexpr (sizeof(VECTOR_TYPE) == 4) { // int32_t, float 等
sve_elements = 8;
pg_full = svptrue_b32();
} else if constexpr (sizeof(VECTOR_TYPE) == 8 || std::is_same_v<VECTOR_TYPE, Decimal128>) {
sve_elements = 4; // int64_t, double, Decimal128
pg_full = svptrue_b64();
}
// 生成连续索引向量
svint16_t index16 = sizeof(VECTOR_TYPE) == 2 ? svindex_s16(0, 1) : svundef_s16();
svint32_t index32 = sizeof(VECTOR_TYPE) == 4 ? svindex_s32(0, 1) : svundef_s32();
svint64_t index64 = (sizeof(VECTOR_TYPE) == 8 || std::is_same_v<VECTOR_TYPE, Decimal128>)
? svindex_s64(0, 1) : svundef_s64();
// 处理完整向量块
for (uint64_t i = 0; i < fullBlocks; i++) {
const uint64_t blockStart = runStart + i * sve_elements;
const int64_t vectorOffset = static_cast<int64_t>(blockStart) + baseValue;
const uint64_t outputOffset = outRow + i * sve_elements;
if constexpr (std::is_same_v<VECTOR_TYPE, Decimal128>) {
// Decimal128 由低64位与高64位(符号位)组成
svint64_t val = svadd_s64_z(pg_full, index64, svdup_s64(vectorOffset));
svbool_t negative = svcmplt_s64(pg_full, val, svdup_s64(0));
svint64_t highBits = svneg_s64_z(negative, svdup_s64(1)); // 负数高位为 -1,正数为 0
svuint64_t lowBits = svreinterpret_u64_s64(val);
// 使用 svst2 同时存储低64位与高64位
svuint64x2_t decimal_data = svcreate2_u64(lowBits, svreinterpret_u64_s64(highBits));
svst2_u64(pg_full, reinterpret_cast<uint64_t*>(&rawValues[outputOffset]), decimal_data);
} else if constexpr (ELEMENT_SIZE == 2) {
svint16_t val = svadd_s16_z(pg_full, index16, svdup_s16(vectorOffset));
svst1_s16(pg_full, &rawValues[outputOffset], val);
} else if constexpr (ELEMENT_SIZE == 4) {
svint32_t val = svadd_s32_z(pg_full, index32, svdup_s32(vectorOffset));
svst1_s32(pg_full, &rawValues[outputOffset], val);
} else if constexpr (ELEMENT_SIZE == 8) {
if constexpr (std::is_same_v<VECTOR_TYPE, double>) {
svint64_t ival = svadd_s64_z(pg_full, index64, svdup_s64(vectorOffset));
svfloat64_t fval = svcvt_f64_s64_z(pg_full, ival);
svst1_f64(pg_full, &rawValues[outputOffset], fval);
} else {
svint64_t val = svadd_s64_z(pg_full, index64, svdup_s64(vectorOffset));
svst1_s64(pg_full, &rawValues[outputOffset], val);
}
}
// 处理尾部剩余元素(不足一个向量)
if (tailElements > 0) {
const uint64_t tailStart = runStart + fullBlocks * sve_elements;
const uint64_t tailOffset = outRow + fullBlocks * sve_elements;
for (uint64_t k = 0; k < tailElements; ++k) {
const uint64_t actualIdx = tailStart + k;
if constexpr (isDecimal128) {
rawValues[tailOffset + k] = Decimal128(static_cast<int128_t>(actualIdx + baseValue));
} else {
rawValues[tailOffset + k] = static_cast<VECTOR_TYPE>(actualIdx + baseValue);
}
}
}
// 向量化保存聚合状态指针
if constexpr (hasAgg) {
svbool_t pg = svptrue_b64();
const uint64_t sve_ptr_elements = svcntd(); // 获取可存放的64位元素个数
for (uint64_t i = 0; i < fullPtrBlocks; i++) {
const uint64_t blockStart = runStart + i * sve_ptr_elements;
const uint64_t outOffset = outRow + i * sve_ptr_elements;
svuint64_t slotPtrs = svld1_u64(pg, reinterpret_cast<const uint64_t*>(&slots[blockStart]));
svst1_u64(pg, reinterpret_cast<uint64_t*>(&(*states)[outOffset]), slotPtrs);
}
// 处理尾部...
}
};关键代码细节:
- 连续索引生成:使用svindex_*系列指令生成[0, 1, 2, ...]的连续索引,再配合svadd和svdup实现批量键值计算。
- Decimal128特殊处理:由于Decimal128由两个64位部分lowBits和highBits组成,svcmplt_s64判断符号,svneg_s64_z根据符号生成高位(负数高位为-1,正数为0),最后用svst2_u64一条指令存储两个64位部分,高效完成Decimal128的构造与存储。
- 状态指针拷贝:将slots[]中的指针视为uint64_t数组,用svld1_u64和svst1_u64批量加载和存储,大幅减少指针操作开销。
四、优化效果
性能收益
- 减少分支预测失败:原实现中while (not isAssigned[index])构成难以预测的条件循环,优化后通过向量化查找连续序列,将多个标量判断合并为少数向量比较与位测试,极大降低分支误预测率。
- 提高数据局部性:连续序列的批量处理使得内存访问模式从随机跳变转为连续加载/存储,充分利用缓存行,提高缓存命中率。
- 利用SVE向量化并行:键值计算:从逐元素标量计算改为整个向量块的并行计算。数据存储:用向量存储指令一次性写入多个元素。聚合状态:指针数组的批量搬运同样受益于向量化。
- 自适应向量宽度:通过模板特化与if constexpr,针对不同数据类型(16/32/64位)选择最优的SVE向量宽度与对应指令,确保资源利用率最大化。
- 处理尾部高效:对不足一个向量的尾部元素,回退到标量处理,避免过度填充带来的计算浪费。
适用场景
- 最佳场景:isAssigned数组中“已分配”槽位集中、连续分布的情况。在实际哈希聚合中,由于哈希值经映射后相对集中,此条件常满足。
- 退化场景:若isAssigned极度稀疏(例如全为0),查找逻辑仍会高效跳过整个向量块,性能优于标量处理。
该优化通过将不规则的条件遍历转为批量连续序列处理,充分发挥SVE向量指令的并行能力,在哈希聚合等数据密集场景中可带来显著的性能提升。


