kupl_shm_allreduce_batch_init
批量生成request。
接口定义
kupl_shm_allreduce_batch_init(void **sendbuf, void **recvbuf, int *count, kupl_shm_datatype *datatype, kupl_shm_reduce_op_t *op, kupl_shm_comm_h comm, kupl_shm_request_h *request, int request_num);
参数
参数名 |
类型 |
描述 |
输入/输出 |
|---|---|---|---|
sendbuf |
const void ** |
需要的send buffer数组 |
输入 |
recvbuf |
void ** |
需要的recv buffer数组 |
输入 |
count |
int * |
需要的send count数组 |
输入 |
datatype |
kupl_shm_datatype_t * |
需要的数据类型数组,可设置为KUPL_SHM_DATATYPE_CHAR, KUPL_SHM_DATATYPE_INT, KUPL_SHM_DATATYPE_LONG, KUPL_SHM_DATATYPE_FLOAT, KUPL_SHM_DATATYPE_DOUBLE |
输入 |
op |
kupl_shm_reduce_op_t * |
需要的归约操作数组,可设置为KUPL_SHM_REDUCE_OP_MAX, KUPL_SHM_REDUCE_OP_MIN, KUPL_SHM_REDUCE_OP_SUM |
输入 |
comm |
kupl_shm_comm_h |
需要的kupl comm |
输入 |
request |
kupl_shm_request_t * |
生成的持久化request数组 |
输出 |
request_num |
kupl_shm_request_h * |
需要批量生成request的数量 |
输入 |
返回值
- 成功:返回KUPL_OK
- 失败:返回KUPL_ERROR
示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 | #include <stdio.h> #include <mpi.h> #include <unistd.h> #include "kupl.h" // 创建 kupl 通信域需要的回调函数 1 static int oob_barrier_callback(void *group) { return MPI_Barrier((MPI_Comm)group); } // 创建 kupl 通信域需要的回调函数 2 static int oob_allgather_callback(const void *sendbuf, void *recvbuf, int size, void *group, kupl_shm_datatype_t datatype) { switch (datatype) { case KUPL_SHM_DATATYPE_CHAR: return MPI_Allgather(sendbuf, size, MPI_CHAR, recvbuf, size, MPI_CHAR, (MPI_Comm)group); default: fprintf(stderr, "not support datatype"); return KUPL_ERROR; } } int main(int argc, char *argv[]) { int check = 1; // 初始化 MPI 环境 MPI_Init(&argc, &argv); MPI_Comm comm = MPI_COMM_WORLD; // 获取 MPI 通信域大小 int world_size; MPI_Comm_size(comm, &world_size); // 获取进程 rank 号 int world_rank; MPI_Comm_rank(comm, &world_rank); // 获取进程 pid 号 int pid = getpid(); // 创建 kupl 通信域 kupl_shm_oob_cb_t oob_cbs; kupl_shm_oob_cb_h oob_cbs_h = &oob_cbs; oob_cbs_h->oob_allgather = oob_allgather_callback; oob_cbs_h->oob_barrier = oob_barrier_callback; kupl_shm_comm_h kupl_comm; int ret = kupl_shm_comm_create(world_size, world_rank, pid, oob_cbs_h, (void *)comm, &kupl_comm); if (ret != KUPL_OK || kupl_comm == NULL) { fprintf(stderr, "kupl shm comm create failed: %d\n", ret); return -1; } kupl_shm_win_h win_send; kupl_shm_win_h win_recv; void *sendbuf; void *recvbuf; int iter = 2; int unit_count = 8000; int count = unit_count * iter; size_t buf_size = sizeof(double) * count; size_t dtmp_buf_size = sizeof(double) * unit_count; kupl_shm_win_alloc(buf_size, kupl_comm, &sendbuf, &win_send); kupl_shm_win_alloc(dtmp_buf_size, kupl_comm, &recvbuf, &win_recv); // set sendbuf for (int i = 0; i < count; i++) { ((double *)sendbuf)[i] = (i + 1) * 1.0; } kupl_shm_request_h *request = (kupl_shm_request_h *) malloc(sizeof (kupl_shm_request_h) * iter); kupl_shm_datatype datatype[iter]; kupl_shm_reduce_op_t reduce[iter]; int count_list[iter]; void **sendbuf_list = (void **) malloc(sizeof (void *) * iter); void **recvbuf_list = (void **) malloc(sizeof (void *) * iter); for (int i = 0; i < iter; i++) { sendbuf_list[i] = static_cast<double *>(sendbuf) + unit_count * i; recvbuf_list[i] = static_cast<double *>(recvbuf); reduce[i] = KUPL_SHM_REDUCE_OP_SUM; count_list[i] = unit_count; datatype[i] = KUPL_SHM_DATATYPE_DOUBLE; } kupl_shm_allreduce_batch_init(sendbuf_list, recvbuf_list, count_list, datatype, reduce, kupl_comm, request, iter); for (int i = 0; i < iter; ++i) { kupl_shm_request_start(request[i]); kupl_memcpy(sendbuf_list[i], recvbuf, unit_count * sizeof(double)); kupl_shm_request_free(request[i]); } for (int i = 0; i < count; i++) { double expected = (i + 1) * 1.0 * world_size; if (expected != ((double *)sendbuf)[i]) { check = 0; } } int result; MPI_Reduce(&check, &result, 1, MPI_INT, MPI_SUM, 0, comm); if (result == 0) { printf("check success\n"); } else { printf("check failed\n"); } kupl_shm_win_free(win_send); kupl_shm_win_free(win_recv); kupl_shm_comm_destroy(kupl_comm); MPI_Finalize(); return 0; } |
运行结果如下。
check success
上述示例演示了kupl allreduce batch_init的流程。 kupl_shm_allreduce_batch_init批量创建request。