Skip to content

Commit

Permalink
RDMA/rxe: Protect QP state with qp->state_lock
Browse files Browse the repository at this point in the history
Currently the rxe driver makes little effort to make the changes to qp
state (which includes qp->attr.qp_state, qp->attr.sq_draining and
qp->valid) atomic between different client threads and IO threads. In
particular a common template is for an RDMA application to call
ib_modify_qp() to move a qp to ERR state and then wait until all the
packet and work queues have drained before calling ib_destroy_qp(). None
of these state changes are protected by locks to assure that the changes
are executed atomically and that memory barriers are included. This has
been observed to lead to incorrect behavior around qp cleanup.

This patch continues the work of the previous patches in this series and
adds locking code around qp state changes and lookups.

Link: https://lore.kernel.org/r/20230405042611.6467-5-rpearsonhpe@gmail.com
Signed-off-by: Bob Pearson <rpearsonhpe@gmail.com>
Signed-off-by: Jason Gunthorpe <jgg@nvidia.com>
  • Loading branch information
Bob Pearson authored and jgunthorpe committed Apr 17, 2023
1 parent 7b560b8 commit f605f26
Show file tree
Hide file tree
Showing 7 changed files with 317 additions and 218 deletions.
48 changes: 30 additions & 18 deletions drivers/infiniband/sw/rxe/rxe_comp.c
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,12 @@ void retransmit_timer(struct timer_list *t)

rxe_dbg_qp(qp, "retransmit timer fired\n");

spin_lock_bh(&qp->state_lock);
if (qp->valid) {
qp->comp.timeout = 1;
rxe_sched_task(&qp->comp.task);
}
spin_unlock_bh(&qp->state_lock);
}

void rxe_comp_queue_pkt(struct rxe_qp *qp, struct sk_buff *skb)
Expand Down Expand Up @@ -479,9 +481,8 @@ static void do_complete(struct rxe_qp *qp, struct rxe_send_wqe *wqe)

static void comp_check_sq_drain_done(struct rxe_qp *qp)
{
spin_lock_bh(&qp->state_lock);
if (unlikely(qp_state(qp) == IB_QPS_SQD)) {
/* state_lock used by requester & completer */
spin_lock_bh(&qp->state_lock);
if (qp->attr.sq_draining && qp->comp.psn == qp->req.psn) {
qp->attr.sq_draining = 0;
spin_unlock_bh(&qp->state_lock);
Expand All @@ -497,8 +498,8 @@ static void comp_check_sq_drain_done(struct rxe_qp *qp)
}
return;
}
spin_unlock_bh(&qp->state_lock);
}
spin_unlock_bh(&qp->state_lock);
}

static inline enum comp_state complete_ack(struct rxe_qp *qp,
Expand Down Expand Up @@ -614,6 +615,26 @@ static void free_pkt(struct rxe_pkt_info *pkt)
ib_device_put(dev);
}

/* reset the retry timer if
* - QP is type RC
* - there is a packet sent by the requester that
* might be acked (we still might get spurious
* timeouts but try to keep them as few as possible)
* - the timeout parameter is set
* - the QP is alive
*/
static void reset_retry_timer(struct rxe_qp *qp)
{
if (qp_type(qp) == IB_QPT_RC && qp->qp_timeout_jiffies) {
spin_lock_bh(&qp->state_lock);
if (qp_state(qp) >= IB_QPS_RTS &&
psn_compare(qp->req.psn, qp->comp.psn) > 0)
mod_timer(&qp->retrans_timer,
jiffies + qp->qp_timeout_jiffies);
spin_unlock_bh(&qp->state_lock);
}
}

int rxe_completer(struct rxe_qp *qp)
{
struct rxe_dev *rxe = to_rdev(qp->ibqp.device);
Expand All @@ -623,14 +644,17 @@ int rxe_completer(struct rxe_qp *qp)
enum comp_state state;
int ret;

spin_lock_bh(&qp->state_lock);
if (!qp->valid || qp_state(qp) == IB_QPS_ERR ||
qp_state(qp) == IB_QPS_RESET) {
qp_state(qp) == IB_QPS_RESET) {
bool notify = qp->valid && (qp_state(qp) == IB_QPS_ERR);

drain_resp_pkts(qp);
flush_send_queue(qp, notify);
spin_unlock_bh(&qp->state_lock);
goto exit;
}
spin_unlock_bh(&qp->state_lock);

if (qp->comp.timeout) {
qp->comp.timeout_retry = 1;
Expand Down Expand Up @@ -718,20 +742,7 @@ int rxe_completer(struct rxe_qp *qp)
break;
}

/* re reset the timeout counter if
* (1) QP is type RC
* (2) the QP is alive
* (3) there is a packet sent by the requester that
* might be acked (we still might get spurious
* timeouts but try to keep them as few as possible)
* (4) the timeout parameter is set
*/
if ((qp_type(qp) == IB_QPT_RC) &&
(qp_state(qp) >= IB_QPS_RTS) &&
(psn_compare(qp->req.psn, qp->comp.psn) > 0) &&
qp->qp_timeout_jiffies)
mod_timer(&qp->retrans_timer,
jiffies + qp->qp_timeout_jiffies);
reset_retry_timer(qp);
goto exit;

case COMPST_ERROR_RETRY:
Expand Down Expand Up @@ -793,6 +804,7 @@ int rxe_completer(struct rxe_qp *qp)
*/
qp->req.wait_for_rnr_timer = 1;
rxe_dbg_qp(qp, "set rnr nak timer\n");
// TODO who protects from destroy_qp??
mod_timer(&qp->rnr_nak_timer,
jiffies + rnrnak_jiffies(aeth_syn(pkt)
& ~AETH_TYPE_MASK));
Expand Down
3 changes: 3 additions & 0 deletions drivers/infiniband/sw/rxe/rxe_net.c
Original file line number Diff line number Diff line change
Expand Up @@ -413,11 +413,14 @@ int rxe_xmit_packet(struct rxe_qp *qp, struct rxe_pkt_info *pkt,
int is_request = pkt->mask & RXE_REQ_MASK;
struct rxe_dev *rxe = to_rdev(qp->ibqp.device);

spin_lock_bh(&qp->state_lock);
if ((is_request && (qp_state(qp) < IB_QPS_RTS)) ||
(!is_request && (qp_state(qp) < IB_QPS_RTR))) {
spin_unlock_bh(&qp->state_lock);
rxe_dbg_qp(qp, "Packet dropped. QP is not in ready state\n");
goto drop;
}
spin_unlock_bh(&qp->state_lock);

rxe_icrc_generate(skb, pkt);

Expand Down
153 changes: 85 additions & 68 deletions drivers/infiniband/sw/rxe/rxe_qp.c
Original file line number Diff line number Diff line change
Expand Up @@ -325,8 +325,10 @@ int rxe_qp_from_init(struct rxe_dev *rxe, struct rxe_qp *qp, struct rxe_pd *pd,
if (err)
goto err2;

spin_lock_bh(&qp->state_lock);
qp->attr.qp_state = IB_QPS_RESET;
qp->valid = 1;
spin_unlock_bh(&qp->state_lock);

return 0;

Expand Down Expand Up @@ -377,27 +379,9 @@ int rxe_qp_to_init(struct rxe_qp *qp, struct ib_qp_init_attr *init)
return 0;
}

/* called by the modify qp verb, this routine checks all the parameters before
* making any changes
*/
int rxe_qp_chk_attr(struct rxe_dev *rxe, struct rxe_qp *qp,
struct ib_qp_attr *attr, int mask)
{
enum ib_qp_state cur_state = (mask & IB_QP_CUR_STATE) ?
attr->cur_qp_state : qp->attr.qp_state;
enum ib_qp_state new_state = (mask & IB_QP_STATE) ?
attr->qp_state : cur_state;

if (!ib_modify_qp_is_ok(cur_state, new_state, qp_type(qp), mask)) {
rxe_dbg_qp(qp, "invalid mask or state\n");
goto err1;
}

if (mask & IB_QP_STATE && cur_state == IB_QPS_SQD) {
if (qp->attr.sq_draining && new_state != IB_QPS_ERR)
goto err1;
}

if (mask & IB_QP_PORT) {
if (!rdma_is_port_valid(&rxe->ib_dev, attr->port_num)) {
rxe_dbg_qp(qp, "invalid port %d\n", attr->port_num);
Expand Down Expand Up @@ -508,22 +492,96 @@ static void rxe_qp_reset(struct rxe_qp *qp)
/* move the qp to the error state */
void rxe_qp_error(struct rxe_qp *qp)
{
spin_lock_bh(&qp->state_lock);
qp->attr.qp_state = IB_QPS_ERR;

/* drain work and packet queues */
rxe_sched_task(&qp->resp.task);
rxe_sched_task(&qp->comp.task);
rxe_sched_task(&qp->req.task);
spin_unlock_bh(&qp->state_lock);
}

static void rxe_qp_sqd(struct rxe_qp *qp, struct ib_qp_attr *attr,
int mask)
{
spin_lock_bh(&qp->state_lock);
qp->attr.sq_draining = 1;
rxe_sched_task(&qp->comp.task);
rxe_sched_task(&qp->req.task);
spin_unlock_bh(&qp->state_lock);
}

/* caller should hold qp->state_lock */
static int __qp_chk_state(struct rxe_qp *qp, struct ib_qp_attr *attr,
int mask)
{
enum ib_qp_state cur_state;
enum ib_qp_state new_state;

cur_state = (mask & IB_QP_CUR_STATE) ?
attr->cur_qp_state : qp->attr.qp_state;
new_state = (mask & IB_QP_STATE) ?
attr->qp_state : cur_state;

if (!ib_modify_qp_is_ok(cur_state, new_state, qp_type(qp), mask))
return -EINVAL;

if (mask & IB_QP_STATE && cur_state == IB_QPS_SQD) {
if (qp->attr.sq_draining && new_state != IB_QPS_ERR)
return -EINVAL;
}

return 0;
}

static const char *const qps2str[] = {
[IB_QPS_RESET] = "RESET",
[IB_QPS_INIT] = "INIT",
[IB_QPS_RTR] = "RTR",
[IB_QPS_RTS] = "RTS",
[IB_QPS_SQD] = "SQD",
[IB_QPS_SQE] = "SQE",
[IB_QPS_ERR] = "ERR",
};

/* called by the modify qp verb */
int rxe_qp_from_attr(struct rxe_qp *qp, struct ib_qp_attr *attr, int mask,
struct ib_udata *udata)
{
enum ib_qp_state cur_state = (mask & IB_QP_CUR_STATE) ?
attr->cur_qp_state : qp->attr.qp_state;
int err;

if (mask & IB_QP_CUR_STATE)
qp->attr.cur_qp_state = attr->qp_state;

if (mask & IB_QP_STATE) {
spin_lock_bh(&qp->state_lock);
err = __qp_chk_state(qp, attr, mask);
if (!err) {
qp->attr.qp_state = attr->qp_state;
rxe_dbg_qp(qp, "state -> %s\n",
qps2str[attr->qp_state]);
}
spin_unlock_bh(&qp->state_lock);

if (err)
return err;

switch (attr->qp_state) {
case IB_QPS_RESET:
rxe_qp_reset(qp);
break;
case IB_QPS_SQD:
rxe_qp_sqd(qp, attr, mask);
break;
case IB_QPS_ERR:
rxe_qp_error(qp);
break;
default:
break;
}
}

if (mask & IB_QP_MAX_QP_RD_ATOMIC) {
int max_rd_atomic = attr->max_rd_atomic ?
roundup_pow_of_two(attr->max_rd_atomic) : 0;
Expand All @@ -545,9 +603,6 @@ int rxe_qp_from_attr(struct rxe_qp *qp, struct ib_qp_attr *attr, int mask,
return err;
}

if (mask & IB_QP_CUR_STATE)
qp->attr.cur_qp_state = attr->qp_state;

if (mask & IB_QP_EN_SQD_ASYNC_NOTIFY)
qp->attr.en_sqd_async_notify = attr->en_sqd_async_notify;

Expand Down Expand Up @@ -627,48 +682,6 @@ int rxe_qp_from_attr(struct rxe_qp *qp, struct ib_qp_attr *attr, int mask,
if (mask & IB_QP_DEST_QPN)
qp->attr.dest_qp_num = attr->dest_qp_num;

if (mask & IB_QP_STATE) {
qp->attr.qp_state = attr->qp_state;

switch (attr->qp_state) {
case IB_QPS_RESET:
rxe_dbg_qp(qp, "state -> RESET\n");
rxe_qp_reset(qp);
break;

case IB_QPS_INIT:
rxe_dbg_qp(qp, "state -> INIT\n");
break;

case IB_QPS_RTR:
rxe_dbg_qp(qp, "state -> RTR\n");
break;

case IB_QPS_RTS:
rxe_dbg_qp(qp, "state -> RTS\n");
break;

case IB_QPS_SQD:
rxe_dbg_qp(qp, "state -> SQD\n");
if (cur_state != IB_QPS_SQD) {
qp->attr.sq_draining = 1;
rxe_sched_task(&qp->comp.task);
rxe_sched_task(&qp->req.task);
}
break;

case IB_QPS_SQE:
rxe_dbg_qp(qp, "state -> SQE !!?\n");
/* Not possible from modify_qp. */
break;

case IB_QPS_ERR:
rxe_dbg_qp(qp, "state -> ERR\n");
rxe_qp_error(qp);
break;
}
}

return 0;
}

Expand All @@ -695,10 +708,12 @@ int rxe_qp_to_attr(struct rxe_qp *qp, struct ib_qp_attr *attr, int mask)
/* Applications that get this state typically spin on it.
* Yield the processor
*/
if (qp->attr.sq_draining)
spin_lock_bh(&qp->state_lock);
if (qp->attr.sq_draining) {
spin_unlock_bh(&qp->state_lock);
cond_resched();

rxe_dbg_qp(qp, "attr->sq_draining = %d\n", attr->sq_draining);
}
spin_unlock_bh(&qp->state_lock);

return 0;
}
Expand All @@ -722,7 +737,9 @@ static void rxe_qp_do_cleanup(struct work_struct *work)
{
struct rxe_qp *qp = container_of(work, typeof(*qp), cleanup_work.work);

spin_lock_bh(&qp->state_lock);
qp->valid = 0;
spin_unlock_bh(&qp->state_lock);
qp->qp_timeout_jiffies = 0;

if (qp_type(qp) == IB_QPT_RC) {
Expand Down
10 changes: 8 additions & 2 deletions drivers/infiniband/sw/rxe/rxe_recv.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,19 @@ static int check_type_state(struct rxe_dev *rxe, struct rxe_pkt_info *pkt,
return -EINVAL;
}

spin_lock_bh(&qp->state_lock);
if (pkt->mask & RXE_REQ_MASK) {
if (unlikely(qp_state(qp) < IB_QPS_RTR))
if (unlikely(qp_state(qp) < IB_QPS_RTR)) {
spin_unlock_bh(&qp->state_lock);
return -EINVAL;
}
} else {
if (unlikely(qp_state(qp) < IB_QPS_RTS))
if (unlikely(qp_state(qp) < IB_QPS_RTS)) {
spin_unlock_bh(&qp->state_lock);
return -EINVAL;
}
}
spin_unlock_bh(&qp->state_lock);

return 0;
}
Expand Down
Loading

0 comments on commit f605f26

Please sign in to comment.