Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 65 additions & 20 deletions src/common/mem.c
Original file line number Diff line number Diff line change
Expand Up @@ -997,6 +997,49 @@ pmem_defer_free(struct umem_instance *umm, umem_off_t off, void *act)
pmemobj_defer_free(pop, id, (struct pobj_action *)act);
}

static void
pmem_tx_set_failure_behavior(enum umem_tx_failure_behavior behavior)
{
switch (behavior) {
case TX_FAILURE_ABORT:
pmemobj_tx_set_failure_behavior(POBJ_TX_FAILURE_ABORT);
break;
case TX_FAILURE_RETURN:
pmemobj_tx_set_failure_behavior(POBJ_TX_FAILURE_RETURN);
break;
default:
D_ASSERTF(0, "Unknown TX failure behavior %d\n", behavior);
}
}

static int
pmem_tx_get_failure_behavior(void)
{
enum pobj_tx_failure_behavior behavior;

behavior = pmemobj_tx_get_failure_behavior();

switch (behavior) {
case POBJ_TX_FAILURE_ABORT:
return TX_FAILURE_ABORT;
case POBJ_TX_FAILURE_RETURN:
return TX_FAILURE_RETURN;
default:
D_ASSERTF(0, "Unknown TX failure behavior %d\n", behavior);
return -DER_INVAL;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No return after unconditional assert.

Suggested change
return -DER_INVAL;

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some static analysis tools may warning as "miss return" or similar for such case.

}
}

static int
pmem_tx_set_snapbuf(struct umem_instance *umm, umem_off_t snapbuf, size_t size)
{
void *buf = umem_off2ptr(umm, snapbuf);
int rc;

rc = pmemobj_tx_log_append_buffer(TX_LOG_TYPE_SNAPSHOT, buf, size);
return rc ? umem_tx_errno(rc) : 0;
}

static int
pmem_tx_stage(void)
{
Expand Down Expand Up @@ -1135,28 +1178,30 @@ umem_tx_add_cb(struct umem_instance *umm, struct umem_tx_stage_data *txd,
return 0;
}

static umem_ops_t pmem_ops = {
.mo_tx_free = pmem_tx_free,
.mo_tx_alloc = pmem_tx_alloc,
.mo_tx_add = pmem_tx_add,
.mo_tx_xadd = pmem_tx_xadd,
.mo_tx_add_ptr = pmem_tx_add_ptr,
.mo_tx_abort = pmem_tx_abort,
.mo_tx_begin = pmem_tx_begin,
.mo_tx_commit = pmem_tx_commit,
.mo_tx_stage = pmem_tx_stage,
.mo_reserve = pmem_reserve,
.mo_defer_free = pmem_defer_free,
.mo_cancel = pmem_cancel,
.mo_tx_publish = pmem_tx_publish,
.mo_atomic_copy = pmem_atomic_copy,
.mo_atomic_alloc = pmem_atomic_alloc,
.mo_atomic_free = pmem_atomic_free,
.mo_atomic_flush = pmem_atomic_flush,
.mo_tx_add_callback = umem_tx_add_cb,
static umem_ops_t pmem_ops = {
.mo_tx_free = pmem_tx_free,
.mo_tx_alloc = pmem_tx_alloc,
.mo_tx_add = pmem_tx_add,
.mo_tx_xadd = pmem_tx_xadd,
.mo_tx_add_ptr = pmem_tx_add_ptr,
.mo_tx_abort = pmem_tx_abort,
.mo_tx_begin = pmem_tx_begin,
.mo_tx_commit = pmem_tx_commit,
.mo_tx_set_failure_behavior = pmem_tx_set_failure_behavior,
.mo_tx_get_failure_behavior = pmem_tx_get_failure_behavior,
.mo_tx_set_snapbuf = pmem_tx_set_snapbuf,
.mo_tx_stage = pmem_tx_stage,
.mo_reserve = pmem_reserve,
.mo_defer_free = pmem_defer_free,
.mo_cancel = pmem_cancel,
.mo_tx_publish = pmem_tx_publish,
.mo_atomic_copy = pmem_atomic_copy,
.mo_atomic_alloc = pmem_atomic_alloc,
.mo_atomic_free = pmem_atomic_free,
.mo_atomic_flush = pmem_atomic_flush,
.mo_tx_add_callback = umem_tx_add_cb,
};


/** BMEM operations (depends on dav) */

static int
Expand Down
28 changes: 28 additions & 0 deletions src/dtx/dtx_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -1032,6 +1032,34 @@ dtx_sub_init(struct dtx_handle *dth, daos_unit_oid_t *oid, uint64_t dkey_hash)
return rc;
}

int
dtx_commit_large(daos_handle_t coh, struct dtx_id *dtis, int cnt, bool keep_act, bool *rm_cos)
{
int step = DTX_YIELD_CYCLE;
int committed = 0;
int rc = 0;
int i = 0;

while (i < cnt) {
if (i + step > cnt)
step = cnt - i;

rc = vos_dtx_commit(coh, dtis + i, step, keep_act, rm_cos);
if (rc >= 0) {
committed += rc;
i += step;
} else {
if ((rc != -DER_NOSPACE && rc != -DER_OVERFLOW) || step <= 1)
return rc;

/* If out of space, reduce TX size and retry. */
step >>= 1;
}
}

return committed;
}

/**
* Prepare the leader DTX handle in DRAM.
*
Expand Down
5 changes: 3 additions & 2 deletions src/dtx/dtx_internal.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/**
* (C) Copyright 2019-2024 Intel Corporation.
* (C) Copyright 2025 Hewlett Packard Enterprise Development LP
* (C) Copyright 2025-2026 Hewlett Packard Enterprise Development LP
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/
Expand Down Expand Up @@ -101,7 +101,7 @@ CRT_RPC_DECLARE(dtx, DAOS_ISEQ_DTX, DAOS_OSEQ_DTX);

CRT_RPC_DECLARE(dtx_coll, DAOS_ISEQ_COLL_DTX, DAOS_OSEQ_COLL_DTX);

#define DTX_YIELD_CYCLE (DTX_THRESHOLD_COUNT >> 3)
#define DTX_YIELD_CYCLE DTX_PIGGYBACK_COUNT

/* The count threshold (per pool) for triggering DTX aggregation. */
#define DTX_AGG_THD_CNT_MAX (1 << 24)
Expand Down Expand Up @@ -263,6 +263,7 @@ extern btr_ops_t dtx_btr_cos_ops;
/* dtx_common.c */
int dtx_handle_reinit(struct dtx_handle *dth);
void dtx_batched_commit(void *arg);
int dtx_commit_large(daos_handle_t coh, struct dtx_id *dtis, int cnt, bool keep_act, bool *rm_cos);
void dtx_aggregation_main(void *arg);
int start_dtx_reindex_ult(struct ds_cont_child *cont);
void dtx_merge_check_result(int *tgt, int src);
Expand Down
4 changes: 2 additions & 2 deletions src/dtx/dtx_rpc.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/**
* (C) Copyright 2019-2024 Intel Corporation.
* (C) Copyright 2025 Hewlett Packard Enterprise Development LP
* (C) Copyright 2025-2026 Hewlett Packard Enterprise Development LP
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/
Expand Down Expand Up @@ -879,7 +879,7 @@ dtx_commit(struct ds_cont_child *cont, struct dtx_entry **dtes,
* the DTX entries (in the dtis) as "PARTIAL_COMMITTED" and re-commit them later.
* It is harmless to re-commit the DTX that has ever been committed.
*/
rc1 = vos_dtx_commit(cont->sc_hdl, dca.dca_dtis, count, rc != 0, rm_cos);
rc1 = dtx_commit_large(cont->sc_hdl, dca.dca_dtis, count, rc != 0, rm_cos);
if (rc1 > 0) {
dra->dra_committed += rc1;
rc1 = 0;
Expand Down
20 changes: 7 additions & 13 deletions src/dtx/dtx_srv.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/**
* (C) Copyright 2019-2024 Intel Corporation.
* (C) Copyright 2025 Hewlett Packard Enterprise Development LP
* (C) Copyright 2025-2026 Hewlett Packard Enterprise Development LP
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/
Expand Down Expand Up @@ -196,19 +196,13 @@ dtx_handler(crt_rpc_t *rpc)
if (unlikely(din->di_epoch == 1))
D_GOTO(out, rc = -DER_IO);

while (i < din->di_dtx_array.ca_count) {
if (i + count > din->di_dtx_array.ca_count)
count = din->di_dtx_array.ca_count - i;
/* The count of DTX entries will not exceed DTX_THRESHOLD_COUNT. */
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/* The count of DTX entries will not exceed DTX_THRESHOLD_COUNT. */
D_ASSERT(din->di_dtx_array.ca_count <= DTX_THRESHOLD_COUNT);

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot directly assertion check the value that is from network. If the sender offered invalid value, in spite of by wrong or intentionally, then dtx_commit_large() will handle that.

rc1 = dtx_commit_large(cont->sc_hdl, (struct dtx_id *)din->di_dtx_array.ca_arrays,
(int)din->di_dtx_array.ca_count, false, NULL);
if (rc1 < 0)
Comment thread
knard38 marked this conversation as resolved.
D_GOTO(out, rc = rc1);

dtis = (struct dtx_id *)din->di_dtx_array.ca_arrays + i;
rc1 = vos_dtx_commit(cont->sc_hdl, dtis, count, false, NULL);
if (rc1 > 0)
committed += rc1;
else if (rc == 0 && rc1 < 0)
rc = rc1;

i += count;
}
committed += rc1;

if (din->di_flags.ca_count > 0)
flags = din->di_flags.ca_arrays;
Expand Down
1 change: 1 addition & 0 deletions src/include/daos/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -860,6 +860,7 @@ enum {
#define DAOS_OBJ_COLL_SPARSE (DAOS_FAIL_UNIT_TEST_GROUP_LOC | 0x4d)

#define DAOS_DTX_RESEND_NONLEADER (DAOS_FAIL_UNIT_TEST_GROUP_LOC | 0x4e)
#define DAOS_DTX_NOSPACE_NOREFRESH (DAOS_FAIL_UNIT_TEST_GROUP_LOC | 0x4f)

#define DAOS_NVME_FAULTY (DAOS_FAIL_UNIT_TEST_GROUP_LOC | 0x50)
#define DAOS_NVME_WRITE_ERR (DAOS_FAIL_UNIT_TEST_GROUP_LOC | 0x51)
Expand Down
4 changes: 3 additions & 1 deletion src/include/daos/dtx.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/**
* (C) Copyright 2019-2023 Intel Corporation.
* (C) Copyright 2025 Hewlett Packard Enterprise Development LP
* (C) Copyright 2025-2026 Hewlett Packard Enterprise Development LP
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/
Expand All @@ -25,6 +25,8 @@
*/
#define DTX_THRESHOLD_COUNT (1 << 9)

#define DTX_PIGGYBACK_COUNT (1 << 6)

/* The time (in second) threshold for batched DTX commit. */
#define DTX_COMMIT_THRESHOLD_AGE 10

Expand Down
38 changes: 38 additions & 0 deletions src/include/daos/mem.h
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,11 @@ struct umem_pool {
struct umem_slab_desc up_slabs[0];
};

enum umem_tx_failure_behavior {
TX_FAILURE_ABORT,
TX_FAILURE_RETURN,
};

#ifdef DAOS_PMEM_BUILD
#define UMEM_CACHE_PAGE_SZ_SHIFT 24 /* 16MB */
#define UMEM_CACHE_PAGE_SZ (1 << UMEM_CACHE_PAGE_SZ_SHIFT)
Expand Down Expand Up @@ -750,7 +755,14 @@ typedef struct {
/** commit memory transaction */
int (*mo_tx_commit)(struct umem_instance *umm, void *data);

/** set TX_FAILURE_ABORT or TX_FAILURE_RETURN when hit failure during TX. */
void (*mo_tx_set_failure_behavior)(enum umem_tx_failure_behavior behavior);

/** query the failure behavior for current TX. */
int (*mo_tx_get_failure_behavior)(void);
#ifdef DAOS_PMEM_BUILD
/** Set emergency buffer for transaction snapshot */
int (*mo_tx_set_snapbuf)(struct umem_instance *umm, umem_off_t snap_buf, size_t size);
/** get TX stage */
int (*mo_tx_stage)(void);

Expand Down Expand Up @@ -1074,12 +1086,38 @@ umem_tx_end(struct umem_instance *umm, int err)
return umem_tx_end_ex(umm, err, NULL);
}

static inline void
umem_tx_set_failure_behavior(struct umem_instance *umm, enum umem_tx_failure_behavior behavior)
{
if (umm->umm_ops->mo_tx_set_failure_behavior)
umm->umm_ops->mo_tx_set_failure_behavior(behavior);
}

static inline int
umem_tx_get_failure_behavior(struct umem_instance *umm)
{
if (umm->umm_ops->mo_tx_get_failure_behavior)
return umm->umm_ops->mo_tx_get_failure_behavior();
else
/* Abort TX on failure by default. */
return TX_FAILURE_ABORT;
}

#ifdef DAOS_PMEM_BUILD
bool umem_tx_inprogress(struct umem_instance *umm);
bool umem_tx_none(struct umem_instance *umm);

int umem_tx_errno(int err);

static inline int
umem_tx_set_snapbuf(struct umem_instance *umm, umem_off_t snap_buf, size_t size)
{
if (umm->umm_ops->mo_tx_set_snapbuf)
return umm->umm_ops->mo_tx_set_snapbuf(umm, snap_buf, size);
else
return 0;
}

static inline int
umem_tx_stage(struct umem_instance *umm)
{
Expand Down
16 changes: 10 additions & 6 deletions src/object/srv_obj.c
Original file line number Diff line number Diff line change
Expand Up @@ -2093,14 +2093,18 @@ obj_local_rw_internal_wrap(crt_rpc_t *rpc, struct obj_io_context *ioc, struct dt
static int
obj_local_rw(crt_rpc_t *rpc, struct obj_io_context *ioc, struct dtx_handle *dth)
{
struct obj_rw_in *orw = crt_req_get(rpc);
struct dtx_share_peer *dsp;
uint32_t retry = 0;
int rc;
struct obj_rw_in *orw = crt_req_get(rpc);
struct dtx_share_peer *dsp;
uint32_t retry = 0;
uint32_t opc = opc_get(rpc->cr_opc);
int rc;

again:
rc = obj_local_rw_internal_wrap(rpc, ioc, dth);
if (dth != NULL && obj_dtx_need_refresh(dth, rc)) {
if (opc == DAOS_OBJ_RPC_FETCH && DAOS_FAIL_CHECK(DAOS_DTX_NOSPACE_NOREFRESH))
return -DER_NONEXIST;

if (++retry < 3) {
rc = dtx_refresh(dth, ioc->ioc_coc);
if (rc == 0)
Expand Down Expand Up @@ -3166,7 +3170,7 @@ ds_obj_rw_handler(crt_rpc_t *rpc)
*/
D_FREE(dti_cos);
dti_cos_cnt = dtx_cos_get_piggyback(ioc.ioc_coc, &orw->orw_oid, orw->orw_dkey_hash,
DTX_THRESHOLD_COUNT, &dti_cos);
DTX_PIGGYBACK_COUNT, &dti_cos);
if (dti_cos_cnt < 0)
D_GOTO(out, rc = dti_cos_cnt);

Expand Down Expand Up @@ -4073,7 +4077,7 @@ ds_obj_punch_handler(crt_rpc_t *rpc)
*/
D_FREE(dti_cos);
dti_cos_cnt = dtx_cos_get_piggyback(ioc.ioc_coc, &opi->opi_oid, opi->opi_dkey_hash,
DTX_THRESHOLD_COUNT, &dti_cos);
DTX_PIGGYBACK_COUNT, &dti_cos);
if (dti_cos_cnt < 0)
D_GOTO(out, rc = dti_cos_cnt);

Expand Down
Loading
Loading