#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <mpi.h>
#include "kupl_shm.h" // KUPL库头文件(来自鲲鹏HPCKit)
// 全局定义缓冲区大小(可修改,用于测试不同消息大小的性能)
#define BUF_SIZE_SMALL 4 // 小消息(4个int,16字节)
#define BUF_SIZE_MEDIUM 1024 // 中消息(1024个int,4KB)
#define BUF_SIZE_LARGE 1048576 // 大消息(1MB)
// 1. 实现oob_allgather回调函数(进程地址收集)
static int oob_allgather_callback(const void *sendbuf, void *recvbuf, int size, void *group, kupl_shm_datatype_t datatype) {
int ret;
// 判断数据类型,确保与MPI数据类型匹配
switch (datatype) {
case KUPL_SHM_DATATYPE_CHAR:
ret = MPI_Allgather(sendbuf, size, MPI_CHAR, recvbuf, size, MPI_CHAR, (MPI_Comm)group);
break;
case KUPL_SHM_DATATYPE_INT:
ret = MPI_Allgather(sendbuf, size, MPI_INT, recvbuf, size, MPI_INT, (MPI_Comm)group);
break;
case KUPL_SHM_DATATYPE_DOUBLE:
ret = MPI_Allgather(sendbuf, size, MPI_DOUBLE, recvbuf, size, MPI_DOUBLE, (MPI_Comm)group);
break;
default:
fprintf(stderr, "Error: Unsupported datatype in oob_allgather_callback\n");
return -1; // 不支持的数据类型,返回失败
}
// 检查MPI_Allgather调用结果
if (ret != MPI_SUCCESS) {
fprintf(stderr, "Error: MPI_Allgather failed in oob_allgather_callback\n");
return -1;
}
return 0; // 执行成功
}
// 2. 实现oob_barrier回调函数(进程同步)
static int oob_barrier_callback(void *group) {
int ret = MPI_Barrier((MPI_Comm)group);
// 检查MPI_Barrier调用结果
if (ret != MPI_SUCCESS) {
fprintf(stderr, "Error: MPI_Barrier failed in oob_barrier_callback\n");
return -1;
}
return 0; // 执行成功
}
// 3. 数据校验函数(验证接收的数据与发送的数据是否一致)
static int data_verify(int *send_buf, int *recv_buf, int buf_size) {
for (int i = 0; i < buf_size; i++) {
if (send_buf[i] != recv_buf[i]) {
fprintf(stderr, "Data verify failed: send_buf[%d] = %d, recv_buf[%d] = %d\n", i, send_buf[i], i, recv_buf[i]);
return -1;
}
}
return 0; // 校验成功
}
int main(int argc, char **argv) {
int ret;
int world_rank, world_size;
kupl_shm_comm_h kupl_comm = NULL;
kupl_shm_win_h win = NULL;
int *local_buffer = NULL;
// 4. 初始化MPI环境(必须放在最前面,MPI进程创建的基础)
ret = MPI_Init(&argc, &argv);
if (ret != MPI_SUCCESS) {
fprintf(stderr, "Error: MPI_Init failed\n");
return -1;
}
// 5. 获取当前进程的rank和进程组大小(进程数)
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
MPI_Comm_size(MPI_COMM_WORLD, &world_size);
// 检查进程数是否为3(本次实验要求3个MPI进程)
if (world_size != 3) {
if (world_rank == 0) {
fprintf(stderr, "Error: This experiment requires 3 MPI processes, but got %d\n", world_size);
}
MPI_Abort(MPI_COMM_WORLD, -1); // 终止所有MPI进程
return -1;
}
// 6. 配置KUPL回调函数结构体
kupl_shm_oob_cb_t oob_cbs;
kupl_shm_oob_cb_h oob_cbs_h = &oob_cbs;
oob_cbs_h->oob_allgather = oob_allgather_callback; // 绑定地址收集回调
oob_cbs_h->oob_barrier = oob_barrier_callback; // 绑定进程同步回调
// 7. 创建KUPL通信域(核心步骤)
ret = kupl_shm_comm_create(world_size, world_rank, getpid(), oob_cbs_h, (void *)MPI_COMM_WORLD, &kupl_comm);
if (ret != 0 || kupl_comm == NULL) {
fprintf(stderr, "Error: kupl_shm_comm_create failed, ret = %d\n", ret);
MPI_Abort(MPI_COMM_WORLD, -1);
return -1;
}
if (world_rank == 0) {
printf("KUPL comm created successfully, world_size = %d\n", world_size);
}
// 8. 申请共享内存,创建内存窗口(测试小、中、大三种消息大小)
// 这里以小消息为例,可替换为BUF_SIZE_MEDIUM或BUF_SIZE_LARGE测试不同场景
int buf_size = BUF_SIZE_SMALL;
ret = kupl_shm_win_alloc(buf_size * sizeof(int), kupl_comm, (void *)&local_buffer, &win);
if (ret != 0 || win == NULL || local_buffer == NULL) {
fprintf(stderr, "Error: kupl_shm_win_alloc failed, ret = %d\n", ret);
kupl_shm_comm_destroy(kupl_comm); // 销毁已创建的通信域
MPI_Abort(MPI_COMM_WORLD, -1);
return -1;
}
// 9. 进程间数据拷贝(核心功能)
// 9.1 rank0进程:初始化本地缓冲区,写入测试数据
if (world_rank == 0) {
for (int i = 0; i < buf_size; i++) {
local_buffer[i] = i + 100; // 写入数据:100, 101, 102, 103(小消息场景)
}
printf("Rank 0: Data initialized successfully, buf_size = %d\n", buf_size);
printf("Rank 0: Send data: ");
for (int i = 0; i < buf_size; i++) {
printf("%d ", local_buffer[i]);
}
printf("\n");
// 9.2 rank0查询rank1和rank2的共享内存地址,拷贝数据
void *remote_buf1 = NULL; // rank1的共享内存地址
void *remote_buf2 = NULL; // rank2的共享内存地址
// 查询rank1的共享内存地址
ret = kupl_shm_win_query(win, 1, &remote_buf1);
if (ret != 0 || remote_buf1 == NULL) {
fprintf(stderr, "Error: kupl_shm_win_query failed for rank 1\n");
goto cleanup; // 跳转到资源销毁步骤
}
// 查询rank2的共享内存地址
ret = kupl_shm_win_query(win, 2, &remote_buf2);
if (ret != 0 || remote_buf2 == NULL) {
fprintf(stderr, "Error: kupl_shm_win_query failed for rank 2\n");
goto cleanup;
}
// 数据拷贝(直接调用memcpy,无需MPI接口)
memcpy(remote_buf1, local_buffer, buf_size * sizeof(int));
memcpy(remote_buf2, local_buffer, buf_size * sizeof(int));
printf("Rank 0: Data copied to rank 1 and rank 2 successfully\n");
}
// 10. 进程同步(确保rank1和rank2在读取数据前,rank0已完成拷贝)
ret = oob_barrier_callback((void *)MPI_COMM_WORLD);
if (ret != 0) {
fprintf(stderr, "Error: oob_barrier_callback failed in main\n");
goto cleanup;
}
// 11. rank1和rank2进程:读取共享内存数据,并进行校验
if (world_rank == 1 || world_rank == 2) {
printf("Rank %d: Received data: ", world_rank);
for (int i = 0; i < buf_size; i++) {
printf("%d ", local_buffer[i]);
}
printf("\n");
// 数据校验(与预期数据对比,预期数据为100~103)
int expected_buf[BUF_SIZE_SMALL] = {100, 101, 102, 103};
ret = data_verify(expected_buf, local_buffer, buf_size);
if (ret == 0) {
printf("Rank %d: Data verify success\n", world_rank);
} else {
printf("Rank %d: Data verify failed\n", world_rank);
}
}
// 12. 资源销毁(按顺序:先释放内存窗口,再销毁通信域,最后终止MPI)
cleanup:
if (win != NULL) {
kupl_shm_win_free(win); // 释放共享内存窗口
win = NULL;
}
if (kupl_comm != NULL) {
kupl_shm_comm_destroy(kupl_comm); // 销毁KUPL通信域
kupl_comm = NULL;
}
local_buffer = NULL; // 释放本地缓冲区指针
// 终止MPI环境
MPI_Finalize();
if (world_rank == 0) {
printf("Experiment completed successfully, all resources released\n");
}
return 0;
}
一、实验环境说明
1.1 硬件环境
本次实验采用鲲鹏920服务器集群核心节点,具体配置如下:服务器型号为Kunpeng 920-7260,部署2颗CPU,单颗CPU48核,总核心数96核,架构为aarch64(ARM64),适配鲲鹏原生优化特性。内存配置为128GB DDR4 2933MHz ECC内存,支持多通道读写,大幅提升内存带宽,满足高性能计算中高频内存访问需求。服务器支持节点内进程间高速通信,依托鲲鹏920处理器的缓存一致性协议(CC-NUMA架构),为共享内存通信提供硬件层面的高效支撑,减少跨核心、跨CPU的内存访问延迟。此外,存储配置为1TB SSD固态硬盘,用于存储实验代码、依赖库及测试数据,保障数据读写速度,避免存储瓶颈影响实验性能。
1.2 软件环境
实验软件环境均适配鲲鹏aarch64架构,经过原生优化,确保软硬件协同高效运行,具体配置及说明如下:
二、实验原理介绍
2.1 KUPL共享内存通信核心概念
KUPL(Kunpeng User-level Shared Memory Communication Library)共享内存通信库,是鲲鹏架构下针对节点内多进程通信优化的用户级通信库,其核心基于kupl_comm通信域和kupl_win内存窗口实现,结合鲲鹏架构的缓存一致性机制,实现高效的进程间数据交互,补充核心概念及关联逻辑如下:
2.2 节点内通信优势
KUPL共享内存通信专为鲲鹏节点内纯MPI场景设计,深度适配鲲鹏920服务器的aarch64架构及缓存一致性机制,相比传统的MPI节点内通信(如MPI_Send/MPI_Recv、MPI_Bcast),从底层通信机制上实现了优化,核心优势如下,结合硬件特性补充细节:
2.3 回调函数作用
KUPL共享内存通信依赖用户实现两个核心回调函数(oob_allgather、oob_barrier),这两个回调函数是kupl_comm创建的必要条件,其底层逻辑与MPI通信机制深度绑定,用于解决进程信息同步与地址收集的问题,具体作用、参数详解及实现原理如下:
三、实验步骤
3.1 环境部署与配置
实验环境部署需严格遵循“硬件检查→操作系统安装→依赖库安装→环境配置→验证”的流程,确保各组件版本兼容、联动正常,具体步骤补充如下(含详细命令及验证示例):
# 检查CPU型号及架构 lscpu | grep "Model name" # 输出应为Kunpeng 920-7260 lscpu | grep "Architecture" # 输出应为aarch64 # 检查内存大小 free -h # 输出应为128GB DDR4# 关闭防火墙 systemctl stop firewalld systemctl disable firewalld # 关闭SELinux setenforce 0 sed -i 's/SELINUX=enforcing/SELINUX=disabled/' /etc/selinux/config# 解压安装包 tar -zxvf Kunpeng_HPCKit_2.3.0_aarch64.tar.gz # 进入安装目录执行安装脚本 cd Kunpeng_HPCKit_2.3.0_aarch64 ./install.sh -y # 加载环境变量(永久生效,需写入/etc/profile) echo "source /opt/鲲鹏/hpckit/2.3.0/setenv.sh" >> /etc/profile source /etc/profile# 配置毕昇编译器yum源(若未配置) wget -O /etc/yum.repos.d/bisheng-compiler.repo https://mirrors.huaweicloud.com/bisheng/compiler/rpm/bisheng-compiler.repo # 安装毕昇编译器3.1.0 yum install -y bisheng-compiler-3.1.0 # 验证编译器版本 bishengcc -v # 输出应为bishengcc version 3.1.0# 安装OpenMPI yum install -y openmpi openmpi-devel # 配置MPI环境变量(永久生效) echo "export MPI_ROOT=/usr/lib64/openmpi" >> /etc/profile echo "export PATH=\$MPI_ROOT/bin:\$PATH" >> /etc/profile echo "export LD_LIBRARY_PATH=\$MPI_ROOT/lib:\$LD_LIBRARY_PATH" >> /etc/profile source /etc/profile# 验证MPI环境(查看编译器版本) mpicc -v # 输出应为bishengcc(毕昇编译器),关联OpenMPI 4.1.1 # 验证KUPL与MPI联动性(查看依赖库链接情况) pkg-config --libs kupl mpi # 正常输出应为:-lkupl -lmpi -lpthread -lrt(表示可正常链接KUPL和MPI库)若验证失败,常见问题及解决方法:① 环境变量未加载,重新执行source /etc/profile;② 依赖库缺失,通过yum install补充对应包;③ HPCKit版本与OpenMPI版本不兼容,确认均为指定版本。3.2 编写共享内存通信代码
本次实验编写kupl_shm_demo.c代码,实现鲲鹏节点内3个MPI进程间的数据拷贝(rank0进程将数据广播发送至rank1和rank2进程),在原有核心流程基础上,增加错误处理、数据校验、多组数据测试及详细注释,代码更具实用性和可扩展性,具体代码如下(含补充功能):
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <mpi.h> #include "kupl_shm.h" // KUPL库头文件(来自鲲鹏HPCKit) // 全局定义缓冲区大小(可修改,用于测试不同消息大小的性能) #define BUF_SIZE_SMALL 4 // 小消息(4个int,16字节) #define BUF_SIZE_MEDIUM 1024 // 中消息(1024个int,4KB) #define BUF_SIZE_LARGE 1048576 // 大消息(1MB) // 1. 实现oob_allgather回调函数(进程地址收集) static int oob_allgather_callback(const void *sendbuf, void *recvbuf, int size, void *group, kupl_shm_datatype_t datatype) { int ret; // 判断数据类型,确保与MPI数据类型匹配 switch (datatype) { case KUPL_SHM_DATATYPE_CHAR: ret = MPI_Allgather(sendbuf, size, MPI_CHAR, recvbuf, size, MPI_CHAR, (MPI_Comm)group); break; case KUPL_SHM_DATATYPE_INT: ret = MPI_Allgather(sendbuf, size, MPI_INT, recvbuf, size, MPI_INT, (MPI_Comm)group); break; case KUPL_SHM_DATATYPE_DOUBLE: ret = MPI_Allgather(sendbuf, size, MPI_DOUBLE, recvbuf, size, MPI_DOUBLE, (MPI_Comm)group); break; default: fprintf(stderr, "Error: Unsupported datatype in oob_allgather_callback\n"); return -1; // 不支持的数据类型,返回失败 } // 检查MPI_Allgather调用结果 if (ret != MPI_SUCCESS) { fprintf(stderr, "Error: MPI_Allgather failed in oob_allgather_callback\n"); return -1; } return 0; // 执行成功 } // 2. 实现oob_barrier回调函数(进程同步) static int oob_barrier_callback(void *group) { int ret = MPI_Barrier((MPI_Comm)group); // 检查MPI_Barrier调用结果 if (ret != MPI_SUCCESS) { fprintf(stderr, "Error: MPI_Barrier failed in oob_barrier_callback\n"); return -1; } return 0; // 执行成功 } // 3. 数据校验函数(验证接收的数据与发送的数据是否一致) static int data_verify(int *send_buf, int *recv_buf, int buf_size) { for (int i = 0; i < buf_size; i++) { if (send_buf[i] != recv_buf[i]) { fprintf(stderr, "Data verify failed: send_buf[%d] = %d, recv_buf[%d] = %d\n", i, send_buf[i], i, recv_buf[i]); return -1; } } return 0; // 校验成功 } int main(int argc, char **argv) { int ret; int world_rank, world_size; kupl_shm_comm_h kupl_comm = NULL; kupl_shm_win_h win = NULL; int *local_buffer = NULL; // 4. 初始化MPI环境(必须放在最前面,MPI进程创建的基础) ret = MPI_Init(&argc, &argv); if (ret != MPI_SUCCESS) { fprintf(stderr, "Error: MPI_Init failed\n"); return -1; } // 5. 获取当前进程的rank和进程组大小(进程数) MPI_Comm_rank(MPI_COMM_WORLD, &world_rank); MPI_Comm_size(MPI_COMM_WORLD, &world_size); // 检查进程数是否为3(本次实验要求3个MPI进程) if (world_size != 3) { if (world_rank == 0) { fprintf(stderr, "Error: This experiment requires 3 MPI processes, but got %d\n", world_size); } MPI_Abort(MPI_COMM_WORLD, -1); // 终止所有MPI进程 return -1; } // 6. 配置KUPL回调函数结构体 kupl_shm_oob_cb_t oob_cbs; kupl_shm_oob_cb_h oob_cbs_h = &oob_cbs; oob_cbs_h->oob_allgather = oob_allgather_callback; // 绑定地址收集回调 oob_cbs_h->oob_barrier = oob_barrier_callback; // 绑定进程同步回调 // 7. 创建KUPL通信域(核心步骤) ret = kupl_shm_comm_create(world_size, world_rank, getpid(), oob_cbs_h, (void *)MPI_COMM_WORLD, &kupl_comm); if (ret != 0 || kupl_comm == NULL) { fprintf(stderr, "Error: kupl_shm_comm_create failed, ret = %d\n", ret); MPI_Abort(MPI_COMM_WORLD, -1); return -1; } if (world_rank == 0) { printf("KUPL comm created successfully, world_size = %d\n", world_size); } // 8. 申请共享内存,创建内存窗口(测试小、中、大三种消息大小) // 这里以小消息为例,可替换为BUF_SIZE_MEDIUM或BUF_SIZE_LARGE测试不同场景 int buf_size = BUF_SIZE_SMALL; ret = kupl_shm_win_alloc(buf_size * sizeof(int), kupl_comm, (void *)&local_buffer, &win); if (ret != 0 || win == NULL || local_buffer == NULL) { fprintf(stderr, "Error: kupl_shm_win_alloc failed, ret = %d\n", ret); kupl_shm_comm_destroy(kupl_comm); // 销毁已创建的通信域 MPI_Abort(MPI_COMM_WORLD, -1); return -1; } // 9. 进程间数据拷贝(核心功能) // 9.1 rank0进程:初始化本地缓冲区,写入测试数据 if (world_rank == 0) { for (int i = 0; i < buf_size; i++) { local_buffer[i] = i + 100; // 写入数据:100, 101, 102, 103(小消息场景) } printf("Rank 0: Data initialized successfully, buf_size = %d\n", buf_size); printf("Rank 0: Send data: "); for (int i = 0; i < buf_size; i++) { printf("%d ", local_buffer[i]); } printf("\n"); // 9.2 rank0查询rank1和rank2的共享内存地址,拷贝数据 void *remote_buf1 = NULL; // rank1的共享内存地址 void *remote_buf2 = NULL; // rank2的共享内存地址 // 查询rank1的共享内存地址 ret = kupl_shm_win_query(win, 1, &remote_buf1); if (ret != 0 || remote_buf1 == NULL) { fprintf(stderr, "Error: kupl_shm_win_query failed for rank 1\n"); goto cleanup; // 跳转到资源销毁步骤 } // 查询rank2的共享内存地址 ret = kupl_shm_win_query(win, 2, &remote_buf2); if (ret != 0 || remote_buf2 == NULL) { fprintf(stderr, "Error: kupl_shm_win_query failed for rank 2\n"); goto cleanup; } // 数据拷贝(直接调用memcpy,无需MPI接口) memcpy(remote_buf1, local_buffer, buf_size * sizeof(int)); memcpy(remote_buf2, local_buffer, buf_size * sizeof(int)); printf("Rank 0: Data copied to rank 1 and rank 2 successfully\n"); } // 10. 进程同步(确保rank1和rank2在读取数据前,rank0已完成拷贝) ret = oob_barrier_callback((void *)MPI_COMM_WORLD); if (ret != 0) { fprintf(stderr, "Error: oob_barrier_callback failed in main\n"); goto cleanup; } // 11. rank1和rank2进程:读取共享内存数据,并进行校验 if (world_rank == 1 || world_rank == 2) { printf("Rank %d: Received data: ", world_rank); for (int i = 0; i < buf_size; i++) { printf("%d ", local_buffer[i]); } printf("\n"); // 数据校验(与预期数据对比,预期数据为100~103) int expected_buf[BUF_SIZE_SMALL] = {100, 101, 102, 103}; ret = data_verify(expected_buf, local_buffer, buf_size); if (ret == 0) { printf("Rank %d: Data verify success\n", world_rank); } else { printf("Rank %d: Data verify failed\n", world_rank); } } // 12. 资源销毁(按顺序:先释放内存窗口,再销毁通信域,最后终止MPI) cleanup: if (win != NULL) { kupl_shm_win_free(win); // 释放共享内存窗口 win = NULL; } if (kupl_comm != NULL) { kupl_shm_comm_destroy(kupl_comm); // 销毁KUPL通信域 kupl_comm = NULL; } local_buffer = NULL; // 释放本地缓冲区指针 // 终止MPI环境 MPI_Finalize(); if (world_rank == 0) { printf("Experiment completed successfully, all resources released\n"); } return 0; }代码补充说明:① 增加了错误处理逻辑,对每个KUPL和MPI接口的返回值进行判断,若失败则打印错误信息并清理资源,避免内存泄漏;② 新增数据校验函数,确保接收的数据与发送的数据一致,提升实验可靠性;③ 支持三种不同大小的消息测试,可通过修改buf_size的值,测试不同数据量下的通信性能;④ 增加进程数检查,确保实验使用3个MPI进程,避免参数错误导致实验失败;⑤ 补充详细注释,便于理解每个步骤的作用及核心逻辑。
3.3 编译与运行程序
编译采用毕昇编译器联合MPI编译,确保代码适配鲲鹏架构和MPI环境,运行时绑定CPU核心提升性能,补充编译选项说明、运行参数配置及运行输出示例:
# 编译命令(核心) mpicc -o kupl_shm_demo kupl_shm_demo.c -lkupl -lpthread -O2 # 编译选项说明: # -o kupl_shm_demo:指定生成的可执行文件名称 # -lkupl:链接KUPL共享内存库 # -lpthread:链接线程库(KUPL内部依赖线程机制) # -O2:开启二级优化,提升代码运行效率(毕昇编译器的优化选项)编译常见问题:① 找不到kupl_shm.h头文件:检查HPCKit环境变量是否加载,确保头文件路径在编译器的搜索路径中;② 链接失败(undefined reference to kupl_shm_comm_create):检查KUPL库是否正确安装,确保-lkupl选项正确添加。# 基础运行命令(指定进程数为3) mpirun -np 3 ./kupl_shm_demo # 优化运行命令(绑定CPU核心,避免进程切换导致的性能损耗) mpirun -np 3 --bind-to-core ./kupl_shm_demo # 运行参数说明: # -np 3:指定启动3个MPI进程,与实验代码中的进程数要求一致 # --bind-to-core:将每个MPI进程绑定到不同的CPU核心,提升通信和计算性能KUPL comm created successfully, world_size = 3 Rank 0: Data initialized successfully, buf_size = 4 Rank 0: Send data: 100 101 102 103 Rank 0: Data copied to rank 1 and rank 2 successfully Rank 1: Received data: 100 101 102 103 Rank 1: Data verify success Rank 2: Received data: 100 101 102 103 Rank 2: Data verify success Experiment completed successfully, all resources released若输出上述内容,说明程序编译运行成功,共享内存通信正常;若出现错误信息,根据错误提示排查(如回调函数失败、内存申请失败等)。3.4 结果验证与性能测试
结果验证分为功能验证和性能验证,功能验证确保共享内存通信正常,性能验证对比KUPL与传统MPI通信的延迟和带宽差异,补充测试工具、测试数据及分析:
3.4.1 结果验证(功能验证)
核心验证逻辑:rank1和rank2进程成功接收并打印rank0进程发送的数据,且数据校验通过,说明共享内存通信功能正常,具体验证要点如下:
3.4.2 性能测试
性能测试采用osu-micro-benchmarks工具(HPC领域常用的微基准测试工具),测试不同消息大小下,KUPL共享内存通信与传统MPI_Send/MPI_Recv通信的延迟和带宽,对比两者的性能差异,具体步骤及结果如下:
yum install -y osu-micro-benchmarks# 测试传统MPI_Send/MPI_Recv通信(延迟) mpirun -np 3 osu_latency # 测试传统MPI_Send/MPI_Recv通信(带宽) mpirun -np 3 osu_bw # 测试KUPL共享内存通信(需修改osu测试代码,适配KUPL接口,核心命令如下) mpirun -np 3 ./osu_kupl_latency # 自定义KUPL延迟测试程序 mpirun -np 3 ./osu_kupl_bw # 自定义KUPL带宽测试程序四、关键代码解析
4.1 实现核心回调函数
核心回调函数(oob_allgather_callback、oob_barrier_callback)是KUPL通信域创建的基础,也是KUPL与MPI联动的关键,结合补充后的代码,详细解析其实现逻辑、参数含义及错误处理:
#include <stdio.h> #include <mpi.h> #include "kupl_shm.h" // oob_allgather回调:收集所有进程的内存地址(补充错误处理和多数据类型支持) static int oob_allgather_callback(const void *sendbuf, void *recvbuf, int size, void *group, kupl_shm_datatype_t datatype) { int ret; // 1. 数据类型匹配:KUPL数据类型与MPI数据类型对应,确保数据拷贝正确 switch (datatype) { case KUPL_SHM_DATATYPE_CHAR: // MPI_Allgather:将每个进程的sendbuf数据收集到所有进程的recvbuf ret = MPI_Allgather(sendbuf, size, MPI_CHAR, recvbuf, size, MPI_CHAR, (MPI_Comm)group); break; case KUPL_SHM_DATATYPE_INT: ret = MPI_Allgather(sendbuf, size, MPI_INT, recvbuf, size, MPI_INT, (MPI_Comm)group); break; case KUPL_SHM_DATATYPE_DOUBLE: ret = MPI_Allgather(sendbuf, size, MPI_DOUBLE, recvbuf, size, MPI_DOUBLE, (MPI_Comm)group); break; default: fprintf(stderr, "Error: Unsupported datatype in oob_allgather_callback\n"); return -1; // 不支持的数据类型,返回失败 } // 2. 错误检查:判断MPI接口调用是否成功,避免后续流程异常 if (ret != MPI_SUCCESS) { fprintf(stderr, "Error: MPI_Allgather failed in oob_allgather_callback\n"); return -1; } return 0; // 执行成功,返回0 } // oob_barrier回调:实现进程同步,避免数据竞争 static int oob_barrier_callback(void *group) { // MPI_Barrier:阻塞当前进程,直到组内所有进程都调用该接口 int ret = MPI_Barrier((MPI_Comm)group); if (ret != MPI_SUCCESS) { fprintf(stderr, "Error: MPI_Barrier failed in oob_barrier_callback\n"); return -1; } return 0; }解析要点:
4.2 创建kupl_comm通信域
kupl_comm是共享内存通信的基础,创建过程需依赖MPI进程信息和回调函数,补充错误处理和参数详解后,代码解析如下:
#include <stdio.h> #include <stdlib.h> #include <mpi.h> #include "kupl_shm.h" int main(int argc, char **argv) { int ret; int world_rank, world_size; kupl_shm_comm_h kupl_comm = NULL; // KUPL通信域句柄,初始化为NULL kupl_shm_oob_cb_t oob_cbs; kupl_shm_oob_cb_h oob_cbs_h = &oob_cbs; // 1. 初始化MPI环境,获取进程rank和进程数 MPI_Init(&argc, &argv); MPI_Comm_rank(MPI_COMM_WORLD, &world_rank); MPI_Comm_size(MPI_COMM_WORLD, &world_size); // 2. 绑定回调函数(必须在创建通信域前完成) oob_cbs_h->oob_allgather = oob_allgather_callback; oob_cbs_h->oob_barrier = oob_barrier_callback; // 3. 创建KUPL通信域(核心接口:kupl_shm_comm_create) ret = kupl_shm_comm_create( world_size, // 参数1:通信组内的进程数(与MPI进程数一致) world_rank, // 参数2:当前进程在通信组内的rank(与MPI rank一致) getpid(), // 参数3:当前进程的PID(用于标识进程) oob_cbs_h, // 参数4:回调函数结构体句柄 (void *)MPI_COMM_WORLD, // 参数5:MPI通信域(与回调函数中的group对应) &kupl_comm // 参数6:输出参数,存储创建的KUPL通信域句柄 ); // 4. 错误检查:判断通信域是否创建成功 if (ret != 0 || kupl_comm == NULL) { fprintf(stderr, "Error: kupl_shm_comm_create failed, ret = %d\n", ret); MPI_Abort(MPI_COMM_WORLD, -1); // 终止所有MPI进程,避免资源泄漏 return -1; } // 后续操作... // 销毁通信域