Skip to content

Commit

Permalink
prov/tcp: Set FI_MULTI_RECV for last completed RX slice
Browse files Browse the repository at this point in the history
When FI_MULTI_RECV is enabled for an RX buffer, the last
completed slice of the RX buffer should have FI_MULTI_RECV
set in the corresponding CQE.

Currently TCP provider assumes the last slice of the RX
buffer will get completed last and sets FI_MULTI_RECV flag
when the last slice is used.  Completions can get out of
order with multiple connections and varied payload sizes
using the same RX buffer.  Last slice will not always
complete last.

To make sure the last completed RX buffer slice has
FI_MULTI_RECV set in the CQE, use a reference counter
to track outstanding slices.  Count is initialized to 1
when the RX buffer is first posted and incrememted on
every slice allocation.  Upon completion, count is
decremented.  When it reaches 0, FI_MULTI_RECV is set
for that CQE.

v2 - Minimize impact to non-multi-recv SRX code.
v3 - Fix completion path, set FI_MULTI_RECV if mrecv is
     NULL and recv_entry is XNET_MULTI_RECV.

Signed-off-by: Chien Tin Tung <chien.tin.tung@intel.com>
  • Loading branch information
Chien Tin Tung committed Oct 11, 2023
1 parent 041180e commit b3d0724
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 4 deletions.
11 changes: 8 additions & 3 deletions prov/tcp/src/xnet.h
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,10 @@ static inline void xnet_signal_progress(struct xnet_progress *progress)
#define XNET_NEED_CTS BIT(11)
#define XNET_MULTI_RECV FI_MULTI_RECV /* BIT(16) */

struct xnet_mrecv {
size_t ref_cnt;
};

struct xnet_xfer_entry {
struct slist_entry entry;
void *user_buf;
Expand All @@ -409,9 +413,10 @@ struct xnet_xfer_entry {
struct util_cntr *cntr;
uint64_t tag_seq_no;
uint64_t tag;
struct {
uint64_t ignore;
size_t rts_iov_cnt;
union {
uint64_t ignore;
size_t rts_iov_cnt;
struct xnet_mrecv *mrecv;
};
fi_addr_t src_addr;
uint64_t cq_flags;
Expand Down
18 changes: 18 additions & 0 deletions prov/tcp/src/xnet_cq.c
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,15 @@ void xnet_report_success(struct xnet_xfer_entry *xfer_entry)
flags = xfer_entry->cq_flags & ~FI_COMPLETION;
if (flags & FI_RECV) {
len = xnet_msg_len(&xfer_entry->hdr);
if (xfer_entry->mrecv) {
xfer_entry->mrecv->ref_cnt--;
if (!xfer_entry->mrecv->ref_cnt) {
flags |= FI_MULTI_RECV;
free(xfer_entry->mrecv);
}
} else if (xfer_entry->ctrl_flags & XNET_MULTI_RECV) {
flags |= FI_MULTI_RECV;
}
xnet_get_cq_info(xfer_entry, &flags, &data, &tag);
} else if (flags & FI_REMOTE_CQ_DATA) {
assert(flags & FI_REMOTE_WRITE);
Expand Down Expand Up @@ -194,6 +203,15 @@ void xnet_report_error(struct xnet_xfer_entry *xfer_entry, int err)

err_entry.flags = xfer_entry->cq_flags & ~FI_COMPLETION;
if (err_entry.flags & FI_RECV) {
if (xfer_entry->mrecv) {
xfer_entry->mrecv->ref_cnt--;
if (!xfer_entry->mrecv->ref_cnt) {
err_entry.flags |= FI_MULTI_RECV;
free(xfer_entry->mrecv);
}
} else if (xfer_entry->ctrl_flags & XNET_MULTI_RECV) {
err_entry.flags |= FI_MULTI_RECV;
}
xnet_get_cq_info(xfer_entry, &err_entry.flags, &err_entry.data,
&err_entry.tag);
} else if (err_entry.flags & FI_REMOTE_CQ_DATA) {
Expand Down
14 changes: 13 additions & 1 deletion prov/tcp/src/xnet_progress.c
Original file line number Diff line number Diff line change
Expand Up @@ -648,11 +648,22 @@ static int xnet_alter_mrecv(struct xnet_ep *ep, struct xnet_xfer_entry *xfer,
if (!recv_entry)
goto complete;

if (!xfer->mrecv) {
xfer->mrecv = calloc(1, sizeof(struct xnet_mrecv));
if (!xfer->mrecv) {
xfer->cq_flags |= FI_MULTI_RECV;
return FI_SUCCESS;
}
xfer->mrecv->ref_cnt = 1;
}

recv_entry->ctrl_flags = XNET_MULTI_RECV;
recv_entry->cq_flags = FI_MSG | FI_RECV;
recv_entry->cntr = xfer->cntr;
recv_entry->cq = xfer->cq;
recv_entry->context = xfer->context;
recv_entry->mrecv = xfer->mrecv;
recv_entry->mrecv->ref_cnt++;

recv_entry->iov_cnt = 1;
recv_entry->user_buf = (char *) xfer->iov[0].iov_base + msg_len;
Expand All @@ -663,7 +674,8 @@ static int xnet_alter_mrecv(struct xnet_ep *ep, struct xnet_xfer_entry *xfer,
return 0;

complete:
xfer->cq_flags |= FI_MULTI_RECV;
if (!xfer->mrecv)
xfer->cq_flags |= FI_MULTI_RECV;
return ret;
}

Expand Down
2 changes: 2 additions & 0 deletions prov/tcp/src/xnet_srx.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ xnet_alloc_srx_xfer(struct xnet_srx *srx)
if (xfer) {
xfer->cntr = srx->cntr;
xfer->cq = srx->cq;
xfer->mrecv = NULL;
}

return xfer;
Expand Down Expand Up @@ -131,6 +132,7 @@ xnet_srx_recv(struct fid_ep *ep_fid, void *buf, size_t len, void *desc,

srx = container_of(ep_fid, struct xnet_srx, rx_fid);


ofi_genlock_lock(xnet_srx2_progress(srx)->active_lock);
recv_entry = xnet_alloc_srx_xfer(srx);
if (!recv_entry) {
Expand Down

0 comments on commit b3d0724

Please sign in to comment.