kpccl_shm_allreduce_init
进程间allreduce初始化。
接口定义
int kpccl_shm_allreduce_init(const void *sendbuf, void *recvbuf, int count, kpccl_shm_datatype_t datatype, kpccl_shm_reduce_op_t op, kpccl_shm_comm_h comm, kpccl_shm_request_t *request);
环境变量
allreduce 可以使用环境变量指定算法。
环境变量名称为KPCCL_SHM_ALLREDUCE_ALGORITHM,可选值:
- 0:代表auto_tunning(默认)
- 1:代表rb(rh_rd)算法
- 2:代表linear 算法
参数
参数名 |
类型 |
描述 |
输入/输出 |
|---|---|---|---|
sendbuf |
const void * |
需要的send buffer |
输入 |
recvbuf |
void * |
需要的recv buffer |
输入 |
count |
int |
需要的send count |
输入 |
datatype |
kpccl_shm_datatype_t |
需要的数据类型,可设置为KPCCL_SHM_DATATYPE_CHAR, KPCCL_SHM_DATATYPE_INT, KPCCL_SHM_DATATYPE_LONG, KPCCL_SHM_DATATYPE_FLOAT, KPCCL_SHM_DATATYPE_DOUBLE |
输入 |
op |
kpccl_shm_reduce_op_t |
需要的归约操作,可设置为KPCCL_SHM_REDUCE_OP_MAX, KPCCL_SHM_REDUCE_OP_MIN, KPCCL_SHM_REDUCE_OP_SUM |
输入 |
comm |
kpccl_shm_comm_h |
需要的kpccl comm |
输入 |
request |
kpccl_shm_request_t * |
生成的持久化request |
输出 |
返回值
- 成功:返回KPCCL_OK
- 失败:返回KPCCL_ERROR
示例
#include <stdio.h>
#include <mpi.h>
#include <math.h>
#include "kpccl.h"
int oob_allgather_callback(const void *sendbuf, void *recvbuf, int size, void *group,kpccl_shm_datatype_t datatype)
{
switch (datatype) {
case KPCCL_SHM_DATATYPE_CHAR:
return MPI_Allgather(sendbuf, size, MPI_CHAR, recvbuf, size, MPI_CHAR, (MPI_Comm)group);
case KPCCL_SHM_DATATYPE_INT:
return MPI_Allgather(sendbuf, size, MPI_INT, recvbuf, size, MPI_INT, (MPI_Comm)group);
case KPCCL_SHM_DATATYPE_LONG:
return MPI_Allgather(sendbuf, size, MPI_LONG, recvbuf, size, MPI_LONG, (MPI_Comm)group);
case KPCCL_SHM_DATATYPE_FLOAT:
return MPI_Allgather(sendbuf, size, MPI_FLOAT, recvbuf, size, MPI_FLOAT, (MPI_Comm)group);
case KPCCL_SHM_DATATYPE_DOUBLE:
return MPI_Allgather(sendbuf, size, MPI_DOUBLE, recvbuf, size, MPI_DOUBLE, (MPI_Comm)group);
default:
kpccl_error("not support datatype");
return KPCCL_ERROR;
}
}
int oob_barrier_callback(void *group)
{
return MPI_Barrier((MPI_Comm)group);
}
int main()
{
int check = 1;
int myid, numprocs;
MPI_Comm comm = MPI_COMM_WORLD;
MPI_Comm_size(comm, &numprocs);
MPI_Comm_rank(comm, &myid);
kpccl_shm_oob_cb_t oob_cbs;
kpccl_shm_oob_cb_h oob_cbs_h = &oob_cbs;
oob_cbs_h->oob_allgather = oob_allgather_callback;
oob_cbs_h->oob_barrier = oob_barrier_callback;
kpccl_shm_comm_h kpccl_comm;
kpccl_shm_comm_create(numprocs, myid, myid, oob_cbs_h, (void *)comm, &kpccl_comm);
kpccl_shm_win_h win_send;
kpccl_shm_win_h win_recv;
kpccl_shm_win_h ompi_win_recv;
void *sendbuf;
void *recvbuf;
void *ompibuf;
int count = 16000;
size_t buf_size = sizeof(float) * count;
kpccl_shm_win_alloc(buf_size, kpccl_comm, &sendbuf, &win_send);
kpccl_shm_win_alloc(buf_size, kpccl_comm, &recvbuf, &win_recv);
kpccl_shm_win_alloc(buf_size, kpccl_comm, &ompibuf, &ompi_win_recv);
auto t_sendbuf = static_cast<float *>(sendbuf);
auto t_recvbuf = static_cast<float *>(recvbuf);
auto t_ompibuf = static_cast<float *>(ompibuf);
for (int i = 0; i < count; i++) {
t_sendbuf[i] = myid + i;
}
kpccl_shm_datatype datatype = KPCCL_SHM_DATATYPE_FLOAT;
MPI_Datatype mpi_datatype = MPI_FLOAT;
kpccl_shm_request_h request;
MPI_Barrier(comm);
kpccl_shm_allreduce_init(sendbuf, recvbuf, count, datatype, KPCCL_SHM_REDUCE_OP_SUM, kpccl_comm, &request);
kpccl_shm_request_start(request);
kpccl_shm_request_wait(request);
kpccl_shm_request_free(request);
MPI_Barrier(comm);
MPI_Allreduce(sendbuf, ompibuf, count, mpi_datatype, MPI_SUM, comm);
for (int i = 0; i < count; i++) {
if (abs(t_recvbuf[i] - t_ompibuf[i]) > 1e-6) {
check = 0;
}
}
int result;
MPI_Reduce(&check, &result, 1, MPI_INT, MPI_SUM, 0, comm);
if (result == 0) {
printf("check success\n");
} else {
printf("check failed\n");
}
}
kpccl_shm_win_free(win_send);
kpccl_shm_win_free(win_recv);
kpccl_shm_win_free(ompi_win_recv);
kpccl_shm_comm_destroy(kpccl_comm);
}
运行结果如下。
check success
上述示例演示了kpccl allreduce的流程。kpccl_shm_allreduce_init函数初始化allreduce的request。