diff --git a/src/client/array/dc_array.c b/src/client/array/dc_array.c index 10b6324b915..ec0288a1459 100644 --- a/src/client/array/dc_array.c +++ b/src/client/array/dc_array.c @@ -70,7 +70,15 @@ struct io_params { char akey_val; }; +struct io_splits { + daos_size_t cnt; + daos_size_t *nrec; + daos_array_iod_t *iod_v; + d_sg_list_t *sgl_v; +}; + unsigned int array_list_io_limit; +unsigned int array_iod_split; void daos_array_env_init() @@ -86,6 +94,12 @@ daos_array_env_init() } else { D_DEBUG(DB_TRACE, "ARRAY List IO limit = %u\n", array_list_io_limit); } + array_iod_split = DAOS_ARRAY_IOD_SPLIT; + d_getenv_uint("DAOS_ARRAY_IOD_SPLIT", &array_iod_split); + if (array_iod_split == 0 || array_iod_split > array_list_io_limit) { + array_iod_split = array_list_io_limit; + } + D_DEBUG(DB_TRACE, "ARRAY IOD split = %u\n", array_iod_split); } static void @@ -1025,6 +1039,133 @@ io_extent_same(daos_array_iod_t *iod, d_sg_list_t *sgl, daos_size_t cell_size, return (rgs_len * cell_size == sgl_len); } +static void +free_splits(struct io_splits *split) +{ + daos_size_t spl_ix; + + if (split->cnt > 1) { + for (spl_ix = 0; spl_ix < split->cnt; spl_ix++) + D_FREE(split->sgl_v[spl_ix].sg_iovs); + D_FREE(split->sgl_v); + D_FREE(split->iod_v); + D_FREE(split->nrec); + } + return; +} + +static int +iod_split(daos_array_iod_t *iod, d_sg_list_t *sgl, daos_size_t cell_size, daos_size_t *num_records, + struct io_splits *split) +{ + daos_size_t rgs_sz = 0; /* sizes in bytes */ + daos_size_t sgl_sz = 0; + daos_size_t spl_sz = 0; + daos_size_t sgl_ix, spl_ix, len, ix, tail; + daos_off_t boff = 0; + char *bptr; + int rc = 0; + + split->cnt = 0; + if (iod->arr_nr < array_iod_split) { + /* No need to split io */ + split->iod_v = iod; + split->sgl_v = sgl; + split->cnt = 1; + if (!io_extent_same(iod, sgl, cell_size, num_records)) + return -DER_INVAL; + return 0; + } + + tail = iod->arr_nr % array_iod_split; + split->cnt = iod->arr_nr / array_iod_split + (tail ? 1 : 0); + D_ALLOC_ARRAY(split->iod_v, split->cnt); + if (split->iod_v == NULL) + goto mem_err; + D_ALLOC_ARRAY(split->nrec, split->cnt); + if (split->nrec == NULL) + goto mem_err; + if (sgl) { + D_ALLOC_ARRAY(split->sgl_v, split->cnt); + if (split->iod_v == NULL) + goto mem_err; + } + + /* Prepare iod batches */ + for (spl_ix = 0; spl_ix < split->cnt; spl_ix++) { + split->iod_v[spl_ix].arr_nr = array_iod_split; + split->iod_v[spl_ix].arr_rgs = &iod->arr_rgs[spl_ix * array_iod_split]; + if (sgl) { + D_ALLOC_ARRAY(split->sgl_v[spl_ix].sg_iovs, sgl->sg_nr / split->cnt + 1); + if (split->sgl_v[spl_ix].sg_iovs == NULL) + goto mem_err; + split->sgl_v[spl_ix].sg_nr = sgl->sg_nr / split->cnt + 1; + } + } + if (tail) + split->iod_v[split->cnt - 1].arr_nr = tail; + if (sgl == NULL) + return 0; + + /* Fill up sgl batches to fulfill iod splits */ + sgl_ix = 0; + boff = 0; + for (spl_ix = 0; spl_ix < split->cnt; spl_ix++) { + /* Calculate each split's byte size */ + spl_sz = 0; + for (ix = 0; ix < split->iod_v[spl_ix].arr_nr; ix++) + spl_sz += split->iod_v[spl_ix].arr_rgs[ix].rg_len; + split->nrec[spl_ix] = spl_sz; + rgs_sz += spl_sz * cell_size; /* iod size running total */ + /* stuff sgl split */ + ix = 0; + do { + len = sgl->sg_iovs[sgl_ix].iov_len - boff; + bptr = (char *)sgl->sg_iovs[sgl_ix].iov_buf + boff; + split->sgl_v[spl_ix].sg_iovs[ix].iov_buf = bptr; + if (len <= spl_sz) { + /* the rest of this sgl segment fits in current iod split */ + split->sgl_v[spl_ix].sg_iovs[ix].iov_len = len; + split->sgl_v[spl_ix].sg_iovs[ix].iov_buf_len = len; + sgl_ix++; + boff = 0; + sgl_sz += len; + } else { + /* currentt sgl segment is larger than the remaining split size */ + tail = len - spl_sz; + split->sgl_v[spl_ix].sg_iovs[ix].iov_len = spl_sz; + split->sgl_v[spl_ix].sg_iovs[ix].iov_buf_len = spl_sz; + boff += spl_sz; + sgl_sz += spl_sz; + } + spl_sz -= split->sgl_v[spl_ix].sg_iovs[ix].iov_len; + ix++; + if (ix == split->sgl_v[spl_ix].sg_nr) { + d_iov_t *new; + daos_size_t n = split->sgl_v[spl_ix].sg_nr + 4; + D_REALLOC_ARRAY(new, split->sgl_v[spl_ix].sg_iovs, + split->sgl_v[spl_ix].sg_nr, n); + if (new == NULL) + goto mem_err; + split->sgl_v[spl_ix].sg_iovs = new; + split->sgl_v[spl_ix].sg_nr = n; + } + } while (spl_sz); + split->sgl_v[spl_ix].sg_nr = ix; /* update sgl split to correct count */ + } + if (sgl_sz != rgs_sz) + goto len_err; /* all this hard work for nothing :( */ + + return 0; + +len_err: + rc = -DER_INVAL; +mem_err: + /* free all allocated memory */ + free_splits(split); + return rc ? rc : -DER_NOMEM; +} + /* * Compute the dkey given the array index for this range. Also compute: - the * number of records that the dkey can hold starting at the index where we start @@ -1429,11 +1570,10 @@ check_short_read_cb(tse_task_t *task, void *data) } static int -dc_array_io(daos_handle_t array_oh, daos_handle_t th, - daos_array_iod_t *rg_iod, d_sg_list_t *user_sgl, - daos_opc_t op_type, tse_task_t *task) +dc_array_io_int(struct dc_array *array, daos_handle_t th, daos_array_iod_t *rg_iod, + d_sg_list_t *user_sgl, daos_opc_t op_type, tse_task_t *task, + daos_size_t tot_num_records) { - struct dc_array *array = NULL; daos_handle_t oh; daos_off_t cur_off; /* offset into user buf to track current pos */ daos_size_t cur_i; /* index into user sgl to track current pos */ @@ -1445,72 +1585,17 @@ dc_array_io(daos_handle_t array_oh, daos_handle_t th, struct io_params *head = NULL; bool head_cb_registered = false; daos_size_t num_ios; - d_list_t io_task_list; - daos_size_t tot_num_records = 0; + d_list_t io_task_list; tse_task_t *stask; /* task for short read and hole mgmt */ - int rc; - - if (rg_iod == NULL) { - D_ERROR("NULL iod passed\n"); - D_GOTO(err_task, rc = -DER_INVAL); - } - - /* - * If we are above the limit, check for small recx size. Just a best effort check for - * extreme cases to reject. - */ - if (rg_iod->arr_nr > array_list_io_limit) { - daos_size_t i; - daos_size_t tiny_count = 0; - - /* quick shortcut check */ - for (i = 0; i < rg_iod->arr_nr; i = i * 2) { - if (rg_iod->arr_rgs[i].rg_len > DAOS_ARRAY_RG_LEN_THD) - break; - if (i == 0) - i++; - } - - /** Full check if quick check fails */ - if (i >= rg_iod->arr_nr) { - for (i = 0; i < rg_iod->arr_nr; i++) { - if (rg_iod->arr_rgs[i].rg_len <= DAOS_ARRAY_RG_LEN_THD) - tiny_count++; - if (tiny_count > array_list_io_limit) - break; - } - if (tiny_count > array_list_io_limit) { - D_ERROR("List io supports a max of %u offsets (using %zu)", - array_list_io_limit, rg_iod->arr_nr); - D_GOTO(err_task, rc = -DER_NOTSUPPORTED); - } - } - } - - array = array_hdl2ptr(array_oh); - if (array == NULL) { - D_ERROR("Invalid array handle: "DF_RC"\n", DP_RC(-DER_NO_HDL)); - D_GOTO(err_task, rc = -DER_NO_HDL); - } - - if (op_type == DAOS_OPC_ARRAY_PUNCH) { - D_ASSERT(user_sgl == NULL); - } else if (user_sgl == NULL) { - D_ERROR("NULL scatter-gather list passed\n"); - D_GOTO(err_task, rc = -DER_INVAL); - } else if (!io_extent_same(rg_iod, user_sgl, array->cell_size, &tot_num_records)) { - rc = -DER_INVAL; - D_ERROR("Unequal extents of memory and array descriptors: " DF_RC "\n", DP_RC(rc)); - D_GOTO(err_task, rc); - } + int rc; oh = array->daos_oh; - cur_off = 0; - cur_i = 0; - u = 0; - num_ios = 0; - records = rg_iod->arr_rgs[0].rg_len; + cur_off = 0; + cur_i = 0; + u = 0; + num_ios = 0; + records = rg_iod->arr_rgs[0].rg_len; array_idx = rg_iod->arr_rgs[0].rg_idx; head = NULL; @@ -1537,21 +1622,21 @@ dc_array_io(daos_handle_t array_oh, daos_handle_t th, * the separating ranges also belong to the same dkey. */ while (u < rg_iod->arr_nr) { - daos_iod_t *iod; - daos_iom_t *iom; - d_sg_list_t *sgl; - daos_key_t *dkey; - uint64_t dkey_val; - daos_size_t dkey_records; - tse_task_t *io_task = NULL; + daos_iod_t *iod; + daos_iom_t *iom; + d_sg_list_t *sgl; + daos_key_t *dkey; + uint64_t dkey_val; + daos_size_t dkey_records; + tse_task_t *io_task = NULL; struct io_params *params; - daos_size_t i; /* index for iod recx */ + daos_size_t i; /* index for iod recx */ /** In some cases, users can pass an empty range, so skip it. */ if (rg_iod->arr_rgs[u].rg_len == 0) { u++; if (u < rg_iod->arr_nr) { - records = rg_iod->arr_rgs[u].rg_len; + records = rg_iod->arr_rgs[u].rg_len; array_idx = rg_iod->arr_rgs[u].rg_idx; } continue; @@ -1563,8 +1648,10 @@ dc_array_io(daos_handle_t array_oh, daos_handle_t th, D_GOTO(err_iotask, rc); } - D_DEBUG(DB_IO, "DKEY IOD "DF_U64": idx = "DF_U64"\t num_records = %zu" - "\t record_i = "DF_U64"\n", dkey_val, array_idx, num_records, record_i); + D_DEBUG(DB_IO, + "DKEY IOD " DF_U64 ": idx = " DF_U64 "\t num_records = %zu" + "\t record_i = " DF_U64 "\n", + dkey_val, array_idx, num_records, record_i); /** allocate params for this dkey io */ D_ALLOC_PTR(params); @@ -1582,7 +1669,7 @@ dc_array_io(daos_handle_t array_oh, daos_handle_t th, struct io_params *prev, *current; current = head; - prev = NULL; + prev = NULL; while (1) { D_ASSERT(current); if (current->dkey_val <= params->dkey_val) { @@ -1598,21 +1685,21 @@ dc_array_io(daos_handle_t array_oh, daos_handle_t th, break; } - prev = current; + prev = current; current = current->next; } } /** Object IO params for the fetch/update */ - iod = ¶ms->iod; - iom = ¶ms->iom; - sgl = ¶ms->sgl; - dkey = ¶ms->dkey; - - params->akey_val = '0'; - params->user_sgl_used = false; - params->cell_size = array->cell_size; - params->chunk_size = array->chunk_size; + iod = ¶ms->iod; + iom = ¶ms->iom; + sgl = ¶ms->sgl; + dkey = ¶ms->dkey; + + params->akey_val = '0'; + params->user_sgl_used = false; + params->cell_size = array->cell_size; + params->chunk_size = array->chunk_size; num_ios++; /** Set integer dkey descriptor */ @@ -1620,19 +1707,19 @@ dc_array_io(daos_handle_t array_oh, daos_handle_t th, /** Set character akey descriptor - TODO: should be NULL */ d_iov_set(&iod->iod_name, ¶ms->akey_val, 1); /** Initialize the rest of the IOD fields */ - iod->iod_nr = 0; - iod->iod_recxs = NULL; - iod->iod_type = DAOS_IOD_ARRAY; + iod->iod_nr = 0; + iod->iod_recxs = NULL; + iod->iod_type = DAOS_IOD_ARRAY; if (op_type == DAOS_OPC_ARRAY_PUNCH) iod->iod_size = 0; else iod->iod_size = array->cell_size; /* Initialize the IOM - used for fetch */ - iom->iom_type = DAOS_IOD_ARRAY; - iom->iom_nr = 0; + iom->iom_type = DAOS_IOD_ARRAY; + iom->iom_nr = 0; - i = 0; + i = 0; dkey_records = 0; /* @@ -1641,8 +1728,8 @@ dc_array_io(daos_handle_t array_oh, daos_handle_t th, * current dkey IOD. */ do { - daos_off_t old_array_idx; - daos_recx_t *new_recxs; + daos_off_t old_array_idx; + daos_recx_t *new_recxs; /** add another element to recxs */ D_REALLOC_ARRAY(new_recxs, iod->iod_recxs, iod->iod_nr, iod->iod_nr + 1); @@ -1654,10 +1741,10 @@ dc_array_io(daos_handle_t array_oh, daos_handle_t th, /** set the record access for this range */ iod->iod_recxs[i].rx_idx = record_i; - iod->iod_recxs[i].rx_nr = (num_records > records) ? records : num_records; + iod->iod_recxs[i].rx_nr = (num_records > records) ? records : num_records; - D_DEBUG(DB_IO, "%zu: index = "DF_U64", size = %zu\n", - u, iod->iod_recxs[i].rx_idx, iod->iod_recxs[i].rx_nr); + D_DEBUG(DB_IO, "%zu: index = " DF_U64 ", size = %zu\n", u, + iod->iod_recxs[i].rx_idx, iod->iod_recxs[i].rx_nr); /* * If the current range is bigger than what the dkey can hold, update the @@ -1681,8 +1768,8 @@ dc_array_io(daos_handle_t array_oh, daos_handle_t th, break; old_array_idx = array_idx; - records = rg_iod->arr_rgs[u].rg_len; - array_idx = rg_iod->arr_rgs[u].rg_idx; + records = rg_iod->arr_rgs[u].rg_len; + array_idx = rg_iod->arr_rgs[u].rg_idx; /* * Boundary case where number of records align with the end boundary of the @@ -1712,7 +1799,7 @@ dc_array_io(daos_handle_t array_oh, daos_handle_t th, } } while (1); - D_DEBUG(DB_IO, "DKEY IOD "DF_U64" ---------------\n", dkey_val); + D_DEBUG(DB_IO, "DKEY IOD " DF_U64 " ---------------\n", dkey_val); /* * if the user sgl maps directly to the array range, no need to partition it. @@ -1720,7 +1807,7 @@ dc_array_io(daos_handle_t array_oh, daos_handle_t th, if ((op_type == DAOS_OPC_ARRAY_PUNCH) || (1 == rg_iod->arr_nr && 1 == user_sgl->sg_nr && dkey_records == rg_iod->arr_rgs[0].rg_len)) { - sgl = user_sgl; + sgl = user_sgl; params->user_sgl_used = true; } /** create an sgl from the user sgl for the current IOD */ @@ -1729,7 +1816,7 @@ dc_array_io(daos_handle_t array_oh, daos_handle_t th, rc = create_sgl(user_sgl, array->cell_size, dkey_records, &cur_off, &cur_i, sgl); if (rc != 0) { - D_ERROR("Failed to create sgl "DF_RC"\n", DP_RC(rc)); + D_ERROR("Failed to create sgl " DF_RC "\n", DP_RC(rc)); D_GOTO(err_iotask, rc); } } @@ -1742,25 +1829,25 @@ dc_array_io(daos_handle_t array_oh, daos_handle_t th, rc = daos_task_create(DAOS_OPC_OBJ_FETCH, tse_task2sched(task), 0, NULL, &io_task); if (rc != 0) { - D_ERROR("Fetch dkey "DF_U64" failed "DF_RC"\n", params->dkey_val, - DP_RC(rc)); + D_ERROR("Fetch dkey " DF_U64 " failed " DF_RC "\n", + params->dkey_val, DP_RC(rc)); D_GOTO(err_iotask, rc); } - io_arg = daos_task_get_args(io_task); - io_arg->oh = oh; - io_arg->th = th; - io_arg->dkey = dkey; - io_arg->nr = 1; - io_arg->iods = iod; - io_arg->sgls = sgl; + io_arg = daos_task_get_args(io_task); + io_arg->oh = oh; + io_arg->th = th; + io_arg->dkey = dkey; + io_arg->nr = 1; + io_arg->iods = iod; + io_arg->sgls = sgl; /** if this is a byte array, add ioms for hole mgmt */ if (array->byte_array) { - iom->iom_nr = 0; + iom->iom_nr = 0; iom->iom_recxs = NULL; iom->iom_flags = DAOS_IOMF_DETAIL; - io_arg->ioms = iom; - rc = tse_task_register_deps(stask, 1, &io_task); + io_arg->ioms = iom; + rc = tse_task_register_deps(stask, 1, &io_task); if (rc) { tse_task_complete(io_task, rc); D_GOTO(err_iotask, rc); @@ -1779,18 +1866,18 @@ dc_array_io(daos_handle_t array_oh, daos_handle_t th, rc = daos_task_create(DAOS_OPC_OBJ_UPDATE, tse_task2sched(task), 0, NULL, &io_task); if (rc != 0) { - D_ERROR("Update dkey "DF_U64" failed "DF_RC"\n", params->dkey_val, - DP_RC(rc)); + D_ERROR("Update dkey " DF_U64 " failed " DF_RC "\n", + params->dkey_val, DP_RC(rc)); D_GOTO(err_iotask, rc); } - io_arg = daos_task_get_args(io_task); - io_arg->oh = oh; - io_arg->th = th; - io_arg->dkey = dkey; - io_arg->nr = 1; - io_arg->iods = iod; - io_arg->sgls = sgl; - rc = tse_task_register_deps(task, 1, &io_task); + io_arg = daos_task_get_args(io_task); + io_arg->oh = oh; + io_arg->th = th; + io_arg->dkey = dkey; + io_arg->nr = 1; + io_arg->iods = iod; + io_arg->sgls = sgl; + rc = tse_task_register_deps(task, 1, &io_task); if (rc) { tse_task_complete(io_task, rc); D_GOTO(err_iotask, rc); @@ -1815,16 +1902,16 @@ dc_array_io(daos_handle_t array_oh, daos_handle_t th, if (head == NULL) { tse_task_complete(stask, 0); } else { - struct hole_params *sparams; + struct hole_params *sparams; D_ALLOC_PTR(sparams); if (sparams == NULL) D_GOTO(err_iotask, rc = -DER_NOMEM); - sparams->io_list = head; - sparams->records_req = tot_num_records; - sparams->ptask = task; - sparams->oh = oh; + sparams->io_list = head; + sparams->records_req = tot_num_records; + sparams->ptask = task; + sparams->oh = oh; rc = tse_task_register_deps(task, 1, &stask); if (rc != 0) { @@ -1844,7 +1931,6 @@ dc_array_io(daos_handle_t array_oh, daos_handle_t th, } tse_task_list_sched(&io_task_list, true); - array_decref(array); return 0; err_iotask: @@ -1854,6 +1940,53 @@ dc_array_io(daos_handle_t array_oh, daos_handle_t th, if (op_type == DAOS_OPC_ARRAY_READ && array->byte_array) tse_task_complete(stask, rc); err_task: + return rc; +} + +static int +dc_array_io(daos_handle_t array_oh, daos_handle_t th, daos_array_iod_t *rg_iod, + d_sg_list_t *user_sgl, daos_opc_t op_type, tse_task_t *task) +{ + struct dc_array *array = NULL; + struct io_splits splits; + daos_size_t total_nr; + int sn, rc; + + splits.cnt = 0; + if (rg_iod == NULL) { + D_ERROR("NULL iod passed\n"); + D_GOTO(err_task, rc = -DER_INVAL); + } + array = array_hdl2ptr(array_oh); + if (array == NULL) { + D_ERROR("Invalid array handle: " DF_RC "\n", DP_RC(-DER_NO_HDL)); + D_GOTO(err_task, rc = -DER_NO_HDL); + } + + rc = iod_split(rg_iod, user_sgl, array->cell_size, &total_nr, &splits); + if (rc) { + if (rc == -DER_INVAL) + D_ERROR("Unequal extents of memory and array descriptors: " DF_RC "\n", + DP_RC(rc)); + else + D_ERROR("Array descriptors split failed: " DF_RC "\n", DP_RC(rc)); + D_GOTO(err_task, rc); + } + for (sn = 0; sn < splits.cnt; sn++) { + daos_array_iod_t *iod = &splits.iod_v[sn]; + d_sg_list_t *sgl = &splits.sgl_v[sn]; + daos_size_t nr = splits.cnt > 1 ? splits.nrec[sn] : total_nr; + rc = dc_array_io_int(array, th, iod, sgl, op_type, task, nr); + if (rc) + D_GOTO(err_task, rc); + } + + free_splits(&splits); + array_decref(array); + return 0; + +err_task: + free_splits(&splits); if (array) array_decref(array); tse_task_complete(task, rc); diff --git a/src/include/daos_array.h b/src/include/daos_array.h index ace0735b3a7..e77fc14609f 100644 --- a/src/include/daos_array.h +++ b/src/include/daos_array.h @@ -1,6 +1,6 @@ /* * (C) Copyright 2016-2024 Intel Corporation. - * (C) Copyright 2025 Hewlett Packard Enterprise Development LP + * (C) Copyright 2025-2026 Hewlett Packard Enterprise Development LP * * SPDX-License-Identifier: BSD-2-Clause-Patent */ @@ -28,6 +28,8 @@ extern "C" { #define DAOS_ARRAY_LIST_IO_LIMIT 16384 /** Tiny recx limit (in bytes) in the array IODs where the list limit is high */ #define DAOS_ARRAY_RG_LEN_THD 16 +/** array iod split size */ +#define DAOS_ARRAY_IOD_SPLIT 4096 /** Range of contiguous records */ typedef struct {