Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 18 additions & 5 deletions src/cart/crt_bulk.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
static inline bool
crt_sgl_valid(d_sg_list_t *sgl)
{
d_iov_t *iov;
int i;
d_iov_t *iov;
int i;

if (sgl == NULL || sgl->sg_nr == 0) {
if (sgl == NULL)
Expand Down Expand Up @@ -130,6 +130,7 @@ crt_bulk_create(crt_context_t crt_ctx, d_sg_list_t *sgl,
ret_hdl->hg_bulk_hdl = HG_BULK_NULL;
ret_hdl->crt_ctx = crt_ctx;
ret_hdl->deferred = true;

D_GOTO(out, rc = DER_SUCCESS);
}

Expand All @@ -138,6 +139,7 @@ crt_bulk_create(crt_context_t crt_ctx, d_sg_list_t *sgl,

rc = crt_hg_bulk_create(&ctx->cc_hg_ctx, sgl, bulk_perm, &ret_hdl->hg_bulk_hdl);
if (rc != 0) {
CRT_METRIC_INC(ctx, CM_BULK_CREATE_FAILED);
D_ERROR("crt_hg_bulk_create() failed, rc: " DF_RC "\n", DP_RC(rc));
if (ret_hdl->iovs != NULL)
D_FREE(ret_hdl->iovs);
Expand All @@ -146,6 +148,8 @@ crt_bulk_create(crt_context_t crt_ctx, d_sg_list_t *sgl,
D_GOTO(out, rc);
}

CRT_METRIC_INC(ctx, CM_BULK_CREATE);

out:
if (rc == 0 && bulk_hdl)
*bulk_hdl = ret_hdl;
Expand Down Expand Up @@ -176,6 +180,8 @@ crt_bulk_bind(crt_bulk_t crt_bulk, crt_context_t crt_ctx)
}

out:
if (rc == 0)
CRT_METRIC_INC(ctx, CM_BULK_BOUND);
return rc;
}

Expand Down Expand Up @@ -216,7 +222,8 @@ crt_bulk_free(crt_bulk_t crt_bulk)
void
crt_bulk_free_common(struct crt_bulk *bulk)
{
hg_return_t hg_ret;
struct crt_context *ctx;
hg_return_t hg_ret;

D_ASSERT(bulk != NULL);

Expand All @@ -228,9 +235,15 @@ crt_bulk_free_common(struct crt_bulk *bulk)
}
}

ctx = bulk->crt_ctx;

/* bulks that are decoded don't have a cart context associated with them */
if (ctx)
CRT_METRIC_INC(ctx, CM_BULK_FREE);

/* decoded bulks are not counted towards quota; such bulks have crt_ctx set to NULL */
if (!bulk->deferred && bulk->crt_ctx != NULL)
put_quota_resource(bulk->crt_ctx, CRT_QUOTA_BULKS);
if (!bulk->deferred && ctx != NULL)
put_quota_resource(ctx, CRT_QUOTA_BULKS);

D_FREE(bulk->iovs);
D_FREE(bulk);
Expand Down
127 changes: 57 additions & 70 deletions src/cart/crt_context.c
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,29 @@ crt_context_ep_empty(crt_context_t crt_ctx)
return rc == 0;
}

/* helper function to add counters */
static void
crt_context_add_counters(struct crt_context *ctx)
{
char *prov;
int idx;
int rc = 0;

D_ASSERT(ctx != NULL);

prov = crt_provider_name_get(ctx->cc_hg_ctx.chc_provider);
idx = ctx->cc_idx;

#define X(name, type, desc, unit_desc) \
rc = d_tm_add_metric(&ctx->cc_metrics.name, type, desc, unit_desc, "net/%s/%s/ctx_%u", \
prov, #name, idx); \
if (rc != 0) \
DL_WARN(rc, "Failed to add metric " #name "\n");

CRT_METRICS_LIST;
#undef X
}

static int
crt_context_init(struct crt_context *ctx)
{
Expand Down Expand Up @@ -272,56 +295,7 @@ crt_context_provider_create(crt_context_t *crt_ctx, crt_provider_t provider, boo

/** initialize sensors for servers */
if (crt_gdata.cg_use_sensors && crt_is_service()) {
int ret;
char *prov;

prov = crt_provider_name_get(ctx->cc_hg_ctx.chc_provider);
ret = d_tm_add_metric(&ctx->cc_timedout, D_TM_COUNTER,
"Total number of timed out RPC requests",
"reqs", "net/%s/req_timeout/ctx_%u",
prov, ctx->cc_idx);
if (ret)
DL_WARN(ret, "Failed to create timed out req counter");

ret = d_tm_add_metric(&ctx->cc_timedout_uri, D_TM_COUNTER,
"Total number of timed out URI lookup "
"requests", "reqs",
"net/%s/uri_lookup_timeout/ctx_%u",
prov, ctx->cc_idx);
if (ret)
DL_WARN(ret, "Failed to create timed out uri req counter");

ret = d_tm_add_metric(&ctx->cc_failed_addr, D_TM_COUNTER,
"Total number of failed address "
"resolution attempts", "reqs",
"net/%s/failed_addr/ctx_%u",
prov, ctx->cc_idx);
if (ret)
DL_WARN(ret, "Failed to create failed addr counter");

ret = d_tm_add_metric(&ctx->cc_net_glitches, D_TM_COUNTER,
"Total number of network glitch errors", "errors",
"net/%s/glitch/ctx_%u", prov, ctx->cc_idx);
if (ret)
DL_WARN(ret, "Failed to create network glitch counter");

ret = d_tm_add_metric(&ctx->cc_swim_delay, D_TM_STATS_GAUGE,
"SWIM delay measurements", "delay",
"net/%s/swim_delay/ctx_%u", prov, ctx->cc_idx);
if (ret)
DL_WARN(ret, "Failed to create SWIM delay gauge");

ret = d_tm_add_metric(&ctx->cc_quotas.rpc_waitq_depth, D_TM_GAUGE,
"Current count of enqueued RPCs", "rpcs",
"net/%s/waitq_depth/ctx_%u", prov, ctx->cc_idx);
if (ret)
DL_WARN(ret, "Failed to create rpc waitq gauge");

ret = d_tm_add_metric(&ctx->cc_quotas.rpc_quota_exceeded, D_TM_COUNTER,
"Total number of exceeded RPC quota errors", "errors",
"net/%s/quota_exceeded/ctx_%u", prov, ctx->cc_idx);
if (ret)
DL_WARN(ret, "Failed to create quota exceeded counter");
crt_context_add_counters(ctx);
Comment thread
frostedcmos marked this conversation as resolved.
}

if (crt_is_service() && crt_gdata.cg_auto_swim_disable == 0 &&
Expand Down Expand Up @@ -501,14 +475,42 @@ crt_rpc_completed(struct crt_rpc_priv *rpc_priv)
void
crt_rpc_complete_and_unlock(struct crt_rpc_priv *rpc_priv, int rc)
{
struct crt_context *ctx;
struct crt_cb_info cbinfo;

D_ASSERT(rpc_priv != NULL);
ctx = rpc_priv->crp_pub.cr_ctx;

if (crt_rpc_completed(rpc_priv)) {
CRT_METRIC_INC(ctx, CM_RPC_DOUBLE_COMPLETE);
crt_rpc_unlock(rpc_priv);
RPC_ERROR(rpc_priv, "already completed, possibly due to duplicated completions.\n");
return;
}

cbinfo.cci_rpc = &rpc_priv->crp_pub;
cbinfo.cci_arg = rpc_priv->crp_arg;
cbinfo.cci_rc = rc;

if (cbinfo.cci_rc == 0)
cbinfo.cci_rc = rpc_priv->crp_reply_hdr.cch_rc;

if (cbinfo.cci_rc != 0)
RPC_CWARN(crt_quiet_error(cbinfo.cci_rc), DB_NET, rpc_priv, "failed, " DF_RC "\n",
DP_RC(cbinfo.cci_rc));

switch (cbinfo.cci_rc) {
case DER_SUCCESS:
CRT_METRIC_INC(ctx, CM_RPC_COMPLETED);
break;
case -DER_TIMEDOUT:
CRT_METRIC_INC(ctx, CM_RPC_TIMEDOUT);
break;
default:
CRT_METRIC_INC(ctx, CM_RPC_COMPLETED_ERR);
break;
}

if (rc == -DER_CANCELED)
rpc_priv->crp_state = RPC_STATE_CANCELED;
else if (rc == -DER_TIMEDOUT)
Expand All @@ -521,18 +523,6 @@ crt_rpc_complete_and_unlock(struct crt_rpc_priv *rpc_priv, int rc)
crt_rpc_unlock(rpc_priv);

if (rpc_priv->crp_complete_cb != NULL) {
struct crt_cb_info cbinfo;

cbinfo.cci_rpc = &rpc_priv->crp_pub;
cbinfo.cci_arg = rpc_priv->crp_arg;
cbinfo.cci_rc = rc;
if (cbinfo.cci_rc == 0)
cbinfo.cci_rc = rpc_priv->crp_reply_hdr.cch_rc;

if (cbinfo.cci_rc != 0)
RPC_CWARN(crt_quiet_error(cbinfo.cci_rc), DB_NET, rpc_priv,
"failed, " DF_RC "\n", DP_RC(cbinfo.cci_rc));

RPC_TRACE(DB_TRACE, rpc_priv,
"Invoking RPC callback (rank %d tag %d) rc: "
DF_RC "\n",
Expand Down Expand Up @@ -1147,9 +1137,6 @@ crt_req_timeout_hdlr(struct crt_rpc_priv *rpc_priv)
grp_priv = crt_grp_pub2priv(tgt_ep->ep_grp);
crt_ctx = rpc_priv->crp_pub.cr_ctx;

if (crt_gdata.cg_use_sensors)
d_tm_inc_counter(crt_ctx->cc_timedout, 1);

switch (rpc_priv->crp_state) {
case RPC_STATE_INITED:
case RPC_STATE_QUEUED:
Expand All @@ -1170,8 +1157,8 @@ crt_req_timeout_hdlr(struct crt_rpc_priv *rpc_priv)
container_of(ul_req, struct crt_rpc_priv, crp_pub), ul_in->ul_grp_id,
ul_in->ul_rank, ul_req->cr_ep.ep_rank);

if (crt_gdata.cg_use_sensors)
d_tm_inc_counter(crt_ctx->cc_timedout_uri, 1);
CRT_METRIC_INC(crt_ctx, CM_URI_LOOKUP_TIMEDOUT);

crt_req_abort(ul_req);
/*
* don't crt_rpc_complete_and_unlock rpc_priv here, because crt_req_abort
Expand Down Expand Up @@ -1410,7 +1397,7 @@ crt_context_req_track(struct crt_rpc_priv *rpc_priv)

if (quota_rc == -DER_QUOTA_LIMIT) {
d_list_add_tail(&rpc_priv->crp_waitq_link, &crt_ctx->cc_quotas.rpc_waitq);
d_tm_inc_gauge(crt_ctx->cc_quotas.rpc_waitq_depth, 1);
CRT_METRIC_INC_GAUGE(crt_ctx, CM_RPC_WAITQ_DEPTH, 1);
}

D_MUTEX_UNLOCK(&crt_ctx->cc_mutex);
Expand Down Expand Up @@ -1589,7 +1576,7 @@ crt_context_req_untrack(struct crt_rpc_priv *rpc_priv)
D_INIT_LIST_HEAD(&submit_list);
if (tmp_rpc != NULL) {
add_rpc_to_list(tmp_rpc, &submit_list);
d_tm_dec_gauge(crt_ctx->cc_quotas.rpc_waitq_depth, 1);
CRT_METRIC_DEC_GAUGE(crt_ctx, CM_RPC_WAITQ_DEPTH, 1);
} else {
put_quota_resource(rpc_priv->crp_pub.cr_ctx, CRT_QUOTA_RPCS);
}
Expand Down Expand Up @@ -2155,7 +2142,7 @@ get_quota_resource(crt_context_t crt_ctx, crt_quota_type_t quota)
} else {
D_DEBUG(DB_TRACE, "Quota limit (%d) reached for quota_type=%d\n",
ctx->cc_quotas.limit[quota], quota);
d_tm_inc_counter(ctx->cc_quotas.rpc_quota_exceeded, 1);
CRT_METRIC_INC(ctx, CM_RPC_QUOTA_EXCEEDED);
rc = -DER_QUOTA_LIMIT;
}

Expand Down
18 changes: 17 additions & 1 deletion src/cart/crt_corpc.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
* (C) Copyright 2016-2024 Intel Corporation.
* (C) Copyright 2025 Hewlett Packard Enterprise Development LP
* (C) Copyright 2025-2026 Hewlett Packard Enterprise Development LP
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/
Expand Down Expand Up @@ -348,12 +348,16 @@ crt_corpc_req_create(crt_context_t crt_ctx, crt_group_t *grp,
bool filter_invert;
d_rank_t grp_root, pri_root;
uint32_t grp_ver;
struct crt_context *ctx;
int rc = 0;

if (crt_ctx == CRT_CONTEXT_NULL || req == NULL) {
D_ERROR("invalid parameter (NULL crt_ctx or req).\n");
D_GOTO(out, rc = -DER_INVAL);
}

ctx = crt_ctx;

if (!crt_initialized()) {
D_ERROR("CaRT not initialized yet.\n");
D_GOTO(out, rc = -DER_UNINIT);
Expand Down Expand Up @@ -438,6 +442,8 @@ crt_corpc_req_create(crt_context_t crt_ctx, crt_group_t *grp,
D_GOTO(out, rc);
}

CRT_METRIC_INC(ctx, CM_CORPC_CREATED);

*req = &rpc_priv->crp_pub;
out:
if (rc < 0)
Expand Down Expand Up @@ -540,7 +546,17 @@ crt_corpc_complete(struct crt_rpc_priv *rpc_priv)
myrank = co_info->co_grp_priv->gp_self;
am_root = (myrank == co_info->co_root);
if (am_root) {
struct crt_context *ctx;

crt_rpc_lock(rpc_priv);

ctx = rpc_priv->crp_pub.cr_ctx;

if (co_info->co_rc == 0)
CRT_METRIC_INC(ctx, CM_CORPC_COMPLETED);
else
CRT_METRIC_INC(ctx, CM_CORPC_COMPLETED_ERR);

crt_rpc_complete_and_unlock(rpc_priv, co_info->co_rc);
} else {
if (co_info->co_rc != 0)
Expand Down
21 changes: 17 additions & 4 deletions src/cart/crt_hg.c
Original file line number Diff line number Diff line change
Expand Up @@ -1180,6 +1180,8 @@ crt_rpc_handler_common(hg_handle_t hg_hdl)
rpc_tmp.crp_hg_hdl = hg_hdl;
rpc_tmp.crp_pub.cr_ctx = crt_ctx;

CRT_METRIC_INC(crt_ctx, CM_RPC_RECV);

rc = crt_hg_unpack_header(hg_hdl, &rpc_tmp, &proc);
if (unlikely(rc != 0)) {
D_ERROR("crt_hg_unpack_header failed, rc: %d.\n", rc);
Expand Down Expand Up @@ -1597,23 +1599,29 @@ crt_hg_reply_send_cb(const struct hg_cb_info *hg_cbinfo)
int
crt_hg_reply_send(struct crt_rpc_priv *rpc_priv)
{
hg_return_t hg_ret;
int rc = 0;
struct crt_context *crt_ctx;
hg_return_t hg_ret;
int rc = 0;

D_ASSERT(rpc_priv != NULL);

crt_ctx = rpc_priv->crp_pub.cr_ctx;

/* corresponds to decref in crt_hg_reply_send_cb */
RPC_ADDREF(rpc_priv);
hg_ret = HG_Respond(rpc_priv->crp_hg_hdl, crt_hg_reply_send_cb,
rpc_priv, &rpc_priv->crp_pub.cr_output);
if (hg_ret != HG_SUCCESS) {
CRT_METRIC_INC(crt_ctx, CM_RPC_REPLY_FAILED);
RPC_ERROR(rpc_priv, "HG_Respond failed, hg_ret: " DF_HG_RC "\n",
DP_HG_RC(hg_ret));
/* should success as addref above */
RPC_DECREF(rpc_priv);
D_GOTO(out, rc = crt_hgret_2_der(hg_ret));
}

CRT_METRIC_INC(crt_ctx, CM_RPC_REPLIED);

/* Release input buffer */
if (rpc_priv->crp_release_input_early && !rpc_priv->crp_forward) {
hg_ret = HG_Release_input_buf(rpc_priv->crp_hg_hdl);
Expand All @@ -1631,8 +1639,11 @@ crt_hg_reply_send(struct crt_rpc_priv *rpc_priv)
void
crt_hg_reply_error_send(struct crt_rpc_priv *rpc_priv, int error_code)
{
void *hg_out_struct;
int hg_ret;
struct crt_context *crt_ctx;
void *hg_out_struct;
int hg_ret;

crt_ctx = rpc_priv->crp_pub.cr_ctx;

D_ASSERT(rpc_priv != NULL);
D_ASSERT(error_code != 0);
Expand All @@ -1641,10 +1652,12 @@ crt_hg_reply_error_send(struct crt_rpc_priv *rpc_priv, int error_code)
rpc_priv->crp_reply_hdr.cch_rc = error_code;
hg_ret = HG_Respond(rpc_priv->crp_hg_hdl, NULL, NULL, hg_out_struct);
if (hg_ret != HG_SUCCESS) {
CRT_METRIC_INC(crt_ctx, CM_RPC_REPLY_FAILED);
RPC_ERROR(rpc_priv,
"HG_Respond failed, hg_ret: " DF_HG_RC "\n",
DP_HG_RC(hg_ret));
} else {
CRT_METRIC_INC(crt_ctx, CM_RPC_REPLIED);
RPC_TRACE(DB_NET, rpc_priv,
"Sent CART level error message back to client. error_code: %d\n",
error_code);
Expand Down
Loading
Loading