Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 30 additions & 32 deletions src/cart/crt_bulk.c
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* (C) Copyright 2016-2024 Intel Corporation.
* (C) Copyright 2025 Google LLC
* (C) Copyright 2025 Hewlett Packard Enterprise Development LP
* (C) Copyright 2025-2026 Hewlett Packard Enterprise Development LP
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/
Expand Down Expand Up @@ -106,6 +106,7 @@ crt_bulk_create(crt_context_t crt_ctx, d_sg_list_t *sgl,
D_ALLOC_PTR(ret_hdl);
if (ret_hdl == NULL)
D_GOTO(out, rc = -DER_NOMEM);
ret_hdl->refcount = 1;

quota_rc = get_quota_resource(crt_ctx, CRT_QUOTA_BULKS);
if (quota_rc == -DER_QUOTA_LIMIT) {
Expand Down Expand Up @@ -183,19 +184,13 @@ crt_bulk_addref(crt_bulk_t crt_bulk)
{
struct crt_bulk *bulk = crt_bulk;
int rc = -DER_SUCCESS;
hg_return_t hg_ret;

if (bulk == NULL) {
D_ERROR("invalid parameter, NULL bulk\n");
D_GOTO(out, rc = -DER_INVAL);
}

hg_ret = HG_Bulk_ref_incr(bulk->hg_bulk_hdl);
if (hg_ret != HG_SUCCESS) {
D_ERROR("HG_Bulk_ref_incr failed, hg_ret: %d.\n", hg_ret);
rc = crt_hgret_2_der(hg_ret);
}

atomic_fetch_add(&bulk->refcount, 1);
out:
return rc;
}
Expand All @@ -204,40 +199,43 @@ int
crt_bulk_free(crt_bulk_t crt_bulk)
{
struct crt_bulk *bulk = crt_bulk;
int rc = -DER_SUCCESS;
hg_return_t hg_ret;

if (bulk == NULL) {
D_ERROR("invalid parameter, NULL bulk\n");
D_GOTO(out, rc = -DER_INVAL);
return -DER_INVAL;
}

/* This can happen if D_QUOTA_BULKS is enabled on a client */
if (bulk->hg_bulk_hdl == HG_BULK_NULL) {
if (bulk->deferred) {
/* Treat as success */
D_GOTO(out, rc = DER_SUCCESS);
} else {
D_ASSERTF(0, "Bulk handle should not be NULL\n");
}
}
if (atomic_fetch_sub(&bulk->refcount, 1) > 1)
return DER_SUCCESS;

crt_bulk_free_common(bulk);

return DER_SUCCESS;
}

hg_ret = HG_Bulk_free(bulk->hg_bulk_hdl);
if (hg_ret != HG_SUCCESS) {
D_ERROR("HG_Bulk_free failed, hg_ret: %d.\n", hg_ret);
rc = crt_hgret_2_der(hg_ret);
void
crt_bulk_free_common(struct crt_bulk *bulk)
{
hg_return_t hg_ret;

D_ASSERT(bulk != NULL);

/* Free the underlying Mercury bulk handle */
if (bulk->hg_bulk_hdl != HG_BULK_NULL) {
hg_ret = HG_Bulk_free(bulk->hg_bulk_hdl);
if (hg_ret != HG_SUCCESS) {
D_ERROR("HG_Bulk_free() failed (%s)\n", HG_Error_to_string(hg_ret));
/* Ignore the error, as we are already in a cleanup path */
}
}

/* decoded bulks are not counted towards quota; such bulks have crt_ctx set to NULL */
if (bulk->crt_ctx)
if (!bulk->deferred && bulk->crt_ctx != NULL)
put_quota_resource(bulk->crt_ctx, CRT_QUOTA_BULKS);
out:
if (bulk != NULL) {
if (bulk->iovs)
D_FREE(bulk->iovs);
D_FREE(bulk);
}
return rc;

if (bulk->iovs != NULL)
D_FREE(bulk->iovs);
D_FREE(bulk);
}

/* Helper function to check for bulk expiration */
Expand Down
119 changes: 73 additions & 46 deletions src/cart/crt_hg_proc.c
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* (C) Copyright 2016-2024 Intel Corporation.
* (C) Copyright 2025 Google LLC
* (C) Copyright 2025 Hewlett Packard Enterprise Development LP
* (C) Copyright 2025-2026 Hewlett Packard Enterprise Development LP
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/
Expand Down Expand Up @@ -115,12 +115,44 @@ CRT_PROC_TYPE_FUNC(int64_t)
CRT_PROC_TYPE_FUNC(uint64_t)
CRT_PROC_TYPE_FUNC(bool)

static int
crt_proc_crt_bulk_t_deferred(struct crt_bulk *bulk)
{
struct crt_context *ctx;
int rc;

/* Create mercury handle based on saved params */
ctx = bulk->crt_ctx;
D_ASSERT(ctx != NULL);

rc = crt_hg_bulk_create(&ctx->cc_hg_ctx, &bulk->sgl, bulk->bulk_perm, &bulk->hg_bulk_hdl);
if (rc != DER_SUCCESS)
return rc;

record_quota_resource(ctx, CRT_QUOTA_BULKS);

if (bulk->bound) {
rc = crt_hg_bulk_bind(bulk->hg_bulk_hdl, &ctx->cc_hg_ctx);
if (rc != 0) {
DL_ERROR(rc, "Failed to bind bulk during proc");
/* free will return quota resource */
crt_bulk_free(bulk);
return rc;
}
}
/* Mark as no longer deferred once allocation is complete */
bulk->deferred = false;

return 0;
}

int
crt_proc_crt_bulk_t(crt_proc_t proc, crt_proc_op_t proc_op, crt_bulk_t *pcrt_bulk)
{
struct crt_bulk *bulk = NULL;
hg_bulk_t hg_bulk;
hg_return_t hg_ret;
hg_bulk_t tmp_hg_bulk;
int rc;

/*
* We only send 'hg_bulk_t' over the wire. During encoding stage we
Expand All @@ -131,86 +163,81 @@ crt_proc_crt_bulk_t(crt_proc_t proc, crt_proc_op_t proc_op, crt_bulk_t *pcrt_bul
case CRT_PROC_ENCODE:
bulk = *pcrt_bulk;

/* RPC can have a NULL bulk. if so, encode a NULL value */
if (!bulk) {
tmp_hg_bulk = HG_BULK_NULL;
hg_ret = hg_proc_hg_bulk_t(proc, (hg_bulk_t *)&tmp_hg_bulk);
return (hg_ret == HG_SUCCESS) ? 0 : -DER_HG;
}

/* Deferred allocation as a result of D_QUOTA_BULKS limit */
if (bulk->deferred) {
struct crt_context *ctx;
int rc;

/* Create mercury handle based on saved params */
ctx = bulk->crt_ctx;
D_ASSERT(ctx != NULL);

rc = crt_hg_bulk_create(&ctx->cc_hg_ctx, &bulk->sgl, bulk->bulk_perm,
&bulk->hg_bulk_hdl);
if (rc != DER_SUCCESS)
if (bulk != CRT_BULK_NULL) {
if (bulk->deferred && (rc = crt_proc_crt_bulk_t_deferred(bulk)) != 0) {
DL_ERROR(rc, "Failed to do deferred bulk allocation during proc");
return rc;

record_quota_resource(ctx, CRT_QUOTA_BULKS);

if (bulk->bound) {
rc = crt_hg_bulk_bind(bulk->hg_bulk_hdl, &ctx->cc_hg_ctx);
if (rc != 0) {
D_ERROR("Failed to bind bulk during proc\n");
/* free will return quota resource */
crt_bulk_free(bulk->hg_bulk_hdl);
return rc;
}
}
bulk->deferred = false;
hg_bulk = bulk->hg_bulk_hdl;
} else {
/* RPC can have a NULL bulk. if so, encode a NULL value */
hg_bulk = HG_BULK_NULL;
}

/* Pack mercury bulk handle to send over the wire */
hg_ret = hg_proc_hg_bulk_t(proc, (hg_bulk_t *)&bulk->hg_bulk_hdl);
hg_ret = hg_proc_hg_bulk_t(proc, &hg_bulk);
return (hg_ret == HG_SUCCESS) ? 0 : -DER_HG;
break;

case CRT_PROC_DECODE:
/* unpack mercury handle and wrap it around crt_bulk_t struct */
hg_ret = hg_proc_hg_bulk_t(proc, &tmp_hg_bulk);
hg_ret = hg_proc_hg_bulk_t(proc, &hg_bulk);
if (hg_ret != HG_SUCCESS)
return -DER_HG;

/* don't create a bulk wrapper if null bulk was transmitted */
if (tmp_hg_bulk == HG_BULK_NULL) {
if (hg_bulk == HG_BULK_NULL) {
*pcrt_bulk = NULL;
return 0;
}

/* Allocate space for a wrapper struct */
D_ALLOC_PTR(bulk);
if (!bulk)
if (bulk == NULL)
return -DER_NOMEM;

bulk->hg_bulk_hdl = tmp_hg_bulk;
bulk->hg_bulk_hdl = hg_bulk;
bulk->refcount = 1;
bulk->deferred = false;
bulk->bound = false;
bulk->crt_ctx = NULL;
bulk->iovs = NULL;

*pcrt_bulk = bulk;
return 0;
break;

case CRT_PROC_FREE:
bulk = *pcrt_bulk;

if (bulk == NULL)
return 0;

hg_ret = hg_proc_hg_bulk_t(proc, &bulk->hg_bulk_hdl);
/**
* Prevent HG proc from assigning NULL if refcount is not zero and keep reference on
* HG bulk, we'll free it ourselves. hg_proc_hg_bulk_t() will decrement refcount on
* the HG bulk.
*/
hg_bulk = bulk->hg_bulk_hdl;
HG_Bulk_ref_incr(hg_bulk);
hg_ret = hg_proc_hg_bulk_t(proc, &hg_bulk);
if (hg_ret != HG_SUCCESS) {
D_ERROR("Failed to free bulk during proc (%s)\n",
HG_Error_to_string(hg_ret));
/* Decrement the refcount we just incremented since proc failed */
hg_ret = HG_Bulk_free(hg_bulk);
if (hg_ret != HG_SUCCESS)
D_ERROR("HG_Bulk_free() failed during error cleanup (%s)\n",
HG_Error_to_string(hg_ret));
return -DER_HG;
}

if (atomic_fetch_sub(&bulk->refcount, 1) > 1)
return 0;

/* Free the wrapper struct */
if (bulk->iovs)
D_FREE(bulk->iovs);
D_FREE(bulk);
/* This is the real free */
crt_bulk_free_common(bulk);
*pcrt_bulk = NULL;
return (hg_ret == HG_SUCCESS) ? 0 : -DER_HG;
break;
return 0;
}

/* Should not get here */
Expand Down
5 changes: 4 additions & 1 deletion src/cart/crt_internal_fns.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
* (C) Copyright 2016-2024 Intel Corporation.
* (C) Copyright 2025 Hewlett Packard Enterprise Development LP
* (C) Copyright 2025-2026 Hewlett Packard Enterprise Development LP
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/
Expand Down Expand Up @@ -62,6 +62,9 @@ crt_bulk_desc_dup(struct crt_bulk_desc *bulk_desc_new,
*bulk_desc_new = *bulk_desc;
}

void
crt_bulk_free_common(struct crt_bulk *bulk);

void
crt_hdlr_proto_query(crt_rpc_t *rpc_req);

Expand Down
1 change: 1 addition & 0 deletions src/cart/crt_internal_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,7 @@ struct crt_bulk {
d_iov_t *iovs; /** original iovs */
d_sg_list_t sgl; /** original sgl */
crt_bulk_perm_t bulk_perm; /** bulk permissions */
ATOMIC uint32_t refcount; /** reference count for this struct */
};

/* crt_context */
Expand Down
Loading