利用NEON指令对NewPForDelta解压算法进行向量化改造
发表于 2026/04/15
0
作者 | 刘洋
一、概述
KNewPForDelta基于Google开源Pfordelta算法,以及开源Compression进行NewPfordelta算法的实现与优化,主要用于搜索引擎、推荐系统等需要快速处理大规模有序整数序列(如文档ID列表、词频位置)的场景。KNewPForDelta针对ARM架构进行深度优化,利用NEON SIMD指令集提升解压缩性能。
在应用场景中目前是以解压缩为主,压缩是在数据更新的时候用的,属于离线部分,解压缩是在数据查询访问的时候用的,属于在线业务,对延迟敏感。关键优化如下:
- 使用NEON指令实现高效位提取和移位操作
- 运用NEON数据重组技术处理跨字边界数据
- 针对不同位宽采用差异化向量化策略
二、算法核心思想
KNewPForDelta是在PForDelta算法基础上的改进。PForDelta的基础思想首次在论文《Super-Scalar RAM-CPU Cache Compression》中提出,引入了分块压缩、异常值分离和差分编码。其核心是对一个数据块(chunk,例如128个整数),通过遍历确定一个参数b,使得大部分数据(如90%)可以用b个比特存储,剩余少数异常值单独存储。压缩时,异常值的位置偏移量和值被记录在块末尾。
例如,对于数列23, 41, 8, 12, 30, 68, 18, 45, 21, 9,...,若确定b=5(即大部分数小于32),则正常值用5比特存储,异常值(如41, 68, 45)用32比特存储在末尾,并在正常值数组中记录异常值的位置偏移。
KNewPForDelta对此进行了关键改进(在论文《Inverted Index Compression and Query Processing with Optimized》中提出),不再在正常值数组中存储异常值的位置偏移,而是存储异常值的低b位。异常值的高位部分及其在块中的索引被存储在单独的两个数组中。此外,异常值的存储位数通过参数t动态选择(8、16或32比特),以进一步压缩空间。这种设计简化了解压时的数据重组流程,更有利于向量化优化。
KNewPForDelta压缩后的数据示例如下:

三、部分unpack解压函数优化示例
unpack16位提取优化(VLD1/VSHR/VAND/VZIP)
unpack16函数用于从压缩数据中提取16位宽的整数。在PForDelta算法中,当参数b=16时,每个压缩数据块使用16位存储一个数值。输入w是一个连续的比特流,其中每16位存储一个压缩的整数值,但为了对齐,实际上在w数组中,每32位存储了两个16位整数(高16位和低16位各一个)。unpack16的任务是将这些16位整数解压缩为32位的无符号整数,存储在输出数组p中。
原始标量代码需要逐个处理高低16位:
void unpack16(unsigned int* p, unsigned int* w, int BS) {
int i;
for (i = 0; i < BS; i += 32, p += 32, w += 16) {
p[0] = (w[0] >> 16);
p[1] = (w[0]) & 65535;
p[2] = (w[1] >> 16);
p[3] = (w[1]) & 65535;
// ... 重复16次类似操作
p[30] = (w[15] >> 16);
p[31] = (w[15]) & 65535;
}
}NEON优化代码使用向量化位提取和交错存储:
#include <arm_neon.h>
#define LANES_PER_BLOCK 32
void unpack16(unsigned int* p, unsigned int* w, int BS) {
static const uint32_t MASK = (1U << 16) - 1;
const uint32x4_t MASK_VEC = vdupq_n_u32(MASK);
for (size_t i = 0; i < BS; i += LANES_PER_BLOCK, p += LANES_PER_BLOCK, w += 16) {
// 使用vld1q_u32加载4个32位输入字
uint32x4_t input0 = vld1q_u32(w);
// 使用vshrq_n_u32提取高16位
uint32x4_t high0 = vshrq_n_u32(input0, 16);
// 使用vandq_u32提取低16位
uint32x4_t low0 = vandq_u32(input0, MASK_VEC);
// 使用vzipq_u32交错重组高16位和低16位
uint32x4x2_t inter0 = vzipq_u32(high0, low0);
// 使用vst1q_u32_x2一次性存储8个32位输出值
vst1q_u32_x2(p, inter0);
uint32x4_t input1 = vld1q_u32(w + 4);
uint32x4_t high1 = vshrq_n_u32(input1, 16);
uint32x4_t low1 = vandq_u32(input1, MASK_VEC);
uint32x4x2_t inter1 = vzipq_u32(high1, low1);
vst1q_u32_x2(p + 8, inter1);
// ... 剩余处理类似
}
}关键优化技术:
- 位操作向量化:使用vshrq_n_u32和vandq_u32实现并行移位和掩码操作
- 数据重组技术:使用vzipq_u32将高16位和低16位结果交错排列
- 向量存储:使用vst1q_u32批量存储输出结果
unpack20复杂位重组(VEXT/VGETQ_LANE_U32)
unpack20函数用于从压缩数据中提取20位宽的整数。这是PForDelta算法中比较复杂的解压情况,因为20位不是2的幂,也不是字节对齐的。输入w是一个连续的比特流,其中每20位存储一个压缩的整数值。由于20位不满足32位对齐,一个20位的值可能跨越两个32位字。unpack20的任务是从这些位流中准确提取每个20位整数,并将其解压缩为32位的无符号整数,存储在输出数组p中。这是一个典型的位打包和解包问题,需要复杂的移位和掩码操作。
原始标量代码需要复杂的位拼接操作:
void unpack20(unsigned int* p, unsigned int* w, int BS) {
int i;
for (i = 0; i < BS; i += 32, p += 32, w += 20) {
p[0] = (w[0] >> 12);
p[1] = (w[0] << 8) & ((1 << 20) - 1);
p[1] |= (w[1] >> 24);
p[2] = (w[1] >> 4) & ((1 << 20) - 1);
// ... 复杂的位拼接操作
p[31] = (w[19]) & ((1 << 20) - 1);
}
}NEON优化代码使用向量加载和标量位操作结合:
void unpack20_neon(unsigned int* p, unsigned int* w, int BS) {
static const uint32_t MASK = (1U << 20) - 1;
for (size_t i = 0; i < BS; i += LANES_PER_BLOCK, p += LANES_PER_BLOCK, w += 20) {
// 使用vld1q_u32向量化加载20个输入字
uint32x4_t v0 = vld1q_u32(w);
uint32x4_t v1 = vld1q_u32(w + 4);
uint32x4_t v2 = vld1q_u32(w + 8);
uint32x4_t v3 = vld1q_u32(w + 12);
uint32x4_t v4 = vld1q_u32(w + 16);
// 使用vgetq_lane_u32提取向量中的标量值进行位操作
p[0] = (vgetq_lane_u32(v0, 0) >> 12) & MASK;
p[1] = ((vgetq_lane_u32(v0, 0) << 8) | (vgetq_lane_u32(v0, 1) >> 24)) & MASK;
p[2] = (vgetq_lane_u32(v0, 1) >> 4) & MASK;
p[3] = ((vgetq_lane_u32(v0, 1) << 16) | (vgetq_lane_u32(v0, 2) >> 16)) & MASK;
p[4] = ((vgetq_lane_u32(v0, 2) << 4) | (vgetq_lane_u32(v0, 3) >> 28)) & MASK;
p[5] = (vgetq_lane_u32(v0, 3) >> 8) & MASK;
p[6] = ((vgetq_lane_u32(v0, 3) << 12) | (vgetq_lane_u32(v1, 0) >> 20)) & MASK;
p[7] = vgetq_lane_u32(v1, 0) & MASK;
// 使用vextq_u32进行跨字数据重组
uint32x4_t v5_8 = vextq_u32(v1, v2, 1);
uint32_t w9 = vgetq_lane_u32(v2, 1);
p[8] = (vgetq_lane_u32(v5_8, 0) >> 12) & MASK;
p[9] = ((vgetq_lane_u32(v5_8, 0) << 8) | (vgetq_lane_u32(v5_8, 1) >> 24)) & MASK;
p[10] = (vgetq_lane_u32(v5_8, 1) >> 4) & MASK;
p[11] = ((vgetq_lane_u32(v5_8, 1) << 16) | (vgetq_lane_u32(v5_8, 2) >> 16)) & MASK;
p[12] = ((vgetq_lane_u32(v5_8, 2) << 4) | (vgetq_lane_u32(v5_8, 3) >> 28)) & MASK;
p[13] = (vgetq_lane_u32(v5_8, 3) >> 8) & MASK;
p[14] = ((vgetq_lane_u32(v5_8, 3) << 12) | (w9 >> 20)) & MASK;
p[15] = w9 & MASK;
// ... 剩余处理类似
}
}关键优化技术:
- 数据重组:使用vextq_u32实现跨字边界的数据重组
- 标量提取:使用vgetq_lane_u32从向量寄存器提取单个元素
- 位操作优化:保持标量位操作的高效性,结合向量加载优势
unpack32直接内存拷贝(VLD1Q_U32_X4/VST1Q_U32_X4)
unpack32函数用于从压缩数据中提取32位宽的整数。在PForDelta算法中,当参数b=32时,每个压缩数据块使用完整的32位存储一个数值,实际上没有进行压缩,只是简单存储。这种情况下,unpack32的任务就是将输入数组w中的32位整数直接拷贝到输出数组p中。虽然这是简单的复制操作,但通过NEON向量化技术仍然可以优化内存拷贝性能。
原始标量代码为简单内存拷贝:
void unpack32(unsigned int* p, unsigned int* w, int BS) {
int i;
for (i = 0; i < BS; i += 32, p += 32, w += 32) {
p[0] = w[0];
p[1] = w[1];
// ... 直接复制32个值
p[31] = w[31];
}
}NEON优化代码使用批量加载存储:
void unpack32_neon(unsigned int* p, unsigned int* w, int BS) {
for (size_t i = 0; i < BS; i += LANES_PER_BLOCK, p += LANES_PER_BLOCK, w += LANES_PER_BLOCK) {
// 使用vld1q_u32_x4一次性加载16个32位整数
uint32x4x4_t block0 = vld1q_u32_x4(w);
uint32x4x4_t block1 = vld1q_u32_x4(w + 16);
// 使用vst1q_u32_x4一次性存储16个32位整数
vst1q_u32_x4(p, block0);
vst1q_u32_x4(p + 16, block1);
}
}关键优化技术:
内存访问优化:利用NEON的128位宽内存操作批量加载4个向量寄存器,减少内存访问次数
四、关键NEON指令总结
指令类别 | NEON指令 | 功能描述 | 应用场景 |
|---|---|---|---|
数据加载 | vld1q_u32 | 加载4个32位整数到128位寄存器 | 所有unpack函数的数据输入 |
数据存储 | vst1q_u32 | 从128位寄存器存储4个32位整数 | 所有unpack函数的结果输出 |
移位操作 | vshrq_n_u32 | 无符号整数右移指定位数 | unpack16的高16位提取 |
位与操作 | vandq_u32 | 向量按位与操作 | 掩码应用和位提取 |
数据重组 | vzipq_u32 | 交错重组两个向量的元素 | unpack16的高低16位交错 |
数据提取 | vextq_u32 | 从两个向量提取连续元素 | unpack20的跨字数据重组 |
标量提取 | vgetq_lane_u32 | 从向量提取指定位置的标量值 | unpack20的复杂位操作 |
五、可执行测试代码
#include <iostream>
#include <cstring>
#include <chrono>
#define LANES_PER_BLOCK 32
// NEON优化的unpack16实现
void unpack16_neon(unsigned int* p, unsigned int* w, int BS) {
static const uint32_t MASK = (1U << 16) - 1;
const uint32x4_t MASK_VEC = vdupq_n_u32(MASK);
for (size_t i = 0; i < BS; i += LANES_PER_BLOCK, p += LANES_PER_BLOCK, w += 16) {
uint32x4_t input0 = vld1q_u32(w);
uint32x4_t high0 = vshrq_n_u32(input0, 16);
uint32x4_t low0 = vandq_u32(input0, MASK_VEC);
uint32x4x2_t inter0 = vzipq_u32(high0, low0);
vst1q_u32_x2(p, inter0);
uint32x4_t input1 = vld1q_u32(w + 4);
uint32x4_t high1 = vshrq_n_u32(input1, 16);
uint32x4_t low1 = vandq_u32(input1, MASK_VEC);
uint32x4x2_t inter1 = vzipq_u32(high1, low1);
vst1q_u32_x2(p + 8, inter1);
uint32x4_t input2 = vld1q_u32(w + 8);
uint32x4_t high2 = vshrq_n_u32(input2, 16);
uint32x4_t low2 = vandq_u32(input2, MASK_VEC);
uint32x4x2_t inter2 = vzipq_u32(high2, low2);
vst1q_u32_x2(p + 16, inter2);
uint32x4_t input3 = vld1q_u32(w + 12);
uint32x4_t high3 = vshrq_n_u32(input3, 16);
uint32x4_t low3 = vandq_u32(input3, MASK_VEC);
uint32x4x2_t inter3 = vzipq_u32(high3, low3);
vst1q_u32_x2(p + 24, inter3);
}
}
// 标量版本用于对比
void unpack16_scalar(unsigned int* p, unsigned int* w, int BS) {
for (int i = 0; i < BS; i += 32, p += 32, w += 16) {
p[0] = (w[0] >> 16);
p[1] = (w[0]) & 65535;
p[2] = (w[1] >> 16);
p[3] = (w[1]) & 65535;
p[4] = (w[2] >> 16);
p[5] = (w[2]) & 65535;
p[6] = (w[3] >> 16);
p[7] = (w[3]) & 65535;
p[8] = (w[4] >> 16);
p[9] = (w[4]) & 65535;
p[10] = (w[5] >> 16);
p[11] = (w[5]) & 65535;
p[12] = (w[6] >> 16);
p[13] = (w[6]) & 65535;
p[14] = (w[7] >> 16);
p[15] = (w[7]) & 65535;
p[16] = (w[8] >> 16);
p[17] = (w[8]) & 65535;
p[18] = (w[9] >> 16);
p[19] = (w[9]) & 65535;
p[20] = (w[10] >> 16);
p[21] = (w[10]) & 65535;
p[22] = (w[11] >> 16);
p[23] = (w[11]) & 65535;
p[24] = (w[12] >> 16);
p[25] = (w[12]) & 65535;
p[26] = (w[13] >> 16);
p[27] = (w[13]) & 65535;
p[28] = (w[14] >> 16);
p[29] = (w[14]) & 65535;
p[30] = (w[15] >> 16);
p[31] = (w[15]) & 65535;
}
}
int main() {
const int BS = 32;
alignas(16) unsigned int input[16] = {
0x00020001, 0x00040003, 0x00060005, 0x00080007,
0x000A0009, 0x000C000B, 0x000E000D, 0x0010000F,
0x00120011, 0x00140013, 0x00160015, 0x00180017,
0x001A0019, 0x001C001B, 0x001E001D, 0x0020001F
};
alignas(16) unsigned int output_neon[32] = {0};
alignas(16) unsigned int output_scalar[32] = {0};
// 测试NEON版本
auto start_neon = std::chrono::high_resolution_clock::now();
for (int i = 0; i < 1000000; ++i) {
unpack16_neon(output_neon, input, BS);
}
auto end_neon = std::chrono::high_resolution_clock::now();
auto duration_neon = std::chrono::duration_cast<std::chrono::microseconds>(end_neon - start_neon);
// 测试标量版本
auto start_scalar = std::chrono::high_resolution_clock::now();
for (int i = 0; i < 1000000; ++i) {
unpack16_scalar(output_scalar, input, BS);
}
auto end_scalar = std::chrono::high_resolution_clock::now();
auto duration_scalar = std::chrono::duration_cast<std::chrono::microseconds>(end_scalar - start_scalar);
std::cout << "NEON版本耗时: " << duration_neon.count() << " 微秒" << std::endl;
std::cout << "标量版本耗时: " << duration_scalar.count() << " 微秒" << std::endl;
std::cout << "加速比: " << (double)duration_scalar.count() / duration_neon.count() << "x" << std::endl;
// 验证结果正确性
bool correct = true;
for (int i = 0; i < 32; ++i) {
if (output_neon[i] != output_scalar[i]) {
correct = false;
break;
}
}
std::cout << "结果正确性: " << (correct ? "通过" : "失败") << std::endl;
return 0;
}六、基准测试结果
测试环境 | 标量版本耗时 | NEON优化版本耗时 | 性能提升 |
|---|---|---|---|
| CPU:鲲鹏920新型号处理操作系统:openEuler 22.03 LTS SP3编译器:GCC 10.3.1 | 6542微秒 | 2596微秒 | 2.52倍 |
测试代码中对32个整数的数据块进行100万次解压操作,NEON优化版本相比标量版本性能提升2.52倍。测试验证了NEON向量化优化的正确性和性能优势。
测试方法:
# 绑定到指定CPU核运行测试
taskset -c 0 ./newpfordelta_neon

