Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 91 additions & 82 deletions src/object/srv_obj_migrate.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#define MIGRATE_DEFAULT_MAX_ULT 4096
#define ENV_MIGRATE_ULT_CNT "D_MIGRATE_ULT_CNT"
struct migrate_one {
struct iter_obj_arg *mo_obj_arg;
daos_key_t mo_dkey;
uint64_t mo_dkey_hash;
uuid_t mo_pool_uuid;
Expand Down Expand Up @@ -116,6 +117,9 @@ struct iter_obj_arg {
uuid_t pool_uuid;
uuid_t cont_uuid;
daos_unit_oid_t oid;
daos_handle_t ioa_oh;
int ioa_obj_ref;
struct daos_oclass_attr ioa_oca;
daos_epoch_t epoch;
daos_epoch_t punched_epoch;
unsigned int shard;
Expand All @@ -126,6 +130,28 @@ struct iter_obj_arg {
uint32_t generation;
};

static void
migrate_obj_get(struct iter_obj_arg *arg)
{
arg->ioa_obj_ref++;
}

static void
migrate_obj_put(struct iter_obj_arg *arg)
{
D_ASSERTF(arg->ioa_obj_ref > 0, DF_CONT " obj " DF_UOID " bad ioa_obj_ref %d\n",
DP_CONT(arg->pool_uuid, arg->cont_uuid), DP_UOID(arg->oid), arg->ioa_obj_ref);
arg->ioa_obj_ref--;
if (arg->ioa_obj_ref == 0) {
if (daos_handle_is_valid(arg->ioa_oh)) {
dsc_obj_close(arg->ioa_oh);
arg->ioa_oh = DAOS_HDL_INVAL;
}
D_FREE(arg->snaps);
D_FREE(arg);
}
}

static int
obj_tree_destory_cb(daos_handle_t ih, d_iov_t *key_iov,
d_iov_t *val_iov, void *data)
Expand Down Expand Up @@ -1619,48 +1645,29 @@ static int
migrate_dkey(struct migrate_pool_tls *tls, struct migrate_one *mrone,
daos_size_t data_size)
{
struct ds_cont_child *cont = NULL;
struct cont_props props;
daos_handle_t coh = DAOS_HDL_INVAL;
struct ds_cont_child *cont = NULL;
daos_handle_t oh = DAOS_HDL_INVAL;
int rc;

D_ASSERT(dss_get_module_info()->dmi_xs_id != 0);
rc = migrate_get_cont_child(tls, mrone->mo_cont_uuid, &cont, true);
if (rc || cont == NULL)
D_GOTO(cont_put, rc);

rc = dsc_pool_open(tls->mpt_pool_uuid, tls->mpt_poh_uuid, 0,
NULL, tls->mpt_pool->spc_pool->sp_map,
&tls->mpt_svc_list, &tls->mpt_pool_hdl);
if (rc)
D_GOTO(cont_put, rc);

/* Open client dc handle used to read the remote object data */
rc = migrate_cont_open(tls, mrone->mo_cont_uuid, 0, &coh);
if (rc)
D_GOTO(cont_put, rc);
D_GOTO(out, rc);

/* Open the remote object */
rc = dsc_obj_open(coh, mrone->mo_oid.id_pub, DAOS_OO_RO, &oh);
if (rc)
D_GOTO(cont_put, rc);
D_ASSERTF(mrone->mo_obj_arg->ioa_obj_ref > 0,
DF_RB ": oid " DF_UOID ", bad ioa_obj_ref %d\n", DP_RB_MPT(tls),
DP_UOID(mrone->mo_oid), mrone->mo_obj_arg->ioa_obj_ref);
D_ASSERT(daos_handle_is_valid(mrone->mo_obj_arg->ioa_oh));
oh = mrone->mo_obj_arg->ioa_oh;
mrone->mo_oca = mrone->mo_obj_arg->ioa_oca;

if (DAOS_FAIL_CHECK(DAOS_REBUILD_TGT_NOSPACE))
D_GOTO(obj_close, rc = -DER_NOSPACE);
D_GOTO(out, rc = -DER_NOSPACE);

if (DAOS_FAIL_CHECK(DAOS_REBUILD_NO_REBUILD)) {
D_DEBUG(DB_REBUILD, DF_UUID" disable rebuild\n",
DP_UUID(tls->mpt_pool_uuid));
D_GOTO(obj_close, rc);
}

dsc_cont_get_props(coh, &props);
rc = dsc_obj_id2oc_attr(mrone->mo_oid.id_pub, &props, &mrone->mo_oca);
if (rc) {
D_ERROR("Unknown object class: %u\n",
daos_obj_id2class(mrone->mo_oid.id_pub));
D_GOTO(obj_close, rc);
D_GOTO(out, rc);
}

/* punch the object */
Expand All @@ -1672,21 +1679,21 @@ migrate_dkey(struct migrate_pool_tls *tls, struct migrate_one *mrone,
if (rc) {
D_ERROR(DF_UOID" punch obj failed: "DF_RC"\n",
DP_UOID(mrone->mo_oid), DP_RC(rc));
D_GOTO(obj_close, rc);
D_GOTO(out, rc);
}
}

rc = migrate_punch(tls, mrone, cont);
if (rc)
D_GOTO(obj_close, rc);
D_GOTO(out, rc);

if (data_size == 0) {
D_DEBUG(DB_REBUILD, "empty mrone %p\n", mrone);
D_GOTO(obj_close, rc);
D_GOTO(out, rc);
}

if (DAOS_FAIL_CHECK(DAOS_REBUILD_UPDATE_FAIL))
D_GOTO(obj_close, rc = -DER_INVAL);
D_GOTO(out, rc = -DER_INVAL);

if (mrone->mo_iods[0].iod_type == DAOS_IOD_SINGLE)
rc = migrate_fetch_update_single(mrone, oh, cont);
Expand All @@ -1702,9 +1709,8 @@ migrate_dkey(struct migrate_pool_tls *tls, struct migrate_one *mrone,

tls->mpt_rec_count += mrone->mo_rec_num;
tls->mpt_size += mrone->mo_size;
obj_close:
dsc_obj_close(oh);
cont_put:

out:
if (cont != NULL)
ds_cont_child_put(cont);
return rc;
Expand Down Expand Up @@ -1835,13 +1841,16 @@ static void
migrate_one_ult(void *arg)
{
struct migrate_one *mrone = arg;
struct iter_obj_arg *obj_arg;
struct migrate_pool_tls *tls;
daos_size_t data_size;
int rc = 0;

while (daos_fail_check(DAOS_REBUILD_TGT_REBUILD_HANG))
dss_sleep(0);

obj_arg = mrone->mo_obj_arg;

tls = migrate_pool_tls_lookup(mrone->mo_pool_uuid,
mrone->mo_pool_tls_version, mrone->mo_generation);
if (tls == NULL || tls->mpt_fini) {
Expand Down Expand Up @@ -1898,6 +1907,7 @@ migrate_one_ult(void *arg)
tls->mpt_fini = 1;
}
out:
migrate_obj_put(obj_arg);
migrate_one_destroy(mrone);
if (tls != NULL) {
migrate_tgt_exit(tls, DKEY_ULT);
Expand Down Expand Up @@ -2666,10 +2676,13 @@ migrate_start_ult(struct enum_unpack_arg *unpack_arg)
rc = migrate_tgt_enter(tls, DKEY_ULT, NULL);
if (rc)
break;
migrate_obj_get(arg);
mrone->mo_obj_arg = arg;
d_list_del_init(&mrone->mo_list);
rc = dss_ult_create(migrate_one_ult, mrone, DSS_XS_VOS,
arg->tgt_idx, MIGRATE_STACK_SIZE, NULL);
if (rc) {
migrate_obj_put(arg);
migrate_tgt_exit(tls, DKEY_ULT);
migrate_one_destroy(mrone);
break;
Expand Down Expand Up @@ -2700,13 +2713,10 @@ migrate_one_epoch_object(daos_epoch_range_t *epr, struct migrate_pool_tls *tls,
daos_key_desc_t kds[KDS_NUM] = {0};
d_iov_t csum = {0};
d_iov_t *p_csum;
uint8_t stack_csum_buf[CSUM_BUF_SIZE] = {0};
struct cont_props props;
uint8_t stack_csum_buf[CSUM_BUF_SIZE] = {0};
struct enum_unpack_arg unpack_arg = { 0 };
d_iov_t iov = { 0 };
d_sg_list_t sgl = { 0 };
daos_handle_t coh = DAOS_HDL_INVAL;
daos_handle_t oh = DAOS_HDL_INVAL;
d_sg_list_t sgl = {0};
uint32_t minimum_nr;
uint32_t enum_flags;
uint32_t num;
Expand All @@ -2724,46 +2734,16 @@ migrate_one_epoch_object(daos_epoch_range_t *epr, struct migrate_pool_tls *tls,
}

D_ASSERT(dss_get_module_info()->dmi_xs_id != 0);

rc = dsc_pool_open(tls->mpt_pool_uuid, tls->mpt_poh_uuid, 0,
NULL, tls->mpt_pool->spc_pool->sp_map,
&tls->mpt_svc_list, &tls->mpt_pool_hdl);
if (rc) {
D_ERROR("dsc_pool_open failed: "DF_RC"\n", DP_RC(rc));
D_GOTO(out, rc);
}

rc = migrate_cont_open(tls, arg->cont_uuid, 0, &coh);
if (rc) {
D_ERROR("migrate_cont_open failed: "DF_RC"\n", DP_RC(rc));
D_GOTO(out, rc);
}

/* Only open with RW flag, reintegrating flag will be set, which is needed
* during unpack_cb to check if parity shard alive.
*/
rc = dsc_obj_open(coh, arg->oid.id_pub, DAOS_OO_RO, &oh);
if (rc) {
D_ERROR("dsc_obj_open failed: "DF_RC"\n", DP_RC(rc));
D_GOTO(out, rc);
}

D_ASSERT(daos_handle_is_valid(arg->ioa_oh));
unpack_arg.arg = arg;
unpack_arg.epr = *epr;
unpack_arg.oh = oh;
unpack_arg.oh = arg->ioa_oh;
unpack_arg.version = tls->mpt_version;
D_INIT_LIST_HEAD(&unpack_arg.merge_list);
unpack_arg.oc_attr = arg->ioa_oca;
buf = stack_buf;
buf_len = ITER_BUF_SIZE;

dsc_cont_get_props(coh, &props);
rc = dsc_obj_id2oc_attr(arg->oid.id_pub, &props, &unpack_arg.oc_attr);
if (rc) {
D_ERROR("Unknown object class: %u\n",
daos_obj_id2class(arg->oid.id_pub));
D_GOTO(out_obj, rc);
}

memset(&anchor, 0, sizeof(anchor));
memset(&akey_anchor, 0, sizeof(akey_anchor));
memset(&dkey_anchor, 0, sizeof(dkey_anchor));
Expand Down Expand Up @@ -2814,9 +2794,8 @@ migrate_one_epoch_object(daos_epoch_range_t *epr, struct migrate_pool_tls *tls,

daos_anchor_set_flags(&dkey_anchor, enum_flags);
num = KDS_NUM;
rc = dsc_obj_list_obj(oh, epr, NULL, NULL, NULL,
&num, kds, &sgl, &anchor,
&dkey_anchor, &akey_anchor, p_csum);
rc = dsc_obj_list_obj(arg->ioa_oh, epr, NULL, NULL, NULL, &num, kds, &sgl, &anchor,
&dkey_anchor, &akey_anchor, p_csum);

if (rc == -DER_KEY2BIG) {
D_DEBUG(DB_REBUILD, "migrate obj "DF_UOID" got "
Expand Down Expand Up @@ -2951,9 +2930,7 @@ migrate_one_epoch_object(daos_epoch_range_t *epr, struct migrate_pool_tls *tls,

if (csum.iov_buf != NULL && csum.iov_buf != stack_csum_buf)
D_FREE(csum.iov_buf);
out_obj:
dsc_obj_close(oh);
out:

D_DEBUG(DB_REBUILD, "obj "DF_UOID" for shard %u eph "
DF_U64"-"DF_U64": "DF_RC"\n", DP_UOID(arg->oid), arg->shard,
epr->epr_lo, epr->epr_hi, DP_RC(rc));
Expand Down Expand Up @@ -3061,9 +3038,13 @@ migrate_obj_ult(void *data)
struct iter_obj_arg *arg = data;
struct migrate_pool_tls *tls = NULL;
daos_epoch_range_t epr;
daos_handle_t coh = DAOS_HDL_INVAL;
struct cont_props props;
int i;
int rc = 0;

migrate_obj_get(arg);

tls = migrate_pool_tls_lookup(arg->pool_uuid, arg->version, arg->generation);
if (tls == NULL || tls->mpt_fini) {
D_WARN("some one abort the rebuild "DF_UUID"\n",
Expand Down Expand Up @@ -3092,6 +3073,33 @@ migrate_obj_ult(void *data)
}
}

rc = dsc_pool_open(tls->mpt_pool_uuid, tls->mpt_poh_uuid, 0, NULL,
tls->mpt_pool->spc_pool->sp_map, &tls->mpt_svc_list, &tls->mpt_pool_hdl);
if (rc) {
DL_ERROR(rc, DF_RB ": dsc_pool_open failed", DP_RB_MPT(tls));
D_GOTO(out, rc);
}

rc = migrate_cont_open(tls, arg->cont_uuid, 0, &coh);
if (rc) {
DL_ERROR(rc, DF_RB ": migrate_cont_open failed", DP_RB_MPT(tls));
D_GOTO(out, rc);
}

rc = dsc_obj_open(coh, arg->oid.id_pub, DAOS_OO_RO, &arg->ioa_oh);
if (rc) {
DL_ERROR(rc, DF_RB ": dsc_obj_open failed", DP_RB_MPT(tls));
D_GOTO(out, rc);
}

dsc_cont_get_props(coh, &props);
rc = dsc_obj_id2oc_attr(arg->oid.id_pub, &props, &arg->ioa_oca);
if (rc) {
DL_ERROR(rc, DF_RB ": unknown object class: %u", DP_RB_MPT(tls),
daos_obj_id2class(arg->oid.id_pub));
D_GOTO(out, rc);
}

for (i = 0; i < arg->snap_cnt; i++) {
epr.epr_lo = i > 0 ? arg->snaps[i - 1] + 1 : 0;
epr.epr_hi = arg->snaps[i];
Expand Down Expand Up @@ -3153,8 +3161,7 @@ migrate_obj_ult(void *data)
if (tls != NULL)
migrate_tgt_exit(tls, OBJ_ULT);

D_FREE(arg->snaps);
D_FREE(arg);
migrate_obj_put(arg);
migrate_pool_tls_put(tls);
}

Expand Down Expand Up @@ -3185,6 +3192,8 @@ migrate_one_object(daos_unit_oid_t oid, daos_epoch_t eph, daos_epoch_t punched_e
return -DER_NOMEM;

obj_arg->oid = oid;
obj_arg->ioa_oh = DAOS_HDL_INVAL;
obj_arg->ioa_obj_ref = 0;
obj_arg->epoch = eph;
obj_arg->shard = shard;
obj_arg->punched_epoch = punched_eph;
Expand Down
Loading