鲲鹏社区首页
中文
注册
开发者
我要评分
文档获取效率
文档正确性
内容完整性
文档易理解
在线提单
论坛求助

kpccl_shm_request_wait

阻塞等待request中的集合通信操作执行结束。

接口定义

int kpccl_shm_request_wait(kpccl_shm_request_h request);

参数

表1 参数定义

参数名

类型

描述

输入/输出

request

kpccl_shm_request_h

需要阻塞等待完成的request

输入

返回值

  • 成功:返回KPCCL_OK
  • 失败:返回KPCCL_ERROR

示例

#include <stdio.h> 
#include <mpi.h>
#include "kpccl.h" 
 
int oob_allgather_callback(const void *sendbuf, void *recvbuf, int size, void *group,kpccl_shm_datatype_t datatype)
{
switch (datatype) {
        case KPCCL_SHM_DATATYPE_CHAR:
            return MPI_Allgather(sendbuf, size, MPI_CHAR, recvbuf, size, MPI_CHAR, (MPI_Comm)group);
        case KPCCL_SHM_DATATYPE_INT:
            return MPI_Allgather(sendbuf, size, MPI_INT, recvbuf, size, MPI_INT, (MPI_Comm)group);
        case KPCCL_SHM_DATATYPE_LONG:
            return MPI_Allgather(sendbuf, size, MPI_LONG, recvbuf, size, MPI_LONG, (MPI_Comm)group);
        case KPCCL_SHM_DATATYPE_FLOAT:
            return MPI_Allgather(sendbuf, size, MPI_FLOAT, recvbuf, size, MPI_FLOAT, (MPI_Comm)group);
        case KPCCL_SHM_DATATYPE_DOUBLE:
            return MPI_Allgather(sendbuf, size, MPI_DOUBLE, recvbuf, size, MPI_DOUBLE, (MPI_Comm)group);
        default:
            kpccl_error("not support datatype");
            return KPCCL_ERROR;
   }
}

int oob_barrier_callback(void *group) 
{
    return MPI_Barrier((MPI_Comm)group);
}
 
int main()
{
    int check = 1;
    int myid, numprocs;
    MPI_Comm comm = MPI_COMM_WORLD;
    MPI_Comm_size(comm, &numprocs);
    MPI_Comm_rank(comm, &myid);
    
    kpccl_shm_oob_cb_t oob_cbs;
    kpccl_shm_oob_cb_h oob_cbs_h = &oob_cbs;
    oob_cbs_h->oob_allgather = oob_allgather_callback;
    oob_cbs_h->oob_barrier = oob_barrier_callback;
 
    kpccl_shm_comm_h kpccl_comm;
    kpccl_shm_comm_create(numprocs, myid, myid, oob_cbs_h, (void *)comm, &kpccl_comm);
    kpccl_shm_win_h win_send;
    kpccl_shm_win_h win_recv;
    kpccl_shm_win_h ompi_win_recv;
    void *sendbuf;
    void *recvbuf;
    void *ompibuf;
    int count = 16000;
    size_t buf_size = sizeof(float) * count;
    kpccl_shm_win_alloc(buf_size, kpccl_comm, &sendbuf, &win_send);
    kpccl_shm_win_alloc(buf_size, kpccl_comm, &recvbuf, &win_recv);
    kpccl_shm_win_alloc(buf_size, kpccl_comm, &ompibuf, &ompi_win_recv);
 
    auto t_sendbuf = static_cast<float *>(sendbuf);
    auto t_recvbuf = static_cast<float *>(recvbuf);
    auto t_ompibuf = static_cast<float *>(ompibuf);
    for (int i = 0; i < count; i++) {
        t_sendbuf[i] = myid + i;
}
 
kpccl_shm_datatype datatype = KPCCL_SHM_DATATYPE_FLOAT;
MPI_Datatype mpi_datatype = MPI_FLOAT;
 
    kpccl_shm_request_h request;
    MPI_Barrier(comm);
    kpccl_shm_allreduce_init(sendbuf, recvbuf, count, datatype, KPCCL_SHM_REDUCE_OP_SUM, kpccl_comm, &request);
kpccl_shm_request_start(request);
kpccl_shm_request_wait(request);
kpccl_shm_request_free(request);
    MPI_Barrier(comm);
MPI_Allreduce(sendbuf, ompibuf, count, mpi_datatype, MPI_SUM, comm);
 
for (int i = 0; i < count; i++) {
    if (abs(t_recvbuf[i] - t_ompibuf[i]) > 1e-6) {
        check = 0;
}
}
int result;
MPI_Reduce(&check, &result, 1, MPI_INT, MPI_SUM, 0, comm);
    if (result == 0) {
    printf("check success\n");
} else {
    printf("check failed\n");
}
}
 
    kpccl_shm_win_free(win_send);
    kpccl_shm_win_free(win_recv);
    kpccl_shm_win_free(ompi_win_recv);
kpccl_shm_comm_destroy(kpccl_comm);
}

运行结果如下。

check success

上述示例演示了kpccl allreduce的流程。kpccl_shm_request_wait函数等待request完成。