Skip to content

Commit

Permalink
Limits the parallel open exchanges to 5
Browse files Browse the repository at this point in the history
  • Loading branch information
Apollon77 committed Sep 14, 2024
1 parent 0104bc3 commit 38b12f2
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 3 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ The main work (all changes without a GitHub username in brackets in the below li
- BREAKING: The "ByteArray" type is removed, replaced with native-JS Uint8Array and a small collection of utility functions in the "Bytes" namespace
- The Matter object model previously exported as @project-chip/matter.js/model now resides in @project-chip/matter.js-model

- matter.js-protocol:
- Limits the number of parallel exchanges to 5

- matter.js-nodejs:
- Node specialization is moved to matter.js-nodejs. matter-node.js remains as a compatibility import.
- BREAKING: The previously deprecated re-exports in matter-node.js from matter.js are removed.
Expand Down
23 changes: 20 additions & 3 deletions packages/matter.js/src/protocol/ExchangeManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import { SecureChannelMessenger } from "./securechannel/SecureChannelMessenger.j

const logger = Logger.get("ExchangeManager");

const MAXIMUM_CONCURRENT_EXCHANGES_PER_SESSION = 5;

export class ChannelNotConnectedError extends MatterError {}

export class MessageChannel<ContextT extends SessionContext> implements Channel<Message> {
Expand Down Expand Up @@ -284,9 +286,6 @@ export class ExchangeManager<ContextT extends SessionContext> {
);
}
}

// TODO A node SHOULD limit itself to a maximum of 5 concurrent exchanges over a unicast session. This is
// to prevent a node from exhausting the message counter window of the peer node.
}
}

Expand Down Expand Up @@ -358,6 +357,24 @@ export class ExchangeManager<ContextT extends SessionContext> {
#addExchange(exchangeIndex: number, exchange: MessageExchange<ContextT>) {
exchange.closed.on(() => this.deleteExchange(exchangeIndex));
this.exchanges.set(exchangeIndex, exchange);

// A node SHOULD limit itself to a maximum of 5 concurrent exchanges over a unicast session. This is
// to prevent a node from exhausting the message counter window of the peer node.
// TODO Make sure Group sessions are handled differently
this.#cleanupSessionExchanges(exchange.session.id);
}

#cleanupSessionExchanges(sessionId: number) {
const sessionExchanges = Array.from(this.exchanges.values()).filter(
exchange => exchange.session.id === sessionId && !exchange.isClosing,
);
if (sessionExchanges.length <= MAXIMUM_CONCURRENT_EXCHANGES_PER_SESSION) {
return;
}
// let's use the first entry in the Map as the oldest exchange and close it
const exchangeToClose = sessionExchanges[0];
logger.debug(`Closing oldest exchange ${exchangeToClose.id} for session ${sessionId}`);
exchangeToClose.close().catch(error => logger.error("Error closing exchange", error)); // TODO Promise??
}
}

Expand Down
11 changes: 11 additions & 0 deletions packages/matter.js/src/protocol/MessageExchange.ts
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ export class MessageExchange<ContextT extends SessionContext> {
#retransmissionTimer: Timer | undefined;
#retransmissionCounter = 0;
#closeTimer: Timer | undefined;
#isClosing = false;
#timedInteractionTimer: Timer | undefined;

readonly #peerSessionId: number;
Expand Down Expand Up @@ -221,6 +222,14 @@ export class MessageExchange<ContextT extends SessionContext> {
return this.#closed;
}

get isClosing() {
return this.#isClosing;
}

get id() {
return this.#exchangeId;
}

/**
* Max Payload size of the exchange which bases on the maximum payload size of the channel reduced by Matter
* protocol overhead.
Expand Down Expand Up @@ -613,6 +622,7 @@ export class MessageExchange<ContextT extends SessionContext> {

async close() {
if (this.#closeTimer !== undefined) return; // close was already called
this.#isClosing = true;

if (this.#receivedMessageToAck !== undefined) {
this.#receivedMessageAckTimer.stop();
Expand Down Expand Up @@ -643,6 +653,7 @@ export class MessageExchange<ContextT extends SessionContext> {
}

private async closeInternal() {
this.#isClosing = true;
this.#retransmissionTimer?.stop();
this.#closeTimer?.stop();
this.#timedInteractionTimer?.stop();
Expand Down

0 comments on commit 38b12f2

Please sign in to comment.