鲲鹏社区首页
中文
注册
我要评分
文档获取效率
文档正确性
内容完整性
文档易理解
在线提单
论坛求助

kupl_shm_request_wait

阻塞等待request中的集合通信操作执行结束。

接口定义

int kupl_shm_request_wait(kupl_shm_request_h request);

参数

表1 参数定义

参数名

类型

描述

输入/输出

request

kupl_shm_request_h

需要阻塞等待完成的request

输入

返回值

  • 成功:返回KUPL_OK
  • 失败:返回KUPL_ERROR

示例

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
#include <stdio.h> 
#include <mpi.h>
#include <unistd.h>
#include "kupl.h" 
 
// 创建 kupl 通信域需要的回调函数 1
static int oob_barrier_callback(void *group)
{
    return MPI_Barrier((MPI_Comm)group);
}
// 创建 kupl 通信域需要的回调函数 2
static int oob_allgather_callback(const void *sendbuf, void *recvbuf, int size, void *group,
                                  kupl_shm_datatype_t datatype)
{
    switch (datatype) {
        case KUPL_SHM_DATATYPE_CHAR:
            return MPI_Allgather(sendbuf, size, MPI_CHAR, recvbuf, size, MPI_CHAR, (MPI_Comm)group);
        default:
            fprintf(stderr, "not support datatype");
            return KUPL_ERROR;
    }
}
int main(int argc, char *argv[])
{
    // 初始化 MPI 环境
    MPI_Init(&argc, &argv);
    MPI_Comm comm = MPI_COMM_WORLD;
    // 获取 MPI 通信域大小
    int world_size;
    MPI_Comm_size(comm, &world_size);
    // 获取进程 rank 号
    int world_rank;
    MPI_Comm_rank(comm, &world_rank);
    // 获取进程 pid 号
    int pid = getpid();
    // 创建 kupl 通信域
    kupl_shm_oob_cb_t oob_cbs;
    kupl_shm_oob_cb_h oob_cbs_h = &oob_cbs;
    oob_cbs_h->oob_allgather = oob_allgather_callback;
    oob_cbs_h->oob_barrier = oob_barrier_callback;
    kupl_shm_comm_h kupl_comm;
    int ret = kupl_shm_comm_create(world_size, world_rank, pid, oob_cbs_h, (void *)comm, &kupl_comm);
    if (ret != KUPL_OK || kupl_comm == NULL) {
        fprintf(stderr, "kupl shm comm create failed: %d\n", ret);
        return -1;
    }
    kupl_shm_win_h win_send;
    kupl_shm_win_h win_recv;
    kupl_shm_win_h ompi_win_recv;
    void *sendbuf = NULL;
    void *recvbuf = NULL;
    void *ompibuf = NULL;
    int count = 16000;
    size_t buf_size = sizeof(int) * count;
    ret = kupl_shm_win_alloc(buf_size, kupl_comm, &sendbuf, &win_send);
    if (ret != KUPL_OK) {
        fprintf(stderr, "kupl sendbuf alloc failed: %d\n", ret);
        return -1;
    }
    ret = kupl_shm_win_alloc(buf_size, kupl_comm, &recvbuf, &win_recv);
    if (ret != KUPL_OK) {
        fprintf(stderr, "kupl recvbuf alloc failed: %d\n", ret);
        kupl_shm_win_free(win_send);
        win_send = NULL;
        return -1;
    }
    ret = kupl_shm_win_alloc(buf_size, kupl_comm, &ompibuf, &ompi_win_recv);
    if (ret != KUPL_OK) {
        fprintf(stderr, "kupl ompibuf alloc failed: %d\n", ret);
        kupl_shm_win_free(win_send);
        kupl_shm_win_free(win_recv);
        win_send = NULL;
        win_recv = NULL;
        return -1;
    }
    auto *t_sendbuf = static_cast<int *>(sendbuf);
    auto *t_recvbuf = static_cast<int *>(recvbuf);
    auto *t_ompibuf = static_cast<int *>(ompibuf);
    for (int i = 0; i < count; i++) {
        t_sendbuf[i] = world_rank + i;
    }
    kupl_shm_request_h request;
    MPI_Barrier(comm);
    kupl_shm_allreduce_init(sendbuf, recvbuf, count, KUPL_SHM_DATATYPE_INT, KUPL_SHM_REDUCE_OP_SUM, kupl_comm,
                            &request);
    kupl_shm_request_start(request);
    kupl_shm_request_wait(request);
    kupl_shm_request_free(request);
    MPI_Barrier(comm);
    MPI_Allreduce(sendbuf, ompibuf, count, MPI_INT, MPI_SUM, comm);
    int check = 0;
    for (int i = 0; i < count; i++) {
        if (t_recvbuf[i] != t_ompibuf[i]) {
            check = 1;
            break;
        }
    }
    int result = 0;
    MPI_Reduce(&check, &result, 1, MPI_INT, MPI_SUM, 0, comm);
    if (result == 0) {
        printf("check success\n");
    } else {
        printf("check failed\n");
    }
    kupl_shm_win_free(win_send);
    kupl_shm_win_free(win_recv);
    kupl_shm_win_free(ompi_win_recv);
    win_send = NULL;
    win_recv = NULL;
    ompi_win_recv = NULL;
    kupl_shm_comm_destroy(kupl_comm);
    MPI_Finalize();
    return 0;
}

运行结果如下。

check success

上述示例演示了kupl allreduce的流程。kupl_shm_request_wait函数等待request完成。