鲲鹏社区首页
中文
注册
开发者
我要评分
获取效率
正确性
完整性
易理解
在线提单
论坛求助

kupl_shm_allreduce_batch_init

批量生成request。

接口定义

kupl_shm_allreduce_batch_init(void **sendbuf, void **recvbuf, int *count, kupl_shm_datatype *datatype, kupl_shm_reduce_op_t *op, kupl_shm_comm_h comm, kupl_shm_request_h *request, int request_num);

参数

表1 参数定义

参数名

类型

描述

输入/输出

sendbuf

const void **

需要的send buffer数组

输入

recvbuf

void **

需要的recv buffer数组

输入

count

int *

需要的send count数组

输入

datatype

kupl_shm_datatype_t *

需要的数据类型数组,可设置为KUPL_SHM_DATATYPE_CHAR, KUPL_SHM_DATATYPE_INT, KUPL_SHM_DATATYPE_LONG, KUPL_SHM_DATATYPE_FLOAT, KUPL_SHM_DATATYPE_DOUBLE

输入

op

kupl_shm_reduce_op_t *

需要的归约操作数组,可设置为KUPL_SHM_REDUCE_OP_MAX, KUPL_SHM_REDUCE_OP_MIN, KUPL_SHM_REDUCE_OP_SUM

输入

comm

kupl_shm_comm_h

需要的kupl comm

输入

request

kupl_shm_request_t *

生成的持久化request数组

输出

request_num

kupl_shm_request_h *

需要批量生成request的数量

输入

返回值

  • 成功:返回KUPL_OK
  • 失败:返回KUPL_ERROR

示例

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
#include <stdio.h>
#include <mpi.h>
#include <unistd.h>
#include "kupl.h"

// 创建 kupl 通信域需要的回调函数 1
static int oob_barrier_callback(void *group)
{
    return MPI_Barrier((MPI_Comm)group);
}
// 创建 kupl 通信域需要的回调函数 2
static int oob_allgather_callback(const void *sendbuf, void *recvbuf, int size, void *group,
                                  kupl_shm_datatype_t datatype)
{
    switch (datatype) {
        case KUPL_SHM_DATATYPE_CHAR:
            return MPI_Allgather(sendbuf, size, MPI_CHAR, recvbuf, size, MPI_CHAR, (MPI_Comm)group);
        default:
            fprintf(stderr, "not support datatype");
            return KUPL_ERROR;
    }
}
 
int main(int argc, char *argv[])
{
   int check = 1;
    // 初始化 MPI 环境
    MPI_Init(&argc, &argv);
    MPI_Comm comm = MPI_COMM_WORLD;
    // 获取 MPI 通信域大小
    int world_size;
    MPI_Comm_size(comm, &world_size);
    // 获取进程 rank 号
    int world_rank;
    MPI_Comm_rank(comm, &world_rank);
    // 获取进程 pid 号
    int pid = getpid();
   
     
    // 创建 kupl 通信域
    kupl_shm_oob_cb_t oob_cbs;
    kupl_shm_oob_cb_h oob_cbs_h = &oob_cbs;
    oob_cbs_h->oob_allgather = oob_allgather_callback;
    oob_cbs_h->oob_barrier = oob_barrier_callback;
    kupl_shm_comm_h kupl_comm;
    int ret = kupl_shm_comm_create(world_size, world_rank, pid, oob_cbs_h, (void *)comm, &kupl_comm);
    if (ret != KUPL_OK || kupl_comm == NULL) {
        fprintf(stderr, "kupl shm comm create failed: %d\n", ret);
        return -1;
    }
     kupl_shm_win_h win_send;
     kupl_shm_win_h win_recv;
     void *sendbuf;
     void *recvbuf;
     int iter = 2;
     int unit_count = 8000;
     int count = unit_count * iter;
     size_t buf_size = sizeof(double) * count;
     size_t dtmp_buf_size = sizeof(double) * unit_count;
     kupl_shm_win_alloc(buf_size, kupl_comm, &sendbuf, &win_send);
     kupl_shm_win_alloc(dtmp_buf_size, kupl_comm, &recvbuf, &win_recv);
 
     // set sendbuf
     for (int i = 0; i < count; i++) {
         ((double *)sendbuf)[i] = (i + 1) * 1.0;
     }
 
     kupl_shm_request_h *request = (kupl_shm_request_h *) malloc(sizeof (kupl_shm_request_h) * iter);
     kupl_shm_datatype datatype[iter];
     kupl_shm_reduce_op_t reduce[iter];
     int count_list[iter];
     void **sendbuf_list = (void **) malloc(sizeof (void *) * iter);
     void **recvbuf_list = (void **) malloc(sizeof (void *) * iter);
 
     for (int i = 0; i < iter; i++) {
         sendbuf_list[i] = static_cast<double *>(sendbuf) + unit_count * i;
         recvbuf_list[i] = static_cast<double *>(recvbuf);
         reduce[i] = KUPL_SHM_REDUCE_OP_SUM;
         count_list[i] = unit_count;
         datatype[i] = KUPL_SHM_DATATYPE_DOUBLE;
     }
     kupl_shm_allreduce_batch_init(sendbuf_list, recvbuf_list, count_list, datatype,
                                    reduce, kupl_comm, request, iter);
     for (int i = 0; i < iter; ++i) {
         kupl_shm_request_start(request[i]);
         kupl_memcpy(sendbuf_list[i], recvbuf, unit_count * sizeof(double));
         kupl_shm_request_free(request[i]);
     }
 
     for (int i = 0; i < count; i++) {
        double expected = (i + 1) * 1.0 * world_size;
         if (expected != ((double *)sendbuf)[i]) {
             check = 0;
         }
     }
 
     int result;
     MPI_Reduce(&check, &result, 1, MPI_INT, MPI_SUM, 0, comm);
     if (result == 0) {
         printf("check success\n");
     } else {
         printf("check failed\n");
     }
 
     kupl_shm_win_free(win_send);
     kupl_shm_win_free(win_recv);
    kupl_shm_comm_destroy(kupl_comm);
    MPI_Finalize();
    return 0;
}

运行结果如下。

check success

上述示例演示了kupl allreduce batch_init的流程。 kupl_shm_allreduce_batch_init批量创建request。