开发者
利用NEON指令对NewPForDelta解压算法进行向量化改造

利用NEON指令对NewPForDelta解压算法进行向量化改造

指令集

发表于 2026/04/15

0

作者 | 刘洋

一、概述

KNewPForDelta基于Google开源Pfordelta算法,以及开源Compression进行NewPfordelta算法的实现与优化,主要用于搜索引擎、推荐系统等需要快速处理大规模有序整数序列(如文档ID列表、词频位置)的场景。KNewPForDelta针对ARM架构进行深度优化,利用NEON SIMD指令集提升解压缩性能。

在应用场景中目前是以解压缩为主,压缩是在数据更新的时候用的,属于离线部分,解压缩是在数据查询访问的时候用的,属于在线业务,对延迟敏感。关键优化如下:

  • 使用NEON指令实现高效位提取和移位操作
  • 运用NEON数据重组技术处理跨字边界数据
  • 针对不同位宽采用差异化向量化策略

二、算法核心思想

KNewPForDelta是在PForDelta算法基础上的改进。PForDelta的基础思想首次在论文《Super-Scalar RAM-CPU Cache Compression》中提出,引入了分块压缩、异常值分离和差分编码。其核心是对一个数据块(chunk,例如128个整数),通过遍历确定一个参数b,使得大部分数据(如90%)可以用b个比特存储,剩余少数异常值单独存储。压缩时,异常值的位置偏移量和值被记录在块末尾。

例如,对于数列23, 41, 8, 12, 30, 68, 18, 45, 21, 9,...,若确定b=5(即大部分数小于32),则正常值用5比特存储,异常值(如41, 68, 45)用32比特存储在末尾,并在正常值数组中记录异常值的位置偏移。

KNewPForDelta对此进行了关键改进(在论文《Inverted Index Compression and Query Processing with Optimized》中提出),不再在正常值数组中存储异常值的位置偏移,而是存储异常值的低b位。异常值的高位部分及其在块中的索引被存储在单独的两个数组中。此外,异常值的存储位数通过参数t动态选择(8、16或32比特),以进一步压缩空间。这种设计简化了解压时的数据重组流程,更有利于向量化优化。

KNewPForDelta压缩后的数据示例如下:

三、部分unpack解压函数优化示例

unpack16位提取优化(VLD1/VSHR/VAND/VZIP)

unpack16函数用于从压缩数据中提取16位宽的整数。在PForDelta算法中,当参数b=16时,每个压缩数据块使用16位存储一个数值。输入w是一个连续的比特流,其中每16位存储一个压缩的整数值,但为了对齐,实际上在w数组中,每32位存储了两个16位整数(高16位和低16位各一个)。unpack16的任务是将这些16位整数解压缩为32位的无符号整数,存储在输出数组p中。

原始标量代码需要逐个处理高低16位:

void unpack16(unsigned int* p, unsigned int* w, int BS) {
  int i;
  for (i = 0; i < BS; i += 32, p += 32, w += 16) {
    p[0] = (w[0] >> 16);
    p[1] = (w[0]) & 65535;
    p[2] = (w[1] >> 16);
    p[3] = (w[1]) & 65535;
    // ... 重复16次类似操作
    p[30] = (w[15] >> 16);
    p[31] = (w[15]) & 65535;
  }
}

NEON优化代码使用向量化位提取和交错存储:

#include <arm_neon.h>
#define LANES_PER_BLOCK 32
void unpack16(unsigned int* p, unsigned int* w, int BS) {
    static const uint32_t MASK = (1U << 16) - 1;
    const uint32x4_t MASK_VEC = vdupq_n_u32(MASK);
    for (size_t i = 0; i < BS; i += LANES_PER_BLOCK, p += LANES_PER_BLOCK, w += 16) {
        // 使用vld1q_u32加载4个32位输入字
        uint32x4_t input0 = vld1q_u32(w);
        // 使用vshrq_n_u32提取高16位
        uint32x4_t high0 = vshrq_n_u32(input0, 16);
        // 使用vandq_u32提取低16位
        uint32x4_t low0 = vandq_u32(input0, MASK_VEC);
        // 使用vzipq_u32交错重组高16位和低16位
        uint32x4x2_t inter0 = vzipq_u32(high0, low0);
        // 使用vst1q_u32_x2一次性存储8个32位输出值
        vst1q_u32_x2(p, inter0);
        uint32x4_t input1 = vld1q_u32(w + 4);
        uint32x4_t high1 = vshrq_n_u32(input1, 16);
        uint32x4_t low1 = vandq_u32(input1, MASK_VEC);
        uint32x4x2_t inter1 = vzipq_u32(high1, low1);
        vst1q_u32_x2(p + 8, inter1);
        // ... 剩余处理类似
    }
}

关键优化技术:

  • 位操作向量化:使用vshrq_n_u32和vandq_u32实现并行移位和掩码操作
  • 数据重组技术:使用vzipq_u32将高16位和低16位结果交错排列
  • 向量存储:使用vst1q_u32批量存储输出结果

unpack20复杂位重组(VEXT/VGETQ_LANE_U32)

unpack20函数用于从压缩数据中提取20位宽的整数。这是PForDelta算法中比较复杂的解压情况,因为20位不是2的幂,也不是字节对齐的。输入w是一个连续的比特流,其中每20位存储一个压缩的整数值。由于20位不满足32位对齐,一个20位的值可能跨越两个32位字。unpack20的任务是从这些位流中准确提取每个20位整数,并将其解压缩为32位的无符号整数,存储在输出数组p中。这是一个典型的位打包和解包问题,需要复杂的移位和掩码操作。

原始标量代码需要复杂的位拼接操作:

void unpack20(unsigned int* p, unsigned int* w, int BS) {
  int i;
  for (i = 0; i < BS; i += 32, p += 32, w += 20) {
    p[0] = (w[0] >> 12);
    p[1] = (w[0] << 8) & ((1 << 20) - 1);
    p[1] |= (w[1] >> 24);
    p[2] = (w[1] >> 4) & ((1 << 20) - 1);
    // ... 复杂的位拼接操作
    p[31] = (w[19]) & ((1 << 20) - 1);
  }
}

NEON优化代码使用向量加载和标量位操作结合:

void unpack20_neon(unsigned int* p, unsigned int* w, int BS) {
    static const uint32_t MASK = (1U << 20) - 1;
    for (size_t i = 0; i < BS; i += LANES_PER_BLOCK, p += LANES_PER_BLOCK, w += 20) {
        // 使用vld1q_u32向量化加载20个输入字
        uint32x4_t v0 = vld1q_u32(w);
        uint32x4_t v1 = vld1q_u32(w + 4);
        uint32x4_t v2 = vld1q_u32(w + 8);
        uint32x4_t v3 = vld1q_u32(w + 12);
        uint32x4_t v4 = vld1q_u32(w + 16);
        // 使用vgetq_lane_u32提取向量中的标量值进行位操作
        p[0] = (vgetq_lane_u32(v0, 0) >> 12) & MASK;
        p[1] = ((vgetq_lane_u32(v0, 0) << 8) | (vgetq_lane_u32(v0, 1) >> 24)) & MASK;
        p[2] = (vgetq_lane_u32(v0, 1) >> 4) & MASK;
        p[3] = ((vgetq_lane_u32(v0, 1) << 16) | (vgetq_lane_u32(v0, 2) >> 16)) & MASK;
        p[4] = ((vgetq_lane_u32(v0, 2) << 4) | (vgetq_lane_u32(v0, 3) >> 28)) & MASK;
        p[5] = (vgetq_lane_u32(v0, 3) >> 8) & MASK;
        p[6] = ((vgetq_lane_u32(v0, 3) << 12) | (vgetq_lane_u32(v1, 0) >> 20)) & MASK;
        p[7] = vgetq_lane_u32(v1, 0) & MASK;
        // 使用vextq_u32进行跨字数据重组
        uint32x4_t v5_8 = vextq_u32(v1, v2, 1);
        uint32_t w9 = vgetq_lane_u32(v2, 1);
        p[8] = (vgetq_lane_u32(v5_8, 0) >> 12) & MASK;
        p[9] = ((vgetq_lane_u32(v5_8, 0) << 8) | (vgetq_lane_u32(v5_8, 1) >> 24)) & MASK;
        p[10] = (vgetq_lane_u32(v5_8, 1) >> 4) & MASK;
        p[11] = ((vgetq_lane_u32(v5_8, 1) << 16) | (vgetq_lane_u32(v5_8, 2) >> 16)) & MASK;
        p[12] = ((vgetq_lane_u32(v5_8, 2) << 4) | (vgetq_lane_u32(v5_8, 3) >> 28)) & MASK;
        p[13] = (vgetq_lane_u32(v5_8, 3) >> 8) & MASK;
        p[14] = ((vgetq_lane_u32(v5_8, 3) << 12) | (w9 >> 20)) & MASK;
        p[15] = w9 & MASK;
        // ... 剩余处理类似
    }
}

关键优化技术:

  • 数据重组:使用vextq_u32实现跨字边界的数据重组
  • 标量提取:使用vgetq_lane_u32从向量寄存器提取单个元素
  • 位操作优化:保持标量位操作的高效性,结合向量加载优势

unpack32直接内存拷贝(VLD1Q_U32_X4/VST1Q_U32_X4)

unpack32函数用于从压缩数据中提取32位宽的整数。在PForDelta算法中,当参数b=32时,每个压缩数据块使用完整的32位存储一个数值,实际上没有进行压缩,只是简单存储。这种情况下,unpack32的任务就是将输入数组w中的32位整数直接拷贝到输出数组p中。虽然这是简单的复制操作,但通过NEON向量化技术仍然可以优化内存拷贝性能。

原始标量代码为简单内存拷贝:

void unpack32(unsigned int* p, unsigned int* w, int BS) {
  int i;
  for (i = 0; i < BS; i += 32, p += 32, w += 32) {
    p[0] = w[0];
    p[1] = w[1];
    // ... 直接复制32个值
    p[31] = w[31];
  }
}

NEON优化代码使用批量加载存储:

void unpack32_neon(unsigned int* p, unsigned int* w, int BS) {
    for (size_t i = 0; i < BS; i += LANES_PER_BLOCK, p += LANES_PER_BLOCK, w += LANES_PER_BLOCK) {
        // 使用vld1q_u32_x4一次性加载16个32位整数
        uint32x4x4_t block0 = vld1q_u32_x4(w);
        uint32x4x4_t block1 = vld1q_u32_x4(w + 16);
        // 使用vst1q_u32_x4一次性存储16个32位整数
        vst1q_u32_x4(p, block0);
        vst1q_u32_x4(p + 16, block1);
    }
}

关键优化技术:

内存访问优化:利用NEON的128位宽内存操作批量加载4个向量寄存器,减少内存访问次数

四、关键NEON指令总结

指令类别

NEON指令

功能描述

应用场景

数据加载

vld1q_u32

加载4个32位整数到128位寄存器

所有unpack函数的数据输入

数据存储

vst1q_u32

从128位寄存器存储4个32位整数

所有unpack函数的结果输出

移位操作

vshrq_n_u32

无符号整数右移指定位数

unpack16的高16位提取

位与操作

vandq_u32

向量按位与操作

掩码应用和位提取

数据重组

vzipq_u32

交错重组两个向量的元素

unpack16的高低16位交错

数据提取

vextq_u32

从两个向量提取连续元素

unpack20的跨字数据重组

标量提取

vgetq_lane_u32

从向量提取指定位置的标量值

unpack20的复杂位操作

五、可执行测试代码

#include <iostream>
#include <cstring>
#include <chrono>
#define LANES_PER_BLOCK 32
// NEON优化的unpack16实现
void unpack16_neon(unsigned int* p, unsigned int* w, int BS) {
    static const uint32_t MASK = (1U << 16) - 1;
    const uint32x4_t MASK_VEC = vdupq_n_u32(MASK);
    for (size_t i = 0; i < BS; i += LANES_PER_BLOCK, p += LANES_PER_BLOCK, w += 16) {
        uint32x4_t input0 = vld1q_u32(w);
        uint32x4_t high0 = vshrq_n_u32(input0, 16);
        uint32x4_t low0 = vandq_u32(input0, MASK_VEC);
        uint32x4x2_t inter0 = vzipq_u32(high0, low0);
        vst1q_u32_x2(p, inter0);
        uint32x4_t input1 = vld1q_u32(w + 4);
        uint32x4_t high1 = vshrq_n_u32(input1, 16);
        uint32x4_t low1 = vandq_u32(input1, MASK_VEC);
        uint32x4x2_t inter1 = vzipq_u32(high1, low1);
        vst1q_u32_x2(p + 8, inter1);
        uint32x4_t input2 = vld1q_u32(w + 8);
        uint32x4_t high2 = vshrq_n_u32(input2, 16);
        uint32x4_t low2 = vandq_u32(input2, MASK_VEC);
        uint32x4x2_t inter2 = vzipq_u32(high2, low2);
        vst1q_u32_x2(p + 16, inter2);
        uint32x4_t input3 = vld1q_u32(w + 12);
        uint32x4_t high3 = vshrq_n_u32(input3, 16);
        uint32x4_t low3 = vandq_u32(input3, MASK_VEC);
        uint32x4x2_t inter3 = vzipq_u32(high3, low3);
        vst1q_u32_x2(p + 24, inter3);
    }
}
// 标量版本用于对比
void unpack16_scalar(unsigned int* p, unsigned int* w, int BS) {
    for (int i = 0; i < BS; i += 32, p += 32, w += 16) {
        p[0] = (w[0] >> 16);
        p[1] = (w[0]) & 65535;
        p[2] = (w[1] >> 16);
        p[3] = (w[1]) & 65535;
        p[4] = (w[2] >> 16);
        p[5] = (w[2]) & 65535;
        p[6] = (w[3] >> 16);
        p[7] = (w[3]) & 65535;
        p[8] = (w[4] >> 16);
        p[9] = (w[4]) & 65535;
        p[10] = (w[5] >> 16);
        p[11] = (w[5]) & 65535;
        p[12] = (w[6] >> 16);
        p[13] = (w[6]) & 65535;
        p[14] = (w[7] >> 16);
        p[15] = (w[7]) & 65535;
        p[16] = (w[8] >> 16);
        p[17] = (w[8]) & 65535;
        p[18] = (w[9] >> 16);
        p[19] = (w[9]) & 65535;
        p[20] = (w[10] >> 16);
        p[21] = (w[10]) & 65535;
        p[22] = (w[11] >> 16);
        p[23] = (w[11]) & 65535;
        p[24] = (w[12] >> 16);
        p[25] = (w[12]) & 65535;
        p[26] = (w[13] >> 16);
        p[27] = (w[13]) & 65535;
        p[28] = (w[14] >> 16);
        p[29] = (w[14]) & 65535;
        p[30] = (w[15] >> 16);
        p[31] = (w[15]) & 65535;
    }
}
int main() {
    const int BS = 32;
    alignas(16) unsigned int input[16] = {
        0x00020001, 0x00040003, 0x00060005, 0x00080007,
        0x000A0009, 0x000C000B, 0x000E000D, 0x0010000F,
        0x00120011, 0x00140013, 0x00160015, 0x00180017,
        0x001A0019, 0x001C001B, 0x001E001D, 0x0020001F
    };
    alignas(16) unsigned int output_neon[32] = {0};
    alignas(16) unsigned int output_scalar[32] = {0};
    // 测试NEON版本
    auto start_neon = std::chrono::high_resolution_clock::now();
    for (int i = 0; i < 1000000; ++i) {
        unpack16_neon(output_neon, input, BS);
    }
    auto end_neon = std::chrono::high_resolution_clock::now();
    auto duration_neon = std::chrono::duration_cast<std::chrono::microseconds>(end_neon - start_neon);
    // 测试标量版本
    auto start_scalar = std::chrono::high_resolution_clock::now();
    for (int i = 0; i < 1000000; ++i) {
        unpack16_scalar(output_scalar, input, BS);
    }
    auto end_scalar = std::chrono::high_resolution_clock::now();
    auto duration_scalar = std::chrono::duration_cast<std::chrono::microseconds>(end_scalar - start_scalar);
    std::cout << "NEON版本耗时: " << duration_neon.count() << " 微秒" << std::endl;
    std::cout << "标量版本耗时: " << duration_scalar.count() << " 微秒" << std::endl;
    std::cout << "加速比: " << (double)duration_scalar.count() / duration_neon.count() << "x" << std::endl;
    // 验证结果正确性
    bool correct = true;
    for (int i = 0; i < 32; ++i) {
        if (output_neon[i] != output_scalar[i]) {
            correct = false;
            break;
        }
    }
    std::cout << "结果正确性: " << (correct ? "通过" : "失败") << std::endl;
    return 0;
}

六、基准测试结果

测试环境

标量版本耗时

NEON优化版本耗时

性能提升

CPU:鲲鹏920新型号处理操作系统:openEuler 22.03 LTS SP3编译器:GCC 10.3.1

6542微秒

2596微秒

2.52倍

测试代码中对32个整数的数据块进行100万次解压操作,NEON优化版本相比标量版本性能提升2.52倍。测试验证了NEON向量化优化的正确性和性能优势。

测试方法:

# 绑定到指定CPU核运行测试
taskset -c 0 ./newpfordelta_neon