Skip to content

Commit

Permalink
prov/efa: Make NACK protocol fall back to DC longCTS when DC is reque…
Browse files Browse the repository at this point in the history
…sted

When application requests FI_DELIVERY_COMPLETE, it should fallback
to the DC version of LONG CTS RTMs, as the default LongCTS is not DC.

Signed-off-by: Jessie Yang <jiaxiyan@amazon.com>
  • Loading branch information
jiaxiyan committed Sep 25, 2024
1 parent 256d55e commit b153cd9
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 11 deletions.
4 changes: 2 additions & 2 deletions fabtests/pytest/efa/test_rdm.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ def test_rdm_pingpong(cmdline_args, iteration_type, completion_semantic, memory_

@pytest.mark.functional
@pytest.mark.serial
def test_mr_exhaustion_rdm_pingpong(cmdline_args):
def test_mr_exhaustion_rdm_pingpong(cmdline_args, completion_semantic):
efa_run_client_server_test(cmdline_args, "fi_efa_exhaust_mr_reg_rdm_pingpong", "short",
"transmit_complete", "host_to_host", "all", timeout=1000)
completion_semantic, "host_to_host", "all", timeout=1000)

@pytest.mark.functional
def test_rdm_pingpong_range(cmdline_args, completion_semantic, memory_type_bi_dir, message_size):
Expand Down
6 changes: 4 additions & 2 deletions prov/efa/src/rdm/efa_rdm_ope.c
Original file line number Diff line number Diff line change
Expand Up @@ -1773,6 +1773,8 @@ ssize_t efa_rdm_ope_post_send(struct efa_rdm_ope *ope, int pkt_type)
ssize_t efa_rdm_ope_post_send_fallback(struct efa_rdm_ope *ope,
int pkt_type, ssize_t err)
{
bool delivery_complete_requested = ope->fi_flags & FI_DELIVERY_COMPLETE;

if (err == -FI_ENOMR) {
/* Long read and runting read protocols could fail because of a
* lack of memory registrations. In that case, we retry with
Expand All @@ -1786,15 +1788,15 @@ ssize_t efa_rdm_ope_post_send_fallback(struct efa_rdm_ope *ope,
"protocol because memory registration limit "
"was reached on the sender\n");
return efa_rdm_ope_post_send_or_queue(
ope, EFA_RDM_LONGCTS_MSGRTM_PKT);
ope, delivery_complete_requested ? EFA_RDM_DC_LONGCTS_MSGRTM_PKT : EFA_RDM_LONGCTS_MSGRTM_PKT);
case EFA_RDM_LONGREAD_TAGRTM_PKT:
case EFA_RDM_RUNTREAD_TAGRTM_PKT:
EFA_INFO(FI_LOG_EP_CTRL,
"Sender fallback to long CTS tagged protocol "
"because memory registration limit was "
"reached on the sender\n");
return efa_rdm_ope_post_send_or_queue(
ope, EFA_RDM_LONGCTS_TAGRTM_PKT);
ope, delivery_complete_requested ? EFA_RDM_DC_LONGCTS_TAGRTM_PKT : EFA_RDM_LONGCTS_TAGRTM_PKT);
default:
return err;
}
Expand Down
22 changes: 18 additions & 4 deletions prov/efa/src/rdm/efa_rdm_pke_cmd.c
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,18 @@ int efa_rdm_pke_fill_data(struct efa_rdm_pke *pkt_entry,
/* The data_offset will be non-zero when the long CTS RTM packet
* is sent to continue a runting read transfer after the
* receiver has run out of memory registrations */
assert((data_offset == 0 || ope->internal_flags & EFA_RDM_OPE_READ_NACK) && data_size == -1);
assert(data_offset == 0 ||
ope->internal_flags & EFA_RDM_OPE_READ_NACK);
assert(data_size == -1);
ret = efa_rdm_pke_init_longcts_msgrtm(pkt_entry, ope);
break;
case EFA_RDM_LONGCTS_TAGRTM_PKT:
/* The data_offset will be non-zero when the long CTS RTM packet
* is sent to continue a runting read transfer after the
* receiver has run out of memory registrations */
assert((data_offset == 0 || ope->internal_flags & EFA_RDM_OPE_READ_NACK) && data_size == -1);
assert(data_offset == 0 ||
ope->internal_flags & EFA_RDM_OPE_READ_NACK);
assert(data_size == -1);
ret = efa_rdm_pke_init_longcts_tagrtm(pkt_entry, ope);
break;
case EFA_RDM_LONGREAD_MSGRTM_PKT:
Expand Down Expand Up @@ -187,11 +191,21 @@ int efa_rdm_pke_fill_data(struct efa_rdm_pke *pkt_entry,
ret = efa_rdm_pke_init_dc_medium_tagrtm(pkt_entry, ope, data_offset, data_size);
break;
case EFA_RDM_DC_LONGCTS_MSGRTM_PKT:
assert(data_offset == 0 && data_size == -1);
/* The data_offset will be non-zero when the DC long CTS RTM packet
* is sent to continue a runting read transfer after the
* receiver has run out of memory registrations */
assert(data_offset == 0 ||
ope->internal_flags & EFA_RDM_OPE_READ_NACK);
assert(data_size == -1);
ret = efa_rdm_pke_init_dc_longcts_msgrtm(pkt_entry, ope);
break;
case EFA_RDM_DC_LONGCTS_TAGRTM_PKT:
assert(data_offset == 0 && data_size == -1);
/* The data_offset will be non-zero when the DC long CTS tagged RTM packet
* is sent to continue a runting read transfer after the
* receiver has run out of memory registrations */
assert(data_offset == 0 ||
ope->internal_flags & EFA_RDM_OPE_READ_NACK);
assert(data_size == -1);
ret = efa_rdm_pke_init_dc_longcts_tagrtm(pkt_entry, ope);
break;
case EFA_RDM_DC_EAGER_RTW_PKT:
Expand Down
18 changes: 15 additions & 3 deletions prov/efa/src/rdm/efa_rdm_pke_nonreq.c
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,7 @@ void efa_rdm_pke_handle_read_nack_recv(struct efa_rdm_pke *pkt_entry)
{
struct efa_rdm_read_nack_hdr *nack_hdr;
struct efa_rdm_ope *txe;
bool delivery_complete_requested;

efa_rdm_ep_domain(pkt_entry->ep)->num_read_msg_in_flight -= 1;

Expand All @@ -700,23 +701,34 @@ void efa_rdm_pke_handle_read_nack_recv(struct efa_rdm_pke *pkt_entry)
efa_rdm_pke_release_rx(pkt_entry);
txe->internal_flags |= EFA_RDM_OPE_READ_NACK;

delivery_complete_requested = txe->fi_flags & FI_DELIVERY_COMPLETE;

if (txe->op == ofi_op_write) {
EFA_INFO(FI_LOG_EP_CTRL,
"Sender fallback to emulated long CTS write "
"protocol because p2p is not available\n");
efa_rdm_ope_post_send_or_queue(txe, EFA_RDM_LONGCTS_RTW_PKT);
efa_rdm_ope_post_send_or_queue(
txe, delivery_complete_requested ?
EFA_RDM_DC_LONGCTS_RTW_PKT :
EFA_RDM_LONGCTS_RTW_PKT);
} else if (txe->op == ofi_op_tagged) {
EFA_INFO(FI_LOG_EP_CTRL,
"Sender fallback to long CTS tagged "
"protocol because memory registration limit "
"was reached on the receiver\n");
efa_rdm_ope_post_send_or_queue(txe, EFA_RDM_LONGCTS_TAGRTM_PKT);
efa_rdm_ope_post_send_or_queue(
txe, delivery_complete_requested ?
EFA_RDM_DC_LONGCTS_TAGRTM_PKT :
EFA_RDM_LONGCTS_TAGRTM_PKT);
} else {
EFA_INFO(FI_LOG_EP_CTRL,
"Sender fallback to long CTS untagged "
"protocol because memory registration limit "
"was reached on the receiver\n");
efa_rdm_ope_post_send_or_queue(txe, EFA_RDM_LONGCTS_MSGRTM_PKT);
efa_rdm_ope_post_send_or_queue(
txe, delivery_complete_requested ?
EFA_RDM_DC_LONGCTS_MSGRTM_PKT :
EFA_RDM_LONGCTS_MSGRTM_PKT);
}
}

Expand Down

0 comments on commit b153cd9

Please sign in to comment.