Skip to content

Commit

Permalink
DAOS-16721 object: fix coll RPC for obj with sparse layout
Browse files Browse the repository at this point in the history
The old implementation did not correctly calculate some collective
object RPC size, and may cause trouble when need bulk data transfer
for large collective object RPC. It also potentially affects how to
dispatch collective RPCs from leader to other engines.

The patch also addes more sanity check for coll-punch RPC to detect
potential DRAM corruption.

Allow-unstable-test: true

Signed-off-by: Fan Yong <fan.yong@intel.com>
  • Loading branch information
Nasf-Fan committed Oct 22, 2024
1 parent e117a6c commit dc60fc4
Show file tree
Hide file tree
Showing 9 changed files with 105 additions and 54 deletions.
6 changes: 5 additions & 1 deletion src/dtx/dtx_coll.c
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,12 @@ dtx_coll_prep_ult(void *arg)
DP_UUID(cont->sc_uuid), DP_RC(rc));
}

if (dcpa->dcpa_result != 0)
if (dcpa->dcpa_result != 0) {
if (dcpa->dcpa_result != -DER_INPROGRESS && dcpa->dcpa_result != -DER_NONEXIST)
D_ERROR("Failed to load mbs for "DF_DTI", opc %u: "DF_RC"\n",
DP_DTI(&dci->dci_xid), opc, DP_RC(rc));
goto out;
}

dcpa->dcpa_result = dtx_coll_prep(dci->dci_po_uuid, dcpa->dcpa_oid, &dci->dci_xid, mbs, -1,
dci->dci_version, cont->sc_pool->spc_map_version,
Expand Down
5 changes: 4 additions & 1 deletion src/dtx/dtx_rpc.c
Original file line number Diff line number Diff line change
Expand Up @@ -974,8 +974,11 @@ dtx_refresh_internal(struct ds_cont_child *cont, int *check_count, d_list_t *che
if (dsp->dsp_mbs == NULL) {
rc = vos_dtx_load_mbs(cont->sc_hdl, &dsp->dsp_xid, NULL, &dsp->dsp_mbs);
if (rc != 0) {
if (rc < 0 && rc != -DER_NONEXIST && for_io)
if (rc < 0 && rc != -DER_NONEXIST && for_io) {
D_ERROR("Failed to load mbs for "DF_DTI": "DF_RC"\n",
DP_DTI(&dsp->dsp_xid), DP_RC(rc));
goto out;
}

drop = true;
goto next;
Expand Down
110 changes: 73 additions & 37 deletions src/object/cli_coll.c
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,10 @@ obj_coll_oper_args_init(struct coll_oper_args *coa, struct dc_object *obj, bool
if (coa->coa_sparse == 0)
coa->coa_dct_cap = obj_ranks;
}

/* Temporarily record obj->cob_min_rank for verification. */
coa->coa_min_rank = obj->cob_min_rank;

D_RWLOCK_UNLOCK(&obj->cob_lock);

if (coa->coa_sparse) {
Expand Down Expand Up @@ -236,17 +240,55 @@ obj_coll_oper_args_fini(struct coll_oper_args *coa)
coa->coa_dct_nr = 0;
}

static void
obj_coll_collapse_one(struct coll_oper_args *coa, struct daos_coll_target *dct,
uint32_t *size, bool copy)
{
struct daos_coll_shard *dcs;
uint32_t dct_size;
int i;

/* The size may be over estimated, no matter. */
dct_size = sizeof(*dct) + dct->dct_bitmap_sz +
sizeof(dct->dct_shards[0]) * (dct->dct_max_shard + 1);

for (i = 0; i <= dct->dct_max_shard; i++) {
dcs = &dct->dct_shards[i];
if (dcs->dcs_nr > 1)
dct_size += sizeof(dcs->dcs_buf[0]) * dcs->dcs_nr;
}

if (coa->coa_for_modify)
dct_size += sizeof(dct->dct_tgt_ids[0]) * dct->dct_tgt_nr;

if (coa->coa_max_dct_sz < dct_size)
coa->coa_max_dct_sz = dct_size;

if (copy)
memcpy(&coa->coa_dcts[coa->coa_dct_nr], dct, sizeof(*dct));

coa->coa_dct_nr++;
*size += dct_size;
}

struct obj_coll_tree_args {
struct coll_oper_args *coa;
uint32_t *size;
};

static int
obj_coll_tree_cb(daos_handle_t ih, d_iov_t *key, d_iov_t *val, void *arg)
{
struct coll_oper_args *coa = arg;
struct daos_coll_target *dct = val->iov_buf;
struct obj_coll_tree_args *octa = arg;
struct coll_oper_args *coa = octa->coa;
struct daos_coll_target *dct = val->iov_buf;

D_ASSERTF(coa->coa_dct_nr < coa->coa_dct_cap,
"Too short pre-allcoated dct_array: %u vs %u\n",
coa->coa_dct_nr, coa->coa_dct_cap);
D_ASSERT(dct->dct_bitmap != NULL);

memcpy(&coa->coa_dcts[coa->coa_dct_nr++], dct, sizeof(*dct));
obj_coll_collapse_one(coa, dct, octa->size, true);

/* The following members have been migrated into coa->coa_dcts. */
dct->dct_bitmap = NULL;
Expand All @@ -259,6 +301,7 @@ obj_coll_tree_cb(daos_handle_t ih, d_iov_t *key, d_iov_t *val, void *arg)
static int
obj_coll_collapse_tree(struct coll_oper_args *coa, uint32_t *size)
{
struct obj_coll_tree_args octa;
struct coll_sparse_targets *tree = coa->coa_tree;
int rc = 0;

Expand All @@ -270,7 +313,14 @@ obj_coll_collapse_tree(struct coll_oper_args *coa, uint32_t *size)
D_GOTO(out, rc = -DER_NOMEM);

coa->coa_sparse = 0;
rc = dbtree_iterate(tree->cst_tree_hdl, DAOS_INTENT_DEFAULT, false, obj_coll_tree_cb, coa);
coa->coa_raw_sparse = 1;
coa->coa_dct_nr = 0;
coa->coa_max_dct_sz = 0;

octa.coa = coa;
octa.size = size;
rc = dbtree_iterate(tree->cst_tree_hdl, DAOS_INTENT_DEFAULT, false,
obj_coll_tree_cb, &octa);
if (rc == 0)
D_ASSERTF(coa->coa_dct_nr == coa->coa_dct_cap,
"Something is wrong when prepare coll target array: %u vs %u\n",
Expand All @@ -287,36 +337,13 @@ static int
obj_coll_collapse_array(struct coll_oper_args *coa, uint32_t *size)
{
struct daos_coll_target *dct;
struct daos_coll_shard *dcs;
uint32_t dct_size;
int i;
int j;

for (i = 0, *size = 0, coa->coa_dct_nr = 0; i < coa->coa_dct_cap; i++) {
for (i = 0, *size = 0, coa->coa_dct_nr = 0, coa->coa_max_dct_sz = 0;
i < coa->coa_dct_cap; i++) {
dct = &coa->coa_dcts[i];
if (dct->dct_bitmap != NULL) {
/* The size may be over estimated, no matter. */
dct_size = sizeof(*dct) + dct->dct_bitmap_sz +
sizeof(dct->dct_shards[0]) * (dct->dct_max_shard + 1);

for (j = 0; j <= dct->dct_max_shard; j++) {
dcs = &dct->dct_shards[j];
if (dcs->dcs_nr > 1)
dct_size += sizeof(dcs->dcs_buf[0]) * dcs->dcs_nr;
}

if (coa->coa_for_modify)
dct_size += sizeof(dct->dct_tgt_ids[0]) * dct->dct_tgt_nr;

if (coa->coa_max_dct_sz < dct_size)
coa->coa_max_dct_sz = dct_size;

if (coa->coa_dct_nr < i)
memcpy(&coa->coa_dcts[coa->coa_dct_nr], dct, sizeof(*dct));

coa->coa_dct_nr++;
*size += dct_size;
}
if (dct->dct_bitmap != NULL)
obj_coll_collapse_one(coa, dct, size, coa->coa_dct_nr < i);
}

/* Reset the other dct slots to avoid double free during cleanup. */
Expand Down Expand Up @@ -373,8 +400,9 @@ obj_coll_prep_one(struct coll_oper_args *coa, struct dc_object *obj,

D_RWLOCK_RDLOCK(&obj->cob_lock);

D_ASSERTF(shard->do_target_rank <= obj->cob_max_rank,
"Unexpected shard with rank %u > %u\n", shard->do_target_rank, obj->cob_max_rank);
D_ASSERTF(coa->coa_min_rank == obj->cob_min_rank,
"Object "DF_OID" layout has been changed unexpectedly %u => %u\n",
DP_OID(obj->cob_md.omd_id), coa->coa_min_rank, obj->cob_min_rank);
D_ASSERTF(shard->do_target_rank >= obj->cob_min_rank,
"Unexpected shard with rank %u < %u\n", shard->do_target_rank, obj->cob_min_rank);

Expand Down Expand Up @@ -669,7 +697,6 @@ dc_obj_coll_punch(tse_task_t *task, struct dc_object *obj, struct dtx_epoch *epo
uint32_t tgt_size = 0;
uint32_t mbs_max_size;
uint32_t inline_size;
uint32_t flags = ORF_LEADER;
uint32_t leader = -1;
uint32_t len;
int rc;
Expand Down Expand Up @@ -746,6 +773,12 @@ dc_obj_coll_punch(tse_task_t *task, struct dc_object *obj, struct dtx_epoch *epo
memcpy(dct, &tmp_tgt, sizeof(tmp_tgt));
}

/* 'shard' is on the leader target that is must be the coa->coa_dcts[0]. */
D_ASSERTF(shard->do_target_rank == coa->coa_dcts[0].dct_rank,
"Object "DF_OID" target array corrupted: rank %u vs %ur, nr %u\n",
DP_OID(obj->cob_md.omd_id), shard->do_target_rank,
coa->coa_dcts[0].dct_rank, coa->coa_dct_nr);

rc = dc_obj_coll_punch_mbs(coa, obj, shard->do_target_id, &mbs);
if (rc < 0)
goto out;
Expand All @@ -767,12 +800,14 @@ dc_obj_coll_punch(tse_task_t *task, struct dc_object *obj, struct dtx_epoch *epo
if (rc != 0)
goto out;

auxi->flags = ORF_LEADER;
if (auxi->io_retry) {
flags |= ORF_RESEND;
auxi->flags |= ORF_RESEND;
/* Reset @enqueue_id if resend to new leader. */
if (spa->pa_auxi.target != shard->do_target_id)
spa->pa_auxi.enqueue_id = 0;
} else {
auxi->flags &= ~ORF_RESEND;
spa->pa_auxi.obj_auxi = auxi;
daos_dti_gen(&spa->pa_dti, false);
}
Expand All @@ -781,14 +816,15 @@ dc_obj_coll_punch(tse_task_t *task, struct dc_object *obj, struct dtx_epoch *epo
spa->pa_auxi.shard = shard->do_shard_idx;

if (obj_is_ec(obj))
flags |= ORF_EC;
auxi->flags |= ORF_EC;

mbs_max_size = sizeof(*mbs) + mbs->dm_data_size +
sizeof(coa->coa_targets[0]) * coa->coa_max_shard_nr + coa->coa_max_bitmap_sz;

return dc_obj_shard_coll_punch(shard, spa, mbs, mbs_max_size, cpca.cpca_bulks, tgt_size,
coa->coa_dcts, coa->coa_dct_nr, coa->coa_max_dct_sz, epoch,
args->flags, flags, map_ver, &auxi->map_ver_reply, task);
args->flags, auxi->flags, map_ver,
&auxi->map_ver_reply, task);

out:
if (rc > 0)
Expand Down
7 changes: 4 additions & 3 deletions src/object/cli_shard.c
Original file line number Diff line number Diff line change
Expand Up @@ -1453,9 +1453,10 @@ obj_shard_coll_punch_cb(tse_task_t *task, void *data)

DL_CDEBUG(task->dt_result < 0, DLOG_ERR, DB_IO, task->dt_result,
"DAOS_OBJ_RPC_COLL_PUNCH RPC %p for "DF_UOID" with DTX "
DF_DTI" for task %p, map_ver %u/%u, flags %lx/%x", rpc, DP_UOID(ocpi->ocpi_oid),
DP_DTI(&ocpi->ocpi_xid), task, ocpi->ocpi_map_ver, *cb_args->cpca_ver,
(unsigned long)ocpi->ocpi_api_flags, ocpi->ocpi_flags);
DF_DTI" for task %p, map_ver %u/%u, flags %lx/%x, %s layout",
rpc, DP_UOID(ocpi->ocpi_oid), DP_DTI(&ocpi->ocpi_xid), task, ocpi->ocpi_map_ver,
*cb_args->cpca_ver, (unsigned long)ocpi->ocpi_api_flags, ocpi->ocpi_flags,
cb_args->cpca_shard_args->pa_coa.coa_raw_sparse ? "sparse" : "continuous");

crt_req_decref(rpc);

Expand Down
6 changes: 5 additions & 1 deletion src/object/obj_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -293,10 +293,14 @@ struct coll_oper_args {
struct shard_auxi_args coa_auxi;
int coa_dct_nr;
uint32_t coa_dct_cap;
uint32_t coa_max_dct_sz;
union {
uint32_t coa_max_dct_sz;
uint32_t coa_min_rank;
};
uint8_t coa_max_shard_nr;
uint8_t coa_max_bitmap_sz;
uint8_t coa_for_modify:1,
coa_raw_sparse:1,
coa_sparse:1;
uint8_t coa_target_nr;
/*
Expand Down
2 changes: 1 addition & 1 deletion src/object/obj_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,7 @@ obj_coll_disp_dest(struct obj_coll_disp_cursor *ocdc, struct daos_coll_target *t
* use the "cur_pos" as the relay engine.
*/
pos = rand % (ocdc->tgt_nr - ocdc->cur_pos) + ocdc->cur_pos;
if (pos != ocdc->cur_pos && tgts[pos].dct_rank > dct->dct_rank) {
if (pos > ocdc->cur_pos && tgts[pos].dct_rank > dct->dct_rank) {
memcpy(&tmp, &tgts[pos], sizeof(tmp));
memcpy(&tgts[pos], dct, sizeof(tmp));
memcpy(dct, &tmp, sizeof(tmp));
Expand Down
12 changes: 9 additions & 3 deletions src/object/srv_coll.c
Original file line number Diff line number Diff line change
Expand Up @@ -239,9 +239,15 @@ obj_coll_punch_prep(struct obj_coll_punch_in *ocpi, struct daos_coll_target *dct
int i;
int j;

/* dcts[0] is for current engine. */
if (dcts[0].dct_bitmap == NULL || dcts[0].dct_bitmap_sz == 0 ||
dcts[0].dct_shards == NULL) {
/* dcts[0] must be for current engine. */
if (unlikely(dcts[0].dct_rank != dss_self_rank())) {
D_ERROR("Invalid targets array: rank %u vs %u, nr %u, flags %x\n",
dcts[0].dct_rank, dss_self_rank(), dct_nr, ocpi->ocpi_flags);
D_GOTO(out, rc = -DER_INVAL);
}

if (unlikely(dcts[0].dct_bitmap == NULL || dcts[0].dct_bitmap_sz == 0 ||
dcts[0].dct_shards == NULL)) {
D_ERROR("Invalid input for current engine: bitmap %s, bitmap_sz %u, shards %s\n",
dcts[0].dct_bitmap == NULL ? "empty" : "non-empty", dcts[0].dct_bitmap_sz,
dcts[0].dct_shards == NULL ? "empty" : "non-empty");
Expand Down
8 changes: 4 additions & 4 deletions src/object/srv_obj.c
Original file line number Diff line number Diff line change
Expand Up @@ -5605,12 +5605,12 @@ ds_obj_coll_punch_handler(crt_rpc_t *rpc)

D_DEBUG(DB_IO, "(%s) handling collective punch RPC %p for obj "
DF_UOID" on XS %u/%u epc "DF_X64" pmv %u, with dti "
DF_DTI", forward width %u, forward depth %u\n",
DF_DTI", forward width %u, forward depth %u, flags %x\n",
(ocpi->ocpi_flags & ORF_LEADER) ? "leader" :
(ocpi->ocpi_tgts.ca_count == 1 ? "non-leader" : "relay-engine"),
rpc, DP_UOID(ocpi->ocpi_oid), dmi->dmi_xs_id, dmi->dmi_tgt_id,
ocpi->ocpi_epoch, ocpi->ocpi_map_ver, DP_DTI(&ocpi->ocpi_xid),
ocpi->ocpi_disp_width, ocpi->ocpi_disp_depth);
ocpi->ocpi_disp_width, ocpi->ocpi_disp_depth, ocpi->ocpi_flags);

D_ASSERT(dmi->dmi_xs_id != 0);

Expand Down Expand Up @@ -5747,13 +5747,13 @@ ds_obj_coll_punch_handler(crt_rpc_t *rpc)
DL_CDEBUG(rc != 0 && rc != -DER_INPROGRESS && rc != -DER_TX_RESTART, DLOG_ERR, DB_IO, rc,
"(%s) handled collective punch RPC %p for obj "DF_UOID" on XS %u/%u epc "
DF_X64" pmv %u/%u, with dti "DF_DTI", bulk_tgt_sz %u, bulk_tgt_nr %u, "
"tgt_nr %u, forward width %u, forward depth %u",
"tgt_nr %u, forward width %u, forward depth %u, flags %x",
(ocpi->ocpi_flags & ORF_LEADER) ? "leader" :
(ocpi->ocpi_tgts.ca_count == 1 ? "non-leader" : "relay-engine"), rpc,
DP_UOID(ocpi->ocpi_oid), dmi->dmi_xs_id, dmi->dmi_tgt_id, ocpi->ocpi_epoch,
ocpi->ocpi_map_ver, max_ver, DP_DTI(&ocpi->ocpi_xid), ocpi->ocpi_bulk_tgt_sz,
ocpi->ocpi_bulk_tgt_nr, (unsigned int)ocpi->ocpi_tgts.ca_count,
ocpi->ocpi_disp_width, ocpi->ocpi_disp_depth);
ocpi->ocpi_disp_width, ocpi->ocpi_disp_depth, ocpi->ocpi_flags);

obj_punch_complete(rpc, rc, max_ver);

Expand Down
3 changes: 0 additions & 3 deletions src/vos/vos_dtx.c
Original file line number Diff line number Diff line change
Expand Up @@ -1972,9 +1972,6 @@ vos_dtx_load_mbs(daos_handle_t coh, struct dtx_id *dti, daos_unit_oid_t *oid,
rc = -DER_INPROGRESS;
}

if (rc < 0)
D_ERROR("Failed to load mbs for "DF_DTI": "DF_RC"\n", DP_DTI(dti), DP_RC(rc));

return rc;
}

Expand Down

0 comments on commit dc60fc4

Please sign in to comment.