Skip to content

Commit

Permalink
feat: broadcast and expect multiple acks
Browse files Browse the repository at this point in the history
This feature was added in `socket.io@4.5.0`:

```js
io.timeout(1000).emit("some-event", (err, responses) => {
  // ...
});
```

Thanks to this change, it will now work with multiple Socket.IO
servers.

Related:

- socketio/socket.io@8b20457
- #445
- #452
  • Loading branch information
darrachequesne committed May 3, 2022
1 parent 65174f3 commit e4c40cc
Show file tree
Hide file tree
Showing 4 changed files with 2,807 additions and 55 deletions.
137 changes: 133 additions & 4 deletions lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ enum RequestType {
REMOTE_DISCONNECT = 4,
REMOTE_FETCH = 5,
SERVER_SIDE_EMIT = 6,
BROADCAST,
BROADCAST_CLIENT_COUNT,
BROADCAST_ACK,
}

interface Request {
Expand All @@ -29,6 +32,11 @@ interface Request {
[other: string]: any;
}

interface AckRequest {
clientCountCallback: (clientCount: number) => void;
ack: (...args: any[]) => void;
}

const isNumeric = (str) => !isNaN(str) && !isNaN(parseFloat(str));

export interface RedisAdapterOptions {
Expand Down Expand Up @@ -84,6 +92,7 @@ export class RedisAdapter extends Adapter {
private readonly requestChannel: string;
private readonly responseChannel: string;
private requests: Map<string, Request> = new Map();
private ackRequests: Map<string, AckRequest> = new Map();

/**
* Adapter constructor.
Expand Down Expand Up @@ -127,7 +136,8 @@ export class RedisAdapter extends Adapter {
[this.requestChannel, this.responseChannel, specificResponseChannel],
(msg, channel) => {
this.onrequest(channel, msg);
}
},
true
);
} else {
this.subClient.psubscribe(this.channel + "*");
Expand Down Expand Up @@ -212,7 +222,12 @@ export class RedisAdapter extends Adapter {
let request;

try {
request = JSON.parse(msg);
// if the buffer starts with a "{" character
if (msg[0] === 0x7b) {
request = JSON.parse(msg.toString());
} else {
request = msgpack.decode(msg);
}
} catch (err) {
debug("ignoring malformed request");
return;
Expand Down Expand Up @@ -379,6 +394,47 @@ export class RedisAdapter extends Adapter {
this.nsp._onServerSideEmit(request.data);
break;

case RequestType.BROADCAST: {
if (this.ackRequests.has(request.requestId)) {
// ignore self
return;
}

const opts = {
rooms: new Set<Room>(request.opts.rooms),
except: new Set<Room>(request.opts.except),
};

super.broadcastWithAck(
request.packet,
opts,
(clientCount) => {
debug("waiting for %d client acknowledgements", clientCount);
this.publishResponse(
request,
JSON.stringify({
type: RequestType.BROADCAST_CLIENT_COUNT,
requestId: request.requestId,
clientCount,
})
);
},
(arg) => {
debug("received acknowledgement with value %j", arg);

this.publishResponse(
request,
msgpack.encode({
type: RequestType.BROADCAST_ACK,
requestId: request.requestId,
packet: arg,
})
);
}
);
break;
}

default:
debug("ignoring unknown request type: %s", request.type);
}
Expand Down Expand Up @@ -407,15 +463,40 @@ export class RedisAdapter extends Adapter {
let response;

try {
response = JSON.parse(msg);
// if the buffer starts with a "{" character
if (msg[0] === 0x7b) {
response = JSON.parse(msg.toString());
} else {
response = msgpack.decode(msg);
}
} catch (err) {
debug("ignoring malformed response");
return;
}

const requestId = response.requestId;

if (!requestId || !this.requests.has(requestId)) {
if (this.ackRequests.has(requestId)) {
const ackRequest = this.ackRequests.get(requestId);

switch (response.type) {
case RequestType.BROADCAST_CLIENT_COUNT: {
ackRequest?.clientCountCallback(response.clientCount);
break;
}

case RequestType.BROADCAST_ACK: {
ackRequest?.ack(response.packet);
break;
}
}
return;
}

if (
!requestId ||
!(this.requests.has(requestId) || this.ackRequests.has(requestId))
) {
debug("ignoring unknown request");
return;
}
Expand Down Expand Up @@ -526,6 +607,50 @@ export class RedisAdapter extends Adapter {
super.broadcast(packet, opts);
}

public broadcastWithAck(
packet: any,
opts: BroadcastOptions,
clientCountCallback: (clientCount: number) => void,
ack: (...args: any[]) => void
) {
packet.nsp = this.nsp.name;

const onlyLocal = opts?.flags?.local;

if (!onlyLocal) {
const requestId = uid2(6);

const rawOpts = {
rooms: [...opts.rooms],
except: [...new Set(opts.except)],
flags: opts.flags,
};

const request = msgpack.encode({
uid: this.uid,
requestId,
type: RequestType.BROADCAST,
packet,
opts: rawOpts,
});

this.pubClient.publish(this.requestChannel, request);

this.ackRequests.set(requestId, {
clientCountCallback,
ack,
});

// we have no way to know at this level whether the server has received an acknowledgement from each client, so we
// will simply clean up the ackRequests map after the given delay
setTimeout(() => {
this.ackRequests.delete(requestId);
}, opts.flags!.timeout);
}

super.broadcastWithAck(packet, opts, clientCountCallback, ack);
}

/**
* Gets a list of sockets by sid.
*
Expand Down Expand Up @@ -955,4 +1080,8 @@ export class RedisAdapter extends Adapter {
});
}
}

serverCount(): Promise<number> {
return this.getNumSub();
}
}
Loading

0 comments on commit e4c40cc

Please sign in to comment.