Skip to content

Commit

Permalink
RDMA: Support user keepalive command (#916)
Browse files Browse the repository at this point in the history
If the client side crashes by any issue or exits normally, the kernel
will try to disconnect RDMA QPs. Then the kernel of server side receives
CM packets, valkey-server handles CM disconnected event and close
connection.

However, there is a lack of keepalive mechanism from RDMA transport
layer. Once the kernel of client side crashes, the server side will not
be notified. To avoid this issue, valkey server sents Keepaliv command
periodically to detect any dead QPs.

An example of mlx-cx5:

```
 # RDMA: CQ handle error status: transport retry counter exceeded[0xc], opcode : 0x0
 # RDMA: CQ handle error status: transport retry counter exceeded[0xc], opcode : 0x0
 # RDMA: CQ handle error status: Work Request Flushed Error[0x5], opcode : 0x0
 # RDMA: CQ handle error status: Work Request Flushed Error[0x5], opcode : 0x0
 # RDMA: CQ handle error status: Work Request Flushed Error[0x5], opcode : 0x0
 # RDMA: CQ handle error status: Work Request Flushed Error[0x5], opcode : 0x0
```

Signed-off-by: zhenwei pi <pizhenwei@bytedance.com>
  • Loading branch information
pizhenwei authored and madolson committed Sep 3, 2024
1 parent f3b93e2 commit f143b66
Showing 1 changed file with 38 additions and 9 deletions.
47 changes: 38 additions & 9 deletions src/rdma.c
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ typedef enum ValkeyRdmaOpcode {
#define VALKEY_RDMA_MAX_RX_SIZE (16 * 1024 * 1024)
#define VALKEY_RDMA_SYNCIO_RES 10
#define VALKEY_RDMA_INVALID_OPCODE 0xffff
#define VALKEY_RDMA_KEEPALIVE_MS 3000

typedef struct rdma_connection {
connection c;
Expand All @@ -94,6 +95,7 @@ typedef struct RdmaContext {
connection *conn;
char *ip;
int port;
long long keepalive_te; /* RDMA has no transport layer keepalive */
struct ibv_pd *pd;
struct rdma_event_channel *cm_channel;
struct ibv_comp_channel *comp_channel;
Expand Down Expand Up @@ -405,7 +407,7 @@ static int rdmaSendCommand(RdmaContext *ctx, struct rdma_cm_id *cm_id, ValkeyRdm
}

static int connRdmaRegisterRx(RdmaContext *ctx, struct rdma_cm_id *cm_id) {
ValkeyRdmaCmd cmd;
ValkeyRdmaCmd cmd = {0};

cmd.memory.opcode = htons(RegisterXferMemory);
cmd.memory.addr = htonu64((uint64_t)ctx->rx.addr);
Expand All @@ -419,7 +421,7 @@ static int connRdmaRegisterRx(RdmaContext *ctx, struct rdma_cm_id *cm_id) {
}

static int connRdmaGetFeature(RdmaContext *ctx, struct rdma_cm_id *cm_id, ValkeyRdmaCmd *cmd) {
ValkeyRdmaCmd _cmd;
ValkeyRdmaCmd _cmd = {0};

_cmd.feature.opcode = htons(GetServerFeature);
_cmd.feature.select = cmd->feature.select;
Expand Down Expand Up @@ -447,12 +449,13 @@ static int rdmaHandleEstablished(struct rdma_cm_event *ev) {
return C_OK;
}

static int rdmaHandleDisconnect(struct rdma_cm_event *ev) {
static int rdmaHandleDisconnect(aeEventLoop *el, struct rdma_cm_event *ev) {
struct rdma_cm_id *cm_id = ev->id;
RdmaContext *ctx = cm_id->context;
connection *conn = ctx->conn;
rdma_connection *rdma_conn = (rdma_connection *)conn;

aeDeleteTimeEvent(el, ctx->keepalive_te);
conn->state = CONN_STATE_CLOSED;

/* we can't close connection now, let's mark this connection as closed state */
Expand Down Expand Up @@ -669,7 +672,27 @@ static void connRdmaEventHandler(struct aeEventLoop *el, int fd, void *clientDat
}
}

static int rdmaHandleConnect(char *err, struct rdma_cm_event *ev, char *ip, size_t ip_len, int *port) {
static int rdmaKeepaliveTimeProc(struct aeEventLoop *el, long long id, void *clientData) {
struct rdma_cm_id *cm_id = clientData;
RdmaContext *ctx = cm_id->context;
connection *conn = ctx->conn;
ValkeyRdmaCmd cmd = {0};

UNUSED(el);
UNUSED(id);
if (conn->state != CONN_STATE_CONNECTED) {
return AE_NOMORE;
}

cmd.keepalive.opcode = htons(Keepalive);
if (rdmaSendCommand(ctx, cm_id, &cmd) != C_OK) {
return AE_NOMORE;
}

return VALKEY_RDMA_KEEPALIVE_MS;
}

static int rdmaHandleConnect(aeEventLoop *el, char *err, struct rdma_cm_event *ev, char *ip, size_t ip_len, int *port) {
int ret = C_OK;
struct rdma_cm_id *cm_id = ev->id;
struct sockaddr_storage caddr;
Expand All @@ -694,6 +717,11 @@ static int rdmaHandleConnect(char *err, struct rdma_cm_event *ev, char *ip, size
ctx = zcalloc(sizeof(RdmaContext));
ctx->ip = zstrdup(ip);
ctx->port = *port;
ctx->keepalive_te = aeCreateTimeEvent(el, VALKEY_RDMA_KEEPALIVE_MS, rdmaKeepaliveTimeProc, cm_id, NULL);
if (ctx->keepalive_te == AE_ERR) {
return C_ERR;
}

cm_id->context = ctx;
if (rdmaCreateResource(ctx, cm_id) == C_ERR) {
goto reject;
Expand Down Expand Up @@ -732,7 +760,8 @@ static rdma_listener *rdmaFdToListener(connListener *listener, int fd) {
* 1, handle RDMA_CM_EVENT_CONNECT_REQUEST and return CM fd on success
* 2, handle RDMA_CM_EVENT_ESTABLISHED and return C_OK on success
*/
static int rdmaAccept(connListener *listener, char *err, int fd, char *ip, size_t ip_len, int *port, void **priv) {
static int
rdmaAccept(aeEventLoop *el, connListener *listener, char *err, int fd, char *ip, size_t ip_len, int *port, void **priv) {
struct rdma_cm_event *ev;
enum rdma_cm_event_type ev_type;
int ret = C_OK;
Expand All @@ -755,7 +784,7 @@ static int rdmaAccept(connListener *listener, char *err, int fd, char *ip, size_
ev_type = ev->event;
switch (ev_type) {
case RDMA_CM_EVENT_CONNECT_REQUEST:
ret = rdmaHandleConnect(err, ev, ip, ip_len, port);
ret = rdmaHandleConnect(el, err, ev, ip, ip_len, port);
if (ret == C_OK) {
RdmaContext *ctx = (RdmaContext *)ev->id->context;
*priv = ev->id;
Expand All @@ -773,7 +802,7 @@ static int rdmaAccept(connListener *listener, char *err, int fd, char *ip, size_
case RDMA_CM_EVENT_ADDR_CHANGE:
case RDMA_CM_EVENT_DISCONNECTED:
case RDMA_CM_EVENT_TIMEWAIT_EXIT:
rdmaHandleDisconnect(ev);
rdmaHandleDisconnect(el, ev);
ret = C_OK;
break;

Expand Down Expand Up @@ -804,7 +833,7 @@ static void connRdmaAcceptHandler(aeEventLoop *el, int fd, void *privdata, int m
UNUSED(mask);

while (max--) {
cfd = rdmaAccept(listener, server.neterr, fd, cip, sizeof(cip), &cport, &connpriv);
cfd = rdmaAccept(el, listener, server.neterr, fd, cip, sizeof(cip), &cport, &connpriv);
if (cfd == ANET_ERR) {
if (errno != EWOULDBLOCK) serverLog(LL_WARNING, "RDMA Accepting client connection: %s", server.neterr);
return;
Expand Down Expand Up @@ -951,7 +980,7 @@ static void rdmaCMeventHandler(struct aeEventLoop *el, int fd, void *clientData,
case RDMA_CM_EVENT_TIMEWAIT_EXIT:
case RDMA_CM_EVENT_CONNECT_REQUEST:
case RDMA_CM_EVENT_ADDR_CHANGE:
case RDMA_CM_EVENT_DISCONNECTED: rdmaHandleDisconnect(ev); break;
case RDMA_CM_EVENT_DISCONNECTED: rdmaHandleDisconnect(el, ev); break;

case RDMA_CM_EVENT_MULTICAST_JOIN:
case RDMA_CM_EVENT_MULTICAST_ERROR:
Expand Down

0 comments on commit f143b66

Please sign in to comment.