Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

prov/util, shm: fix util srx to adhere to peer api (setting msg_size and flags correctly) #10511

Merged
merged 2 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions prov/shm/src/smr.h
Original file line number Diff line number Diff line change
Expand Up @@ -290,10 +290,8 @@ int smr_complete_rx(struct smr_ep *ep, void *context, uint32_t op,
uint64_t flags, size_t len, void *buf, int64_t id,
uint64_t tag, uint64_t data);

static inline uint64_t smr_rx_cq_flags(uint32_t op, uint64_t rx_flags,
uint16_t op_flags)
static inline uint64_t smr_rx_cq_flags(uint64_t rx_flags, uint16_t op_flags)
{
rx_flags |= ofi_rx_cq_flags(op);
if (op_flags & SMR_REMOTE_CQ_DATA)
rx_flags |= FI_REMOTE_CQ_DATA;
return rx_flags;
Expand Down
41 changes: 19 additions & 22 deletions prov/shm/src/smr_progress.c
Original file line number Diff line number Diff line change
Expand Up @@ -762,8 +762,8 @@ static int smr_start_common(struct smr_ep *ep, struct smr_cmd *cmd,

if (!pend) {
comp_buf = rx_entry->iov[0].iov_base;
comp_flags = smr_rx_cq_flags(cmd->msg.hdr.op, rx_entry->flags,
cmd->msg.hdr.op_flags);
comp_flags = smr_rx_cq_flags(rx_entry->flags,
cmd->msg.hdr.op_flags);
if (err) {
FI_WARN(&smr_prov, FI_LOG_EP_CTRL,
"error processing op\n");
Expand Down Expand Up @@ -822,8 +822,8 @@ static int smr_copy_saved(struct smr_cmd_ctx *cmd_ctx,
}
assert(!cmd_ctx->sar_entry);

comp_flags = smr_rx_cq_flags(cmd_ctx->cmd.msg.hdr.op,
rx_entry->flags, cmd_ctx->cmd.msg.hdr.op_flags);
comp_flags = smr_rx_cq_flags(rx_entry->flags,
cmd_ctx->cmd.msg.hdr.op_flags);

ret = smr_complete_rx(cmd_ctx->ep, rx_entry->context,
cmd_ctx->cmd.msg.hdr.op, comp_flags,
Expand Down Expand Up @@ -1106,14 +1106,14 @@ static int smr_progress_cmd_rma(struct smr_ep *ep, struct smr_cmd *cmd,
FI_WARN(&smr_prov, FI_LOG_EP_CTRL,
"error processing rma op\n");
ret = smr_write_err_comp(ep->util_ep.rx_cq, NULL,
smr_rx_cq_flags(cmd->msg.hdr.op, 0,
cmd->msg.hdr.op_flags), 0, -err);
smr_rx_cq_flags(0, cmd->msg.hdr.op_flags),
0, -err);
} else {
ret = smr_complete_rx(ep, (void *) cmd->msg.hdr.msg_id,
cmd->msg.hdr.op, smr_rx_cq_flags(cmd->msg.hdr.op,
0, cmd->msg.hdr.op_flags), total_len,
iov_count ? iov[0].iov_base : NULL,
cmd->msg.hdr.id, 0, cmd->msg.hdr.data);
cmd->msg.hdr.op, smr_rx_cq_flags(0,
cmd->msg.hdr.op_flags), total_len,
iov_count ? iov[0].iov_base : NULL,
cmd->msg.hdr.id, 0, cmd->msg.hdr.data);
}
if (ret) {
FI_WARN(&smr_prov, FI_LOG_EP_CTRL,
Expand Down Expand Up @@ -1191,13 +1191,12 @@ static int smr_progress_cmd_atomic(struct smr_ep *ep, struct smr_cmd *cmd,
FI_WARN(&smr_prov, FI_LOG_EP_CTRL,
"error processing atomic op\n");
ret = smr_write_err_comp(ep->util_ep.rx_cq, NULL,
smr_rx_cq_flags(cmd->msg.hdr.op, 0,
cmd->msg.hdr.op_flags), 0, err);
smr_rx_cq_flags(0, cmd->msg.hdr.op_flags),
0, err);
} else {
ret = smr_complete_rx(ep, NULL, cmd->msg.hdr.op,
smr_rx_cq_flags(cmd->msg.hdr.op, 0,
cmd->msg.hdr.op_flags), total_len,
ioc_count ? ioc[0].addr : NULL,
smr_rx_cq_flags(0, cmd->msg.hdr.op_flags),
total_len, ioc_count ? ioc[0].addr : NULL,
cmd->msg.hdr.id, 0, cmd->msg.hdr.data);
}
if (ret) {
Expand Down Expand Up @@ -1304,13 +1303,11 @@ void smr_progress_ipc_list(struct smr_ep *ep)

if (ipc_entry->rx_entry) {
context = ipc_entry->rx_entry->context;
flags = smr_rx_cq_flags(ipc_entry->cmd.msg.hdr.op,
ipc_entry->rx_entry->flags,
flags = smr_rx_cq_flags(ipc_entry->rx_entry->flags,
ipc_entry->cmd.msg.hdr.op_flags);
} else {
context = NULL;
flags = smr_rx_cq_flags(ipc_entry->cmd.msg.hdr.op,
0, ipc_entry->cmd.msg.hdr.op_flags);
flags = smr_rx_cq_flags(0, ipc_entry->cmd.msg.hdr.op_flags);
}

ret = smr_complete_rx(ep, context, ipc_entry->cmd.msg.hdr.op,
Expand Down Expand Up @@ -1422,13 +1419,13 @@ static void smr_progress_sar_list(struct smr_ep *ep)

if (sar_entry->rx_entry) {
comp_ctx = sar_entry->rx_entry->context;
comp_flags = smr_rx_cq_flags(sar_entry->cmd.msg.hdr.op,
comp_flags = smr_rx_cq_flags(
sar_entry->rx_entry->flags,
sar_entry->cmd.msg.hdr.op_flags);
} else {
comp_ctx = NULL;
comp_flags = smr_rx_cq_flags(sar_entry->cmd.msg.hdr.op,
0, sar_entry->cmd.msg.hdr.op_flags);
comp_flags = smr_rx_cq_flags(0,
sar_entry->cmd.msg.hdr.op_flags);
}
ret = smr_complete_rx(ep, comp_ctx,
sar_entry->cmd.msg.hdr.op, comp_flags,
Expand Down
53 changes: 40 additions & 13 deletions prov/util/src/util_srx.c
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ static void util_init_rx_entry(struct util_rx_entry *entry,
entry->peer_entry.context = context;
entry->peer_entry.tag = tag;
entry->peer_entry.flags = flags;
entry->peer_entry.msg_size = ofi_total_iov_len(iov, count);
}

static struct util_rx_entry *util_get_recv_entry(struct util_srx_ctx *srx,
Expand Down Expand Up @@ -191,6 +192,8 @@ static int util_match_msg(struct fid_peer_srx *srx,
util_entry->peer_entry.srx = srx;
srx_ctx->update_func(srx_ctx, util_entry);
}
util_entry->peer_entry.msg_size = MIN(util_entry->peer_entry.msg_size,
attr->msg_size);
*rx_entry = &util_entry->peer_entry;
return ret;
}
Expand Down Expand Up @@ -268,6 +271,8 @@ static int util_match_tag(struct fid_peer_srx *srx,
ret = -FI_ENOENT;
util_entry->peer_entry.srx = srx;
out:
util_entry->peer_entry.msg_size = MIN(util_entry->peer_entry.msg_size,
attr->msg_size);
*rx_entry = &util_entry->peer_entry;
return ret;
}
Expand Down Expand Up @@ -496,6 +501,33 @@ static struct util_rx_entry *util_search_unexp_msg(struct util_srx_ctx *srx,
return util_search_peer_msg(ofi_array_at(&srx->src_unexp_peers, addr));
}

static bool util_unexp_mrecv(struct util_srx_ctx *srx,
struct util_rx_entry *mrecv_entry,
struct util_rx_entry *rx_entry)
{
mrecv_entry->multi_recv_ref++;
rx_entry->peer_entry.owner_context = mrecv_entry;

rx_entry->peer_entry.iov[0].iov_base =
mrecv_entry->peer_entry.iov->iov_base;
rx_entry->peer_entry.iov->iov_len =
Copy link
Contributor

@shijin-aws shijin-aws Nov 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit strange that we use iov[0] for iov_base while iov-> for iov_len. Is it enforced that multi_recv post has to be 1 iov_count?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, current util code only supports iov_count == 1 for multi_recv. See the assertion in util_generic_mrecv

	assert(flags & FI_MULTI_RECV && iov_count == 1);

MIN(mrecv_entry->peer_entry.iov->iov_len,
rx_entry->peer_entry.msg_size);
*rx_entry->peer_entry.desc = mrecv_entry->peer_entry.desc[0];

rx_entry->peer_entry.count = 1;
rx_entry->peer_entry.addr = mrecv_entry->peer_entry.addr;
rx_entry->peer_entry.context = mrecv_entry->peer_entry.context;
rx_entry->peer_entry.tag = mrecv_entry->peer_entry.tag;
rx_entry->peer_entry.flags |= mrecv_entry->peer_entry.flags &
~FI_MULTI_RECV;
rx_entry->peer_entry.msg_size = rx_entry->peer_entry.iov->iov_len;

return util_adjust_multi_recv(srx, &mrecv_entry->peer_entry,
rx_entry->peer_entry.msg_size);

}

static ssize_t util_generic_mrecv(struct util_srx_ctx *srx,
const struct iovec *iov, void **desc, size_t iov_count,
fi_addr_t addr, void *context, uint64_t flags)
Expand All @@ -510,7 +542,8 @@ static ssize_t util_generic_mrecv(struct util_srx_ctx *srx,

ofi_genlock_lock(srx->lock);
mrecv_entry = util_get_recv_entry(srx, iov, desc, iov_count, addr,
context, 0, 0, flags);
context, 0, 0,
flags | FI_MSG | FI_RECV);
if (!mrecv_entry) {
ret = -FI_ENOMEM;
goto out;
Expand All @@ -520,15 +553,7 @@ static ssize_t util_generic_mrecv(struct util_srx_ctx *srx,

rx_entry = util_search_unexp_msg(srx, addr);
while (rx_entry) {
util_init_rx_entry(rx_entry, mrecv_entry->peer_entry.iov, desc,
iov_count, addr, context, 0,
flags & (~FI_MULTI_RECV));
mrecv_entry->multi_recv_ref++;
rx_entry->peer_entry.owner_context = mrecv_entry;

if (util_adjust_multi_recv(srx, &mrecv_entry->peer_entry,
rx_entry->peer_entry.msg_size))
buf_done = true;
buf_done = util_unexp_mrecv(srx, mrecv_entry, rx_entry);

srx->update_func(srx, rx_entry);
ret = rx_entry->peer_entry.srx->peer_ops->start_msg(
Expand Down Expand Up @@ -695,7 +720,8 @@ ssize_t util_srx_generic_trecv(struct fid_ep *ep_fid, const struct iovec *iov,
assert(queue);
rx_entry = util_get_recv_entry(srx, iov, desc,
iov_count, addr, context, tag,
ignore, flags);
ignore,
flags | FI_TAGGED | FI_RECV);
if (!rx_entry)
ret = -FI_ENOMEM;
else
Expand Down Expand Up @@ -741,10 +767,11 @@ ssize_t util_srx_generic_recv(struct fid_ep *ep_fid, const struct iovec *iov,
ofi_array_at(&srx->src_recv_queues, addr);
assert(queue);
rx_entry = util_get_recv_entry(srx, iov, desc, iov_count, addr,
context, 0, 0, flags);
context, 0, 0,
flags | FI_MSG | FI_RECV);
if (!rx_entry)
ret = -FI_ENOMEM;
else
else
slist_insert_tail((struct slist_entry *)
(&rx_entry->peer_entry), queue);
goto out;
Expand Down