Skip to content

Commit

Permalink
DAOS-16721 object: fix coll RPC for obj with sparse layout - b26
Browse files Browse the repository at this point in the history
The old implementation did not correctly calculate some collective
object RPC size, and may cause trouble when need bulk data transfer
for large collective object RPC. It also potentially affects how to
dispatch collective RPCs from leader to other engines.

The patch also addes more sanity check for coll-punch RPC to detect
potential DRAM corruption.

Allow-unstable-test: true

Signed-off-by: Fan Yong <fan.yong@intel.com>
  • Loading branch information
Nasf-Fan committed Oct 22, 2024
1 parent b913d3e commit 18e7ab6
Show file tree
Hide file tree
Showing 9 changed files with 116 additions and 53 deletions.
6 changes: 5 additions & 1 deletion src/dtx/dtx_coll.c
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,12 @@ dtx_coll_prep_ult(void *arg)
DP_UUID(cont->sc_uuid), DP_RC(rc));
}

if (dcpa->dcpa_result != 0)
if (dcpa->dcpa_result != 0) {
if (dcpa->dcpa_result != -DER_INPROGRESS && dcpa->dcpa_result != -DER_NONEXIST)
D_ERROR("Failed to load mbs for "DF_DTI", opc %u: "DF_RC"\n",
DP_DTI(&dci->dci_xid), opc, DP_RC(rc));
goto out;
}

dcpa->dcpa_result = dtx_coll_prep(dci->dci_po_uuid, dcpa->dcpa_oid, &dci->dci_xid, mbs, -1,
dci->dci_version, cont->sc_pool->spc_map_version,
Expand Down
5 changes: 4 additions & 1 deletion src/dtx/dtx_rpc.c
Original file line number Diff line number Diff line change
Expand Up @@ -976,8 +976,11 @@ dtx_refresh_internal(struct ds_cont_child *cont, int *check_count, d_list_t *che
if (dsp->dsp_mbs == NULL) {
rc = vos_dtx_load_mbs(cont->sc_hdl, &dsp->dsp_xid, NULL, &dsp->dsp_mbs);
if (rc != 0) {
if (rc < 0 && rc != -DER_NONEXIST && for_io)
if (rc < 0 && rc != -DER_NONEXIST && for_io) {
D_ERROR("Failed to load mbs for "DF_DTI": "DF_RC"\n",
DP_DTI(&dsp->dsp_xid), DP_RC(rc));
goto out;
}

drop = true;
goto next;
Expand Down
114 changes: 78 additions & 36 deletions src/object/cli_coll.c
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,10 @@ obj_coll_oper_args_init(struct coll_oper_args *coa, struct dc_object *obj, bool
if (coa->coa_sparse == 0)
coa->coa_dct_cap = obj_ranks;
}

/* Temporarily save obj->cob_min_rank for verification during obj_coll_prep_one. */
coa->coa_min_rank = obj->cob_min_rank;

D_RWLOCK_UNLOCK(&obj->cob_lock);

if (coa->coa_sparse) {
Expand Down Expand Up @@ -208,7 +212,6 @@ obj_coll_oper_args_init(struct coll_oper_args *coa, struct dc_object *obj, bool
coa->coa_dct_nr = -1;
}

coa->coa_max_dct_sz = 0;
coa->coa_max_shard_nr = 0;
coa->coa_max_bitmap_sz = 0;
coa->coa_target_nr = 0;
Expand Down Expand Up @@ -236,17 +239,55 @@ obj_coll_oper_args_fini(struct coll_oper_args *coa)
coa->coa_dct_nr = 0;
}

static void
obj_coll_collapse_one(struct coll_oper_args *coa, struct daos_coll_target *dct,
uint32_t *size, bool copy)
{
struct daos_coll_shard *dcs;
uint32_t dct_size;
int i;

/* The size may be over estimated, no matter. */
dct_size = sizeof(*dct) + dct->dct_bitmap_sz +
sizeof(dct->dct_shards[0]) * (dct->dct_max_shard + 1);

for (i = 0; i <= dct->dct_max_shard; i++) {
dcs = &dct->dct_shards[i];
if (dcs->dcs_nr > 1)
dct_size += sizeof(dcs->dcs_buf[0]) * dcs->dcs_nr;
}

if (coa->coa_for_modify)
dct_size += sizeof(dct->dct_tgt_ids[0]) * dct->dct_tgt_nr;

if (coa->coa_max_dct_sz < dct_size)
coa->coa_max_dct_sz = dct_size;

if (copy)
memcpy(&coa->coa_dcts[coa->coa_dct_nr], dct, sizeof(*dct));

coa->coa_dct_nr++;
*size += dct_size;
}

struct obj_coll_tree_args {
struct coll_oper_args *coa;
uint32_t *size;
};

static int
obj_coll_tree_cb(daos_handle_t ih, d_iov_t *key, d_iov_t *val, void *arg)
{
struct coll_oper_args *coa = arg;
struct daos_coll_target *dct = val->iov_buf;
struct obj_coll_tree_args *octa = arg;
struct coll_oper_args *coa = octa->coa;
struct daos_coll_target *dct = val->iov_buf;

D_ASSERTF(coa->coa_dct_nr < coa->coa_dct_cap,
"Too short pre-allcoated dct_array: %u vs %u\n",
coa->coa_dct_nr, coa->coa_dct_cap);
D_ASSERT(dct->dct_bitmap != NULL);

memcpy(&coa->coa_dcts[coa->coa_dct_nr++], dct, sizeof(*dct));
obj_coll_collapse_one(coa, dct, octa->size, true);

/* The following members have been migrated into coa->coa_dcts. */
dct->dct_bitmap = NULL;
Expand All @@ -259,6 +300,7 @@ obj_coll_tree_cb(daos_handle_t ih, d_iov_t *key, d_iov_t *val, void *arg)
static int
obj_coll_collapse_tree(struct coll_oper_args *coa, uint32_t *size)
{
struct obj_coll_tree_args octa;
struct coll_sparse_targets *tree = coa->coa_tree;
int rc = 0;

Expand All @@ -270,7 +312,14 @@ obj_coll_collapse_tree(struct coll_oper_args *coa, uint32_t *size)
D_GOTO(out, rc = -DER_NOMEM);

coa->coa_sparse = 0;
rc = dbtree_iterate(tree->cst_tree_hdl, DAOS_INTENT_DEFAULT, false, obj_coll_tree_cb, coa);
coa->coa_raw_sparse = 1;
coa->coa_dct_nr = 0;
coa->coa_max_dct_sz = 0;

octa.coa = coa;
octa.size = size;
rc = dbtree_iterate(tree->cst_tree_hdl, DAOS_INTENT_DEFAULT, false,
obj_coll_tree_cb, &octa);
if (rc == 0)
D_ASSERTF(coa->coa_dct_nr == coa->coa_dct_cap,
"Something is wrong when prepare coll target array: %u vs %u\n",
Expand All @@ -287,35 +336,19 @@ static int
obj_coll_collapse_array(struct coll_oper_args *coa, uint32_t *size)
{
struct daos_coll_target *dct;
struct daos_coll_shard *dcs;
uint32_t dct_size;
int i;
int j;

for (i = 0, *size = 0, coa->coa_dct_nr = 0; i < coa->coa_dct_cap; i++) {
for (i = 0, *size = 0, coa->coa_dct_nr = 0, coa->coa_max_dct_sz = 0;
i < coa->coa_dct_cap; i++) {
dct = &coa->coa_dcts[i];
if (dct->dct_bitmap != NULL) {
/* The size may be over estimated, no matter. */
dct_size = sizeof(*dct) + dct->dct_bitmap_sz +
sizeof(dct->dct_shards[0]) * (dct->dct_max_shard + 1);

for (j = 0; j <= dct->dct_max_shard; j++) {
dcs = &dct->dct_shards[j];
if (dcs->dcs_nr > 1)
dct_size += sizeof(dcs->dcs_buf[0]) * dcs->dcs_nr;
}

if (coa->coa_for_modify)
dct_size += sizeof(dct->dct_tgt_ids[0]) * dct->dct_tgt_nr;

if (coa->coa_max_dct_sz < dct_size)
coa->coa_max_dct_sz = dct_size;

if (coa->coa_dct_nr < i)
memcpy(&coa->coa_dcts[coa->coa_dct_nr], dct, sizeof(*dct));

coa->coa_dct_nr++;
*size += dct_size;
if (coa->coa_dct_nr < i && coa->coa_dct_nr > 0)
D_ASSERTF(coa->coa_dcts[coa->coa_dct_nr - 1].dct_rank <
dct->dct_rank,
"Unsorted ranks in target array %u vs %u\n",
coa->coa_dcts[coa->coa_dct_nr - 1].dct_rank,
dct->dct_rank);
obj_coll_collapse_one(coa, dct, size, coa->coa_dct_nr < i);
}
}

Expand Down Expand Up @@ -373,8 +406,9 @@ obj_coll_prep_one(struct coll_oper_args *coa, struct dc_object *obj,

D_RWLOCK_RDLOCK(&obj->cob_lock);

D_ASSERTF(shard->do_target_rank <= obj->cob_max_rank,
"Unexpected shard with rank %u > %u\n", shard->do_target_rank, obj->cob_max_rank);
D_ASSERTF(coa->coa_min_rank == obj->cob_min_rank,
"Object "DF_OID" layout has been changed unexpectedly %u => %u, idx %u, ver %u\n",
DP_OID(obj->cob_md.omd_id), coa->coa_min_rank, obj->cob_min_rank, idx, map_ver);
D_ASSERTF(shard->do_target_rank >= obj->cob_min_rank,
"Unexpected shard with rank %u < %u\n", shard->do_target_rank, obj->cob_min_rank);

Expand Down Expand Up @@ -669,7 +703,6 @@ dc_obj_coll_punch(tse_task_t *task, struct dc_object *obj, struct dtx_epoch *epo
uint32_t tgt_size = 0;
uint32_t mbs_max_size;
uint32_t inline_size;
uint32_t flags = ORF_LEADER;
uint32_t leader = -1;
uint32_t len;
int rc;
Expand Down Expand Up @@ -746,6 +779,12 @@ dc_obj_coll_punch(tse_task_t *task, struct dc_object *obj, struct dtx_epoch *epo
memcpy(dct, &tmp_tgt, sizeof(tmp_tgt));
}

/* 'shard' is on the leader target that is must be the coa->coa_dcts[0]. */
D_ASSERTF(shard->do_target_rank == coa->coa_dcts[0].dct_rank,
"Object "DF_OID" target array corrupted: rank %u vs %ur, nr %u\n",
DP_OID(obj->cob_md.omd_id), shard->do_target_rank,
coa->coa_dcts[0].dct_rank, coa->coa_dct_nr);

rc = dc_obj_coll_punch_mbs(coa, obj, shard->do_target_id, &mbs);
if (rc < 0)
goto out;
Expand All @@ -767,12 +806,14 @@ dc_obj_coll_punch(tse_task_t *task, struct dc_object *obj, struct dtx_epoch *epo
if (rc != 0)
goto out;

auxi->flags = ORF_LEADER;
if (auxi->io_retry) {
flags |= ORF_RESEND;
auxi->flags |= ORF_RESEND;
/* Reset @enqueue_id if resend to new leader. */
if (spa->pa_auxi.target != shard->do_target_id)
spa->pa_auxi.enqueue_id = 0;
} else {
auxi->flags &= ~ORF_RESEND;
spa->pa_auxi.obj_auxi = auxi;
daos_dti_gen(&spa->pa_dti, false);
}
Expand All @@ -781,14 +822,15 @@ dc_obj_coll_punch(tse_task_t *task, struct dc_object *obj, struct dtx_epoch *epo
spa->pa_auxi.shard = shard->do_shard_idx;

if (obj_is_ec(obj))
flags |= ORF_EC;
auxi->flags |= ORF_EC;

mbs_max_size = sizeof(*mbs) + mbs->dm_data_size +
sizeof(coa->coa_targets[0]) * coa->coa_max_shard_nr + coa->coa_max_bitmap_sz;

return dc_obj_shard_coll_punch(shard, spa, mbs, mbs_max_size, cpca.cpca_bulks, tgt_size,
coa->coa_dcts, coa->coa_dct_nr, coa->coa_max_dct_sz, epoch,
args->flags, flags, map_ver, &auxi->map_ver_reply, task);
args->flags, auxi->flags, map_ver,
&auxi->map_ver_reply, task);

out:
if (rc > 0)
Expand Down
7 changes: 4 additions & 3 deletions src/object/cli_shard.c
Original file line number Diff line number Diff line change
Expand Up @@ -1453,9 +1453,10 @@ obj_shard_coll_punch_cb(tse_task_t *task, void *data)

DL_CDEBUG(task->dt_result < 0, DLOG_ERR, DB_IO, task->dt_result,
"DAOS_OBJ_RPC_COLL_PUNCH RPC %p for "DF_UOID" with DTX "
DF_DTI" for task %p, map_ver %u/%u, flags %lx/%x", rpc, DP_UOID(ocpi->ocpi_oid),
DP_DTI(&ocpi->ocpi_xid), task, ocpi->ocpi_map_ver, *cb_args->cpca_ver,
(unsigned long)ocpi->ocpi_api_flags, ocpi->ocpi_flags);
DF_DTI" for task %p, map_ver %u/%u, flags %lx/%x, %s layout",
rpc, DP_UOID(ocpi->ocpi_oid), DP_DTI(&ocpi->ocpi_xid), task, ocpi->ocpi_map_ver,
*cb_args->cpca_ver, (unsigned long)ocpi->ocpi_api_flags, ocpi->ocpi_flags,
cb_args->cpca_shard_args->pa_coa.coa_raw_sparse ? "sparse" : "continuous");

crt_req_decref(rpc);

Expand Down
7 changes: 6 additions & 1 deletion src/object/obj_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -292,10 +292,15 @@ struct coll_oper_args {
struct shard_auxi_args coa_auxi;
int coa_dct_nr;
uint32_t coa_dct_cap;
uint32_t coa_max_dct_sz;
union {
uint32_t coa_max_dct_sz;
/* Temporarily save obj->cob_min_rank for verification during obj_coll_prep_one. */
uint32_t coa_min_rank;
};
uint8_t coa_max_shard_nr;
uint8_t coa_max_bitmap_sz;
uint8_t coa_for_modify:1,
coa_raw_sparse:1,
coa_sparse:1;
uint8_t coa_target_nr;
/*
Expand Down
2 changes: 1 addition & 1 deletion src/object/obj_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,7 @@ obj_coll_disp_dest(struct obj_coll_disp_cursor *ocdc, struct daos_coll_target *t
* use the "cur_pos" as the relay engine.
*/
pos = rand % (ocdc->tgt_nr - ocdc->cur_pos) + ocdc->cur_pos;
if (pos != ocdc->cur_pos && tgts[pos].dct_rank > dct->dct_rank) {
if (pos > ocdc->cur_pos && tgts[pos].dct_rank > dct->dct_rank) {
memcpy(&tmp, &tgts[pos], sizeof(tmp));
memcpy(&tgts[pos], dct, sizeof(tmp));
memcpy(dct, &tmp, sizeof(tmp));
Expand Down
17 changes: 14 additions & 3 deletions src/object/srv_coll.c
Original file line number Diff line number Diff line change
Expand Up @@ -239,9 +239,15 @@ obj_coll_punch_prep(struct obj_coll_punch_in *ocpi, struct daos_coll_target *dct
int i;
int j;

/* dcts[0] is for current engine. */
if (dcts[0].dct_bitmap == NULL || dcts[0].dct_bitmap_sz == 0 ||
dcts[0].dct_shards == NULL) {
/* dcts[0] must be for current engine. */
if (unlikely(dcts[0].dct_rank != dss_self_rank())) {
D_ERROR("Invalid targets array: rank %u vs %u, nr %u, flags %x\n",
dcts[0].dct_rank, dss_self_rank(), dct_nr, ocpi->ocpi_flags);
D_GOTO(out, rc = -DER_INVAL);
}

if (unlikely(dcts[0].dct_bitmap == NULL || dcts[0].dct_bitmap_sz == 0 ||
dcts[0].dct_shards == NULL)) {
D_ERROR("Invalid input for current engine: bitmap %s, bitmap_sz %u, shards %s\n",
dcts[0].dct_bitmap == NULL ? "empty" : "non-empty", dcts[0].dct_bitmap_sz,
dcts[0].dct_shards == NULL ? "empty" : "non-empty");
Expand Down Expand Up @@ -302,6 +308,11 @@ obj_coll_punch_prep(struct obj_coll_punch_in *ocpi, struct daos_coll_target *dct

/* Set i = 1 to skip leader_rank. */
for (i = 1; i < dct_nr; i++) {
if (unlikely(dcts[0].dct_rank == dcts[i].dct_rank)) {
D_ERROR("Duplicated leader rank %u at %d\n", dcts[0].dct_rank, i);
D_GOTO(out, rc = -DER_INVAL);
}

dce->dce_ranks->rl_ranks[i - 1] = dcts[i].dct_rank;
if (max_rank < dcts[i].dct_rank)
max_rank = dcts[i].dct_rank;
Expand Down
8 changes: 4 additions & 4 deletions src/object/srv_obj.c
Original file line number Diff line number Diff line change
Expand Up @@ -5605,12 +5605,12 @@ ds_obj_coll_punch_handler(crt_rpc_t *rpc)

D_DEBUG(DB_IO, "(%s) handling collective punch RPC %p for obj "
DF_UOID" on XS %u/%u epc "DF_X64" pmv %u, with dti "
DF_DTI", forward width %u, forward depth %u\n",
DF_DTI", forward width %u, forward depth %u, flags %x\n",
(ocpi->ocpi_flags & ORF_LEADER) ? "leader" :
(ocpi->ocpi_tgts.ca_count == 1 ? "non-leader" : "relay-engine"),
rpc, DP_UOID(ocpi->ocpi_oid), dmi->dmi_xs_id, dmi->dmi_tgt_id,
ocpi->ocpi_epoch, ocpi->ocpi_map_ver, DP_DTI(&ocpi->ocpi_xid),
ocpi->ocpi_disp_width, ocpi->ocpi_disp_depth);
ocpi->ocpi_disp_width, ocpi->ocpi_disp_depth, ocpi->ocpi_flags);

D_ASSERT(dmi->dmi_xs_id != 0);

Expand Down Expand Up @@ -5747,13 +5747,13 @@ ds_obj_coll_punch_handler(crt_rpc_t *rpc)
DL_CDEBUG(rc != 0 && rc != -DER_INPROGRESS && rc != -DER_TX_RESTART, DLOG_ERR, DB_IO, rc,
"(%s) handled collective punch RPC %p for obj "DF_UOID" on XS %u/%u epc "
DF_X64" pmv %u/%u, with dti "DF_DTI", bulk_tgt_sz %u, bulk_tgt_nr %u, "
"tgt_nr %u, forward width %u, forward depth %u",
"tgt_nr %u, forward width %u, forward depth %u, flags %x",
(ocpi->ocpi_flags & ORF_LEADER) ? "leader" :
(ocpi->ocpi_tgts.ca_count == 1 ? "non-leader" : "relay-engine"), rpc,
DP_UOID(ocpi->ocpi_oid), dmi->dmi_xs_id, dmi->dmi_tgt_id, ocpi->ocpi_epoch,
ocpi->ocpi_map_ver, max_ver, DP_DTI(&ocpi->ocpi_xid), ocpi->ocpi_bulk_tgt_sz,
ocpi->ocpi_bulk_tgt_nr, (unsigned int)ocpi->ocpi_tgts.ca_count,
ocpi->ocpi_disp_width, ocpi->ocpi_disp_depth);
ocpi->ocpi_disp_width, ocpi->ocpi_disp_depth, ocpi->ocpi_flags);

obj_punch_complete(rpc, rc, max_ver);

Expand Down
3 changes: 0 additions & 3 deletions src/vos/vos_dtx.c
Original file line number Diff line number Diff line change
Expand Up @@ -1976,9 +1976,6 @@ vos_dtx_load_mbs(daos_handle_t coh, struct dtx_id *dti, daos_unit_oid_t *oid,
rc = -DER_INPROGRESS;
}

if (rc < 0)
D_ERROR("Failed to load mbs for "DF_DTI": "DF_RC"\n", DP_DTI(dti), DP_RC(rc));

return rc;
}

Expand Down

0 comments on commit 18e7ab6

Please sign in to comment.