From d64eeb395eafd2ae19aadc87cbcd60e17b8db0b0 Mon Sep 17 00:00:00 2001 From: Xiao Yang Date: Fri, 13 May 2022 10:32:15 +0800 Subject: [PATCH] rpma: make gpspm server use separate RCQ Also use shared completion channel. Signed-off-by: Xiao Yang --- engines/librpma_gpspm.c | 229 +++++++++++++++++++++++++++------------- 1 file changed, 155 insertions(+), 74 deletions(-) diff --git a/engines/librpma_gpspm.c b/engines/librpma_gpspm.c index b0b12ee81f..1564853c92 100644 --- a/engines/librpma_gpspm.c +++ b/engines/librpma_gpspm.c @@ -374,11 +374,10 @@ struct server_data { /* resources for messaging buffer from DRAM allocated by fio */ struct rpma_mr_local *msg_mr; - uint32_t msg_sqe_available; /* # of free SQ slots */ - - /* in-memory queues */ - struct ibv_wc *msgs_queued; - uint32_t msg_queued_nr; + /* # of free SQ slots */ + uint32_t msg_sqe_available; + /* receive CQ */ + struct rpma_cq *rcq; librpma_fio_persist_fn persist; }; @@ -401,13 +400,6 @@ static int server_init(struct thread_data *td) goto err_server_cleanup; } - /* allocate in-memory queue */ - sd->msgs_queued = calloc(td->o.iodepth, sizeof(*sd->msgs_queued)); - if (sd->msgs_queued == NULL) { - td_verror(td, errno, "calloc"); - goto err_free_sd; - } - #ifdef CONFIG_LIBPMEM2_INSTALLED /* get libpmem2 persist function from pmem2_map */ sd->persist = pmem2_get_persist_fn(csd->mem.map); @@ -427,9 +419,6 @@ static int server_init(struct thread_data *td) return 0; -err_free_sd: - free(sd); - err_server_cleanup: librpma_fio_server_cleanup(td); @@ -500,7 +489,6 @@ static void server_cleanup(struct thread_data *td) if ((ret = rpma_mr_dereg(&sd->msg_mr))) librpma_td_verror(td, ret, "rpma_mr_dereg"); - free(sd->msgs_queued); free(sd); } @@ -533,6 +521,7 @@ static int prepare_connection(struct thread_data *td, static int server_open_file(struct thread_data *td, struct fio_file *f) { struct librpma_fio_server_data *csd = td->io_ops_data; + struct server_data *sd = csd->server_data; struct rpma_conn_cfg *cfg = NULL; uint16_t max_msg_num = td->o.iodepth; int ret; @@ -546,7 +535,7 @@ static int server_open_file(struct thread_data *td, struct fio_file *f) } /* - * Calculate the required queue sizes where: + * The required queue sizes are: * - the send queue (SQ) has to be big enough to accommodate * all possible flush requests (SENDs) * - the receive queue (RQ) has to be big enough to accommodate @@ -562,12 +551,25 @@ static int server_open_file(struct thread_data *td, struct fio_file *f) librpma_td_verror(td, ret, "rpma_conn_cfg_set_rq_size"); goto err_cfg_delete; } - if ((ret = rpma_conn_cfg_set_cq_size(cfg, max_msg_num * 2))) { + if ((ret = rpma_conn_cfg_set_cq_size(cfg, max_msg_num))) { librpma_td_verror(td, ret, "rpma_conn_cfg_set_cq_size"); goto err_cfg_delete; } + if ((ret = rpma_conn_cfg_set_rcq_size(cfg, max_msg_num))) { + librpma_td_verror(td, ret, "rpma_conn_cfg_set_rcq_size"); + goto err_cfg_delete; + } + if ((ret = rpma_conn_cfg_set_compl_channel(cfg, true))) { + librpma_td_verror(td, ret, "rpma_conn_cfg_set_compl_channel"); + goto err_cfg_delete; + } + + if ((ret = librpma_fio_server_open_file(td, f, cfg))) + goto err_cfg_delete; - ret = librpma_fio_server_open_file(td, f, cfg); + /* get the connection's receive CQ */ + if ((ret = rpma_conn_get_rcq(csd->conn, &sd->rcq))) + librpma_td_verror(td, ret, "rpma_conn_get_rcq"); err_cfg_delete: (void) rpma_conn_cfg_delete(&cfg); @@ -660,92 +662,171 @@ static int server_qe_process(struct thread_data *td, struct ibv_wc *wc) return -1; } -static inline int server_queue_process(struct thread_data *td) +/* + * server_cmpl_poll - poll and process a completion + * + * Return value: + * 0 or 1 - number of received completions + * -1 - in case of an error + */ +static int server_cmpl_poll(struct thread_data *td, struct rpma_cq *cq, + struct ibv_wc *wc) { struct librpma_fio_server_data *csd = td->io_ops_data; struct server_data *sd = csd->server_data; int ret; - int i; - /* min(# of queue entries, # of SQ entries available) */ - uint32_t qes_to_process = min(sd->msg_queued_nr, sd->msg_sqe_available); - if (qes_to_process == 0) + ret = rpma_cq_get_wc(cq, 1, wc, NULL); + if (ret == RPMA_E_NO_COMPLETION) { + /* lack of completion is not an error */ return 0; + } + if (ret) { + librpma_td_verror(td, ret, "rpma_cq_get_wc"); + goto err_terminate; + } + + /* validate the completion */ + if (wc->status != IBV_WC_SUCCESS) + goto err_terminate; + + if (wc->opcode == IBV_WC_SEND) + ++sd->msg_sqe_available; + + return 1; + +err_terminate: + td->terminate = true; + + return -1; +} + +static int server_queue_poll(struct thread_data *td) +{ + struct librpma_fio_server_data *csd = td->io_ops_data; + struct server_data *sd = csd->server_data; + struct ibv_wc cq_wc, rcq_wc; + int ret; + + /* process a receive completion */ + ret = server_cmpl_poll(td, sd->rcq, &rcq_wc); + if (ret < 0) + return ret; - /* process queued completions */ - for (i = 0; i < qes_to_process; ++i) { - if ((ret = server_qe_process(td, &sd->msgs_queued[i]))) + if (ret == 0) { + /* process a send completion if nothing to be done with RCQ */ + ret = server_cmpl_poll(td, csd->cq, &cq_wc); + if (ret < 0) return ret; - } - /* progress the queue */ - for (i = 0; i < sd->msg_queued_nr - qes_to_process; ++i) { - memcpy(&sd->msgs_queued[i], - &sd->msgs_queued[qes_to_process + i], - sizeof(sd->msgs_queued[i])); + return 0; } - sd->msg_queued_nr -= qes_to_process; + /* here means rcq_wc.opcode == IBV_WC_RECV */ - return 0; + /* ensure that at least one SQ slot is available */ + while (sd->msg_sqe_available == 0) { + /* process a send completion */ + ret = server_cmpl_poll(td, csd->cq, &cq_wc); + if (ret < 0) + return ret; + } + + return server_qe_process(td, &rcq_wc); } -static int server_cmpl_process(struct thread_data *td) +static int server_queue_wait_poll(struct thread_data *td) { struct librpma_fio_server_data *csd = td->io_ops_data; struct server_data *sd = csd->server_data; - struct ibv_wc *wc = &sd->msgs_queued[sd->msg_queued_nr]; - struct librpma_fio_options_values *o = td->eo; + struct rpma_cq *cq; + struct ibv_wc cq_wc, rcq_wc; + bool is_rcq; int ret; - ret = rpma_cq_get_wc(csd->cq, 1, wc, NULL); - if (ret == RPMA_E_NO_COMPLETION) { - if (o->busy_wait_polling) - return 0; /* lack of completion is not an error */ - - ret = rpma_cq_wait(csd->cq); - if (ret == RPMA_E_NO_COMPLETION) - return 0; /* lack of completion is not an error */ - if (ret) { - librpma_td_verror(td, ret, "rpma_cq_wait"); - goto err_terminate; - } + /* process a receive completion */ + ret = server_cmpl_poll(td, sd->rcq, &rcq_wc); + if (ret < 0) + return ret; - ret = rpma_cq_get_wc(csd->cq, 1, wc, NULL); - if (ret == RPMA_E_NO_COMPLETION) - return 0; /* lack of completion is not an error */ - if (ret) { - librpma_td_verror(td, ret, "rpma_cq_get_wc"); - goto err_terminate; - } - } else if (ret) { - librpma_td_verror(td, ret, "rpma_cq_get_wc"); - goto err_terminate; + if (ret == 0) { + /* process all available send completions before wait for RCQ */ + do { + ret = server_cmpl_poll(td, csd->cq, &cq_wc); + if (ret < 0) + return ret; + } while (ret); + + do { + ret = rpma_conn_wait(csd->conn, &cq, &is_rcq); + if (ret) { + librpma_td_verror(td, ret, "rpma_conn_wait"); + td->terminate = true; + return ret; + } + + if (!is_rcq) { + /* process all available send completions */ + do { + ret = server_cmpl_poll(td, cq, &cq_wc); + if (ret < 0) + return ret; + } while (ret); + } else { + /* process a receive completion */ + ret = server_cmpl_poll(td, cq, &rcq_wc); + if (ret < 0) + return ret; + } + } while (!is_rcq); + + /* return if there is still no receive completion */ + if (ret == 0) + return ret; } - /* validate the completion */ - if (wc->status != IBV_WC_SUCCESS) - goto err_terminate; + /* here means rcq_wc.opcode == IBV_WC_RECV */ - if (wc->opcode == IBV_WC_RECV) - ++sd->msg_queued_nr; - else if (wc->opcode == IBV_WC_SEND) - ++sd->msg_sqe_available; + /* ensure that at least one SQ slot is available */ + while (sd->msg_sqe_available == 0) { + /* process a send completion */ + ret = server_cmpl_poll(td, csd->cq, &cq_wc); + if (ret < 0) + return ret; - return 0; + if (ret == 0) { + do { + ret = rpma_conn_wait(csd->conn, &cq, &is_rcq); + if (ret) { + librpma_td_verror(td, ret, "rpma_conn_wait"); + td->terminate = true; + return ret; + } + } while (is_rcq); + + /* process a send completion again */ + ret = server_cmpl_poll(td, cq, &cq_wc); + if (ret < 0) + return ret; + } + } -err_terminate: - td->terminate = true; + return server_qe_process(td, &rcq_wc); +} - return -1; +static inline int server_queue_process(struct thread_data *td) +{ + struct librpma_fio_options_values *o = td->eo; + + if (o->busy_wait_polling) + return server_queue_poll(td); + else + return server_queue_wait_poll(td); } static enum fio_q_status server_queue(struct thread_data *td, struct io_u *io_u) { do { - if (server_cmpl_process(td)) - return FIO_Q_BUSY; - if (server_queue_process(td)) return FIO_Q_BUSY;