KAEZlib异步解压缩接口调用示例
接口定义
接口定义 |
接口说明 |
---|---|
void *KAEZIP_create_async_compress_session(iova_map_fn usr_map); |
创建异步压缩任务session。 |
int KAEZIP_compress_async_in_session(void *sess, const struct kaezip_buffer_list *src, struct kaezip_buffer_list *dst, kaezip_async_callback callback, struct kaezip_result *result); |
提交异步压缩任务。 |
void KAEZIP_async_polling_in_session(void *sess, int budget); |
polling查询异步压缩/解压任务的结果。 |
void KAEZIP_destroy_async_compress_session(void *sess); |
销毁压缩任务session。 |
void *KAEZIP_create_async_decompress_session(iova_map_fn usr_map); |
创建异步解压任务session。 |
int KAEZIP_decompress_async_in_session(void *sess, const struct kaezip_buffer_list *src, struct kaezip_buffer_list *dst, kaezip_async_callback callback, struct kaezip_result *result); |
提交异步解压任务。 |
void KAEZIP_destroy_async_decompress_session(void *sess); |
销毁解压任务session。 |
void KAEZIP_reset_session(void *sess); |
重置任务session。 |
调用约束
- 支持的硬件规格为鲲鹏920 7280Z处理器。
- 仅支持输出deflate_raw格式数据。
- 每个session只能在同一个线程中使用,所有API接口不保证多线程安全,即不能在多个线程中,调用API接口传入相同的session,否则不保证压缩解压功能正常。不同session之间的资源互斥,建议不同线程创建并使用各自独立的session。
调用示例
以下示例代码使用KAEZip异步压缩接口对原始数据进行压缩,通过polling查询deflate_raw格式的压缩结果,针对压缩结果使用标准zlib同步解压的方式和KAEZip异步解压的方式分别进行解压,最终将解压的结果与原始数据进行比对,达到验证异步压缩接口输出的内容是标准deflate_raw格式且异步解压接口可解的测试目的。详细代码和编译运行步骤如下。
- 创建main.c文件。
- 按“i”进入编辑模式,将以下内容写入文件。
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <time.h> #include <unistd.h> #include <sys/stat.h> #include <zlib.h> // for Bytef #include <fcntl.h> // for O_RDONLY and open #include <sys/mman.h> // for munmap #include <inttypes.h> #include "kaezip.h" #define HPAGE_SIZE (2 * 1024 * 1024) #define PAGE_SHIFT 12 #define PAGE_SIZE (1UL << PAGE_SHIFT) #define PFN_MASK ((1UL << 55) - 1) #define TEST_FILE_PATH "../../../scripts/compressTestDataset/calgary" static int g_has_done = 0; static int g_inflate_type = 0; // use async decompress or not。0:sync decompress;1:async decompress。struct struct my_custom_data { void *src; void *dst; struct kaezip_buffer_list src_list; struct kaezip_buffer_list dest_list; void *src_decompd; struct kaezip_buffer_list src_decompd_list; size_t src_len; size_t dst_len; size_t src_decompd_len; }; struct cache_page_map { uint64_t *entries; size_t entries_num; void *base_vaddr; }; static struct cache_page_map* init_cache_page_map(void *base_vaddr, size_t total_size) { struct cache_page_map *cache = malloc(sizeof(struct cache_page_map)); if (!cache) return NULL; int fd = open("/proc/self/pagemap", O_RDONLY); if (fd < 0) { perror("open /proc/self/pagemap failed"); free(cache); return NULL; } // 根据申请大小计算需要读取的条目数 size_t pages_num = total_size / PAGE_SIZE; cache->entries_num = pages_num; cache->base_vaddr = base_vaddr; // 分配缓存空间 cache->entries = malloc(pages_num * sizeof(uint64_t)); if (!cache->entries) { close(fd); free(cache); return NULL; } // 计算文件偏移量(基地址为第一个条目,即申请到的虚拟地址对应的页面) uintptr_t base = (uintptr_t)base_vaddr; uintptr_t first_offset = (base / PAGE_SIZE) * sizeof(uint64_t); // 定位到起始位置 if (lseek(fd, first_offset, SEEK_SET) != first_offset) { perror("lseek failed"); close(fd); free(cache->entries); free(cache); return NULL; } if (read(fd, cache->entries, pages_num * sizeof(uint64_t)) != (ssize_t)(pages_num * sizeof(uint64_t))) { perror("read cache failed"); close(fd); free(cache->entries); free(cache); return NULL; } close(fd); return cache; } #define MAP_HUGE_1GB (30 << MAP_HUGE_SHIFT) static void *get_huge_pages(size_t total_size) { void *addr = mmap( NULL, total_size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS | MAP_HUGETLB | MAP_HUGE_1GB, -1, 0 ); if (addr == MAP_FAILED) { fprintf(stderr, "申请内存大页失败。\n"); fprintf(stderr, "系统可能没有足够的大页可用。\n"); fprintf(stderr, "请尝试分配更多大页: sudo sysctl vm.nr_hugepages=10000\n"); exit(EXIT_FAILURE); } return addr; } static uint64_t get_physical_address_cache_page_map(struct cache_page_map *cache, void *vaddr) { uintptr_t virtual_addr = (uintptr_t)vaddr; uintptr_t base = (uintptr_t)cache->base_vaddr; uintptr_t index = (virtual_addr - base) / PAGE_SIZE; // printf("uintptr_t index = %ld . entries_num = %ld \n", index, cache->entries_num); if (index >= cache->entries_num) { fprintf(stderr, "地址超出缓存范围\n"); return 0; } uint64_t entry = cache->entries[index]; if (!(entry & (1ULL << 63))) { fprintf(stderr, "页面不存在\n"); return 0; } // 提取物理帧号(PFN) uint64_t pfn = entry & PFN_MASK; return (pfn << PAGE_SHIFT) | (virtual_addr & (PAGE_SIZE - 1)); } static void* get_physical_address_wrapper(void *usr, void *vaddr, size_t sz) { struct cache_page_map *cache = (struct cache_page_map *)usr; uint64_t phys_addr = get_physical_address_cache_page_map(cache, vaddr); return (void*)(uintptr_t)phys_addr; } // 给tuple_buf分配 input_size 大小的真实物理内存,用于存放压缩前的file数据。 或者用于存放压缩后的数据。 static int prepare_physical_buf(void **tuple_buf, size_t input_size, struct cache_page_map** page_cache, FILE* file) { int huge_page_num = (int)(input_size * sizeof(Bytef) / HPAGE_SIZE) + 1; // 大页大小为2M,申请大页时申请大小需为大页大小的整数倍 size_t total_size = huge_page_num * HPAGE_SIZE; *tuple_buf = get_huge_pages(total_size); if (*tuple_buf == NULL) { return -1; } if(file == NULL) { memset(*tuple_buf, 0, total_size); } else { (void)fread(*tuple_buf, 1, input_size, file); } struct cache_page_map* cache = init_cache_page_map(*tuple_buf, total_size); if (cache == NULL) { printf("init_cache_page_map failed\n"); return -1; } *page_cache = cache; return 0; } static void *g_page_info = NULL; static size_t read_inputFile(const char* fileName, void** input) { FILE* sourceFile = fopen(fileName, "r"); if (sourceFile == NULL) { fprintf(stderr, "%s not exist!\n", fileName); return 0; } int fd = fileno(sourceFile); struct stat fs; (void)fstat(fd, &fs); size_t input_size = fs.st_size; prepare_physical_buf(input, input_size, (struct cache_page_map**)&g_page_info, sourceFile); fclose(sourceFile); return input_size; } static void release_huge_pages(void *addr, size_t total_size) { munmap(addr, total_size); } static void check_results(struct my_custom_data *my_data){ if (my_data->src_decompd_len != my_data->src_len) { printf("Test Error: 解压后与原始长度不一样. 原始长度=%ld 压缩后再解压得到长度=%ld \n", my_data->src_len, my_data->src_decompd_len); } // 比较解压后的数据和原始数据 if (memcmp(my_data->src_decompd, my_data->src_list.buf[0].data, my_data->src_decompd_len) == 0) { if(g_inflate_type == 0) { printf("Test Success for zlib deflate raw with async deflate and sync inflate.\n"); } else { printf("Test Success for zlib deflate raw wuth async deflate and async inflate.\n"); } } else { printf("Test Error:Decompressed data does not match the original data.\n"); } } static void decompression_callback3(struct kaezip_result *result) { if (result->status != 0) { printf("DeCompression failed with status: %d\n", result->status); return; } // 在回调中获取解压的数据 struct my_custom_data *my_data = (struct my_custom_data *)result->user_data; void *compressed_data = my_data->src_decompd_list.buf[0].data; my_data->src_decompd_len = result->dst_len; my_data->src_decompd = compressed_data; check_results(my_data); g_has_done = 1; } static int decompressAsync(struct my_custom_data *mydata) { iova_map_fn usr_map = get_physical_address_wrapper; void *desess = KAEZIP_create_async_decompress_session(usr_map); // 使用真实的压缩数据长度作为解压时的输入长度。 mydata->dest_list.buf[0].buf_len = mydata->dst_len; // 提供超出原始数据大小的缓冲区,以确保解压时数据不会溢出 size_t tmp_size = mydata->src_len * 2; void *tmp_buf = NULL; struct cache_page_map *tmp_page_info = {0}; prepare_physical_buf(&tmp_buf, tmp_size, &tmp_page_info, NULL); struct kaezip_buffer src_decomped_buf_array[128]; mydata->src_decompd_list.buf_num = 1; mydata->src_decompd_list.buf = src_decomped_buf_array; mydata->src_decompd_list.buf[0].data = tmp_buf; mydata->src_decompd_list.buf[0].buf_len = tmp_size; mydata->src_decompd_list.usr_data = tmp_page_info; // 异步解压结果 struct kaezip_result result = {0}; result.user_data = mydata; // 将 mydata->dest_list.buf[0].data 中的数据进行解压,解压结果放在 mydata->src_decompd_list.buf[0].data 中。 int compression_status = KAEZIP_decompress_async_in_session(desess, &mydata->dest_list, &mydata->src_decompd_list, decompression_callback3, &result); if (compression_status != 0) { printf("deCompression failed with error code: %d\n", compression_status); release_huge_pages(tmp_buf, tmp_size); return -1; } while (g_has_done != 1) { KAEZIP_async_polling_in_session(desess, 1); usleep(100); } KAEZIP_destroy_async_decompress_session(desess); release_huge_pages(tmp_buf, tmp_size); return compression_status; } static void compression_callback3(struct kaezip_result *result) { if (result->status != 0) { printf("Compression failed with status: %d\n", result->status); return; } // 在回调中获取压缩后的数据 struct my_custom_data *my_data = (struct my_custom_data *)result->user_data; size_t compressed_size = result->dst_len; void *compressed_data = my_data->dest_list.buf[0].data; my_data->dst_len = compressed_size; if (g_inflate_type == 0) { // 同步解压。 // 为解压数据分配内存 size_t tmp_src_len = result->src_size * 10; void *dst_buffer = malloc(tmp_src_len); if (!dst_buffer) { printf("Memory allocation failed for decompressed data.\n"); return; } int ret = -1; z_stream strm; strm.zalloc = (alloc_func)0; strm.zfree = (free_func)0; strm.opaque = (voidpf)0; (void)inflateInit2_(&strm, -15, "1.2.11", sizeof(z_stream)); strm.next_in = (z_const Bytef *)compressed_data; strm.next_out = dst_buffer; strm.avail_in = compressed_size; strm.avail_out = tmp_src_len; ret = inflate(&strm, Z_FINISH); tmp_src_len = strm.total_out; // inflateReset(&strm); (void)inflateEnd(&strm); if (ret < Z_OK) { printf("[KAE_ERR]:uncompress2 failed, ret is:%d.\n", ret); } if (ret < 0) { printf("Decompression failed with error code: %d\n", ret); free(dst_buffer); return; } my_data->src_decompd = dst_buffer; my_data->src_decompd_len = tmp_src_len; check_results(my_data); // 释放解压后的数据 free(dst_buffer); g_has_done = 1; } else { decompressAsync(my_data); } } static int test_main() { g_has_done = 0; size_t src_len = 0; void *inbuf = NULL; src_len = read_inputFile(TEST_FILE_PATH, &inbuf); iova_map_fn usr_map = get_physical_address_wrapper; void *sess = KAEZIP_create_async_compress_session(usr_map); // 异步压缩 struct kaezip_result result = {0}; struct my_custom_data mydata = {0}; struct kaezip_buffer src_buf[128]; mydata.src_list.usr_data = g_page_info; mydata.src_list.buf_num = 1; mydata.src_list.buf = src_buf; mydata.src_list.buf[0].data = inbuf; mydata.src_list.buf[0].buf_len = src_len; size_t compressed_size = compressBound(src_len); void *tuple_buf = NULL; struct cache_page_map *tuple_page_info = {0}; prepare_physical_buf(&tuple_buf, compressed_size, &tuple_page_info, NULL); struct kaezip_buffer tuple_buf_array[128]; mydata.dest_list.buf_num = 1; mydata.dest_list.buf = tuple_buf_array; mydata.dest_list.buf[0].data = tuple_buf; mydata.dest_list.buf[0].buf_len = compressed_size; mydata.dest_list.usr_data = tuple_page_info; mydata.src_len = src_len; result.user_data = &mydata; // 将 my_data->src_list.buf[0].data 中的数据压缩,压缩结果放在 my_data->dest_list.buf[0].data 中。 int compression_status = KAEZIP_compress_async_in_session(sess, &mydata.src_list, &mydata.dest_list, compression_callback3, &result); if (compression_status != 0) { printf("Compression failed with error code: %d\n", compression_status); release_huge_pages(inbuf, src_len); return -1; } while (g_has_done != 1) { KAEZIP_async_polling_in_session(sess, 1); usleep(100); } KAEZIP_destroy_async_compress_session(sess); release_huge_pages(tuple_buf, src_len); return compression_status; } int main() { int ret = test_main(); // 测试异步压缩 -> 同步解压。 g_inflate_type = 1; // 测试异步压缩 -> 异步解压。 ret = test_main(); return ret; }
- 按“Esc”键,输入:wq!,按“Enter”保存并退出编辑。
- 编译main.c文件(需修改示例代码中真实测试文件TEST_FILE_PATH的路径为合法路径)。
gcc main.c -I/usr/local/kaezip/include -L/usr/local/kaezip/lib -lz -lkaezip -o kaezlib_deflate_async_test
- 运行测试文件。
export LD_LIBRARY_PATH=/usr/local/kaezip/lib:$LD_LIBRARY_PATH ./kaezlib_deflate_async_test
输出结果如下:# Test Success for zlib deflate raw with async deflate and sync inflate. # Test Success for zlib deflate raw wuth async deflate and async inflate.
测试工具
使用kzip工具对异步接口进行功能和性能测试。
- 进入kzip测试工具地址“KAELz4/test/kzip”。
cd KAELz4/test/kzip
- 编译打包kzip工具。
sh build.sh
- 查看工具参数说明。
export LD_LIBRARY_PATH=/usr/local/kaezip/lib/:$LD_LIBRARY_PATH ./kzip -h
- 单IO时延测试:等价串行流程,结果表示单个IO的压缩时延。
sh runPerf.sh -A kaezlibasync_deflate -m 1 -n 20000 -s [4/8/16/32/64] -r 1 -k 1 -i 1 -p 1 -f [path to calgary.tar]
- 单核压缩能力测试:单线程加压,结果表示单线程能够提供的压缩带宽与时延。
sh runPerf.sh -A kaezlibasync_deflate -m 1 -n 20000 -s [4/8/16/32/64] -r 1 -k 1 -i 4 -p 1 -f [path to calgary.tar]
- 单KAE能力测试:单线程继续加压,结果表示单个KAE能够提供的压缩带宽与时延。
sh runPerf.sh -A kaezlibasync_deflate -m 1 -n 20000 -s [4/8/16/32/64] -r 1 -k 1 -i 8 -p 1 -f [path to calgary.tar]