diff --git a/src/container/srv_target.c b/src/container/srv_target.c index 064b6a271b5..b2b9e28f230 100644 --- a/src/container/srv_target.c +++ b/src/container/srv_target.c @@ -210,7 +210,7 @@ cont_aggregate_runnable(struct ds_cont_child *cont, struct sched_request *req, if (ds_pool_is_rebuilding(pool) && !vos_agg) { D_DEBUG(DB_EPC, DF_CONT ": skip EC aggregation during rebuild %d, %d.\n", DP_CONT(cont->sc_pool->spc_uuid, cont->sc_uuid), - atomic_load(&pool->sp_rebuilding), pool->sp_rebuild_scan); + atomic_load(&pool->sp_rebuilding), atomic_load(&pool->sp_rebuild_enum)); return false; } @@ -319,8 +319,7 @@ cont_child_aggregate(struct ds_cont_child *cont, cont_aggregate_cb_t agg_cb, daos_epoch_t epoch_max, epoch_min; daos_epoch_range_t epoch_range; struct sched_request *req = cont2req(cont, param->ap_vos_agg); - uint64_t hlc = d_hlc_get(); - uint64_t change_hlc; + uint64_t hlc = d_hlc_get(); uint64_t interval; uint64_t snapshots_local[MAX_SNAPSHOT_LOCAL] = { 0 }; uint64_t *snapshots = NULL; @@ -329,16 +328,14 @@ cont_child_aggregate(struct ds_cont_child *cont, cont_aggregate_cb_t agg_cb, uint32_t flags = 0; int i, rc = 0; - change_hlc = max(cont->sc_snapshot_delete_hlc, - cont->sc_pool->spc_rebuild_end_hlc); - if (param->ap_full_scan_hlc < change_hlc) { - /* Snapshot has been deleted or rebuild happens since the last + if (param->ap_full_scan_hlc < cont->sc_snapshot_delete_hlc) { + /* Snapshot has been deleted since the last * aggregation, let's restart from 0. */ epoch_min = 0; flags |= VOS_AGG_FL_FORCE_SCAN; - D_DEBUG(DB_EPC, "change hlc "DF_X64" > full "DF_X64"\n", - change_hlc, param->ap_full_scan_hlc); + D_DEBUG(DB_EPC, "snapshot del hlc " DF_X64 " > full " DF_X64 "\n", + cont->sc_snapshot_delete_hlc, param->ap_full_scan_hlc); } else { epoch_min = get_hae(cont, param->ap_vos_agg); } @@ -378,41 +375,18 @@ cont_child_aggregate(struct ds_cont_child *cont, cont_aggregate_cb_t agg_cb, D_DEBUG(DB_EPC, "hlc "DF_X64" epoch "DF_X64"/"DF_X64" agg max "DF_X64"\n", hlc, epoch_max, epoch_min, cont->sc_aggregation_max); - if (cont->sc_snapshots_nr + 1 < MAX_SNAPSHOT_LOCAL) { + snapshots_nr = cont->sc_snapshots_nr; + if (snapshots_nr < MAX_SNAPSHOT_LOCAL) { snapshots = snapshots_local; } else { - D_ALLOC(snapshots, (cont->sc_snapshots_nr + 1) * - sizeof(daos_epoch_t)); + D_ALLOC(snapshots, snapshots_nr * sizeof(daos_epoch_t)); if (snapshots == NULL) return -DER_NOMEM; } - if (cont->sc_pool->spc_rebuild_fence != 0) { - uint64_t rebuild_fence = cont->sc_pool->spc_rebuild_fence; - int j; - int insert_idx; - - /* insert rebuild_fetch into the snapshot list */ - D_DEBUG(DB_EPC, "rebuild fence "DF_X64"\n", rebuild_fence); - for (j = 0, insert_idx = 0; j < cont->sc_snapshots_nr; j++) { - if (cont->sc_snapshots[j] < rebuild_fence) { - snapshots[j] = cont->sc_snapshots[j]; - insert_idx++; - } else { - snapshots[j + 1] = cont->sc_snapshots[j]; - } - } - snapshots[insert_idx] = rebuild_fence; - snapshots_nr = cont->sc_snapshots_nr + 1; - } else { - /* Since sc_snapshots might be freed by other ULT, let's - * always copy here. - */ - snapshots_nr = cont->sc_snapshots_nr; - if (snapshots_nr > 0) - memcpy(snapshots, cont->sc_snapshots, - snapshots_nr * sizeof(daos_epoch_t)); - } + /* Since sc_snapshots might be freed by other ULT, let's always copy here. */ + if (snapshots_nr > 0) + memcpy(snapshots, cont->sc_snapshots, snapshots_nr * sizeof(daos_epoch_t)); /* Find highest snapshot less than last aggregated epoch. */ for (i = 0; i < snapshots_nr && snapshots[i] < epoch_min; ++i) diff --git a/src/include/daos_srv/container.h b/src/include/daos_srv/container.h index 0480e6603b6..f7c84d807e1 100644 --- a/src/include/daos_srv/container.h +++ b/src/include/daos_srv/container.h @@ -129,7 +129,7 @@ struct ds_cont_child { * VOS aggregation will use this boundary. We will optimize it later. */ uint64_t sc_ec_agg_eph_boundary; - /* The current EC aggregate epoch for this xstream */ + /* The local EC aggregation epoch for this xstream */ uint64_t sc_ec_agg_eph; /* Used by ds_cont_eph_report() to query the minimum ec_agg_eph and stable_eph * from all local VOS. @@ -160,7 +160,7 @@ struct ds_cont_child { struct agg_param { void *ap_data; struct ds_cont_child *ap_cont; - daos_epoch_t ap_full_scan_hlc; + daos_epoch_t ap_full_scan_hlc; bool ap_vos_agg; }; diff --git a/src/include/daos_srv/pool.h b/src/include/daos_srv/pool.h index 3fbaae93810..cb1fe8d1eb5 100644 --- a/src/include/daos_srv/pool.h +++ b/src/include/daos_srv/pool.h @@ -94,13 +94,10 @@ struct ds_pool { * rebuild job. */ uint32_t sp_rebuild_gen; - ATOMIC int sp_rebuilding; ATOMIC int sp_discarding; - /** - * someone has already messaged this pool to for rebuild scan, - * NB: all xstreams can do lockless-write on it but it's OK - */ - int sp_rebuild_scan; + ATOMIC int sp_rebuilding; + /* someone has already messaged this pool to for rebuild object/key enumeration */ + ATOMIC int sp_rebuild_enum; int sp_discard_status; /** path to ephemeral metrics */ @@ -178,16 +175,6 @@ struct ds_pool_child { d_list_t spc_cont_list; d_list_t spc_srv_cont_hdl; /* Single server cont handle */ - /* The current maxim rebuild epoch, (0 if there is no rebuild), so - * vos aggregation can not cross this epoch during rebuild to avoid - * interfering rebuild process. - */ - uint64_t spc_rebuild_fence; - - /* The HLC when current rebuild ends, which will be used to compare - * with the aggregation full scan start HLC to know whether the - * aggregation needs to be restarted from 0. */ - uint64_t spc_rebuild_end_hlc; uint32_t spc_map_version; int spc_ref; ABT_eventual spc_ref_eventual; @@ -219,7 +206,7 @@ struct ds_pool_svc_op_val { static inline bool ds_pool_is_rebuilding(struct ds_pool *pool) { - return (atomic_load(&pool->sp_rebuilding) > 0 || pool->sp_rebuild_scan > 0); + return (atomic_load(&pool->sp_rebuilding) > 0 || atomic_load(&pool->sp_rebuild_enum) > 0); } /* encode metadata RPC operation key: HLC time first, in network order, for keys sorted by time. diff --git a/src/object/srv_obj.c b/src/object/srv_obj.c index 29083dd02da..bd53e95995c 100644 --- a/src/object/srv_obj.c +++ b/src/object/srv_obj.c @@ -3439,9 +3439,8 @@ obj_local_enum(struct obj_io_context *ioc, crt_rpc_t *rpc, if (oei->oei_flags & ORF_FOR_MIGRATION) { /* just in case ds_pool::sp_rebuilding is not set, pause my local EC aggregation * by setting this flag. - * NB: it's a lockess write to shared data structure and it's harmless. */ - ioc->ioc_coc->sc_pool->spc_pool->sp_rebuild_scan = 1; + atomic_store(&ioc->ioc_coc->sc_pool->spc_pool->sp_rebuild_enum, 1); flags = DTX_FOR_MIGRATION; } diff --git a/src/rebuild/rebuild_internal.h b/src/rebuild/rebuild_internal.h index d593eead83a..6bcc3f0ab37 100644 --- a/src/rebuild/rebuild_internal.h +++ b/src/rebuild/rebuild_internal.h @@ -77,12 +77,7 @@ struct rebuild_tgt_pool_tracker { uint64_t rt_stable_epoch; /* Only used by reclaim job to discard those half-rebuild data */ - uint64_t rt_reclaim_epoch; - /* local rebuild epoch mainly to constrain the VOS aggregation - * to make sure aggregation will not cross the epoch - */ - uint64_t rt_rebuild_fence; - + uint64_t rt_reclaim_epoch; uint32_t rt_leader_rank; /* Global dtx resync version */ @@ -373,7 +368,8 @@ void rebuild_tgt_status_check_ult(void *arg); int -rebuild_tgt_prepare(crt_rpc_t *rpc, struct rebuild_tgt_pool_tracker **p_rpt); +rebuild_tgt_prepare(struct ds_pool *pool, struct rebuild_scan_in *rsi, + struct rebuild_tgt_pool_tracker **p_rpt); bool rebuild_status_match(struct rebuild_tgt_pool_tracker *rpt, diff --git a/src/rebuild/scan.c b/src/rebuild/scan.c index 96f568c0af4..f989e5102cb 100644 --- a/src/rebuild/scan.c +++ b/src/rebuild/scan.c @@ -1254,6 +1254,8 @@ rebuild_tgt_scan_handler(crt_rpc_t *rpc) struct rebuild_scan_out *rout; struct rebuild_pool_tls *tls = NULL; struct rebuild_tgt_pool_tracker *rpt = NULL; + struct ds_pool *pool = NULL; + bool checker = false; int rc; rsi = crt_req_get(rpc); @@ -1261,6 +1263,13 @@ rebuild_tgt_scan_handler(crt_rpc_t *rpc) D_INFO(DF_RB "\n", DP_RB_RSI(rsi)); + rc = ds_pool_lookup(rsi->rsi_pool_uuid, &pool); + if (rc) { + DL_ERROR(rc, DF_RB " cannot find pool", DP_RB_RSI(rsi)); + D_GOTO(out_put, rc); + } + atomic_fetch_add(&pool->sp_rebuilding, 1); + /* If PS leader has been changed, and rebuild version is also increased * due to adding new failure targets for rebuild, let's abort previous * rebuild. @@ -1290,7 +1299,7 @@ rebuild_tgt_scan_handler(crt_rpc_t *rpc) if (rpt != NULL && rpt->rt_rebuild_op == rsi->rsi_rebuild_op) { if (rpt->rt_global_done) { D_WARN("previous not cleaned up yet " DF_RBF "\n", DP_RBF_RPT(rpt)); - D_GOTO(out, rc = -DER_BUSY); + D_GOTO(out_put, rc = -DER_BUSY); } /* Rebuild should never skip the version */ @@ -1323,7 +1332,7 @@ rebuild_tgt_scan_handler(crt_rpc_t *rpc) * an old or same leader. */ if (rsi->rsi_leader_term <= rpt->rt_leader_term) - D_GOTO(out, rc = 0); + D_GOTO(out_put, rc = 0); if (rpt->rt_leader_rank != rsi->rsi_master_rank) { D_DEBUG(DB_REBUILD, "new leader existing " DF_RBF "-> req " DF_RBF "\n", @@ -1340,7 +1349,7 @@ rebuild_tgt_scan_handler(crt_rpc_t *rpc) rpt->rt_leader_term = rsi->rsi_leader_term; - D_GOTO(out, rc = 0); + D_GOTO(out_put, rc = 0); } else if (rpt != NULL) { rpt_put(rpt); rpt = NULL; @@ -1351,43 +1360,48 @@ rebuild_tgt_scan_handler(crt_rpc_t *rpc) rsi->rsi_rebuild_gen); if (tls != NULL) { D_WARN("previous not cleaned up yet " DF_RBF, DP_RBF_RSI(rsi)); - D_GOTO(out, rc = -DER_BUSY); + D_GOTO(out_delete, rc = -DER_BUSY); } if (daos_fail_check(DAOS_REBUILD_TGT_START_FAIL)) - D_GOTO(out, rc = -DER_INVAL); + D_GOTO(out_delete, rc = -DER_INVAL); - rc = rebuild_tgt_prepare(rpc, &rpt); + rc = rebuild_tgt_prepare(pool, rsi, &rpt); if (rc) - D_GOTO(out, rc); + D_GOTO(out_delete, rc); rpt_get(rpt); rc = dss_ult_create(rebuild_tgt_status_check_ult, rpt, DSS_XS_SELF, 0, DSS_DEEP_STACK_SZ, NULL); if (rc) { rpt_put(rpt); - D_GOTO(out, rc); + D_GOTO(out_delete, rc); } - - atomic_fetch_add(&rpt->rt_pool->sp_rebuilding, 1); /* reset in rebuild_tgt_fini */ + checker = true; rpt_get(rpt); /* step-3: start scan leader */ rc = dss_ult_create(rebuild_scan_leader, rpt, DSS_XS_SELF, 0, 0, NULL); if (rc != 0) { rpt_put(rpt); - D_GOTO(out, rc); + D_GOTO(out_delete, rc); } -out: - if (tls && tls->rebuild_pool_status == 0 && rc != 0) - tls->rebuild_pool_status = rc; - - if (rpt) { - if (rc) - rpt_delete(rpt); +out_delete: + if (rpt && !checker) + rpt_delete(rpt); +out_put: + if (rpt) rpt_put(rpt); + + if (pool) { + if (!checker) + atomic_fetch_sub(&pool->sp_rebuilding, 1); + ds_pool_put(pool); } + if (rc != 0 && tls && tls->rebuild_pool_status == 0) + tls->rebuild_pool_status = rc; + rout = crt_reply_get(rpc); rout->rso_status = rc; rout->rso_stable_epoch = d_hlc_get(); diff --git a/src/rebuild/srv.c b/src/rebuild/srv.c index b47504ca30b..61eb444911d 100644 --- a/src/rebuild/srv.c +++ b/src/rebuild/srv.c @@ -375,6 +375,8 @@ static void rpt_insert(struct rebuild_tgt_pool_tracker *rpt) { D_ASSERT(dss_get_module_info()->dmi_xs_id == 0); + + rpt_get(rpt); ABT_rwlock_wrlock(rebuild_gst.rg_ttl_rwlock); d_list_add(&rpt->rt_list, &rebuild_gst.rg_tgt_tracker_list); ABT_rwlock_unlock(rebuild_gst.rg_ttl_rwlock); @@ -384,9 +386,13 @@ void rpt_delete(struct rebuild_tgt_pool_tracker *rpt) { D_ASSERT(dss_get_module_info()->dmi_xs_id == 0); + D_ASSERT(!d_list_empty(&rpt->rt_list)); + ABT_rwlock_wrlock(rebuild_gst.rg_ttl_rwlock); d_list_del_init(&rpt->rt_list); ABT_rwlock_unlock(rebuild_gst.rg_ttl_rwlock); + + rpt_put(rpt); } struct rebuild_tgt_pool_tracker * @@ -1546,7 +1552,7 @@ rpt_put(struct rebuild_tgt_pool_tracker *rpt) D_ASSERT(rpt->rt_refcount >= 0); D_DEBUG(DB_REBUILD, DF_RB ": rpt %p ref %d finishing %d\n", DP_RB_RPT(rpt), rpt, rpt->rt_refcount, rpt->rt_finishing); - if (rpt->rt_refcount == 1 && rpt->rt_finishing) + if (rpt->rt_finishing) ABT_cond_signal(rpt->rt_fini_cond); zombie = (rpt->rt_refcount == 0); ABT_mutex_unlock(rpt->rt_lock); @@ -2870,22 +2876,8 @@ rebuild_fini_one(void *arg) if (dpc == NULL) return 0; - /* Reset rebuild epoch, then reset the aggregation epoch, so - * it can aggregate the rebuild epoch. - */ - D_ASSERT(rpt->rt_rebuild_fence != 0); - if (rpt->rt_rebuild_fence == dpc->spc_rebuild_fence) { - dpc->spc_rebuild_fence = 0; - dpc->spc_rebuild_end_hlc = d_hlc_get(); - D_DEBUG(DB_REBUILD, DF_RB ": Reset aggregation end hlc " DF_U64 "\n", - DP_RB_RPT(rpt), dpc->spc_rebuild_end_hlc); - } else { - D_DEBUG(DB_REBUILD, - DF_RB ": pool is still being rebuilt rt_rebuild_fence " DF_U64 - " spc_rebuild_fence " DF_U64 "\n", - DP_RB_RPT(rpt), rpt->rt_rebuild_fence, dpc->spc_rebuild_fence); - } - + D_DEBUG(DB_REBUILD, DF_RB ": rebuild fini for stable epoch " DF_U64 "\n", DP_RB_RPT(rpt), + rpt->rt_stable_epoch); ds_pool_child_put(dpc); return 0; } @@ -2901,21 +2893,21 @@ rebuild_tgt_fini(struct rebuild_tgt_pool_tracker *rpt) D_ASSERT(atomic_load(&rpt->rt_pool->sp_rebuilding) > 0); atomic_fetch_sub(&rpt->rt_pool->sp_rebuilding, 1); - rpt->rt_pool->sp_rebuild_scan = 0; + + atomic_store(&rpt->rt_pool->sp_rebuild_enum, 0); ABT_mutex_lock(rpt->rt_lock); ABT_cond_signal(rpt->rt_global_dtx_wait_cond); D_ASSERT(rpt->rt_refcount > 0); rpt->rt_finishing = 1; /* Wait until all ult/tasks finish and release the rpt. - * NB: Because rebuild_tgt_fini will be only called in - * rebuild_tgt_status_check_ult, which will make sure when - * rt_refcount reaches to 1, either all rebuild is done or - * all ult/task has been aborted by rt_abort, i.e. no new - * ULT/task will be created after this check. So it is safe - * to destroy the rpt after this. + * NB: Because rebuild_tgt_fini will be only called in rebuild_tgt_status_check_ult, + * which will make sure when rt_refcount reaches to 2 (one by check ULT, the other by + * track list), either all rebuild is done or all ult/task has been aborted by rt_abort, + * i.e. no new ULT/task will be created after this check. So it is safe to destroy + * the rpt after this. */ - if (rpt->rt_refcount > 1) + while (rpt->rt_refcount > 2) ABT_cond_wait(rpt->rt_fini_cond, rpt->rt_lock); ABT_mutex_unlock(rpt->rt_lock); @@ -2934,7 +2926,6 @@ rebuild_tgt_fini(struct rebuild_tgt_pool_tracker *rpt) /* No one should access rpt after rebuild_fini_one. */ D_INFO(DF_RB " Finalized rebuild\n", DP_RB_RPT(rpt)); rpt_delete(rpt); - rpt_put(rpt); } void @@ -3093,8 +3084,8 @@ rebuild_tgt_status_check_ult(void *arg) sched_req_put(rpt->rt_ult); rpt->rt_ult = NULL; out: - rpt_put(rpt); rebuild_tgt_fini(rpt); + rpt_put(rpt); } /** @@ -3128,13 +3119,8 @@ rebuild_prepare_one(void *data) D_ASSERT(dss_get_module_info()->dmi_xs_id != 0); - /* Set the rebuild epoch per VOS container, so VOS aggregation will not - * cross the epoch to cause problem. - */ - D_ASSERT(rpt->rt_rebuild_fence != 0); - dpc->spc_rebuild_fence = rpt->rt_rebuild_fence; - D_DEBUG(DB_REBUILD, DF_RB " open local container " DF_UUID " rebuild eph " DF_X64 "\n", - DP_RB_RPT(rpt), DP_UUID(rpt->rt_coh_uuid), rpt->rt_rebuild_fence); + D_DEBUG(DB_REBUILD, DF_RB " open local container " DF_UUID " stable eph " DF_X64 "\n", + DP_RB_RPT(rpt), DP_UUID(rpt->rt_coh_uuid), rpt->rt_stable_epoch); put: ds_pool_child_put(dpc); @@ -3196,10 +3182,9 @@ rpt_create(struct ds_pool *pool, uint32_t master_rank, uint32_t pm_ver, * each target get the scan rpc from the master. */ int -rebuild_tgt_prepare(crt_rpc_t *rpc, struct rebuild_tgt_pool_tracker **p_rpt) +rebuild_tgt_prepare(struct ds_pool *pool, struct rebuild_scan_in *rsi, + struct rebuild_tgt_pool_tracker **p_rpt) { - struct rebuild_scan_in *rsi = crt_req_get(rpc); - struct ds_pool *pool; struct rebuild_tgt_pool_tracker *rpt = NULL; struct rebuild_pool_tls *pool_tls; daos_prop_t prop = { 0 }; @@ -3209,12 +3194,6 @@ rebuild_tgt_prepare(crt_rpc_t *rpc, struct rebuild_tgt_pool_tracker **p_rpt) D_DEBUG(DB_REBUILD, DF_RB " prepare rebuild\n", DP_RB_RSI(rsi)); - rc = ds_pool_lookup(rsi->rsi_pool_uuid, &pool); - if (rc) { - DL_ERROR(rc, DF_RB " cannot find pool", DP_RB_RSI(rsi)); - return rc; - } - if (ds_pool_get_version(pool) < rsi->rsi_rebuild_ver) { D_INFO(DF_RB " map %u < rsi_rebuild_ver %u\n", DP_RB_RSI(rsi), ds_pool_get_version(pool), rsi->rsi_rebuild_ver); @@ -3249,7 +3228,6 @@ rebuild_tgt_prepare(crt_rpc_t *rpc, struct rebuild_tgt_pool_tracker **p_rpt) /* Let's add the rpt to the tracker list before IV fetch, which might yield, * to make sure the new coming request can find the rpt in the list. */ - rpt_get(rpt); rpt_insert(rpt); rc = ds_pool_iv_srv_hdl_fetch(pool, &rpt->rt_poh_uuid, &rpt->rt_coh_uuid); @@ -3276,31 +3254,24 @@ rebuild_tgt_prepare(crt_rpc_t *rpc, struct rebuild_tgt_pool_tracker **p_rpt) if (pool_tls == NULL) D_GOTO(out, rc = -DER_NOMEM); - rpt->rt_rebuild_fence = d_hlc_get(); - rc = ds_pool_task_collective(rpt->rt_pool_uuid, - PO_COMP_ST_NEW | PO_COMP_ST_DOWN | PO_COMP_ST_DOWNOUT, - rebuild_prepare_one, rpt, 0); + rc = ds_pool_task_collective(rpt->rt_pool_uuid, + PO_COMP_ST_NEW | PO_COMP_ST_DOWN | PO_COMP_ST_DOWNOUT, + rebuild_prepare_one, rpt, 0); if (rc) { - rpt->rt_rebuild_fence = 0; rebuild_pool_tls_destroy(pool_tls); D_GOTO(out, rc); } ABT_mutex_lock(rpt->rt_lock); + ds_pool_get(pool); rpt->rt_pool = pool; /* pin it */ ABT_mutex_unlock(rpt->rt_lock); *p_rpt = rpt; out: - if (rc) { - if (rpt) { - if (!d_list_empty(&rpt->rt_list)) { - rpt_delete(rpt); - rpt_put(rpt); - } - rpt_put(rpt); - } - ds_pool_put(pool); + if (rc && rpt) { + rpt_delete(rpt); + rpt_put(rpt); } daos_prop_fini(&prop);