From 51c08d0bf3c6ef7c8664560e21a99f60d155ea03 Mon Sep 17 00:00:00 2001 From: Fan Yong Date: Wed, 29 Apr 2026 23:37:49 +0800 Subject: [PATCH] DAOS-18690 vos: handle DTX commit under space pressure If we cannot normally allocate space to hold committed DTX table, then release some old DTX entries from some container in the same pool to hold new committed ones. The patch also uses preallocated space for TX snapshots under the case of space pressure. Signed-off-by: Fan Yong --- src/common/mem.c | 85 ++++-- src/dtx/dtx_common.c | 28 ++ src/dtx/dtx_internal.h | 5 +- src/dtx/dtx_rpc.c | 10 +- src/dtx/dtx_srv.c | 24 +- src/include/daos/common.h | 1 + src/include/daos/dtx.h | 4 +- src/include/daos/mem.h | 38 +++ src/object/srv_obj.c | 16 +- src/tests/suite/daos_base_tx.c | 195 +++++++++++++ src/vos/vos_container.c | 11 +- src/vos/vos_dtx.c | 507 +++++++++++++++++++++++++-------- src/vos/vos_internal.h | 8 + src/vos/vos_pool.c | 1 + 14 files changed, 765 insertions(+), 168 deletions(-) diff --git a/src/common/mem.c b/src/common/mem.c index e473cb8bfe5..f64d744be72 100644 --- a/src/common/mem.c +++ b/src/common/mem.c @@ -997,6 +997,49 @@ pmem_defer_free(struct umem_instance *umm, umem_off_t off, void *act) pmemobj_defer_free(pop, id, (struct pobj_action *)act); } +static void +pmem_tx_set_failure_behavior(enum umem_tx_failure_behavior behavior) +{ + switch (behavior) { + case TX_FAILURE_ABORT: + pmemobj_tx_set_failure_behavior(POBJ_TX_FAILURE_ABORT); + break; + case TX_FAILURE_RETURN: + pmemobj_tx_set_failure_behavior(POBJ_TX_FAILURE_RETURN); + break; + default: + D_ASSERTF(0, "Unknown TX failure behavior %d\n", behavior); + } +} + +static int +pmem_tx_get_failure_behavior(void) +{ + enum pobj_tx_failure_behavior behavior; + + behavior = pmemobj_tx_get_failure_behavior(); + + switch (behavior) { + case POBJ_TX_FAILURE_ABORT: + return TX_FAILURE_ABORT; + case POBJ_TX_FAILURE_RETURN: + return TX_FAILURE_RETURN; + default: + D_ASSERTF(0, "Unknown TX failure behavior %d\n", behavior); + return -DER_INVAL; + } +} + +static int +pmem_tx_set_snapbuf(struct umem_instance *umm, umem_off_t snapbuf, size_t size) +{ + void *buf = umem_off2ptr(umm, snapbuf); + int rc; + + rc = pmemobj_tx_log_append_buffer(TX_LOG_TYPE_SNAPSHOT, buf, size); + return rc ? umem_tx_errno(rc) : 0; +} + static int pmem_tx_stage(void) { @@ -1135,28 +1178,30 @@ umem_tx_add_cb(struct umem_instance *umm, struct umem_tx_stage_data *txd, return 0; } -static umem_ops_t pmem_ops = { - .mo_tx_free = pmem_tx_free, - .mo_tx_alloc = pmem_tx_alloc, - .mo_tx_add = pmem_tx_add, - .mo_tx_xadd = pmem_tx_xadd, - .mo_tx_add_ptr = pmem_tx_add_ptr, - .mo_tx_abort = pmem_tx_abort, - .mo_tx_begin = pmem_tx_begin, - .mo_tx_commit = pmem_tx_commit, - .mo_tx_stage = pmem_tx_stage, - .mo_reserve = pmem_reserve, - .mo_defer_free = pmem_defer_free, - .mo_cancel = pmem_cancel, - .mo_tx_publish = pmem_tx_publish, - .mo_atomic_copy = pmem_atomic_copy, - .mo_atomic_alloc = pmem_atomic_alloc, - .mo_atomic_free = pmem_atomic_free, - .mo_atomic_flush = pmem_atomic_flush, - .mo_tx_add_callback = umem_tx_add_cb, +static umem_ops_t pmem_ops = { + .mo_tx_free = pmem_tx_free, + .mo_tx_alloc = pmem_tx_alloc, + .mo_tx_add = pmem_tx_add, + .mo_tx_xadd = pmem_tx_xadd, + .mo_tx_add_ptr = pmem_tx_add_ptr, + .mo_tx_abort = pmem_tx_abort, + .mo_tx_begin = pmem_tx_begin, + .mo_tx_commit = pmem_tx_commit, + .mo_tx_set_failure_behavior = pmem_tx_set_failure_behavior, + .mo_tx_get_failure_behavior = pmem_tx_get_failure_behavior, + .mo_tx_set_snapbuf = pmem_tx_set_snapbuf, + .mo_tx_stage = pmem_tx_stage, + .mo_reserve = pmem_reserve, + .mo_defer_free = pmem_defer_free, + .mo_cancel = pmem_cancel, + .mo_tx_publish = pmem_tx_publish, + .mo_atomic_copy = pmem_atomic_copy, + .mo_atomic_alloc = pmem_atomic_alloc, + .mo_atomic_free = pmem_atomic_free, + .mo_atomic_flush = pmem_atomic_flush, + .mo_tx_add_callback = umem_tx_add_cb, }; - /** BMEM operations (depends on dav) */ static int diff --git a/src/dtx/dtx_common.c b/src/dtx/dtx_common.c index 169c2b2d6a2..d27963b5bf8 100644 --- a/src/dtx/dtx_common.c +++ b/src/dtx/dtx_common.c @@ -1032,6 +1032,34 @@ dtx_sub_init(struct dtx_handle *dth, daos_unit_oid_t *oid, uint64_t dkey_hash) return rc; } +int +dtx_commit_large(daos_handle_t coh, struct dtx_id *dtis, int cnt, bool keep_act, bool *rm_cos) +{ + int step = DTX_YIELD_CYCLE; + int committed = 0; + int rc = 0; + int i = 0; + + while (i < cnt) { + if (i + step > cnt) + step = cnt - i; + + rc = vos_dtx_commit(coh, dtis + i, step, keep_act, rm_cos); + if (rc >= 0) { + committed += rc; + i += step; + } else { + if ((rc != -DER_NOSPACE && rc != -DER_OVERFLOW) || step <= 1) + return rc; + + /* If out of space, reduce TX size and retry. */ + step >>= 1; + } + } + + return committed; +} + /** * Prepare the leader DTX handle in DRAM. * diff --git a/src/dtx/dtx_internal.h b/src/dtx/dtx_internal.h index 1298b4b350a..49fafa83523 100644 --- a/src/dtx/dtx_internal.h +++ b/src/dtx/dtx_internal.h @@ -1,6 +1,6 @@ /** * (C) Copyright 2019-2024 Intel Corporation. - * (C) Copyright 2025 Hewlett Packard Enterprise Development LP + * (C) Copyright 2025-2026 Hewlett Packard Enterprise Development LP * * SPDX-License-Identifier: BSD-2-Clause-Patent */ @@ -101,7 +101,7 @@ CRT_RPC_DECLARE(dtx, DAOS_ISEQ_DTX, DAOS_OSEQ_DTX); CRT_RPC_DECLARE(dtx_coll, DAOS_ISEQ_COLL_DTX, DAOS_OSEQ_COLL_DTX); -#define DTX_YIELD_CYCLE (DTX_THRESHOLD_COUNT >> 3) +#define DTX_YIELD_CYCLE DTX_PIGGYBACK_COUNT /* The count threshold (per pool) for triggering DTX aggregation. */ #define DTX_AGG_THD_CNT_MAX (1 << 24) @@ -263,6 +263,7 @@ extern btr_ops_t dtx_btr_cos_ops; /* dtx_common.c */ int dtx_handle_reinit(struct dtx_handle *dth); void dtx_batched_commit(void *arg); +int dtx_commit_large(daos_handle_t coh, struct dtx_id *dtis, int cnt, bool keep_act, bool *rm_cos); void dtx_aggregation_main(void *arg); int start_dtx_reindex_ult(struct ds_cont_child *cont); void dtx_merge_check_result(int *tgt, int src); diff --git a/src/dtx/dtx_rpc.c b/src/dtx/dtx_rpc.c index 140c93f0b7a..1203ee058eb 100644 --- a/src/dtx/dtx_rpc.c +++ b/src/dtx/dtx_rpc.c @@ -878,11 +878,13 @@ dtx_commit(struct ds_cont_child *cont, struct dtx_entry **dtes, } /* - * Some DTX entries may have been committed on some participants. Then mark all - * the DTX entries (in the dtis) as "PARTIAL_COMMITTED" and re-commit them later. - * It is harmless to re-commit the DTX that has ever been committed. + * Some DTX entries may have been committed on parts of (remote) participants. + * It is no way to revert related partially committed DTX entries since we do + * not know whether someone has already read related data for those partially + * committed DTX entries. Then let's mark all the DTX entries in the @dtis as + * "PARTIAL_COMMITTED" and re-commit them later. It is safe to re-commit them. */ - rc1 = vos_dtx_commit(cont->sc_hdl, dca.dca_dtis, count, rc != 0, rm_cos); + rc1 = dtx_commit_large(cont->sc_hdl, dca.dca_dtis, count, rc != 0, rm_cos); if (rc1 > 0) { dra->dra_committed += rc1; rc1 = 0; diff --git a/src/dtx/dtx_srv.c b/src/dtx/dtx_srv.c index 5b68645cf54..7a738437fee 100644 --- a/src/dtx/dtx_srv.c +++ b/src/dtx/dtx_srv.c @@ -1,6 +1,6 @@ /** * (C) Copyright 2019-2024 Intel Corporation. - * (C) Copyright 2025 Hewlett Packard Enterprise Development LP + * (C) Copyright 2025-2026 Hewlett Packard Enterprise Development LP * * SPDX-License-Identifier: BSD-2-Clause-Patent */ @@ -196,19 +196,17 @@ dtx_handler(crt_rpc_t *rpc) if (unlikely(din->di_epoch == 1)) D_GOTO(out, rc = -DER_IO); - while (i < din->di_dtx_array.ca_count) { - if (i + count > din->di_dtx_array.ca_count) - count = din->di_dtx_array.ca_count - i; - - dtis = (struct dtx_id *)din->di_dtx_array.ca_arrays + i; - rc1 = vos_dtx_commit(cont->sc_hdl, dtis, count, false, NULL); - if (rc1 > 0) - committed += rc1; - else if (rc == 0 && rc1 < 0) - rc = rc1; + /* + * The count of DTX entries will not exceed DTX_THRESHOLD_COUNT, that + * is guaranteed by the caller. Even if some wrong number was offered + * (via network), dtx_commit_large will handle related cases properly. + */ + rc1 = dtx_commit_large(cont->sc_hdl, (struct dtx_id *)din->di_dtx_array.ca_arrays, + (int)din->di_dtx_array.ca_count, false, NULL); + if (rc1 < 0) + D_GOTO(out, rc = rc1); - i += count; - } + committed += rc1; if (din->di_flags.ca_count > 0) flags = din->di_flags.ca_arrays; diff --git a/src/include/daos/common.h b/src/include/daos/common.h index 47f10646076..85cf9d9dadb 100644 --- a/src/include/daos/common.h +++ b/src/include/daos/common.h @@ -860,6 +860,7 @@ enum { #define DAOS_OBJ_COLL_SPARSE (DAOS_FAIL_UNIT_TEST_GROUP_LOC | 0x4d) #define DAOS_DTX_RESEND_NONLEADER (DAOS_FAIL_UNIT_TEST_GROUP_LOC | 0x4e) +#define DAOS_DTX_NOSPACE_NOREFRESH (DAOS_FAIL_UNIT_TEST_GROUP_LOC | 0x4f) #define DAOS_NVME_FAULTY (DAOS_FAIL_UNIT_TEST_GROUP_LOC | 0x50) #define DAOS_NVME_WRITE_ERR (DAOS_FAIL_UNIT_TEST_GROUP_LOC | 0x51) diff --git a/src/include/daos/dtx.h b/src/include/daos/dtx.h index b91e5a1084c..db175503646 100644 --- a/src/include/daos/dtx.h +++ b/src/include/daos/dtx.h @@ -1,6 +1,6 @@ /** * (C) Copyright 2019-2023 Intel Corporation. - * (C) Copyright 2025 Hewlett Packard Enterprise Development LP + * (C) Copyright 2025-2026 Hewlett Packard Enterprise Development LP * * SPDX-License-Identifier: BSD-2-Clause-Patent */ @@ -25,6 +25,8 @@ */ #define DTX_THRESHOLD_COUNT (1 << 9) +#define DTX_PIGGYBACK_COUNT (1 << 6) + /* The time (in second) threshold for batched DTX commit. */ #define DTX_COMMIT_THRESHOLD_AGE 10 diff --git a/src/include/daos/mem.h b/src/include/daos/mem.h index d451f26704d..eb4bdbf3552 100644 --- a/src/include/daos/mem.h +++ b/src/include/daos/mem.h @@ -190,6 +190,11 @@ struct umem_pool { struct umem_slab_desc up_slabs[0]; }; +enum umem_tx_failure_behavior { + TX_FAILURE_ABORT, + TX_FAILURE_RETURN, +}; + #ifdef DAOS_PMEM_BUILD #define UMEM_CACHE_PAGE_SZ_SHIFT 24 /* 16MB */ #define UMEM_CACHE_PAGE_SZ (1 << UMEM_CACHE_PAGE_SZ_SHIFT) @@ -750,7 +755,14 @@ typedef struct { /** commit memory transaction */ int (*mo_tx_commit)(struct umem_instance *umm, void *data); + /** set TX_FAILURE_ABORT or TX_FAILURE_RETURN when hit failure during TX. */ + void (*mo_tx_set_failure_behavior)(enum umem_tx_failure_behavior behavior); + + /** query the failure behavior for current TX. */ + int (*mo_tx_get_failure_behavior)(void); #ifdef DAOS_PMEM_BUILD + /** Set emergency buffer for transaction snapshot */ + int (*mo_tx_set_snapbuf)(struct umem_instance *umm, umem_off_t snap_buf, size_t size); /** get TX stage */ int (*mo_tx_stage)(void); @@ -1074,12 +1086,38 @@ umem_tx_end(struct umem_instance *umm, int err) return umem_tx_end_ex(umm, err, NULL); } +static inline void +umem_tx_set_failure_behavior(struct umem_instance *umm, enum umem_tx_failure_behavior behavior) +{ + if (umm->umm_ops->mo_tx_set_failure_behavior) + umm->umm_ops->mo_tx_set_failure_behavior(behavior); +} + +static inline int +umem_tx_get_failure_behavior(struct umem_instance *umm) +{ + if (umm->umm_ops->mo_tx_get_failure_behavior) + return umm->umm_ops->mo_tx_get_failure_behavior(); + else + /* Abort TX on failure by default. */ + return TX_FAILURE_ABORT; +} + #ifdef DAOS_PMEM_BUILD bool umem_tx_inprogress(struct umem_instance *umm); bool umem_tx_none(struct umem_instance *umm); int umem_tx_errno(int err); +static inline int +umem_tx_set_snapbuf(struct umem_instance *umm, umem_off_t snap_buf, size_t size) +{ + if (umm->umm_ops->mo_tx_set_snapbuf) + return umm->umm_ops->mo_tx_set_snapbuf(umm, snap_buf, size); + else + return 0; +} + static inline int umem_tx_stage(struct umem_instance *umm) { diff --git a/src/object/srv_obj.c b/src/object/srv_obj.c index f8224af6183..5d3c5adb60e 100644 --- a/src/object/srv_obj.c +++ b/src/object/srv_obj.c @@ -2093,14 +2093,18 @@ obj_local_rw_internal_wrap(crt_rpc_t *rpc, struct obj_io_context *ioc, struct dt static int obj_local_rw(crt_rpc_t *rpc, struct obj_io_context *ioc, struct dtx_handle *dth) { - struct obj_rw_in *orw = crt_req_get(rpc); - struct dtx_share_peer *dsp; - uint32_t retry = 0; - int rc; + struct obj_rw_in *orw = crt_req_get(rpc); + struct dtx_share_peer *dsp; + uint32_t retry = 0; + uint32_t opc = opc_get(rpc->cr_opc); + int rc; again: rc = obj_local_rw_internal_wrap(rpc, ioc, dth); if (dth != NULL && obj_dtx_need_refresh(dth, rc)) { + if (opc == DAOS_OBJ_RPC_FETCH && DAOS_FAIL_CHECK(DAOS_DTX_NOSPACE_NOREFRESH)) + return -DER_NONEXIST; + if (++retry < 3) { rc = dtx_refresh(dth, ioc->ioc_coc); if (rc == 0) @@ -3167,7 +3171,7 @@ ds_obj_rw_handler(crt_rpc_t *rpc) */ D_FREE(dti_cos); dti_cos_cnt = dtx_cos_get_piggyback(ioc.ioc_coc, &orw->orw_oid, orw->orw_dkey_hash, - DTX_THRESHOLD_COUNT, &dti_cos); + DTX_PIGGYBACK_COUNT, &dti_cos); if (dti_cos_cnt < 0) D_GOTO(out, rc = dti_cos_cnt); @@ -4073,7 +4077,7 @@ ds_obj_punch_handler(crt_rpc_t *rpc) */ D_FREE(dti_cos); dti_cos_cnt = dtx_cos_get_piggyback(ioc.ioc_coc, &opi->opi_oid, opi->opi_dkey_hash, - DTX_THRESHOLD_COUNT, &dti_cos); + DTX_PIGGYBACK_COUNT, &dti_cos); if (dti_cos_cnt < 0) D_GOTO(out, rc = dti_cos_cnt); diff --git a/src/tests/suite/daos_base_tx.c b/src/tests/suite/daos_base_tx.c index 0c6bd80d71c..e70383ef40e 100644 --- a/src/tests/suite/daos_base_tx.c +++ b/src/tests/suite/daos_base_tx.c @@ -1024,6 +1024,197 @@ dtx_23(void **state) ioreq_fini(&req); } +#define TSIZE 8 +#define CONTS 5 + +static void +dtx_24(void **state) +{ + test_arg_t *arg = *state; + const char *dkey = dts_dtx_dkey; + char akey[TSIZE] = {0}; + char wbuf[TSIZE]; + char rbuf[TSIZE]; + daos_obj_id_t oid; + struct ioreq req; + char i; + char j; + + FAULT_INJECTION_REQUIRED(); + + print_message("DTX24: DTX commit under space pressure with single container\n"); + + if (!test_runable(arg, dts_dtx_replica_cnt)) + return; + + oid = daos_test_oid_gen(arg->coh, dts_dtx_class, 0, 0, arg->myrank); + ioreq_init(&req, arg->coh, oid, DAOS_IOD_ARRAY, arg); + + print_message("Filling DTX committed table...\n"); + + for (i = 'a'; i <= 'z'; i++) { + snprintf(akey, TSIZE, "akey-%c", i); + for (j = 'A'; j <= 'Z'; j++) { + memset(wbuf, j, TSIZE); + insert_single(dkey, akey, (j - 'A') * TSIZE, wbuf, TSIZE, DAOS_TX_NONE, + &req); + } + } + + /* Wait for batched commit. */ + sleep(DTX_COMMIT_THRESHOLD_AGE + 3); + + /* Simulate DER_NOSPACE when DTX commit via DAOS_DTX_NOSPACE_NOREFRESH. */ + dtx_set_fail_loc(arg, DAOS_DTX_NOSPACE_NOREFRESH | DAOS_FAIL_ALWAYS); + + print_message("Writing more after space pressure...\n"); + + for (i = 'N'; i <= 'Z'; i++) { + snprintf(akey, TSIZE, "akey-%c", i); + for (j = 'a'; j <= 'm'; j++) { + memset(wbuf, j, TSIZE); + insert_single(dkey, akey, (j - 'a') * TSIZE, wbuf, TSIZE, DAOS_TX_NONE, + &req); + } + } + + /* Wait for batched commit. */ + sleep(DTX_COMMIT_THRESHOLD_AGE + 3); + + print_message("Verifying all written data...\n"); + + /* + * DAOS_DTX_NOSPACE_NOREFRESH will prevent DTX refresh. If former batched commit failed + * to commit some DTX because of simulated space exhaustion, then related lookup will + * return IO failure and the fetch result will be different from former write one. + */ + for (i = 'N'; i <= 'Z'; i++) { + snprintf(akey, TSIZE, "akey-%c", i); + for (j = 'a'; j <= 'm'; j++) { + memset(wbuf, j, TSIZE); + lookup_single(dkey, akey, (j - 'a') * TSIZE, rbuf, TSIZE, DAOS_TX_NONE, + &req); + assert_memory_equal(wbuf, rbuf, TSIZE); + } + } + + dtx_set_fail_loc(arg, 0); + ioreq_fini(&req); +} + +static void +dtx_25(void **state) +{ + test_arg_t *arg = *state; + struct test_cont conts[CONTS] = {0}; + daos_obj_id_t oids[CONTS] = {0}; + struct ioreq reqs[CONTS] = {0}; + char dkey[TSIZE] = {0}; + char akey[TSIZE] = {0}; + char wbuf[TSIZE]; + char rbuf[TSIZE]; + daos_prop_t *redun_prop; + int rc; + int m; + char n; + char i; + char j; + + FAULT_INJECTION_REQUIRED(); + + print_message("DTX25: DTX commit under space pressure with multiple containers\n"); + + if (!test_runable(arg, 3)) + return; + + redun_prop = daos_prop_alloc(1); + assert_non_null(redun_prop); + + redun_prop->dpp_entries[0].dpe_type = DAOS_PROP_CO_REDUN_LVL; + redun_prop->dpp_entries[0].dpe_val = DAOS_PROP_CO_REDUN_RANK; + + for (m = 0; m < CONTS; m++) { + rc = daos_cont_create(arg->pool.poh, &conts[m].uuid, redun_prop, NULL); + assert_rc_equal(rc, 0); + + uuid_unparse_lower(conts[m].uuid, conts[m].label); + rc = daos_cont_open(arg->pool.poh, conts[m].label, DAOS_COO_RW, &conts[m].coh, NULL, + NULL); + assert_rc_equal(rc, 0); + + oids[m] = daos_test_oid_gen(conts[m].coh, OC_RP_3GX, 0, 0, arg->myrank); + ioreq_init(&reqs[m], conts[m].coh, oids[m], DAOS_IOD_ARRAY, arg); + + print_message("Filling DTX committed table for the container " DF_UUID "\n", + DP_UUID(conts[m].uuid)); + + for (n = '0'; n < '4'; n++) { + snprintf(dkey, TSIZE, "dkey-%c", n); + for (i = 'a'; i <= 'm'; i++) { + snprintf(akey, TSIZE, "akey-%c", i); + for (j = 'A'; j <= 'Z'; j++) { + memset(wbuf, j, TSIZE); + insert_single(dkey, akey, (j - 'A') * TSIZE, wbuf, TSIZE, + DAOS_TX_NONE, &reqs[m]); + } + } + } + } + + /* Wait for batched commit. */ + sleep(DTX_COMMIT_THRESHOLD_AGE + 3); + + /* Simulate DER_NOSPACE when DTX commit via DAOS_DTX_NOSPACE_NOREFRESH. */ + dtx_set_fail_loc(arg, DAOS_DTX_NOSPACE_NOREFRESH | DAOS_FAIL_ALWAYS); + + for (m = 0; m < CONTS; m++) { + print_message("Writing more to cont " DF_UUID " after space pressure...\n", + DP_UUID(conts[m].uuid)); + + for (i = 'N'; i <= 'Z'; i++) { + snprintf(akey, TSIZE, "akey-%c", i); + for (j = 'a'; j <= 'm'; j++) { + memset(wbuf, j, TSIZE); + insert_single(dkey, akey, (j - 'a') * TSIZE, wbuf, TSIZE, + DAOS_TX_NONE, &reqs[m]); + } + } + } + + /* Wait for batched commit. */ + sleep(DTX_COMMIT_THRESHOLD_AGE + 3); + + print_message("Verifying all written data...\n"); + + /* + * DAOS_DTX_NOSPACE_NOREFRESH will prevent DTX refresh. If former batched commit failed + * to commit some DTX because of simulated space exhaustion, then related lookup will + * return IO failure and the fetch result will be different from former write one. + */ + for (m = 0; m < CONTS; m++) { + for (i = 'N'; i <= 'Z'; i++) { + snprintf(akey, TSIZE, "akey-%c", i); + for (j = 'a'; j <= 'm'; j++) { + memset(wbuf, j, TSIZE); + lookup_single(dkey, akey, (j - 'a') * TSIZE, rbuf, TSIZE, + DAOS_TX_NONE, &reqs[m]); + assert_memory_equal(wbuf, rbuf, TSIZE); + } + } + } + + daos_prop_free(redun_prop); + dtx_set_fail_loc(arg, 0); + + for (m = 0; m < CONTS; m++) { + ioreq_fini(&reqs[m]); + if (daos_handle_is_valid(conts[m].coh)) + daos_cont_close(conts[m].coh, NULL); + if (daos_is_valid_uuid_string(conts[m].label, UUID_SST_NONE)) + daos_cont_destroy(arg->pool.poh, conts[m].label, 0, NULL); + } +} + static int dtx_base_rf0_setup(void **state) { @@ -1092,6 +1283,10 @@ static const struct CMUnitTest dtx_tests[] = { dtx_22, NULL, test_case_teardown}, {"DTX23: Resend with lost reply from non-leader", dtx_23, NULL, test_case_teardown}, + {"DTX24: DTX under space pressure with single container", + dtx_24, NULL, test_case_teardown}, + {"DTX25: DTX under space pressure with multiple containers", + dtx_25, NULL, test_case_teardown}, }; /* clang-format on */ diff --git a/src/vos/vos_container.c b/src/vos/vos_container.c index 8c63f7c0f09..388a065d605 100644 --- a/src/vos/vos_container.c +++ b/src/vos/vos_container.c @@ -240,9 +240,8 @@ struct d_ulink_ops co_hdl_uh_ops = { .uop_cmp = cont_cmp, }; -int -cont_insert(struct vos_container *cont, struct d_uuid *key, struct d_uuid *pkey, - daos_handle_t *coh) +static int +cont_insert(struct vos_container *cont, struct d_uuid *key, struct d_uuid *pkey, daos_handle_t *coh) { int rc = 0; @@ -497,6 +496,8 @@ vos_cont_open(daos_handle_t poh, uuid_t co_uuid, daos_handle_t *coh) goto exit; } + d_list_add_tail(&cont->vc_pool_link, &pool->vp_cont_list); + cont->vc_open_count = 1; D_DEBUG(DB_TRACE, "Inert cont "DF_UUID" into hash table.\n", DP_UUID(cont->vc_id)); @@ -527,8 +528,10 @@ vos_cont_close(daos_handle_t coh) DP_UUID(cont->vc_id), cont->vc_open_count); cont->vc_open_count--; - if (cont->vc_open_count == 0) + if (cont->vc_open_count == 0) { + d_list_del_init(&cont->vc_pool_link); vos_obj_cache_evict(cont); + } D_DEBUG(DB_TRACE, "Close cont "DF_UUID", open count: %d\n", DP_UUID(cont->vc_id), cont->vc_open_count); diff --git a/src/vos/vos_dtx.c b/src/vos/vos_dtx.c index d50f2b87cc8..778b012e97d 100644 --- a/src/vos/vos_dtx.c +++ b/src/vos/vos_dtx.c @@ -420,6 +420,32 @@ static btr_ops_t dtx_committed_btr_ops = { .to_rec_update = dtx_cmt_ent_update, }; +static inline int +vos_dtx_add_ptr(struct vos_pool *pool, void *ptr, size_t size) +{ + struct umem_instance *umm = &pool->vp_umm; + int rc; + + rc = umem_tx_add_ptr(umm, ptr, size); +#ifdef DAOS_PMEM_BUILD + if (unlikely(rc == -DER_NOSPACE)) { + struct vos_pool_ext_df *ext_df = umem_off2ptr(umm, pool->vp_pool_df->pd_ext); + int behavior = umem_tx_get_failure_behavior(umm); + + if (ext_df != NULL && !UMOFF_IS_NULL(ext_df->ped_emerg_buf) && + behavior == TX_FAILURE_RETURN) { + rc = + umem_tx_set_snapbuf(umm, ext_df->ped_emerg_buf, VOS_SNAPBUF_EMERG_SIZE); + if (rc == 0) + rc = umem_tx_add_ptr(umm, ptr, size); + else + rc = -DER_NOSPACE; + } + } +#endif + return rc; +} + int vos_dtx_table_register(void) { @@ -473,7 +499,7 @@ vos_dtx_table_destroy(struct umem_instance *umm, struct vos_cont_df *cont_df) /* cd_dtx_active_tail is next to cd_dtx_active_head */ rc = umem_tx_add_ptr(umm, &cont_df->cd_dtx_active_head, sizeof(cont_df->cd_dtx_active_head) + - sizeof(cont_df->cd_dtx_active_tail)); + sizeof(cont_df->cd_dtx_active_tail)); if (rc != 0) return rc; @@ -571,7 +597,7 @@ do_dtx_rec_release(struct umem_instance *umm, struct vos_container *cont, } if (abort) { if (DAE_INDEX(dae) != DTX_INDEX_INVAL) { - rc = umem_tx_add_ptr(umm, &svt->ir_dtx, + rc = vos_dtx_add_ptr(cont->vc_pool, &svt->ir_dtx, sizeof(svt->ir_dtx)); if (rc != 0) return rc; @@ -579,8 +605,7 @@ do_dtx_rec_release(struct umem_instance *umm, struct vos_container *cont, dtx_set_aborted(&svt->ir_dtx); } else { - rc = umem_tx_add_ptr(umm, &svt->ir_dtx, - sizeof(svt->ir_dtx)); + rc = vos_dtx_add_ptr(cont->vc_pool, &svt->ir_dtx, sizeof(svt->ir_dtx)); if (rc != 0) return rc; @@ -600,7 +625,7 @@ do_dtx_rec_release(struct umem_instance *umm, struct vos_container *cont, if (abort) { if (DAE_INDEX(dae) != DTX_INDEX_INVAL) { - rc = umem_tx_add_ptr(umm, &evt->dc_dtx, + rc = vos_dtx_add_ptr(cont->vc_pool, &evt->dc_dtx, sizeof(evt->dc_dtx)); if (rc != 0) return rc; @@ -608,8 +633,7 @@ do_dtx_rec_release(struct umem_instance *umm, struct vos_container *cont, dtx_set_aborted(&evt->dc_dtx); } else { - rc = umem_tx_add_ptr(umm, &evt->dc_dtx, - sizeof(evt->dc_dtx)); + rc = vos_dtx_add_ptr(cont->vc_pool, &evt->dc_dtx, sizeof(evt->dc_dtx)); if (rc != 0) return rc; @@ -708,7 +732,8 @@ dtx_rec_release(struct vos_container *cont, struct vos_dtx_act_ent *dae, bool ab return rc; if (!invalid && keep_act) { - rc = umem_tx_add_ptr(umm, &dae_df->dae_rec_off, sizeof(dae_df->dae_rec_off)); + rc = vos_dtx_add_ptr(cont->vc_pool, &dae_df->dae_rec_off, + sizeof(dae_df->dae_rec_off)); if (rc != 0) return rc; dae_df->dae_rec_off = UMOFF_NULL; @@ -718,7 +743,7 @@ dtx_rec_release(struct vos_container *cont, struct vos_dtx_act_ent *dae, bool ab if (!invalid && keep_act) { /* When re-commit partial committed DTX, the count can be zero. */ if (dae_df->dae_rec_cnt > 0) { - rc = umem_tx_add_ptr(umm, &dae_df->dae_rec_cnt, + rc = vos_dtx_add_ptr(cont->vc_pool, &dae_df->dae_rec_cnt, sizeof(dae_df->dae_rec_cnt)); if (rc != 0) return rc; @@ -733,7 +758,7 @@ dtx_rec_release(struct vos_container *cont, struct vos_dtx_act_ent *dae, bool ab if (DAE_FLAGS(dae) & DTE_PARTIAL_COMMITTED) return 0; - rc = umem_tx_add_ptr(umm, &dae_df->dae_flags, sizeof(dae_df->dae_flags)); + rc = vos_dtx_add_ptr(cont->vc_pool, &dae_df->dae_flags, sizeof(dae_df->dae_flags)); if (rc != 0) return rc; @@ -753,11 +778,11 @@ dtx_rec_release(struct vos_container *cont, struct vos_dtx_act_ent *dae, bool ab } if (dbd->dbd_count > 1 || dbd->dbd_index < dbd->dbd_cap) { - rc = umem_tx_add_ptr(umm, &dae_df->dae_flags, sizeof(dae_df->dae_flags)); + rc = vos_dtx_add_ptr(cont->vc_pool, &dae_df->dae_flags, sizeof(dae_df->dae_flags)); if (rc != 0) return rc; - rc = umem_tx_add_ptr(umm, &dbd->dbd_count, sizeof(dbd->dbd_count)); + rc = vos_dtx_add_ptr(cont->vc_pool, &dbd->dbd_count, sizeof(dbd->dbd_count)); if (rc != 0) return rc; @@ -772,8 +797,7 @@ dtx_rec_release(struct vos_container *cont, struct vos_dtx_act_ent *dae, bool ab dbd_off = umem_ptr2off(umm, dbd); tmp = umem_off2ptr(umm, dbd->dbd_prev); if (tmp != NULL) { - rc = umem_tx_add_ptr(umm, &tmp->dbd_next, - sizeof(tmp->dbd_next)); + rc = vos_dtx_add_ptr(cont->vc_pool, &tmp->dbd_next, sizeof(tmp->dbd_next)); if (rc != 0) return rc; @@ -782,8 +806,7 @@ dtx_rec_release(struct vos_container *cont, struct vos_dtx_act_ent *dae, bool ab tmp = umem_off2ptr(umm, dbd->dbd_next); if (tmp != NULL) { - rc = umem_tx_add_ptr(umm, &tmp->dbd_prev, - sizeof(tmp->dbd_prev)); + rc = vos_dtx_add_ptr(cont->vc_pool, &tmp->dbd_prev, sizeof(tmp->dbd_prev)); if (rc != 0) return rc; @@ -791,8 +814,8 @@ dtx_rec_release(struct vos_container *cont, struct vos_dtx_act_ent *dae, bool ab } if (cont_df->cd_dtx_active_head == dbd_off) { - rc = umem_tx_add_ptr(umm, &cont_df->cd_dtx_active_head, - sizeof(cont_df->cd_dtx_active_head)); + rc = vos_dtx_add_ptr(cont->vc_pool, &cont_df->cd_dtx_active_head, + sizeof(cont_df->cd_dtx_active_head)); if (rc != 0) return rc; @@ -800,8 +823,8 @@ dtx_rec_release(struct vos_container *cont, struct vos_dtx_act_ent *dae, bool ab } if (cont_df->cd_dtx_active_tail == dbd_off) { - rc = umem_tx_add_ptr(umm, &cont_df->cd_dtx_active_tail, - sizeof(cont_df->cd_dtx_active_tail)); + rc = vos_dtx_add_ptr(cont->vc_pool, &cont_df->cd_dtx_active_tail, + sizeof(cont_df->cd_dtx_active_tail)); if (rc != 0) return rc; @@ -987,23 +1010,22 @@ vos_dtx_extend_act_table(struct vos_container *cont) D_ASSERT(UMOFF_IS_NULL(cont_df->cd_dtx_active_head)); /* cd_dtx_active_tail is next to cd_dtx_active_head */ - rc = umem_tx_add_ptr(umm, &cont_df->cd_dtx_active_head, + rc = vos_dtx_add_ptr(cont->vc_pool, &cont_df->cd_dtx_active_head, sizeof(cont_df->cd_dtx_active_head) + - sizeof(cont_df->cd_dtx_active_tail)); + sizeof(cont_df->cd_dtx_active_tail)); if (rc != 0) goto out; cont_df->cd_dtx_active_head = dbd_off; } else { - rc = umem_tx_add_ptr(umm, &tmp->dbd_next, - sizeof(tmp->dbd_next)); + rc = vos_dtx_add_ptr(cont->vc_pool, &tmp->dbd_next, sizeof(tmp->dbd_next)); if (rc != 0) goto out; tmp->dbd_next = dbd_off; - dbd->dbd_prev = cont_df->cd_dtx_active_tail; - rc = umem_tx_add_ptr(umm, &cont_df->cd_dtx_active_tail, + + rc = vos_dtx_add_ptr(cont->vc_pool, &cont_df->cd_dtx_active_tail, sizeof(cont_df->cd_dtx_active_tail)); if (rc != 0) goto out; @@ -1018,6 +1040,298 @@ vos_dtx_extend_act_table(struct vos_container *cont) return rc; } +static inline bool +vos_dtx_cmt_victim_candidate(struct vos_container *cont, uint64_t *time_l, uint64_t *time_h) +{ + struct vos_cont_df *cont_df = cont->vc_cont_df; + struct vos_dtx_blob_df *dbd; + + /* + * Keep at least one blob for each container committed DTX table. + * For cases of "cd_dtx_committed_head == cd_dtx_committed_tail", + * either the committed DTX table is empty, or only has one blob. + */ + if (cont_df->cd_dtx_committed_head == cont_df->cd_dtx_committed_tail) + return false; + + dbd = umem_off2ptr(vos_cont2umm(cont), cont_df->cd_dtx_committed_head); + D_ASSERT(dbd != NULL); + + if ((*time_h > dbd->dbd_committed_data[dbd->dbd_count - 1].dce_cmt_time) || + (*time_h == dbd->dbd_committed_data[dbd->dbd_count - 1].dce_cmt_time && + *time_l > dbd->dbd_committed_data[0].dce_cmt_time)) { + *time_l = dbd->dbd_committed_data[0].dce_cmt_time; + *time_h = dbd->dbd_committed_data[dbd->dbd_count - 1].dce_cmt_time; + return true; + } + + return false; +} + +/* + * Space is almost exhausted. Under such case, we must reclaim space to make current + * DTX commit to be proceed; otherwise, uncommitted DTX may block VOS aggregation as + * to prevent further space release. The most direct approach is to reclaim some old + * blob from current container's committed DTX table. It may be unfair because other + * containers could hold older committed DTX entries. However, it maybe not worth to + * scan all pools/containers on the target to find the globally oldest committed DTX + * blob under space pressure. So we make some tradeoff: firstly try to choose victim + * from the neighbors of current container, including prov one, next one and itself + * on the containers list belong to the same pool. If cannot find sitable candidate, + * choose the (more) next one that has more than one committed DTX blob. DAOS-18690. + */ +static struct vos_container * +vos_dtx_choose_victim(struct vos_container *cont) +{ + struct vos_pool *pool = cont->vc_pool; + struct vos_container *victim = NULL; + struct vos_container *prev = NULL; + struct vos_container *next = NULL; + uint64_t time_l = (uint64_t)(-1); + uint64_t time_h = (uint64_t)(-1); + + D_ASSERT(!d_list_empty(&pool->vp_cont_list)); + + if (vos_dtx_cmt_victim_candidate(cont, &time_l, &time_h)) + victim = cont; + + if (cont->vc_pool_link.prev == &pool->vp_cont_list) + prev = d_list_entry(pool->vp_cont_list.prev, struct vos_container, vc_pool_link); + else + prev = d_list_entry(cont->vc_pool_link.prev, struct vos_container, vc_pool_link); + + if (prev == cont) + goto out; + + if (vos_dtx_cmt_victim_candidate(prev, &time_l, &time_h)) + victim = prev; + + if (cont->vc_pool_link.next == &pool->vp_cont_list) + next = d_list_entry(pool->vp_cont_list.next, struct vos_container, vc_pool_link); + else + next = d_list_entry(cont->vc_pool_link.next, struct vos_container, vc_pool_link); + + if (next == prev) + goto out; + + if (vos_dtx_cmt_victim_candidate(next, &time_l, &time_h)) + victim = next; + +out: + if (unlikely(victim == NULL && next != NULL)) { + do { + if (next->vc_pool_link.next == &pool->vp_cont_list) + next = d_list_entry(pool->vp_cont_list.next, struct vos_container, + vc_pool_link); + else + next = d_list_entry(next->vc_pool_link.next, struct vos_container, + vc_pool_link); + if (next == prev || next == cont) + break; + + if (vos_dtx_cmt_victim_candidate(next, &time_l, &time_h)) + victim = next; + } while (victim == NULL); + } + + return victim; +} + +static int +vos_dtx_reuse_cmt_blob(struct vos_container *cont) +{ + struct vos_pool *pool = cont->vc_pool; + struct umem_instance *umm = vos_pool2umm(pool); + struct vos_container *vcm_cont; + struct vos_cont_df *cur_cont_df; + struct vos_cont_df *vcm_cont_df; + struct vos_dtx_blob_df *vcm_dbd; + struct vos_dtx_blob_df *new_head; + struct vos_dtx_blob_df *cur_tail; + struct vos_dtx_cmt_ent_df *dce_df; + d_iov_t kiov; + umem_off_t vcm_dbd_off; + umem_off_t new_head_off; + umem_off_t cur_tail_off; + daos_epoch_t epoch = 0; + uint32_t count = 0; + int rc; + int i; + + vcm_cont = vos_dtx_choose_victim(cont); + if (unlikely(vcm_cont == NULL)) { + D_ERROR("No enough space to extend commit DTX table for cont " DF_UUID "\n", + DP_UUID(cont->vc_id)); + return -DER_NOSPACE; + } + + cur_cont_df = cont->vc_cont_df; + vcm_cont_df = vcm_cont->vc_cont_df; + vcm_dbd_off = vcm_cont_df->cd_dtx_committed_head; + vcm_dbd = umem_off2ptr(umm, vcm_dbd_off); + D_ASSERT(vcm_dbd != NULL); + + /* P1: discard related DTX entries for the victim DTX blob. */ + for (i = 0; i < vcm_dbd->dbd_count; i++) { + dce_df = &vcm_dbd->dbd_committed_data[i]; + d_iov_set(&kiov, &dce_df->dce_xid, sizeof(dce_df->dce_xid)); + rc = dbtree_delete(vcm_cont->vc_dtx_committed_hdl, BTR_PROBE_EQ, &kiov, NULL); + if (rc == 0) + count++; + if (rc == -DER_NONEXIST) + rc = 0; + if (unlikely(rc != 0)) { + D_ERROR("Failed to discard DTX entry " DF_DTI " when purge committed DTX " + "blob on victim " DF_UUID " with dbd_off " UMOFF_PF ": " DF_RC "\n", + DP_DTI(&dce_df->dce_xid), DP_UUID(vcm_cont->vc_id), + UMOFF_P(vcm_dbd_off), DP_RC(rc)); + goto out; + } + + if (epoch < dce_df->dce_epoch) + epoch = dce_df->dce_epoch; + } + + /* P2: prepare to delete victim blob from victim container committed DTX table (head). */ + new_head_off = vcm_dbd->dbd_next; + new_head = umem_off2ptr(umm, new_head_off); + D_ASSERT(new_head != NULL); + + rc = vos_dtx_add_ptr(pool, &vcm_cont_df->cd_dtx_committed_head, + sizeof(vcm_cont_df->cd_dtx_committed_head)); + if (rc != 0) + goto out; + + rc = vos_dtx_add_ptr(pool, &new_head->dbd_prev, sizeof(new_head->dbd_prev)); + if (rc != 0) + goto out; + + /* P3: prepare to insert victim blob to current container committed DTX table (tail). */ + cur_tail_off = cur_cont_df->cd_dtx_committed_tail; + cur_tail = umem_off2ptr(umm, cur_tail_off); + + if (UMOFF_IS_NULL(cur_cont_df->cd_dtx_committed_head)) + /* cd_dtx_committed_tail is next to cd_dtx_committed_head */ + rc = vos_dtx_add_ptr(pool, &cur_cont_df->cd_dtx_committed_head, + sizeof(cur_cont_df->cd_dtx_committed_head) + + sizeof(cur_cont_df->cd_dtx_committed_tail)); + else + rc = vos_dtx_add_ptr(pool, &cur_cont_df->cd_dtx_committed_tail, + sizeof(cur_cont_df->cd_dtx_committed_tail)); + if (rc != 0) + goto out; + + if (cur_tail != NULL) { + rc = vos_dtx_add_ptr(pool, &cur_tail->dbd_next, sizeof(cur_tail->dbd_next)); + if (rc != 0) + goto out; + } + + /* dbd_next is next to dbd_prev */ + rc = vos_dtx_add_ptr(pool, &vcm_dbd->dbd_prev, + sizeof(vcm_dbd->dbd_prev) + sizeof(vcm_dbd->dbd_next)); + if (rc != 0) + goto out; + + /* P4: prepare to reset victim DTX blob conuter, dbd_index is next to dbd_count. */ + rc = vos_dtx_add_ptr(pool, &vcm_dbd->dbd_count, + sizeof(vcm_dbd->dbd_count) + sizeof(vcm_dbd->dbd_index)); + if (rc != 0) + goto out; + + /* P5: delete victim blob from victim container committed DTX table (head). */ + vcm_cont_df->cd_dtx_committed_head = new_head_off; + new_head->dbd_prev = UMOFF_NULL; + + /* P6: insert victim blob to current container committed DTX table (tail). */ + cur_cont_df->cd_dtx_committed_tail = vcm_dbd_off; + vcm_dbd->dbd_prev = cur_tail_off; + vcm_dbd->dbd_next = UMOFF_NULL; + if (UMOFF_IS_NULL(cur_cont_df->cd_dtx_committed_head)) + cur_cont_df->cd_dtx_committed_head = vcm_dbd_off; + if (cur_tail != NULL) + cur_tail->dbd_next = vcm_dbd_off; + + /* P7: reset victim DTX blob counter. */ + vcm_dbd->dbd_count = 0; + vcm_dbd->dbd_index = 0; + + /* P8: refresh DTX related metrics. */ + if (count > 0) { + D_ASSERTF(vcm_cont->vc_dtx_committed_count >= count, + "Unexpected committed DTX entries count for " DF_UUID ": %u vs %u\n", + DP_UUID(vcm_cont->vc_id), vcm_cont->vc_dtx_committed_count, count); + + vcm_cont->vc_dtx_committed_count -= count; + pool->vp_dtx_committed_count -= count; + } + +out: + DL_CDEBUG(rc == 0, DLOG_WARN, DLOG_ERR, rc, + "Reused committed DTX blob %p (" UMOFF_PF ") from cont " DF_UUID + " to cont " DF_UUID " under space pressure in pool " DF_UUID, + vcm_dbd, UMOFF_P(vcm_dbd_off), DP_UUID(vcm_cont->vc_id), DP_UUID(cont->vc_id), + DP_UUID(cont->vc_pool->vp_id)); + return rc; +} + +static int +vos_dtx_extend_cmt_table(struct vos_container *cont) +{ + struct umem_instance *umm = vos_cont2umm(cont); + struct vos_cont_df *cont_df = cont->vc_cont_df; + struct vos_dtx_blob_df *dbd = NULL; + struct vos_dtx_blob_df *tail; + umem_off_t dbd_off = UMOFF_NULL; + int rc; + + if (!DAOS_FAIL_CHECK(DAOS_DTX_NOSPACE_NOREFRESH)) + dbd_off = umem_zalloc(umm, DTX_CMT_BLOB_SIZE); + + if (UMOFF_IS_NULL(dbd_off)) + return vos_dtx_reuse_cmt_blob(cont); + + dbd = umem_off2ptr(umm, dbd_off); + tail = umem_off2ptr(umm, cont_df->cd_dtx_committed_tail); + if (tail == NULL) { + D_ASSERT(UMOFF_IS_NULL(cont_df->cd_dtx_committed_head)); + + /* cd_dtx_committed_tail is next to cd_dtx_committed_head */ + rc = vos_dtx_add_ptr(cont->vc_pool, &cont_df->cd_dtx_committed_head, + sizeof(cont_df->cd_dtx_committed_head) + + sizeof(cont_df->cd_dtx_committed_tail)); + if (rc != 0) + goto out; + + cont_df->cd_dtx_committed_head = dbd_off; + } else { + rc = vos_dtx_add_ptr(cont->vc_pool, &tail->dbd_next, sizeof(tail->dbd_next)); + if (rc != 0) + goto out; + + rc = vos_dtx_add_ptr(cont->vc_pool, &cont_df->cd_dtx_committed_tail, + sizeof(cont_df->cd_dtx_committed_tail)); + if (rc != 0) + goto out; + + tail->dbd_next = dbd_off; + dbd->dbd_prev = cont_df->cd_dtx_committed_tail; + } + cont_df->cd_dtx_committed_tail = dbd_off; + + dbd->dbd_magic = DTX_CMT_BLOB_MAGIC; + dbd->dbd_cap = (DTX_CMT_BLOB_SIZE - sizeof(struct vos_dtx_blob_df)) / + sizeof(struct vos_dtx_cmt_ent_df); + dbd->dbd_count = 0; + dbd->dbd_index = 0; + +out: + DL_CDEBUG(rc == 0, DB_IO, DLOG_ERR, rc, + "Allocated DTX committed blob %p (" UMOFF_PF ") for " DF_UUID "/" DF_UUID, dbd, + UMOFF_P(dbd_off), DP_UUID(cont->vc_pool->vp_id), DP_UUID(cont->vc_id)); + return rc; +} + static int vos_dtx_alloc(struct umem_instance *umm, struct dtx_handle *dth) { @@ -2209,19 +2523,17 @@ vos_dtx_commit_internal(struct vos_container *cont, struct dtx_id dtis[], int count, daos_epoch_t epoch, bool keep_act, bool rm_cos[], struct vos_dtx_act_ent **daes, struct vos_dtx_cmt_ent **dces) { - struct vos_cont_df *cont_df = cont->vc_cont_df; - struct umem_instance *umm = vos_cont2umm(cont); - struct vos_dtx_blob_df *dbd; - struct vos_dtx_blob_df *dbd_prev; - umem_off_t dbd_off; - uint64_t cmt_time = daos_wallclock_secs(); - int committed = 0; - int rc = 0; - int p = 0; - int i = 0; - int j; - int k; - bool allocated = false; + struct vos_dtx_blob_df *dbd; + struct vos_cont_df *cont_df = cont->vc_cont_df; + struct umem_instance *umm = vos_cont2umm(cont); + uint64_t cmt_time = daos_wallclock_secs(); + int committed = 0; + int rc = 0; + int p = 0; + int i = 0; + int j; + int k; + bool allocated = false; dbd = umem_off2ptr(umm, cont_df->cd_dtx_committed_tail); if (dbd == NULL) @@ -2263,7 +2575,8 @@ vos_dtx_commit_internal(struct vos_container *cont, struct dtx_id dtis[], goto out; /* Only need to add range for the first partial blob. */ - rc = umem_tx_add_ptr(umm, &dbd->dbd_count, sizeof(dbd->dbd_count)); + rc = + vos_dtx_add_ptr(cont->vc_pool, &dbd->dbd_count, sizeof(dbd->dbd_count)); if (rc != 0) goto out; } @@ -2283,52 +2596,17 @@ vos_dtx_commit_internal(struct vos_container *cont, struct dtx_id dtis[], goto out; new_blob: - dbd_prev = dbd; - /* Need new @dbd */ - dbd_off = umem_zalloc(umm, DTX_CMT_BLOB_SIZE); - if (UMOFF_IS_NULL(dbd_off)) { - D_ERROR("No space to store committed DTX %d "DF_DTI"\n", - count, DP_DTI(&dtis[i])); - D_GOTO(out, rc = -DER_NOSPACE); - } + if (unlikely(allocated)) + D_GOTO(out, rc = -DER_OVERFLOW); - dbd = umem_off2ptr(umm, dbd_off); - dbd->dbd_magic = DTX_CMT_BLOB_MAGIC; - dbd->dbd_cap = (DTX_CMT_BLOB_SIZE - sizeof(struct vos_dtx_blob_df)) / - sizeof(struct vos_dtx_cmt_ent_df); - dbd->dbd_prev = umem_ptr2off(umm, dbd_prev); - - if (dbd_prev == NULL) { - D_ASSERT(UMOFF_IS_NULL(cont_df->cd_dtx_committed_head)); - D_ASSERT(UMOFF_IS_NULL(cont_df->cd_dtx_committed_tail)); - - /* cd_dtx_committed_tail is next to cd_dtx_committed_head */ - rc = umem_tx_add_ptr(umm, &cont_df->cd_dtx_committed_head, - sizeof(cont_df->cd_dtx_committed_head) + - sizeof(cont_df->cd_dtx_committed_tail)); - if (rc != 0) - goto out; - - cont_df->cd_dtx_committed_head = dbd_off; - } else { - rc = umem_tx_add_ptr(umm, &dbd_prev->dbd_next, - sizeof(dbd_prev->dbd_next)); - if (rc != 0) - goto out; - - dbd_prev->dbd_next = dbd_off; + rc = vos_dtx_extend_cmt_table(cont); + if (rc != 0) + goto out; - rc = umem_tx_add_ptr(umm, &cont_df->cd_dtx_committed_tail, - sizeof(cont_df->cd_dtx_committed_tail)); - if (rc != 0) - goto out; - } + allocated = true; - D_DEBUG(DB_IO, "Allocated DTX committed blob %p ("UMOFF_PF") for cont "DF_UUID"\n", - dbd, UMOFF_P(dbd_off), DP_UUID(cont->vc_id)); + dbd = umem_off2ptr(umm, cont_df->cd_dtx_committed_tail); - cont_df->cd_dtx_committed_tail = dbd_off; - allocated = true; goto again; out: @@ -2582,6 +2860,18 @@ dtx_commit_pin(struct vos_container *cont, struct dtx_id dtis[], int count, int return rc; } +static inline int +vos_dtx_begin_with_behavior(struct umem_instance *umm) +{ + int rc; + + rc = umem_tx_begin(umm, NULL); + if (rc == 0) + umem_tx_set_failure_behavior(umm, TX_FAILURE_RETURN); + + return rc; +} + int vos_dtx_commit(daos_handle_t coh, struct dtx_id dtis[], int count, bool keep_act, bool rm_cos[]) { @@ -2616,7 +2906,7 @@ vos_dtx_commit(daos_handle_t coh, struct dtx_id dtis[], int count, bool keep_act count -= pinned; /* Commit multiple DTXs via single local transaction. */ - rc = umem_tx_begin(vos_cont2umm(cont), NULL); + rc = vos_dtx_begin_with_behavior(vos_cont2umm(cont)); if (rc == 0) { committed = vos_dtx_commit_internal(cont, &dtis[idx], pinned, 0, keep_act, rm_cos != NULL ? &rm_cos[idx] : NULL, @@ -2687,7 +2977,7 @@ vos_dtx_abort_internal(struct vos_container *cont, struct vos_dtx_act_ent *dae, goto out; umm = vos_cont2umm(cont); - rc = umem_tx_begin(umm, NULL); + rc = vos_dtx_begin_with_behavior(umm); if (rc != 0) goto out; @@ -2975,7 +3265,7 @@ vos_dtx_set_flags_one(struct vos_container *cont, struct dtx_id *dti, uint32_t f dae_df = umem_off2ptr(umm, dae->dae_df_off); D_ASSERT(dae_df != NULL); - rc = umem_tx_add_ptr(umm, &dae_df->dae_flags, sizeof(dae_df->dae_flags)); + rc = vos_dtx_add_ptr(cont->vc_pool, &dae_df->dae_flags, sizeof(dae_df->dae_flags)); if (rc == 0) { dae_df->dae_flags |= flags; DAE_FLAGS(dae) |= flags; @@ -3010,7 +3300,7 @@ vos_dtx_set_flags(daos_handle_t coh, struct dtx_id dtis[], int count, uint32_t f } umm = vos_cont2umm(cont); - rc = umem_tx_begin(umm, NULL); + rc = vos_dtx_begin_with_behavior(umm); if (rc != 0) goto out; @@ -3045,7 +3335,7 @@ dtx_blob_aggregate(struct umem_instance *umm, struct vos_tls *tls, struct vos_co goto out; } - rc = umem_tx_begin(umm, NULL); + rc = vos_dtx_begin_with_behavior(umm); if (unlikely(rc != 0)) { D_ERROR("Failed to TX begin for DTX aggregation " UMOFF_PF ": " DF_RC "\n", UMOFF_P(dbd_off), DP_RC(rc)); @@ -3080,7 +3370,7 @@ dtx_blob_aggregate(struct umem_instance *umm, struct vos_tls *tls, struct vos_co } if (epoch != cont_df->cd_newest_aggregated) { - rc = umem_tx_add_ptr(umm, &cont_df->cd_newest_aggregated, + rc = vos_dtx_add_ptr(cont->vc_pool, &cont_df->cd_newest_aggregated, sizeof(cont_df->cd_newest_aggregated)); if (unlikely(rc != 0)) { D_ERROR("Failed to refresh epoch for DTX aggregation " UMOFF_PF ": " DF_RC @@ -3094,7 +3384,7 @@ dtx_blob_aggregate(struct umem_instance *umm, struct vos_tls *tls, struct vos_co if (dbd->dbd_count - dtx_aggr_count > 0) { size_t buf_len; - rc = umem_tx_add_ptr(umm, &dbd->dbd_committed_data[0], + rc = vos_dtx_add_ptr(cont->vc_pool, &dbd->dbd_committed_data[0], sizeof(dbd->dbd_committed_data[0]) * dbd->dbd_count); if (unlikely(rc != 0)) { D_ERROR("Failed update committed DTX blob " UMOFF_PF ": " DF_RC "\n", @@ -3105,7 +3395,7 @@ dtx_blob_aggregate(struct umem_instance *umm, struct vos_tls *tls, struct vos_co memmove(&dbd->dbd_committed_data[0], &dbd->dbd_committed_data[dtx_aggr_count], buf_len); - rc = umem_tx_add_ptr(umm, &dbd->dbd_count, sizeof(dbd->dbd_count)); + rc = vos_dtx_add_ptr(cont->vc_pool, &dbd->dbd_count, sizeof(dbd->dbd_count)); if (unlikely(rc != 0)) { D_ERROR("Failed update committed DTX count " UMOFF_PF ": " DF_RC "\n", UMOFF_P(dbd_off), DP_RC(rc)); @@ -3122,7 +3412,7 @@ dtx_blob_aggregate(struct umem_instance *umm, struct vos_tls *tls, struct vos_co D_ASSERT(UMOFF_IS_NULL(dbd_prev_off)); D_ASSERT(dbd_off == cont_df->cd_dtx_committed_head); - rc = umem_tx_add_ptr(umm, &cont_df->cd_dtx_committed_head, + rc = vos_dtx_add_ptr(cont->vc_pool, &cont_df->cd_dtx_committed_head, sizeof(cont_df->cd_dtx_committed_head)); if (unlikely(rc != 0)) { D_ERROR("Failed to update head for DTX aggregation " UMOFF_PF ": " DF_RC @@ -3134,7 +3424,7 @@ dtx_blob_aggregate(struct umem_instance *umm, struct vos_tls *tls, struct vos_co if (dbd_next == NULL) { D_ASSERT(dbd_off == cont_df->cd_dtx_committed_tail); - rc = umem_tx_add_ptr(umm, &cont_df->cd_dtx_committed_tail, + rc = vos_dtx_add_ptr(cont->vc_pool, &cont_df->cd_dtx_committed_tail, sizeof(cont_df->cd_dtx_committed_tail)); if (unlikely(rc != 0)) { D_ERROR("Failed to update tail for DTX aggregation " UMOFF_PF @@ -3144,7 +3434,8 @@ dtx_blob_aggregate(struct umem_instance *umm, struct vos_tls *tls, struct vos_co } cont_df->cd_dtx_committed_tail = dbd_prev_off; } else { - rc = umem_tx_add_ptr(umm, &dbd_next->dbd_prev, sizeof(dbd_next->dbd_prev)); + rc = vos_dtx_add_ptr(cont->vc_pool, &dbd_next->dbd_prev, + sizeof(dbd_next->dbd_prev)); if (unlikely(rc != 0)) { D_ERROR("Failed to update previous DTXs blob for DTX " "aggregation " UMOFF_PF ": " DF_RC "\n", @@ -3278,32 +3569,12 @@ vos_dtx_stat(daos_handle_t coh, struct dtx_stat *stat, uint32_t flags) stat->dtx_newest_aggregated = cont_df->cd_newest_aggregated; if (!UMOFF_IS_NULL(cont_df->cd_dtx_committed_head)) { - struct umem_instance *umm = vos_cont2umm(cont); - struct vos_dtx_blob_df *dbd; - struct vos_dtx_cmt_ent_df *dce; - int i; - - dbd = umem_off2ptr(umm, cont_df->cd_dtx_committed_head); - - for (i = 0; i < dbd->dbd_count; i++) { - dce = &dbd->dbd_committed_data[i]; - - if (!daos_is_zero_dti(&dce->dce_xid) && - dce->dce_cmt_time != 0) { - stat->dtx_first_cmt_blob_time_up = dce->dce_cmt_time; - break; - } - } - - for (i = dbd->dbd_count - 1; i > 0; i--) { - dce = &dbd->dbd_committed_data[i]; + struct vos_dtx_blob_df *dbd = + umem_off2ptr(vos_cont2umm(cont), cont_df->cd_dtx_committed_head); - if (!daos_is_zero_dti(&dce->dce_xid) && - dce->dce_cmt_time != 0) { - stat->dtx_first_cmt_blob_time_lo = dce->dce_cmt_time; - break; - } - } + stat->dtx_first_cmt_blob_time_up = dbd->dbd_committed_data[0].dce_cmt_time; + stat->dtx_first_cmt_blob_time_lo = + dbd->dbd_committed_data[dbd->dbd_count - 1].dce_cmt_time; } } diff --git a/src/vos/vos_internal.h b/src/vos/vos_internal.h index 9265a2d8fdf..f6b2fc36975 100644 --- a/src/vos/vos_internal.h +++ b/src/vos/vos_internal.h @@ -330,6 +330,10 @@ struct vos_pool { d_list_t vp_gc_link; /** List of open containers with objects in gc pool */ d_list_t vp_gc_cont; + + /** List of open containers */ + d_list_t vp_cont_list; + /** address of durable-format pool in SCM */ struct vos_pool_df *vp_pool_df; /** Dummy data I/O context */ @@ -388,6 +392,10 @@ struct vos_container { d_list_t vc_dtx_unsorted_list; /* The list for the active DTX entries that are re-indexed when open the container. */ d_list_t vc_dtx_reindex_list; + + /* Link into vos_pool::vp_cont_list */ + d_list_t vc_pool_link; + /* The largest epoch difference for re-indexed DTX entries max/min pairs. */ uint64_t vc_dtx_reindex_eph_diff; /* The latest calculated local stable epoch. */ diff --git a/src/vos/vos_pool.c b/src/vos/vos_pool.c index 3d40c89684f..e856a84e004 100644 --- a/src/vos/vos_pool.c +++ b/src/vos/vos_pool.c @@ -1226,6 +1226,7 @@ pool_alloc(uuid_t uuid, struct vos_pool **pool_p) d_uhash_ulink_init(&pool->vp_hlink, &pool_uuid_hops); D_INIT_LIST_HEAD(&pool->vp_gc_link); D_INIT_LIST_HEAD(&pool->vp_gc_cont); + D_INIT_LIST_HEAD(&pool->vp_cont_list); uuid_copy(pool->vp_id, uuid); *pool_p = pool;