Skip to content

Commit

Permalink
DAOS-14261 engine: Add dss_chore for I/O forwarding (#13372) (#14158)
Browse files Browse the repository at this point in the history
As requested by the Jira ticket, add a new I/O forwarding mechanism,
dss_chore, to avoid creating a ULT for every forwarding task.

  - Forwarding of object I/O and DTX RPCs is converted to chores.

  - Cancelation is not implemented, because the I/O forwarding tasks
    themselves do not support cancelation yet.

  - In certain engine configurations, some xstreams do not need to
    initialize dx_chore_queue. This is left to future work.

Signed-off-by: Li Wei <wei.g.li@intel.com>
  • Loading branch information
jolivier23 authored Apr 19, 2024
1 parent 7da45c7 commit dfd7211
Show file tree
Hide file tree
Showing 7 changed files with 453 additions and 87 deletions.
88 changes: 53 additions & 35 deletions src/dtx/dtx_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -1914,10 +1914,16 @@ dtx_handle_resend(daos_handle_t coh, struct dtx_id *dti,
*/
#define DTX_EXEC_STEP_LENGTH DTX_THRESHOLD_COUNT

struct dtx_ult_arg {
struct dtx_chore {
struct dss_chore chore;
dtx_sub_func_t func;
void *func_arg;
struct dtx_leader_handle *dlh;

/* Chore-internal state variables */
uint32_t i;
uint32_t j;
uint32_t k;
};

static void
Expand Down Expand Up @@ -1970,28 +1976,42 @@ dtx_sub_comp_cb(struct dtx_leader_handle *dlh, int idx, int rc)
idx, tgt->st_rank, tgt->st_tgt_idx, tgt->st_flags, rc);
}

static void
dtx_leader_exec_ops_ult(void *arg)
static enum dss_chore_status
dtx_leader_exec_ops_chore(struct dss_chore *chore, bool is_reentrance)
{
struct dtx_ult_arg *ult_arg = arg;
struct dtx_leader_handle *dlh = ult_arg->dlh;
struct dtx_chore *dtx_chore = container_of(chore, struct dtx_chore, chore);
struct dtx_leader_handle *dlh = dtx_chore->dlh;
struct dtx_sub_status *sub;
struct daos_shard_tgt *tgt;
uint32_t i;
uint32_t j;
uint32_t k;
int rc = 0;

for (i = dlh->dlh_forward_idx, j = 0, k = 0; j < dlh->dlh_forward_cnt; i++, j++) {
sub = &dlh->dlh_subs[i];
/*
* If this is the first entrance, initialize the chore-internal state
* variables.
*/
if (is_reentrance) {
D_DEBUG(DB_TRACE, "%p: resume: i=%u j=%u k=%u forward_cnt=%u\n", chore,
dtx_chore->i, dtx_chore->j, dtx_chore->k, dlh->dlh_forward_cnt);
dtx_chore->i++;
dtx_chore->j++;
} else {
D_DEBUG(DB_TRACE, "%p: initialize: forward_idx=%u forward_cnt=%u\n", chore,
dlh->dlh_forward_idx, dlh->dlh_forward_cnt);
dtx_chore->i = dlh->dlh_forward_idx;
dtx_chore->j = 0;
dtx_chore->k = 0;
}

for (; dtx_chore->j < dlh->dlh_forward_cnt; dtx_chore->i++, dtx_chore->j++) {
sub = &dlh->dlh_subs[dtx_chore->i];
tgt = &sub->dss_tgt;

if (dlh->dlh_normal_sub_done == 0) {
sub->dss_result = 0;
sub->dss_comp = 0;

if (unlikely(tgt->st_flags & DTF_DELAY_FORWARD)) {
dtx_sub_comp_cb(dlh, i, 0);
dtx_sub_comp_cb(dlh, dtx_chore->i, 0);
continue;
}
} else {
Expand All @@ -2003,33 +2023,35 @@ dtx_leader_exec_ops_ult(void *arg)
}

if (tgt->st_rank == DAOS_TGT_IGNORE ||
(i == daos_fail_value_get() && DAOS_FAIL_CHECK(DAOS_DTX_SKIP_PREPARE))) {
(dtx_chore->i == daos_fail_value_get() &&
DAOS_FAIL_CHECK(DAOS_DTX_SKIP_PREPARE))) {
if (dlh->dlh_normal_sub_done == 0 || tgt->st_flags & DTF_DELAY_FORWARD)
dtx_sub_comp_cb(dlh, i, 0);
dtx_sub_comp_cb(dlh, dtx_chore->i, 0);
continue;
}

rc = ult_arg->func(dlh, ult_arg->func_arg, i, dtx_sub_comp_cb);
rc = dtx_chore->func(dlh, dtx_chore->func_arg, dtx_chore->i, dtx_sub_comp_cb);
if (rc != 0) {
if (sub->dss_comp == 0)
dtx_sub_comp_cb(dlh, i, rc);
dtx_sub_comp_cb(dlh, dtx_chore->i, rc);
break;
}

/* Yield to avoid holding CPU for too long time. */
if ((++k) % DTX_RPC_YIELD_THD == 0)
ABT_thread_yield();
if (++(dtx_chore->k) % DTX_RPC_YIELD_THD == 0)
return DSS_CHORE_YIELD;
}

if (rc != 0) {
for (i++, j++; j < dlh->dlh_forward_cnt; i++, j++) {
sub = &dlh->dlh_subs[i];
for (dtx_chore->i++, dtx_chore->j++; dtx_chore->j < dlh->dlh_forward_cnt;
dtx_chore->i++, dtx_chore->j++) {
sub = &dlh->dlh_subs[dtx_chore->i];
tgt = &sub->dss_tgt;

if (dlh->dlh_normal_sub_done == 0 || tgt->st_flags & DTF_DELAY_FORWARD) {
sub->dss_result = 0;
sub->dss_comp = 0;
dtx_sub_comp_cb(dlh, i, 0);
dtx_sub_comp_cb(dlh, dtx_chore->i, 0);
}
}
}
Expand All @@ -2039,6 +2061,8 @@ dtx_leader_exec_ops_ult(void *arg)
D_ASSERTF(rc == ABT_SUCCESS, "ABT_future_set failed [%u, %u), for delay %s: %d\n",
dlh->dlh_forward_idx, dlh->dlh_forward_idx + dlh->dlh_forward_cnt,
dlh->dlh_normal_sub_done == 1 ? "yes" : "no", rc);

return DSS_CHORE_DONE;
}

/**
Expand All @@ -2048,15 +2072,15 @@ int
dtx_leader_exec_ops(struct dtx_leader_handle *dlh, dtx_sub_func_t func,
dtx_agg_cb_t agg_cb, int allow_failure, void *func_arg)
{
struct dtx_ult_arg ult_arg;
struct dtx_chore dtx_chore;
int sub_cnt = dlh->dlh_normal_sub_cnt + dlh->dlh_delay_sub_cnt;
int rc = 0;
int local_rc = 0;
int remote_rc = 0;

ult_arg.func = func;
ult_arg.func_arg = func_arg;
ult_arg.dlh = dlh;
dtx_chore.func = func;
dtx_chore.func_arg = func_arg;
dtx_chore.dlh = dlh;

dlh->dlh_result = 0;
dlh->dlh_allow_failure = allow_failure;
Expand Down Expand Up @@ -2092,15 +2116,10 @@ dtx_leader_exec_ops(struct dtx_leader_handle *dlh, dtx_sub_func_t func,
D_GOTO(out, rc = dss_abterr2der(rc));
}

/*
* NOTE: Ideally, we probably should create ULT for each shard, but for performance
* reasons, let's only create one for all remote targets for now.
*/
rc = dss_ult_create(dtx_leader_exec_ops_ult, &ult_arg, DSS_XS_IOFW,
dss_get_module_info()->dmi_tgt_id, DSS_DEEP_STACK_SZ, NULL);
rc = dss_chore_delegate(&dtx_chore.chore, dtx_leader_exec_ops_chore);
if (rc != 0) {
D_ERROR("ult create failed [%u, %u] (2): "DF_RC"\n",
dlh->dlh_forward_idx, dlh->dlh_forward_cnt, DP_RC(rc));
DL_ERROR(rc, "chore create failed [%u, %u] (2)", dlh->dlh_forward_idx,
dlh->dlh_forward_cnt);
ABT_future_free(&dlh->dlh_future);
goto out;
}
Expand Down Expand Up @@ -2168,10 +2187,9 @@ dtx_leader_exec_ops(struct dtx_leader_handle *dlh, dtx_sub_func_t func,
/* The ones without DELAY flag will be skipped when scan the targets array. */
dlh->dlh_forward_cnt = dlh->dlh_normal_sub_cnt + dlh->dlh_delay_sub_cnt;

rc = dss_ult_create(dtx_leader_exec_ops_ult, &ult_arg, DSS_XS_IOFW,
dss_get_module_info()->dmi_tgt_id, DSS_DEEP_STACK_SZ, NULL);
rc = dss_chore_delegate(&dtx_chore.chore, dtx_leader_exec_ops_chore);
if (rc != 0) {
D_ERROR("ult create failed (4): "DF_RC"\n", DP_RC(rc));
DL_ERROR(rc, "chore create failed (4)");
ABT_future_free(&dlh->dlh_future);
goto out;
}
Expand Down
Loading

0 comments on commit dfd7211

Please sign in to comment.