diff --git a/src/cart/crt_bulk.c b/src/cart/crt_bulk.c index ed645ffabaa..7b9502dc8cd 100644 --- a/src/cart/crt_bulk.c +++ b/src/cart/crt_bulk.c @@ -1,7 +1,7 @@ /* * (C) Copyright 2016-2024 Intel Corporation. * (C) Copyright 2025 Google LLC - * (C) Copyright 2025 Hewlett Packard Enterprise Development LP + * (C) Copyright 2025-2026 Hewlett Packard Enterprise Development LP * * SPDX-License-Identifier: BSD-2-Clause-Patent */ @@ -106,6 +106,7 @@ crt_bulk_create(crt_context_t crt_ctx, d_sg_list_t *sgl, D_ALLOC_PTR(ret_hdl); if (ret_hdl == NULL) D_GOTO(out, rc = -DER_NOMEM); + ret_hdl->refcount = 1; quota_rc = get_quota_resource(crt_ctx, CRT_QUOTA_BULKS); if (quota_rc == -DER_QUOTA_LIMIT) { @@ -182,20 +183,14 @@ int crt_bulk_addref(crt_bulk_t crt_bulk) { struct crt_bulk *bulk = crt_bulk; - int rc = -DER_SUCCESS; - hg_return_t hg_ret; + int rc = DER_SUCCESS; if (bulk == NULL) { D_ERROR("invalid parameter, NULL bulk\n"); D_GOTO(out, rc = -DER_INVAL); } - hg_ret = HG_Bulk_ref_incr(bulk->hg_bulk_hdl); - if (hg_ret != HG_SUCCESS) { - D_ERROR("HG_Bulk_ref_incr failed, hg_ret: %d.\n", hg_ret); - rc = crt_hgret_2_der(hg_ret); - } - + atomic_fetch_add(&bulk->refcount, 1); out: return rc; } @@ -204,40 +199,41 @@ int crt_bulk_free(crt_bulk_t crt_bulk) { struct crt_bulk *bulk = crt_bulk; - int rc = -DER_SUCCESS; - hg_return_t hg_ret; if (bulk == NULL) { D_ERROR("invalid parameter, NULL bulk\n"); - D_GOTO(out, rc = -DER_INVAL); + return -DER_INVAL; } - /* This can happen if D_QUOTA_BULKS is enabled on a client */ - if (bulk->hg_bulk_hdl == HG_BULK_NULL) { - if (bulk->deferred) { - /* Treat as success */ - D_GOTO(out, rc = DER_SUCCESS); - } else { - D_ASSERTF(0, "Bulk handle should not be NULL\n"); - } - } + if (atomic_fetch_sub(&bulk->refcount, 1) > 1) + return DER_SUCCESS; + + crt_bulk_free_common(bulk); + + return DER_SUCCESS; +} - hg_ret = HG_Bulk_free(bulk->hg_bulk_hdl); - if (hg_ret != HG_SUCCESS) { - D_ERROR("HG_Bulk_free failed, hg_ret: %d.\n", hg_ret); - rc = crt_hgret_2_der(hg_ret); +void +crt_bulk_free_common(struct crt_bulk *bulk) +{ + hg_return_t hg_ret; + + D_ASSERT(bulk != NULL); + + if (bulk->hg_bulk_hdl != HG_BULK_NULL) { + hg_ret = HG_Bulk_free(bulk->hg_bulk_hdl); + if (hg_ret != HG_SUCCESS) { + D_ERROR("HG_Bulk_free() failed (%s)\n", HG_Error_to_string(hg_ret)); + /* Ignore the error, as we are already in a cleanup path */ + } } /* decoded bulks are not counted towards quota; such bulks have crt_ctx set to NULL */ - if (bulk->crt_ctx) + if (!bulk->deferred && bulk->crt_ctx != NULL) put_quota_resource(bulk->crt_ctx, CRT_QUOTA_BULKS); -out: - if (bulk != NULL) { - if (bulk->iovs) - D_FREE(bulk->iovs); - D_FREE(bulk); - } - return rc; + + D_FREE(bulk->iovs); + D_FREE(bulk); } /* Helper function to check for bulk expiration */ diff --git a/src/cart/crt_hg_proc.c b/src/cart/crt_hg_proc.c index 839fd45d49d..967c07505af 100644 --- a/src/cart/crt_hg_proc.c +++ b/src/cart/crt_hg_proc.c @@ -1,7 +1,7 @@ /* * (C) Copyright 2016-2024 Intel Corporation. * (C) Copyright 2025 Google LLC - * (C) Copyright 2025 Hewlett Packard Enterprise Development LP + * (C) Copyright 2025-2026 Hewlett Packard Enterprise Development LP * * SPDX-License-Identifier: BSD-2-Clause-Patent */ @@ -115,12 +115,45 @@ CRT_PROC_TYPE_FUNC(int64_t) CRT_PROC_TYPE_FUNC(uint64_t) CRT_PROC_TYPE_FUNC(bool) +static int +crt_proc_crt_bulk_t_deferred(struct crt_bulk *bulk) +{ + struct crt_context *ctx; + int rc; + + /* Create mercury handle based on saved params */ + ctx = bulk->crt_ctx; + D_ASSERT(ctx != NULL); + + rc = crt_hg_bulk_create(&ctx->cc_hg_ctx, &bulk->sgl, bulk->bulk_perm, &bulk->hg_bulk_hdl); + if (rc != DER_SUCCESS) + return rc; + + record_quota_resource(ctx, CRT_QUOTA_BULKS); + + if (bulk->bound) { + rc = crt_hg_bulk_bind(bulk->hg_bulk_hdl, &ctx->cc_hg_ctx); + if (rc != 0) { + DL_ERROR(rc, "Failed to bind bulk during proc"); + put_quota_resource(ctx, CRT_QUOTA_BULKS); + (void)HG_Bulk_free(bulk->hg_bulk_hdl); + bulk->hg_bulk_hdl = HG_BULK_NULL; + return rc; + } + } + /* Mark as no longer deferred once allocation is complete */ + bulk->deferred = false; + + return 0; +} + int crt_proc_crt_bulk_t(crt_proc_t proc, crt_proc_op_t proc_op, crt_bulk_t *pcrt_bulk) { struct crt_bulk *bulk = NULL; + hg_bulk_t hg_bulk; hg_return_t hg_ret; - hg_bulk_t tmp_hg_bulk; + int rc; /* * We only send 'hg_bulk_t' over the wire. During encoding stage we @@ -131,70 +164,50 @@ crt_proc_crt_bulk_t(crt_proc_t proc, crt_proc_op_t proc_op, crt_bulk_t *pcrt_bul case CRT_PROC_ENCODE: bulk = *pcrt_bulk; - /* RPC can have a NULL bulk. if so, encode a NULL value */ - if (!bulk) { - tmp_hg_bulk = HG_BULK_NULL; - hg_ret = hg_proc_hg_bulk_t(proc, (hg_bulk_t *)&tmp_hg_bulk); - return (hg_ret == HG_SUCCESS) ? 0 : -DER_HG; - } - /* Deferred allocation as a result of D_QUOTA_BULKS limit */ - if (bulk->deferred) { - struct crt_context *ctx; - int rc; - - /* Create mercury handle based on saved params */ - ctx = bulk->crt_ctx; - D_ASSERT(ctx != NULL); - - rc = crt_hg_bulk_create(&ctx->cc_hg_ctx, &bulk->sgl, bulk->bulk_perm, - &bulk->hg_bulk_hdl); - if (rc != DER_SUCCESS) + if (bulk != CRT_BULK_NULL) { + if (bulk->deferred && (rc = crt_proc_crt_bulk_t_deferred(bulk)) != 0) { + DL_ERROR(rc, "Failed to do deferred bulk allocation during proc"); return rc; - - record_quota_resource(ctx, CRT_QUOTA_BULKS); - - if (bulk->bound) { - rc = crt_hg_bulk_bind(bulk->hg_bulk_hdl, &ctx->cc_hg_ctx); - if (rc != 0) { - D_ERROR("Failed to bind bulk during proc\n"); - /* free will return quota resource */ - crt_bulk_free(bulk->hg_bulk_hdl); - return rc; - } } - bulk->deferred = false; + hg_bulk = bulk->hg_bulk_hdl; + } else { + /* RPC can have a NULL bulk. if so, encode a NULL value */ + hg_bulk = HG_BULK_NULL; } /* Pack mercury bulk handle to send over the wire */ - hg_ret = hg_proc_hg_bulk_t(proc, (hg_bulk_t *)&bulk->hg_bulk_hdl); + hg_ret = hg_proc_hg_bulk_t(proc, &hg_bulk); return (hg_ret == HG_SUCCESS) ? 0 : -DER_HG; - break; case CRT_PROC_DECODE: /* unpack mercury handle and wrap it around crt_bulk_t struct */ - hg_ret = hg_proc_hg_bulk_t(proc, &tmp_hg_bulk); + hg_ret = hg_proc_hg_bulk_t(proc, &hg_bulk); if (hg_ret != HG_SUCCESS) return -DER_HG; /* don't create a bulk wrapper if null bulk was transmitted */ - if (tmp_hg_bulk == HG_BULK_NULL) { + if (hg_bulk == HG_BULK_NULL) { *pcrt_bulk = NULL; return 0; } /* Allocate space for a wrapper struct */ D_ALLOC_PTR(bulk); - if (!bulk) + if (bulk == NULL) return -DER_NOMEM; - bulk->hg_bulk_hdl = tmp_hg_bulk; - bulk->deferred = false; - bulk->crt_ctx = NULL; + *bulk = (struct crt_bulk){.hg_bulk_hdl = hg_bulk, + .crt_ctx = NULL, + .iovs = NULL, + .sgl = {0}, + .bulk_perm = 0, /* unused */ + .refcount = 1, + .bound = false, + .deferred = false}; *pcrt_bulk = bulk; return 0; - break; case CRT_PROC_FREE: bulk = *pcrt_bulk; @@ -202,15 +215,29 @@ crt_proc_crt_bulk_t(crt_proc_t proc, crt_proc_op_t proc_op, crt_bulk_t *pcrt_bul if (bulk == NULL) return 0; - hg_ret = hg_proc_hg_bulk_t(proc, &bulk->hg_bulk_hdl); + /** + * Prevent HG proc from assigning NULL if refcount is not zero and keep reference on + * HG bulk, we'll free it ourselves. hg_proc_hg_bulk_t() will decrement refcount on + * the HG bulk. + */ + hg_bulk = bulk->hg_bulk_hdl; + (void)HG_Bulk_ref_incr(hg_bulk); + hg_ret = hg_proc_hg_bulk_t(proc, &hg_bulk); + if (hg_ret != HG_SUCCESS) { + /* For correctness, call HG_Bulk_free() here but this is theoretically not + * needed as hg_proc_hg_bulk_t() cannot fail in this context */ + (void)HG_Bulk_free(hg_bulk); + D_ERROR("Failed to free bulk during proc (%s)\n", + HG_Error_to_string(hg_ret)); + } + + if (atomic_fetch_sub(&bulk->refcount, 1) > 1) + return 0; - /* Free the wrapper struct */ - if (bulk->iovs) - D_FREE(bulk->iovs); - D_FREE(bulk); + /* This is the real free */ + crt_bulk_free_common(bulk); *pcrt_bulk = NULL; - return (hg_ret == HG_SUCCESS) ? 0 : -DER_HG; - break; + return 0; } /* Should not get here */ diff --git a/src/cart/crt_internal_fns.h b/src/cart/crt_internal_fns.h index 4128052518d..720d5d291bf 100644 --- a/src/cart/crt_internal_fns.h +++ b/src/cart/crt_internal_fns.h @@ -1,6 +1,6 @@ /* * (C) Copyright 2016-2024 Intel Corporation. - * (C) Copyright 2025 Hewlett Packard Enterprise Development LP + * (C) Copyright 2025-2026 Hewlett Packard Enterprise Development LP * * SPDX-License-Identifier: BSD-2-Clause-Patent */ @@ -62,6 +62,9 @@ crt_bulk_desc_dup(struct crt_bulk_desc *bulk_desc_new, *bulk_desc_new = *bulk_desc; } +void +crt_bulk_free_common(struct crt_bulk *bulk); + void crt_hdlr_proto_query(crt_rpc_t *rpc_req); diff --git a/src/cart/crt_internal_types.h b/src/cart/crt_internal_types.h index 02fbe3eea0c..7e0190bdaef 100644 --- a/src/cart/crt_internal_types.h +++ b/src/cart/crt_internal_types.h @@ -422,13 +422,14 @@ struct crt_quotas { * Deferred allocation is only supported on clients through D_QUOTA_BULKS env */ struct crt_bulk { + d_sg_list_t sgl; /** original sgl */ + d_iov_t *iovs; /** original iovs */ hg_bulk_t hg_bulk_hdl; /** mercury bulk handle */ - bool deferred; /** whether handle allocation was deferred */ crt_context_t crt_ctx; /** context on which bulk is to be created */ - bool bound; /** whether crt_bulk_bind() was used on it */ - d_iov_t *iovs; /** original iovs */ - d_sg_list_t sgl; /** original sgl */ crt_bulk_perm_t bulk_perm; /** bulk permissions */ + ATOMIC uint32_t refcount; /** reference count for this struct */ + bool bound; /** whether crt_bulk_bind() was used on it */ + bool deferred; /** whether handle allocation was deferred */ }; /* crt_context */