Skip to content

Commit

Permalink
prov/efa: merge multiple ope queued list
Browse files Browse the repository at this point in the history
Combine ope_queued_rnr_list, ope_queued_ctrl_list
and ope_queued_read_list in to one list: ope_queued_list.
One op entry can only be queued for 1 reason at the same
time so there is no value to maintain 3 separate lists

Signed-off-by: Shi Jin <sjina@amazon.com>
  • Loading branch information
shijin-aws committed Jun 24, 2024
1 parent 1fa395d commit 3cfc0bb
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 131 deletions.
152 changes: 60 additions & 92 deletions prov/efa/src/efa_domain.c
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,7 @@ static int efa_domain_init_rdm(struct efa_domain *efa_domain, struct fi_info *in
efa_env.cq_size);
efa_domain->num_read_msg_in_flight = 0;

dlist_init(&efa_domain->ope_queued_rnr_list);
dlist_init(&efa_domain->ope_queued_ctrl_list);
dlist_init(&efa_domain->ope_queued_read_list);
dlist_init(&efa_domain->ope_queued_list);
dlist_init(&efa_domain->ope_longcts_send_list);
dlist_init(&efa_domain->peer_backoff_list);
dlist_init(&efa_domain->handshake_queued_peer_list);
Expand Down Expand Up @@ -486,72 +484,81 @@ void efa_domain_progress_rdm_peers_and_queues(struct efa_domain *domain)
}

/*
* Resend queued RNR pkts
* Repost pkts for all queued op entries
*/
dlist_foreach_container_safe(&domain->ope_queued_rnr_list,
dlist_foreach_container_safe(&domain->ope_queued_list,
struct efa_rdm_ope,
ope, queued_rnr_entry, tmp) {
ope, queued_entry, tmp) {
peer = efa_rdm_ep_get_peer(ope->ep, ope->addr);
assert(peer);

if (peer->flags & EFA_RDM_PEER_IN_BACKOFF)
if (peer && (peer->flags & EFA_RDM_PEER_IN_BACKOFF))
continue;

assert(ope->internal_flags & EFA_RDM_OPE_QUEUED_RNR);
assert(!dlist_empty(&ope->queued_pkts));
ret = efa_rdm_ep_post_queued_pkts(ope->ep, &ope->queued_pkts);
if (ope->internal_flags & EFA_RDM_OPE_QUEUED_RNR) {
assert(!dlist_empty(&ope->queued_pkts));
ret = efa_rdm_ep_post_queued_pkts(ope->ep, &ope->queued_pkts);

if (ret == -FI_EAGAIN)
break;

if (OFI_UNLIKELY(ret)) {
assert(ope->type == EFA_RDM_RXE || ope->type == EFA_RDM_TXE);
if (ope->type == EFA_RDM_RXE)
efa_rdm_rxe_handle_error(ope, -ret, FI_EFA_ERR_PKT_SEND);
else
efa_rdm_txe_handle_error(ope, -ret, FI_EFA_ERR_PKT_SEND);
return;
}
if (ret == -FI_EAGAIN)
continue;

dlist_remove(&ope->queued_rnr_entry);
ope->internal_flags &= ~EFA_RDM_OPE_QUEUED_RNR;
}
if (OFI_UNLIKELY(ret)) {
assert(ope->type == EFA_RDM_RXE || ope->type == EFA_RDM_TXE);
if (ope->type == EFA_RDM_RXE)
efa_rdm_rxe_handle_error(ope, -ret, FI_EFA_ERR_PKT_SEND);
else
efa_rdm_txe_handle_error(ope, -ret, FI_EFA_ERR_PKT_SEND);
continue;
}

/*
* Send any queued ctrl packets.
*/
dlist_foreach_container_safe(&domain->ope_queued_ctrl_list,
struct efa_rdm_ope,
ope, queued_ctrl_entry, tmp) {
peer = efa_rdm_ep_get_peer(ope->ep, ope->addr);
assert(peer);
dlist_remove(&ope->queued_entry);
ope->internal_flags &= ~EFA_RDM_OPE_QUEUED_RNR;
}

if (peer->flags & EFA_RDM_PEER_IN_BACKOFF)
continue;
if (ope->internal_flags & EFA_RDM_OPE_QUEUED_CTRL) {
ret = efa_rdm_ope_post_send(ope, ope->queued_ctrl_type);
if (ret == -FI_EAGAIN)
continue;

assert(ope->internal_flags & EFA_RDM_OPE_QUEUED_CTRL);
ret = efa_rdm_ope_post_send(ope, ope->queued_ctrl_type);
if (ret == -FI_EAGAIN)
break;
if (OFI_UNLIKELY(ret)) {
assert(ope->type == EFA_RDM_TXE || ope->type == EFA_RDM_RXE);
if (ope->type == EFA_RDM_TXE)
efa_rdm_txe_handle_error(ope, -ret, FI_EFA_ERR_PKT_POST);
else
efa_rdm_rxe_handle_error(ope, -ret, FI_EFA_ERR_PKT_POST);
continue;
}

if (OFI_UNLIKELY(ret)) {
efa_rdm_rxe_handle_error(ope, -ret, FI_EFA_ERR_PKT_POST);
return;
/* it can happen that efa_rdm_ope_post_send() released ope
* (if the ope is rxe and packet type is EOR and inject is used). In
* that case rxe's state has been set to EFA_RDM_OPE_FREE and
* it has been removed from ep->op_queued_entry_list, so nothing
* is left to do.
*/
if (ope->state == EFA_RDM_OPE_FREE)
continue;

ope->internal_flags &= ~EFA_RDM_OPE_QUEUED_CTRL;
dlist_remove(&ope->queued_entry);
}

/* it can happen that efa_rdm_ope_post_send() released ope
* (if the ope is rxe and packet type is EOR and inject is used). In
* that case rxe's state has been set to EFA_RDM_OPE_FREE and
* it has been removed from ep->op_queued_entry_list, so nothing
* is left to do.
*/
if (ope->state == EFA_RDM_OPE_FREE)
continue;
if (ope->internal_flags & EFA_RDM_OPE_QUEUED_READ) {
ret = efa_rdm_ope_post_read(ope);
if (ret == -FI_EAGAIN)
continue;

ope->internal_flags &= ~EFA_RDM_OPE_QUEUED_CTRL;
dlist_remove(&ope->queued_ctrl_entry);
}
if (OFI_UNLIKELY(ret)) {
assert(ope->type == EFA_RDM_TXE || ope->type == EFA_RDM_RXE);
if (ope->type == EFA_RDM_TXE)
efa_rdm_txe_handle_error(ope, -ret, FI_EFA_ERR_READ_POST);
else
efa_rdm_rxe_handle_error(ope, -ret, FI_EFA_ERR_READ_POST);
return;
}

ope->internal_flags &= ~EFA_RDM_OPE_QUEUED_READ;
dlist_remove(&ope->queued_entry);
}
}
/*
* Send data packets until window or data queue is exhausted.
*/
Expand Down Expand Up @@ -597,43 +604,4 @@ void efa_domain_progress_rdm_peers_and_queues(struct efa_domain *domain)
}
}
}

/*
* Send remote read requests until finish or error encoutered
*/
dlist_foreach_container_safe(&domain->ope_queued_read_list, struct efa_rdm_ope,
ope, queued_read_entry, tmp) {
peer = efa_rdm_ep_get_peer(ope->ep, ope->addr);
/*
* Here peer can be NULL, when the read request is a
* local read request. Local read request is used to copy
* data from host memory to device memory on same process.
*/
if (peer && (peer->flags & EFA_RDM_PEER_IN_BACKOFF))
continue;

/*
* The core's TX queue is full so we can't do any
* additional work.
*/
if (ope->ep->efa_outstanding_tx_ops == ope->ep->efa_max_outstanding_tx_ops)
return;

ret = efa_rdm_ope_post_read(ope);
if (ret == -FI_EAGAIN)
break;

if (OFI_UNLIKELY(ret)) {
assert(ope->type == EFA_RDM_TXE || ope->type == EFA_RDM_RXE);
if (ope->type == EFA_RDM_TXE)
efa_rdm_txe_handle_error(ope, -ret, FI_EFA_ERR_READ_POST);
else
efa_rdm_rxe_handle_error(ope, -ret, FI_EFA_ERR_READ_POST);

return;
}

ope->internal_flags &= ~EFA_RDM_OPE_QUEUED_READ;
dlist_remove(&ope->queued_read_entry);
}
}
8 changes: 2 additions & 6 deletions prov/efa/src/efa_domain.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,8 @@ struct efa_domain {
size_t rdm_cq_size;
/* number of rdma-read messages in flight */
uint64_t num_read_msg_in_flight;
/* op entries with queued rnr packets */
struct dlist_entry ope_queued_rnr_list;
/* op entries with queued ctrl packets */
struct dlist_entry ope_queued_ctrl_list;
/* op entries with queued read requests */
struct dlist_entry ope_queued_read_list;
/* queued op entries */
struct dlist_entry ope_queued_list;
/* tx/rx_entries used by long CTS msg/write/read protocol
* which have data to be sent */
struct dlist_entry ope_longcts_send_list;
Expand Down
8 changes: 4 additions & 4 deletions prov/efa/src/rdm/efa_rdm_ep_fiops.c
Original file line number Diff line number Diff line change
Expand Up @@ -755,17 +755,17 @@ bool efa_rdm_ep_has_unfinished_send(struct efa_rdm_ep *efa_rdm_ep)
if (efa_rdm_ep->efa_outstanding_tx_ops > 0)
return true;

dlist_foreach_safe(&efa_rdm_ep_domain(efa_rdm_ep)->ope_queued_rnr_list, entry, tmp) {
dlist_foreach_safe(&efa_rdm_ep_domain(efa_rdm_ep)->ope_queued_list, entry, tmp) {
ope = container_of(entry, struct efa_rdm_ope,
queued_rnr_entry);
queued_entry);
if (ope->ep == efa_rdm_ep) {
return true;
}
}

dlist_foreach_safe(&efa_rdm_ep_domain(efa_rdm_ep)->ope_queued_ctrl_list, entry, tmp) {
dlist_foreach_safe(&efa_rdm_ep_domain(efa_rdm_ep)->ope_queued_list, entry, tmp) {
ope = container_of(entry, struct efa_rdm_ope,
queued_ctrl_entry);
queued_entry);
if (ope->ep == efa_rdm_ep) {
return true;
}
Expand Down
28 changes: 14 additions & 14 deletions prov/efa/src/rdm/efa_rdm_ope.c
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,10 @@ void efa_rdm_txe_release(struct efa_rdm_ope *txe)
}

if (txe->internal_flags & EFA_RDM_OPE_QUEUED_RNR)
dlist_remove(&txe->queued_rnr_entry);
dlist_remove(&txe->queued_entry);

if (txe->internal_flags & EFA_RDM_OPE_QUEUED_CTRL)
dlist_remove(&txe->queued_ctrl_entry);
dlist_remove(&txe->queued_entry);

#ifdef ENABLE_EFA_POISONING
efa_rdm_poison_mem_region(txe,
Expand Down Expand Up @@ -177,11 +177,11 @@ void efa_rdm_rxe_release_internal(struct efa_rdm_ope *rxe)
pkt_entry, entry, tmp) {
efa_rdm_pke_release_tx(pkt_entry);
}
dlist_remove(&rxe->queued_rnr_entry);
dlist_remove(&rxe->queued_entry);
}

if (rxe->internal_flags & EFA_RDM_OPE_QUEUED_CTRL)
dlist_remove(&rxe->queued_ctrl_entry);
dlist_remove(&rxe->queued_entry);

#ifdef ENABLE_EFA_POISONING
efa_rdm_poison_mem_region(rxe,
Expand Down Expand Up @@ -588,11 +588,11 @@ void efa_rdm_rxe_handle_error(struct efa_rdm_ope *rxe, int err, int prov_errno)
struct efa_rdm_pke,
pkt_entry, entry, tmp)
efa_rdm_pke_release_tx(pkt_entry);
dlist_remove(&rxe->queued_rnr_entry);
dlist_remove(&rxe->queued_entry);
}

if (rxe->internal_flags & EFA_RDM_OPE_QUEUED_CTRL)
dlist_remove(&rxe->queued_ctrl_entry);
dlist_remove(&rxe->queued_entry);

if (rxe->unexp_pkt) {
efa_rdm_pke_release_rx(rxe->unexp_pkt);
Expand Down Expand Up @@ -685,10 +685,10 @@ void efa_rdm_txe_handle_error(struct efa_rdm_ope *txe, int err, int prov_errno)
}

if (txe->internal_flags & EFA_RDM_OPE_QUEUED_RNR)
dlist_remove(&txe->queued_rnr_entry);
dlist_remove(&txe->queued_entry);

if (txe->internal_flags & EFA_RDM_OPE_QUEUED_CTRL)
dlist_remove(&txe->queued_ctrl_entry);
dlist_remove(&txe->queued_entry);

dlist_foreach_container_safe(&txe->queued_pkts,
struct efa_rdm_pke,
Expand Down Expand Up @@ -1571,8 +1571,8 @@ int efa_rdm_ope_post_remote_read_or_queue(struct efa_rdm_ope *ope)
err = efa_rdm_ope_post_read(ope);
switch (err) {
case -FI_EAGAIN:
dlist_insert_tail(&ope->queued_read_entry,
&efa_rdm_ep_domain(ope->ep)->ope_queued_read_list);
dlist_insert_tail(&ope->queued_entry,
&efa_rdm_ep_domain(ope->ep)->ope_queued_list);
ope->internal_flags |= EFA_RDM_OPE_QUEUED_READ;
err = 0;
break;
Expand Down Expand Up @@ -1807,8 +1807,8 @@ ssize_t efa_rdm_ope_post_send_fallback(struct efa_rdm_ope *ope,
* @brief post packet(s) according to packet type. Queue the post if -FI_EAGAIN is encountered.
*
* This function will call efa_rdm_ope_post_send() to post packet(s) according to packet type.
* If efa_rdm_ope_post_send() returned -FI_EAGAIN, this function will put the txe in efa_rdm_ep's
* queued_ctrl_list. The progress engine will try to post the packet later.
* If efa_rdm_ope_post_send() returned -FI_EAGAIN, this function will put the txe in efa_domain's
* queued_list. The progress engine will try to post the packet later.
*
* This function is mainly used by packet handler to post responsive ctrl packet (such as EOR and CTS).
*
Expand All @@ -1825,8 +1825,8 @@ ssize_t efa_rdm_ope_post_send_or_queue(struct efa_rdm_ope *ope, int pkt_type)
assert(!(ope->internal_flags & EFA_RDM_OPE_QUEUED_RNR));
ope->internal_flags |= EFA_RDM_OPE_QUEUED_CTRL;
ope->queued_ctrl_type = pkt_type;
dlist_insert_tail(&ope->queued_ctrl_entry,
&efa_rdm_ep_domain(ope->ep)->ope_queued_ctrl_list);
dlist_insert_tail(&ope->queued_entry,
&efa_rdm_ep_domain(ope->ep)->ope_queued_list);
err = 0;
}

Expand Down
16 changes: 5 additions & 11 deletions prov/efa/src/rdm/efa_rdm_ope.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,14 +126,8 @@ struct efa_rdm_ope {
/* ep_entry is linked to tx/rxe_list in efa_rdm_ep */
struct dlist_entry ep_entry;

/* queued_ctrl_entry is linked with tx/rx_queued_ctrl_list in efa_domain */
struct dlist_entry queued_ctrl_entry;

/* queued_read_entry is linked with ope_queued_read_list in efa_domain */
struct dlist_entry queued_read_entry;

/* queued_rnr_entry is linked with tx/rx_queued_rnr_list in efa_domain */
struct dlist_entry queued_rnr_entry;
/* queued_entry is linked with ope_queued_list in efa_domain */
struct dlist_entry queued_entry;

/* Queued packets due to TX queue full or RNR backoff */
struct dlist_entry queued_pkts;
Expand Down Expand Up @@ -218,7 +212,7 @@ void efa_rdm_rxe_release_internal(struct efa_rdm_ope *rxe);
/**
* @brief flag to tell if an ope encouter RNR when sending packets
*
* If an ope has this flag, it is on the ope_queued_rnr_list
* If an ope has this flag, it is on the ope_queued_list
* of the endpoint.
*/
#define EFA_RDM_OPE_QUEUED_RNR BIT_ULL(9)
Expand All @@ -242,7 +236,7 @@ void efa_rdm_rxe_release_internal(struct efa_rdm_ope *rxe);
/**
* @brief flag to indicate an ope has queued ctrl packet,
*
* If this flag is on, the op_entyr is on the ope_queued_ctrl_list
* If this flag is on, the op_entyr is on the ope_queued_list
* of the endpoint
*/
#define EFA_RDM_OPE_QUEUED_CTRL BIT_ULL(11)
Expand All @@ -264,7 +258,7 @@ void efa_rdm_rxe_release_internal(struct efa_rdm_ope *rxe);
/**
* @brief flag to indicate an ope has queued read requests
*
* When this flag is on, the ope is on ope_queued_read_list
* When this flag is on, the ope is on ope_queued_list
* of the endpoint
*/
#define EFA_RDM_OPE_QUEUED_READ BIT_ULL(12)
Expand Down
8 changes: 4 additions & 4 deletions prov/efa/src/rdm/efa_rdm_pke_cmd.c
Original file line number Diff line number Diff line change
Expand Up @@ -475,8 +475,8 @@ void efa_rdm_pke_handle_tx_error(struct efa_rdm_pke *pkt_entry, int prov_errno)
efa_rdm_ep_queue_rnr_pkt(ep, &txe->queued_pkts, pkt_entry);
if (!(txe->internal_flags & EFA_RDM_OPE_QUEUED_RNR)) {
txe->internal_flags |= EFA_RDM_OPE_QUEUED_RNR;
dlist_insert_tail(&txe->queued_rnr_entry,
&efa_rdm_ep_domain(ep)->ope_queued_rnr_list);
dlist_insert_tail(&txe->queued_entry,
&efa_rdm_ep_domain(ep)->ope_queued_list);
}
}
} else {
Expand All @@ -496,8 +496,8 @@ void efa_rdm_pke_handle_tx_error(struct efa_rdm_pke *pkt_entry, int prov_errno)
efa_rdm_ep_queue_rnr_pkt(ep, &rxe->queued_pkts, pkt_entry);
if (!(rxe->internal_flags & EFA_RDM_OPE_QUEUED_RNR)) {
rxe->internal_flags |= EFA_RDM_OPE_QUEUED_RNR;
dlist_insert_tail(&rxe->queued_rnr_entry,
&efa_rdm_ep_domain(ep)->ope_queued_rnr_list);
dlist_insert_tail(&rxe->queued_entry,
&efa_rdm_ep_domain(ep)->ope_queued_list);
}
} else {
efa_rdm_rxe_handle_error(pkt_entry->ope, err, prov_errno);
Expand Down

0 comments on commit 3cfc0bb

Please sign in to comment.