Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 12 additions & 38 deletions src/container/srv_target.c
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ cont_aggregate_runnable(struct ds_cont_child *cont, struct sched_request *req,
if (ds_pool_is_rebuilding(pool) && !vos_agg) {
D_DEBUG(DB_EPC, DF_CONT ": skip EC aggregation during rebuild %d, %d.\n",
DP_CONT(cont->sc_pool->spc_uuid, cont->sc_uuid),
atomic_load(&pool->sp_rebuilding), pool->sp_rebuild_scan);
atomic_load(&pool->sp_rebuilding), atomic_load(&pool->sp_rebuild_enum));
return false;
}

Expand Down Expand Up @@ -319,8 +319,7 @@ cont_child_aggregate(struct ds_cont_child *cont, cont_aggregate_cb_t agg_cb,
daos_epoch_t epoch_max, epoch_min;
daos_epoch_range_t epoch_range;
struct sched_request *req = cont2req(cont, param->ap_vos_agg);
uint64_t hlc = d_hlc_get();
uint64_t change_hlc;
uint64_t hlc = d_hlc_get();
uint64_t interval;
uint64_t snapshots_local[MAX_SNAPSHOT_LOCAL] = { 0 };
uint64_t *snapshots = NULL;
Expand All @@ -329,16 +328,14 @@ cont_child_aggregate(struct ds_cont_child *cont, cont_aggregate_cb_t agg_cb,
uint32_t flags = 0;
int i, rc = 0;

change_hlc = max(cont->sc_snapshot_delete_hlc,
cont->sc_pool->spc_rebuild_end_hlc);
if (param->ap_full_scan_hlc < change_hlc) {
/* Snapshot has been deleted or rebuild happens since the last
if (param->ap_full_scan_hlc < cont->sc_snapshot_delete_hlc) {
/* Snapshot has been deleted since the last
* aggregation, let's restart from 0.
*/
epoch_min = 0;
flags |= VOS_AGG_FL_FORCE_SCAN;
D_DEBUG(DB_EPC, "change hlc "DF_X64" > full "DF_X64"\n",
change_hlc, param->ap_full_scan_hlc);
D_DEBUG(DB_EPC, "snapshot del hlc " DF_X64 " > full " DF_X64 "\n",
cont->sc_snapshot_delete_hlc, param->ap_full_scan_hlc);
} else {
epoch_min = get_hae(cont, param->ap_vos_agg);
}
Expand Down Expand Up @@ -378,41 +375,18 @@ cont_child_aggregate(struct ds_cont_child *cont, cont_aggregate_cb_t agg_cb,
D_DEBUG(DB_EPC, "hlc "DF_X64" epoch "DF_X64"/"DF_X64" agg max "DF_X64"\n",
hlc, epoch_max, epoch_min, cont->sc_aggregation_max);

if (cont->sc_snapshots_nr + 1 < MAX_SNAPSHOT_LOCAL) {
snapshots_nr = cont->sc_snapshots_nr;
if (snapshots_nr < MAX_SNAPSHOT_LOCAL) {
snapshots = snapshots_local;
} else {
D_ALLOC(snapshots, (cont->sc_snapshots_nr + 1) *
sizeof(daos_epoch_t));
D_ALLOC(snapshots, snapshots_nr * sizeof(daos_epoch_t));
if (snapshots == NULL)
return -DER_NOMEM;
}

if (cont->sc_pool->spc_rebuild_fence != 0) {
uint64_t rebuild_fence = cont->sc_pool->spc_rebuild_fence;
int j;
int insert_idx;

/* insert rebuild_fetch into the snapshot list */
D_DEBUG(DB_EPC, "rebuild fence "DF_X64"\n", rebuild_fence);
for (j = 0, insert_idx = 0; j < cont->sc_snapshots_nr; j++) {
if (cont->sc_snapshots[j] < rebuild_fence) {
snapshots[j] = cont->sc_snapshots[j];
insert_idx++;
} else {
snapshots[j + 1] = cont->sc_snapshots[j];
}
}
snapshots[insert_idx] = rebuild_fence;
snapshots_nr = cont->sc_snapshots_nr + 1;
} else {
/* Since sc_snapshots might be freed by other ULT, let's
* always copy here.
*/
snapshots_nr = cont->sc_snapshots_nr;
if (snapshots_nr > 0)
memcpy(snapshots, cont->sc_snapshots,
snapshots_nr * sizeof(daos_epoch_t));
}
/* Since sc_snapshots might be freed by other ULT, let's always copy here. */
if (snapshots_nr > 0)
memcpy(snapshots, cont->sc_snapshots, snapshots_nr * sizeof(daos_epoch_t));

/* Find highest snapshot less than last aggregated epoch. */
for (i = 0; i < snapshots_nr && snapshots[i] < epoch_min; ++i)
Expand Down
4 changes: 2 additions & 2 deletions src/include/daos_srv/container.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ struct ds_cont_child {
* VOS aggregation will use this boundary. We will optimize it later.
*/
uint64_t sc_ec_agg_eph_boundary;
/* The current EC aggregate epoch for this xstream */
/* The local EC aggregation epoch for this xstream */
uint64_t sc_ec_agg_eph;
/* Used by ds_cont_eph_report() to query the minimum ec_agg_eph and stable_eph
* from all local VOS.
Expand Down Expand Up @@ -160,7 +160,7 @@ struct ds_cont_child {
struct agg_param {
void *ap_data;
struct ds_cont_child *ap_cont;
daos_epoch_t ap_full_scan_hlc;
daos_epoch_t ap_full_scan_hlc;
bool ap_vos_agg;
};

Expand Down
21 changes: 4 additions & 17 deletions src/include/daos_srv/pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,10 @@ struct ds_pool {
* rebuild job.
*/
uint32_t sp_rebuild_gen;
ATOMIC int sp_rebuilding;
ATOMIC int sp_discarding;
/**
* someone has already messaged this pool to for rebuild scan,
* NB: all xstreams can do lockless-write on it but it's OK
*/
int sp_rebuild_scan;
ATOMIC int sp_rebuilding;
/* someone has already messaged this pool to for rebuild object/key enumeration */
ATOMIC int sp_rebuild_enum;

int sp_discard_status;
/** path to ephemeral metrics */
Expand Down Expand Up @@ -178,16 +175,6 @@ struct ds_pool_child {
d_list_t spc_cont_list;
d_list_t spc_srv_cont_hdl; /* Single server cont handle */

/* The current maxim rebuild epoch, (0 if there is no rebuild), so
* vos aggregation can not cross this epoch during rebuild to avoid
* interfering rebuild process.
*/
uint64_t spc_rebuild_fence;

/* The HLC when current rebuild ends, which will be used to compare
* with the aggregation full scan start HLC to know whether the
* aggregation needs to be restarted from 0. */
uint64_t spc_rebuild_end_hlc;
uint32_t spc_map_version;
int spc_ref;
ABT_eventual spc_ref_eventual;
Expand Down Expand Up @@ -219,7 +206,7 @@ struct ds_pool_svc_op_val {
static inline bool
ds_pool_is_rebuilding(struct ds_pool *pool)
{
return (atomic_load(&pool->sp_rebuilding) > 0 || pool->sp_rebuild_scan > 0);
return (atomic_load(&pool->sp_rebuilding) > 0 || atomic_load(&pool->sp_rebuild_enum) > 0);
}

/* encode metadata RPC operation key: HLC time first, in network order, for keys sorted by time.
Expand Down
3 changes: 1 addition & 2 deletions src/object/srv_obj.c
Original file line number Diff line number Diff line change
Expand Up @@ -3439,9 +3439,8 @@ obj_local_enum(struct obj_io_context *ioc, crt_rpc_t *rpc,
if (oei->oei_flags & ORF_FOR_MIGRATION) {
/* just in case ds_pool::sp_rebuilding is not set, pause my local EC aggregation
* by setting this flag.
* NB: it's a lockess write to shared data structure and it's harmless.
*/
ioc->ioc_coc->sc_pool->spc_pool->sp_rebuild_scan = 1;
atomic_store(&ioc->ioc_coc->sc_pool->spc_pool->sp_rebuild_enum, 1);
flags = DTX_FOR_MIGRATION;
}

Expand Down
10 changes: 3 additions & 7 deletions src/rebuild/rebuild_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,7 @@ struct rebuild_tgt_pool_tracker {
uint64_t rt_stable_epoch;

/* Only used by reclaim job to discard those half-rebuild data */
uint64_t rt_reclaim_epoch;
/* local rebuild epoch mainly to constrain the VOS aggregation
* to make sure aggregation will not cross the epoch
*/
uint64_t rt_rebuild_fence;

uint64_t rt_reclaim_epoch;
uint32_t rt_leader_rank;

/* Global dtx resync version */
Expand Down Expand Up @@ -373,7 +368,8 @@ void
rebuild_tgt_status_check_ult(void *arg);

int
rebuild_tgt_prepare(crt_rpc_t *rpc, struct rebuild_tgt_pool_tracker **p_rpt);
rebuild_tgt_prepare(struct ds_pool *pool, struct rebuild_scan_in *rsi,
struct rebuild_tgt_pool_tracker **p_rpt);

bool
rebuild_status_match(struct rebuild_tgt_pool_tracker *rpt,
Expand Down
50 changes: 32 additions & 18 deletions src/rebuild/scan.c
Original file line number Diff line number Diff line change
Expand Up @@ -1254,13 +1254,22 @@ rebuild_tgt_scan_handler(crt_rpc_t *rpc)
struct rebuild_scan_out *rout;
struct rebuild_pool_tls *tls = NULL;
struct rebuild_tgt_pool_tracker *rpt = NULL;
struct ds_pool *pool = NULL;
bool checker = false;
int rc;

rsi = crt_req_get(rpc);
D_ASSERT(rsi != NULL);

D_INFO(DF_RB "\n", DP_RB_RSI(rsi));

rc = ds_pool_lookup(rsi->rsi_pool_uuid, &pool);
if (rc) {
DL_ERROR(rc, DF_RB " cannot find pool", DP_RB_RSI(rsi));
D_GOTO(out_put, rc);
}
atomic_fetch_add(&pool->sp_rebuilding, 1);

/* If PS leader has been changed, and rebuild version is also increased
* due to adding new failure targets for rebuild, let's abort previous
* rebuild.
Expand Down Expand Up @@ -1290,7 +1299,7 @@ rebuild_tgt_scan_handler(crt_rpc_t *rpc)
if (rpt != NULL && rpt->rt_rebuild_op == rsi->rsi_rebuild_op) {
if (rpt->rt_global_done) {
D_WARN("previous not cleaned up yet " DF_RBF "\n", DP_RBF_RPT(rpt));
D_GOTO(out, rc = -DER_BUSY);
D_GOTO(out_put, rc = -DER_BUSY);
}

/* Rebuild should never skip the version */
Expand Down Expand Up @@ -1323,7 +1332,7 @@ rebuild_tgt_scan_handler(crt_rpc_t *rpc)
* an old or same leader.
*/
if (rsi->rsi_leader_term <= rpt->rt_leader_term)
D_GOTO(out, rc = 0);
D_GOTO(out_put, rc = 0);

if (rpt->rt_leader_rank != rsi->rsi_master_rank) {
D_DEBUG(DB_REBUILD, "new leader existing " DF_RBF "-> req " DF_RBF "\n",
Expand All @@ -1340,7 +1349,7 @@ rebuild_tgt_scan_handler(crt_rpc_t *rpc)

rpt->rt_leader_term = rsi->rsi_leader_term;

D_GOTO(out, rc = 0);
D_GOTO(out_put, rc = 0);
} else if (rpt != NULL) {
rpt_put(rpt);
rpt = NULL;
Expand All @@ -1351,43 +1360,48 @@ rebuild_tgt_scan_handler(crt_rpc_t *rpc)
rsi->rsi_rebuild_gen);
if (tls != NULL) {
D_WARN("previous not cleaned up yet " DF_RBF, DP_RBF_RSI(rsi));
D_GOTO(out, rc = -DER_BUSY);
D_GOTO(out_delete, rc = -DER_BUSY);
}

if (daos_fail_check(DAOS_REBUILD_TGT_START_FAIL))
D_GOTO(out, rc = -DER_INVAL);
D_GOTO(out_delete, rc = -DER_INVAL);

rc = rebuild_tgt_prepare(rpc, &rpt);
rc = rebuild_tgt_prepare(pool, rsi, &rpt);
if (rc)
D_GOTO(out, rc);
D_GOTO(out_delete, rc);

rpt_get(rpt);
rc = dss_ult_create(rebuild_tgt_status_check_ult, rpt, DSS_XS_SELF,
0, DSS_DEEP_STACK_SZ, NULL);
if (rc) {
rpt_put(rpt);
D_GOTO(out, rc);
D_GOTO(out_delete, rc);
}

atomic_fetch_add(&rpt->rt_pool->sp_rebuilding, 1); /* reset in rebuild_tgt_fini */
checker = true;

rpt_get(rpt);
/* step-3: start scan leader */
rc = dss_ult_create(rebuild_scan_leader, rpt, DSS_XS_SELF, 0, 0, NULL);
if (rc != 0) {
rpt_put(rpt);
D_GOTO(out, rc);
D_GOTO(out_delete, rc);
}

out:
if (tls && tls->rebuild_pool_status == 0 && rc != 0)
tls->rebuild_pool_status = rc;

if (rpt) {
if (rc)
rpt_delete(rpt);
out_delete:
if (rpt && !checker)
rpt_delete(rpt);
out_put:
if (rpt)
rpt_put(rpt);

if (pool) {
if (!checker)
atomic_fetch_sub(&pool->sp_rebuilding, 1);
ds_pool_put(pool);
}
if (rc != 0 && tls && tls->rebuild_pool_status == 0)
tls->rebuild_pool_status = rc;

rout = crt_reply_get(rpc);
rout->rso_status = rc;
rout->rso_stable_epoch = d_hlc_get();
Expand Down
Loading
Loading