From 0cc5477187bf95cab2595d5b11867ad3580527d7 Mon Sep 17 00:00:00 2001 From: Danish Arora Date: Thu, 22 Aug 2024 15:59:34 +0530 Subject: [PATCH 1/6] chore(filter): move reliability logic into a separate class --- packages/sdk/src/protocols/filter.ts | 226 ++++++++++++++++----------- 1 file changed, 131 insertions(+), 95 deletions(-) diff --git a/packages/sdk/src/protocols/filter.ts b/packages/sdk/src/protocols/filter.ts index 18f1dc7521..1b09ab49f4 100644 --- a/packages/sdk/src/protocols/filter.ts +++ b/packages/sdk/src/protocols/filter.ts @@ -57,40 +57,77 @@ const DEFAULT_KEEP_ALIVE = 30 * 1000; const DEFAULT_SUBSCRIBE_OPTIONS = { keepAlive: DEFAULT_KEEP_ALIVE }; -export class SubscriptionManager implements ISubscriptionSDK { + +export class ReliabilityMonitor { private readonly receivedMessagesHashStr: string[] = []; - private keepAliveTimer: number | null = null; private readonly receivedMessagesHashes: ReceivedMessageHashes; - private peerFailures: Map = new Map(); private missedMessagesByPeer: Map = new Map(); - private maxPingFailures: number = DEFAULT_MAX_PINGS; private maxMissedMessagesThreshold = DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD; - private subscriptionCallbacks: Map< - ContentTopic, - SubscriptionCallback - >; - public constructor( - private readonly pubsubTopic: PubsubTopic, - private protocol: FilterCore, private getPeers: () => Peer[], - private readonly renewPeer: (peerToDisconnect: PeerId) => Promise + private renewAndSubscribePeer: (peerId: PeerId) => Promise ) { - this.pubsubTopic = pubsubTopic; - this.subscriptionCallbacks = new Map(); const allPeerIdStr = this.getPeers().map((p) => p.id.toString()); + this.receivedMessagesHashes = { all: new Set(), nodes: { ...Object.fromEntries(allPeerIdStr.map((peerId) => [peerId, new Set()])) } }; + allPeerIdStr.forEach((peerId) => this.missedMessagesByPeer.set(peerId, 0)); } - public get messageHashes(): string[] { - return [...this.receivedMessagesHashes.all]; + public async validateMessageForReliability( + pubsubTopic: PubsubTopic, + message: WakuMessage, + peerIdStr: PeerIdStr + ): Promise<{ alreadyReceived: boolean }> { + const hashedMessageStr = messageHashStr( + pubsubTopic, + message as IProtoMessage + ); + this.addHash(hashedMessageStr, peerIdStr); + + if (this.receivedMessagesHashStr.includes(hashedMessageStr)) { + log.info("Message already received, skipping"); + return { alreadyReceived: true }; + } + + this.receivedMessagesHashStr.push(hashedMessageStr); + + for (const hash of this.receivedMessagesHashes.all) { + for (const [peerIdStr, hashes] of Object.entries( + this.receivedMessagesHashes.nodes + )) { + if (!hashes.has(hash)) { + this.incrementMissedMessageCount(peerIdStr); + if (this.shouldRenewPeer(peerIdStr)) { + log.info( + `Peer ${peerIdStr} has missed too many messages, renewing.` + ); + const peerId = this.getPeers().find( + (p) => p.id.toString() === peerIdStr + )?.id; + if (!peerId) { + log.error( + `Unexpected Error: Peer ${peerIdStr} not found in connected peers.` + ); + continue; + } + try { + await this.renewAndSubscribePeer(peerId); + } catch (error) { + log.error(`Failed to renew peer ${peerIdStr}: ${error}`); + } + } + } + } + } + + return { alreadyReceived: false }; } private addHash(hash: string, peerIdStr?: string): void { @@ -101,6 +138,54 @@ export class SubscriptionManager implements ISubscriptionSDK { } } + private incrementMissedMessageCount(peerIdStr: string): void { + const currentCount = this.missedMessagesByPeer.get(peerIdStr) || 0; + this.missedMessagesByPeer.set(peerIdStr, currentCount + 1); + } + + private shouldRenewPeer(peerIdStr: string): boolean { + const missedMessages = this.missedMessagesByPeer.get(peerIdStr) || 0; + return missedMessages > this.maxMissedMessagesThreshold; + } + + public resetPeerStats(peerIdStr: string): void { + this.missedMessagesByPeer.set(peerIdStr, 0); + this.receivedMessagesHashes.nodes[peerIdStr] = new Set(); + } + + public removePeerStats(peerIdStr: string): void { + this.missedMessagesByPeer.delete(peerIdStr); + delete this.receivedMessagesHashes.nodes[peerIdStr]; + } +} + +export class SubscriptionManager implements ISubscriptionSDK { + private keepAliveTimer: number | null = null; + private peerFailures: Map = new Map(); + private maxPingFailures: number = DEFAULT_MAX_PINGS; + + private reliabilityMonitor: ReliabilityMonitor; + + private subscriptionCallbacks: Map< + ContentTopic, + SubscriptionCallback + >; + + public constructor( + private readonly pubsubTopic: PubsubTopic, + private protocol: FilterCore, + private getPeers: () => Peer[], + private readonly renewPeer: (peerToDisconnect: PeerId) => Promise + ) { + this.pubsubTopic = pubsubTopic; + this.subscriptionCallbacks = new Map(); + + this.reliabilityMonitor = new ReliabilityMonitor( + getPeers, + this.renewAndSubscribePeer.bind(this) + ); + } + public async subscribe( decoders: IDecoder | IDecoder[], callback: Callback, @@ -108,9 +193,6 @@ export class SubscriptionManager implements ISubscriptionSDK { ): Promise { this.keepAliveTimer = options.keepAlive || DEFAULT_KEEP_ALIVE; this.maxPingFailures = options.pingsBeforePeerRenewed || DEFAULT_MAX_PINGS; - this.maxMissedMessagesThreshold = - options.maxMissedMessagesThreshold || - DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD; const decodersArray = Array.isArray(decoders) ? decoders : [decoders]; @@ -218,54 +300,20 @@ export class SubscriptionManager implements ISubscriptionSDK { return finalResult; } - private async validateMessage(): Promise { - for (const hash of this.receivedMessagesHashes.all) { - for (const [peerIdStr, hashes] of Object.entries( - this.receivedMessagesHashes.nodes - )) { - if (!hashes.has(hash)) { - this.incrementMissedMessageCount(peerIdStr); - if (this.shouldRenewPeer(peerIdStr)) { - log.info( - `Peer ${peerIdStr} has missed too many messages, renewing.` - ); - const peerId = this.getPeers().find( - (p) => p.id.toString() === peerIdStr - )?.id; - if (!peerId) { - log.error( - `Unexpected Error: Peer ${peerIdStr} not found in connected peers.` - ); - continue; - } - try { - await this.renewAndSubscribePeer(peerId); - } catch (error) { - log.error(`Failed to renew peer ${peerIdStr}: ${error}`); - } - } - } - } - } - } - public async processIncomingMessage( message: WakuMessage, peerIdStr: PeerIdStr ): Promise { - const hashedMessageStr = messageHashStr( - this.pubsubTopic, - message as IProtoMessage - ); - - this.addHash(hashedMessageStr, peerIdStr); - void this.validateMessage(); + const { alreadyReceived } = + await this.reliabilityMonitor.validateMessageForReliability( + this.pubsubTopic, + message, + peerIdStr + ); - if (this.receivedMessagesHashStr.includes(hashedMessageStr)) { - log.info("Message already received, skipping"); + if (alreadyReceived) { return; } - this.receivedMessagesHashStr.push(hashedMessageStr); const { contentTopic } = message; const subscriptionCallback = this.subscriptionCallbacks.get(contentTopic); @@ -307,6 +355,29 @@ export class SubscriptionManager implements ISubscriptionSDK { return result; } + private async renewAndSubscribePeer( + peerId: PeerId + ): Promise { + try { + const newPeer = await this.renewPeer(peerId); + await this.protocol.subscribe( + this.pubsubTopic, + newPeer, + Array.from(this.subscriptionCallbacks.keys()) + ); + + this.reliabilityMonitor.resetPeerStats(newPeer.id.toString()); + + return newPeer; + } catch (error) { + log.warn(`Failed to renew peer ${peerId.toString()}: ${error}.`); + return; + } finally { + this.peerFailures.delete(peerId.toString()); + this.reliabilityMonitor.removePeerStats(peerId.toString()); + } + } + private async pingSpecificPeer(peerId: PeerId): Promise { const peer = this.getPeers().find((p) => p.id.equals(peerId)); if (!peer) { @@ -353,31 +424,6 @@ export class SubscriptionManager implements ISubscriptionSDK { } } - private async renewAndSubscribePeer( - peerId: PeerId - ): Promise { - try { - const newPeer = await this.renewPeer(peerId); - await this.protocol.subscribe( - this.pubsubTopic, - newPeer, - Array.from(this.subscriptionCallbacks.keys()) - ); - - this.receivedMessagesHashes.nodes[newPeer.id.toString()] = new Set(); - this.missedMessagesByPeer.set(newPeer.id.toString(), 0); - - return newPeer; - } catch (error) { - log.warn(`Failed to renew peer ${peerId.toString()}: ${error}.`); - return; - } finally { - this.peerFailures.delete(peerId.toString()); - this.missedMessagesByPeer.delete(peerId.toString()); - delete this.receivedMessagesHashes.nodes[peerId.toString()]; - } - } - private startKeepAlivePings(options: SubscribeOptions): void { const { keepAlive } = options; if (this.keepAliveTimer) { @@ -402,16 +448,6 @@ export class SubscriptionManager implements ISubscriptionSDK { clearInterval(this.keepAliveTimer); this.keepAliveTimer = null; } - - private incrementMissedMessageCount(peerIdStr: string): void { - const currentCount = this.missedMessagesByPeer.get(peerIdStr) || 0; - this.missedMessagesByPeer.set(peerIdStr, currentCount + 1); - } - - private shouldRenewPeer(peerIdStr: string): boolean { - const missedMessages = this.missedMessagesByPeer.get(peerIdStr) || 0; - return missedMessages > this.maxMissedMessagesThreshold; - } } class FilterSDK extends BaseProtocolSDK implements IFilterSDK { From d990c7e1b4dd017984090177cd0ac5e82e2d1d7f Mon Sep 17 00:00:00 2001 From: Danish Arora Date: Thu, 22 Aug 2024 16:01:57 +0530 Subject: [PATCH 2/6] chore: cleanup ReliabilityMonitor function to follow SRP --- packages/sdk/src/protocols/filter.ts | 90 ++++++++++++++++------------ 1 file changed, 51 insertions(+), 39 deletions(-) diff --git a/packages/sdk/src/protocols/filter.ts b/packages/sdk/src/protocols/filter.ts index 1b09ab49f4..140cbfb4cb 100644 --- a/packages/sdk/src/protocols/filter.ts +++ b/packages/sdk/src/protocols/filter.ts @@ -80,61 +80,56 @@ export class ReliabilityMonitor { allPeerIdStr.forEach((peerId) => this.missedMessagesByPeer.set(peerId, 0)); } - public async validateMessageForReliability( + public async processMessage( pubsubTopic: PubsubTopic, message: WakuMessage, peerIdStr: PeerIdStr - ): Promise<{ alreadyReceived: boolean }> { + ): Promise { const hashedMessageStr = messageHashStr( pubsubTopic, message as IProtoMessage ); - this.addHash(hashedMessageStr, peerIdStr); - if (this.receivedMessagesHashStr.includes(hashedMessageStr)) { + if (this.isMessageAlreadyReceived(hashedMessageStr)) { log.info("Message already received, skipping"); - return { alreadyReceived: true }; + return true; } - this.receivedMessagesHashStr.push(hashedMessageStr); + this.addMessageHash(hashedMessageStr, peerIdStr); + await this.checkMissedMessages(); + return false; + } + + private isMessageAlreadyReceived(hashedMessageStr: string): boolean { + return this.receivedMessagesHashStr.includes(hashedMessageStr); + } + + private addMessageHash(hash: string, peerIdStr?: string): void { + this.receivedMessagesHashStr.push(hash); + this.receivedMessagesHashes.all.add(hash); + + if (peerIdStr) { + this.receivedMessagesHashes.nodes[peerIdStr].add(hash); + } + } + + private async checkMissedMessages(): Promise { for (const hash of this.receivedMessagesHashes.all) { for (const [peerIdStr, hashes] of Object.entries( this.receivedMessagesHashes.nodes )) { if (!hashes.has(hash)) { - this.incrementMissedMessageCount(peerIdStr); - if (this.shouldRenewPeer(peerIdStr)) { - log.info( - `Peer ${peerIdStr} has missed too many messages, renewing.` - ); - const peerId = this.getPeers().find( - (p) => p.id.toString() === peerIdStr - )?.id; - if (!peerId) { - log.error( - `Unexpected Error: Peer ${peerIdStr} not found in connected peers.` - ); - continue; - } - try { - await this.renewAndSubscribePeer(peerId); - } catch (error) { - log.error(`Failed to renew peer ${peerIdStr}: ${error}`); - } - } + await this.handleMissedMessage(peerIdStr); } } } - - return { alreadyReceived: false }; } - private addHash(hash: string, peerIdStr?: string): void { - this.receivedMessagesHashes.all.add(hash); - - if (peerIdStr) { - this.receivedMessagesHashes.nodes[peerIdStr].add(hash); + private async handleMissedMessage(peerIdStr: string): Promise { + this.incrementMissedMessageCount(peerIdStr); + if (this.shouldRenewPeer(peerIdStr)) { + await this.renewPeer(peerIdStr); } } @@ -148,6 +143,24 @@ export class ReliabilityMonitor { return missedMessages > this.maxMissedMessagesThreshold; } + private async renewPeer(peerIdStr: string): Promise { + log.info(`Peer ${peerIdStr} has missed too many messages, renewing.`); + const peerId = this.getPeers().find( + (p) => p.id.toString() === peerIdStr + )?.id; + if (!peerId) { + log.error( + `Unexpected Error: Peer ${peerIdStr} not found in connected peers.` + ); + return; + } + try { + await this.renewAndSubscribePeer(peerId); + } catch (error) { + log.error(`Failed to renew peer ${peerIdStr}: ${error}`); + } + } + public resetPeerStats(peerIdStr: string): void { this.missedMessagesByPeer.set(peerIdStr, 0); this.receivedMessagesHashes.nodes[peerIdStr] = new Set(); @@ -304,12 +317,11 @@ export class SubscriptionManager implements ISubscriptionSDK { message: WakuMessage, peerIdStr: PeerIdStr ): Promise { - const { alreadyReceived } = - await this.reliabilityMonitor.validateMessageForReliability( - this.pubsubTopic, - message, - peerIdStr - ); + const alreadyReceived = await this.reliabilityMonitor.processMessage( + this.pubsubTopic, + message, + peerIdStr + ); if (alreadyReceived) { return; From 6894822b54d7929477bcfecd36b7e57a82ad1291 Mon Sep 17 00:00:00 2001 From: Danish Arora Date: Thu, 22 Aug 2024 16:03:43 +0530 Subject: [PATCH 3/6] chore: move ReliabilityMonitor into a separate file --- packages/sdk/src/index.ts | 2 +- .../protocols/{filter.ts => filter/index.ts} | 127 +---------------- .../protocols/filter/reliability_monitor.ts | 129 ++++++++++++++++++ packages/sdk/src/waku.ts | 2 +- 4 files changed, 134 insertions(+), 126 deletions(-) rename packages/sdk/src/protocols/{filter.ts => filter/index.ts} (83%) create mode 100644 packages/sdk/src/protocols/filter/reliability_monitor.ts diff --git a/packages/sdk/src/index.ts b/packages/sdk/src/index.ts index 2bbdfc0d30..fdff4f4a8a 100644 --- a/packages/sdk/src/index.ts +++ b/packages/sdk/src/index.ts @@ -11,7 +11,7 @@ export * from "./waku.js"; export { createLightNode, defaultLibp2p } from "./create/index.js"; export { wakuLightPush } from "./protocols/light_push.js"; -export { wakuFilter } from "./protocols/filter.js"; +export { wakuFilter } from "./protocols/filter/index.js"; export { wakuStore } from "./protocols/store.js"; export * as waku from "@waku/core"; diff --git a/packages/sdk/src/protocols/filter.ts b/packages/sdk/src/protocols/filter/index.ts similarity index 83% rename from packages/sdk/src/protocols/filter.ts rename to packages/sdk/src/protocols/filter/index.ts index 140cbfb4cb..b8d8a8ecbc 100644 --- a/packages/sdk/src/protocols/filter.ts +++ b/packages/sdk/src/protocols/filter/index.ts @@ -24,7 +24,6 @@ import { SubscribeResult, type Unsubscribe } from "@waku/interfaces"; -import { messageHashStr } from "@waku/message-hash"; import { WakuMessage } from "@waku/proto"; import { ensurePubsubTopicIsConfigured, @@ -34,144 +33,24 @@ import { toAsyncIterator } from "@waku/utils"; -import { BaseProtocolSDK } from "./base_protocol.js"; +import { BaseProtocolSDK } from "../base_protocol.js"; + +import { ReliabilityMonitor } from "./reliability_monitor.js"; type SubscriptionCallback = { decoders: IDecoder[]; callback: Callback; }; -type ReceivedMessageHashes = { - all: Set; - nodes: { - [peerId: PeerIdStr]: Set; - }; -}; - const log = new Logger("sdk:filter"); const DEFAULT_MAX_PINGS = 3; -const DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD = 3; const DEFAULT_KEEP_ALIVE = 30 * 1000; const DEFAULT_SUBSCRIBE_OPTIONS = { keepAlive: DEFAULT_KEEP_ALIVE }; -export class ReliabilityMonitor { - private readonly receivedMessagesHashStr: string[] = []; - private readonly receivedMessagesHashes: ReceivedMessageHashes; - private missedMessagesByPeer: Map = new Map(); - private maxMissedMessagesThreshold = DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD; - - public constructor( - private getPeers: () => Peer[], - private renewAndSubscribePeer: (peerId: PeerId) => Promise - ) { - const allPeerIdStr = this.getPeers().map((p) => p.id.toString()); - - this.receivedMessagesHashes = { - all: new Set(), - nodes: { - ...Object.fromEntries(allPeerIdStr.map((peerId) => [peerId, new Set()])) - } - }; - - allPeerIdStr.forEach((peerId) => this.missedMessagesByPeer.set(peerId, 0)); - } - - public async processMessage( - pubsubTopic: PubsubTopic, - message: WakuMessage, - peerIdStr: PeerIdStr - ): Promise { - const hashedMessageStr = messageHashStr( - pubsubTopic, - message as IProtoMessage - ); - - if (this.isMessageAlreadyReceived(hashedMessageStr)) { - log.info("Message already received, skipping"); - return true; - } - - this.addMessageHash(hashedMessageStr, peerIdStr); - await this.checkMissedMessages(); - - return false; - } - - private isMessageAlreadyReceived(hashedMessageStr: string): boolean { - return this.receivedMessagesHashStr.includes(hashedMessageStr); - } - - private addMessageHash(hash: string, peerIdStr?: string): void { - this.receivedMessagesHashStr.push(hash); - this.receivedMessagesHashes.all.add(hash); - - if (peerIdStr) { - this.receivedMessagesHashes.nodes[peerIdStr].add(hash); - } - } - - private async checkMissedMessages(): Promise { - for (const hash of this.receivedMessagesHashes.all) { - for (const [peerIdStr, hashes] of Object.entries( - this.receivedMessagesHashes.nodes - )) { - if (!hashes.has(hash)) { - await this.handleMissedMessage(peerIdStr); - } - } - } - } - - private async handleMissedMessage(peerIdStr: string): Promise { - this.incrementMissedMessageCount(peerIdStr); - if (this.shouldRenewPeer(peerIdStr)) { - await this.renewPeer(peerIdStr); - } - } - - private incrementMissedMessageCount(peerIdStr: string): void { - const currentCount = this.missedMessagesByPeer.get(peerIdStr) || 0; - this.missedMessagesByPeer.set(peerIdStr, currentCount + 1); - } - - private shouldRenewPeer(peerIdStr: string): boolean { - const missedMessages = this.missedMessagesByPeer.get(peerIdStr) || 0; - return missedMessages > this.maxMissedMessagesThreshold; - } - - private async renewPeer(peerIdStr: string): Promise { - log.info(`Peer ${peerIdStr} has missed too many messages, renewing.`); - const peerId = this.getPeers().find( - (p) => p.id.toString() === peerIdStr - )?.id; - if (!peerId) { - log.error( - `Unexpected Error: Peer ${peerIdStr} not found in connected peers.` - ); - return; - } - try { - await this.renewAndSubscribePeer(peerId); - } catch (error) { - log.error(`Failed to renew peer ${peerIdStr}: ${error}`); - } - } - - public resetPeerStats(peerIdStr: string): void { - this.missedMessagesByPeer.set(peerIdStr, 0); - this.receivedMessagesHashes.nodes[peerIdStr] = new Set(); - } - - public removePeerStats(peerIdStr: string): void { - this.missedMessagesByPeer.delete(peerIdStr); - delete this.receivedMessagesHashes.nodes[peerIdStr]; - } -} - export class SubscriptionManager implements ISubscriptionSDK { private keepAliveTimer: number | null = null; private peerFailures: Map = new Map(); diff --git a/packages/sdk/src/protocols/filter/reliability_monitor.ts b/packages/sdk/src/protocols/filter/reliability_monitor.ts new file mode 100644 index 0000000000..168da5c024 --- /dev/null +++ b/packages/sdk/src/protocols/filter/reliability_monitor.ts @@ -0,0 +1,129 @@ +import type { Peer, PeerId } from "@libp2p/interface"; +import { IProtoMessage, PeerIdStr, PubsubTopic } from "@waku/interfaces"; +import { messageHashStr } from "@waku/message-hash"; +import { WakuMessage } from "@waku/proto"; +import { Logger } from "@waku/utils"; + +type ReceivedMessageHashes = { + all: Set; + nodes: { + [peerId: PeerIdStr]: Set; + }; +}; +const DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD = 3; + +const log = new Logger("reliability-monitor"); + +export class ReliabilityMonitor { + private readonly receivedMessagesHashStr: string[] = []; + private readonly receivedMessagesHashes: ReceivedMessageHashes; + private missedMessagesByPeer: Map = new Map(); + private maxMissedMessagesThreshold = DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD; + + public constructor( + private getPeers: () => Peer[], + private renewAndSubscribePeer: (peerId: PeerId) => Promise + ) { + const allPeerIdStr = this.getPeers().map((p) => p.id.toString()); + + this.receivedMessagesHashes = { + all: new Set(), + nodes: { + ...Object.fromEntries(allPeerIdStr.map((peerId) => [peerId, new Set()])) + } + }; + + allPeerIdStr.forEach((peerId) => this.missedMessagesByPeer.set(peerId, 0)); + } + + public async processMessage( + pubsubTopic: PubsubTopic, + message: WakuMessage, + peerIdStr: PeerIdStr + ): Promise { + const hashedMessageStr = messageHashStr( + pubsubTopic, + message as IProtoMessage + ); + + if (this.isMessageAlreadyReceived(hashedMessageStr)) { + log.info("Message already received, skipping"); + return true; + } + + this.addMessageHash(hashedMessageStr, peerIdStr); + await this.checkMissedMessages(); + + return false; + } + + private isMessageAlreadyReceived(hashedMessageStr: string): boolean { + return this.receivedMessagesHashStr.includes(hashedMessageStr); + } + + private addMessageHash(hash: string, peerIdStr?: string): void { + this.receivedMessagesHashStr.push(hash); + this.receivedMessagesHashes.all.add(hash); + + if (peerIdStr) { + this.receivedMessagesHashes.nodes[peerIdStr].add(hash); + } + } + + private async checkMissedMessages(): Promise { + for (const hash of this.receivedMessagesHashes.all) { + for (const [peerIdStr, hashes] of Object.entries( + this.receivedMessagesHashes.nodes + )) { + if (!hashes.has(hash)) { + await this.handleMissedMessage(peerIdStr); + } + } + } + } + + private async handleMissedMessage(peerIdStr: string): Promise { + this.incrementMissedMessageCount(peerIdStr); + if (this.shouldRenewPeer(peerIdStr)) { + await this.renewPeer(peerIdStr); + } + } + + private incrementMissedMessageCount(peerIdStr: string): void { + const currentCount = this.missedMessagesByPeer.get(peerIdStr) || 0; + this.missedMessagesByPeer.set(peerIdStr, currentCount + 1); + } + + private shouldRenewPeer(peerIdStr: string): boolean { + const missedMessages = this.missedMessagesByPeer.get(peerIdStr) || 0; + return missedMessages > this.maxMissedMessagesThreshold; + } + + private async renewPeer(peerIdStr: string): Promise { + log.info(`Peer ${peerIdStr} has missed too many messages, renewing.`); + const peerId = this.getPeers().find( + (p) => p.id.toString() === peerIdStr + )?.id; + if (!peerId) { + log.error( + `Unexpected Error: Peer ${peerIdStr} not found in connected peers.` + ); + return; + } + try { + await this.renewAndSubscribePeer(peerId); + } catch (error) { + log.error(`Failed to renew peer ${peerIdStr}: ${error}`); + } + } + + public resetPeerStats(peerIdStr: string): void { + this.missedMessagesByPeer.set(peerIdStr, 0); + this.receivedMessagesHashes.nodes[peerIdStr] = new Set(); + } + + public removePeerStats(peerIdStr: string): void { + this.missedMessagesByPeer.delete(peerIdStr); + delete this.receivedMessagesHashes.nodes[peerIdStr]; + } +} diff --git a/packages/sdk/src/waku.ts b/packages/sdk/src/waku.ts index ae79a71849..266c62494b 100644 --- a/packages/sdk/src/waku.ts +++ b/packages/sdk/src/waku.ts @@ -17,7 +17,7 @@ import { Protocols } from "@waku/interfaces"; import { wakuRelay } from "@waku/relay"; import { Logger } from "@waku/utils"; -import { wakuFilter } from "./protocols/filter.js"; +import { wakuFilter } from "./protocols/filter/index.js"; import { wakuLightPush } from "./protocols/light_push.js"; import { wakuStore } from "./protocols/store.js"; From 5d5ccbe59191aebf360405764ed5c2daa4f5180c Mon Sep 17 00:00:00 2001 From: Danish Arora Date: Thu, 22 Aug 2024 16:07:23 +0530 Subject: [PATCH 4/6] chore: move SubscriptionManager into a separate file --- .../sdk/src/protocols/filter/constants.ts | 4 + packages/sdk/src/protocols/filter/index.ts | 342 +---------------- .../protocols/filter/subscription_manager.ts | 344 ++++++++++++++++++ 3 files changed, 350 insertions(+), 340 deletions(-) create mode 100644 packages/sdk/src/protocols/filter/constants.ts create mode 100644 packages/sdk/src/protocols/filter/subscription_manager.ts diff --git a/packages/sdk/src/protocols/filter/constants.ts b/packages/sdk/src/protocols/filter/constants.ts new file mode 100644 index 0000000000..9cdfed9c6b --- /dev/null +++ b/packages/sdk/src/protocols/filter/constants.ts @@ -0,0 +1,4 @@ +export const DEFAULT_KEEP_ALIVE = 30 * 1000; +export const DEFAULT_SUBSCRIBE_OPTIONS = { + keepAlive: DEFAULT_KEEP_ALIVE +}; diff --git a/packages/sdk/src/protocols/filter/index.ts b/packages/sdk/src/protocols/filter/index.ts index b8d8a8ecbc..d87cc97468 100644 --- a/packages/sdk/src/protocols/filter/index.ts +++ b/packages/sdk/src/protocols/filter/index.ts @@ -1,30 +1,21 @@ -import type { Peer } from "@libp2p/interface"; -import type { PeerId } from "@libp2p/interface"; import { ConnectionManager, FilterCore } from "@waku/core"; import { type Callback, - type ContentTopic, - type CoreProtocolResult, type CreateSubscriptionResult, type IAsyncIterator, type IDecodedMessage, type IDecoder, type IFilterSDK, - type IProtoMessage, - type ISubscriptionSDK, type Libp2p, NetworkConfig, - type PeerIdStr, type ProtocolCreateOptions, ProtocolError, type ProtocolUseOptions, type PubsubTopic, - type SDKProtocolResult, type SubscribeOptions, SubscribeResult, type Unsubscribe } from "@waku/interfaces"; -import { WakuMessage } from "@waku/proto"; import { ensurePubsubTopicIsConfigured, groupByContentTopic, @@ -35,312 +26,11 @@ import { import { BaseProtocolSDK } from "../base_protocol.js"; -import { ReliabilityMonitor } from "./reliability_monitor.js"; - -type SubscriptionCallback = { - decoders: IDecoder[]; - callback: Callback; -}; +import { DEFAULT_SUBSCRIBE_OPTIONS } from "./constants.js"; +import { SubscriptionManager } from "./subscription_manager.js"; const log = new Logger("sdk:filter"); -const DEFAULT_MAX_PINGS = 3; -const DEFAULT_KEEP_ALIVE = 30 * 1000; - -const DEFAULT_SUBSCRIBE_OPTIONS = { - keepAlive: DEFAULT_KEEP_ALIVE -}; - -export class SubscriptionManager implements ISubscriptionSDK { - private keepAliveTimer: number | null = null; - private peerFailures: Map = new Map(); - private maxPingFailures: number = DEFAULT_MAX_PINGS; - - private reliabilityMonitor: ReliabilityMonitor; - - private subscriptionCallbacks: Map< - ContentTopic, - SubscriptionCallback - >; - - public constructor( - private readonly pubsubTopic: PubsubTopic, - private protocol: FilterCore, - private getPeers: () => Peer[], - private readonly renewPeer: (peerToDisconnect: PeerId) => Promise - ) { - this.pubsubTopic = pubsubTopic; - this.subscriptionCallbacks = new Map(); - - this.reliabilityMonitor = new ReliabilityMonitor( - getPeers, - this.renewAndSubscribePeer.bind(this) - ); - } - - public async subscribe( - decoders: IDecoder | IDecoder[], - callback: Callback, - options: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS - ): Promise { - this.keepAliveTimer = options.keepAlive || DEFAULT_KEEP_ALIVE; - this.maxPingFailures = options.pingsBeforePeerRenewed || DEFAULT_MAX_PINGS; - - const decodersArray = Array.isArray(decoders) ? decoders : [decoders]; - - // check that all decoders are configured for the same pubsub topic as this subscription - for (const decoder of decodersArray) { - if (decoder.pubsubTopic !== this.pubsubTopic) { - return { - failures: [ - { - error: ProtocolError.TOPIC_DECODER_MISMATCH - } - ], - successes: [] - }; - } - } - - const decodersGroupedByCT = groupByContentTopic(decodersArray); - const contentTopics = Array.from(decodersGroupedByCT.keys()); - - const promises = this.getPeers().map(async (peer) => - this.protocol.subscribe(this.pubsubTopic, peer, contentTopics) - ); - - const results = await Promise.allSettled(promises); - - const finalResult = this.handleResult(results, "subscribe"); - - // Save the callback functions by content topics so they - // can easily be removed (reciprocally replaced) if `unsubscribe` (reciprocally `subscribe`) - // is called for those content topics - decodersGroupedByCT.forEach((decoders, contentTopic) => { - // Cast the type because a given `subscriptionCallbacks` map may hold - // Decoder that decode to different implementations of `IDecodedMessage` - const subscriptionCallback = { - decoders, - callback - } as unknown as SubscriptionCallback; - - // The callback and decoder may override previous values, this is on - // purpose as the user may call `subscribe` to refresh the subscription - this.subscriptionCallbacks.set(contentTopic, subscriptionCallback); - }); - - if (options.keepAlive) { - this.startKeepAlivePings(options); - } - - return finalResult; - } - - public async unsubscribe( - contentTopics: ContentTopic[] - ): Promise { - const promises = this.getPeers().map(async (peer) => { - const response = await this.protocol.unsubscribe( - this.pubsubTopic, - peer, - contentTopics - ); - - contentTopics.forEach((contentTopic: string) => { - this.subscriptionCallbacks.delete(contentTopic); - }); - - return response; - }); - - const results = await Promise.allSettled(promises); - const finalResult = this.handleResult(results, "unsubscribe"); - - if (this.subscriptionCallbacks.size === 0) { - if (this.keepAliveTimer) { - this.stopKeepAlivePings(); - } - } - - return finalResult; - } - - public async ping(peerId?: PeerId): Promise { - const peers = peerId ? [peerId] : this.getPeers().map((peer) => peer.id); - - const promises = peers.map((peerId) => this.pingSpecificPeer(peerId)); - const results = await Promise.allSettled(promises); - - return this.handleResult(results, "ping"); - } - - public async unsubscribeAll(): Promise { - const promises = this.getPeers().map(async (peer) => - this.protocol.unsubscribeAll(this.pubsubTopic, peer) - ); - - const results = await Promise.allSettled(promises); - - this.subscriptionCallbacks.clear(); - - const finalResult = this.handleResult(results, "unsubscribeAll"); - - if (this.keepAliveTimer) { - this.stopKeepAlivePings(); - } - - return finalResult; - } - - public async processIncomingMessage( - message: WakuMessage, - peerIdStr: PeerIdStr - ): Promise { - const alreadyReceived = await this.reliabilityMonitor.processMessage( - this.pubsubTopic, - message, - peerIdStr - ); - - if (alreadyReceived) { - return; - } - - const { contentTopic } = message; - const subscriptionCallback = this.subscriptionCallbacks.get(contentTopic); - if (!subscriptionCallback) { - log.error("No subscription callback available for ", contentTopic); - return; - } - log.info( - "Processing message with content topic ", - contentTopic, - " on pubsub topic ", - this.pubsubTopic - ); - await pushMessage(subscriptionCallback, this.pubsubTopic, message); - } - - private handleResult( - results: PromiseSettledResult[], - type: "ping" | "subscribe" | "unsubscribe" | "unsubscribeAll" - ): SDKProtocolResult { - const result: SDKProtocolResult = { failures: [], successes: [] }; - - for (const promiseResult of results) { - if (promiseResult.status === "rejected") { - log.error( - `Failed to resolve ${type} promise successfully: `, - promiseResult.reason - ); - result.failures.push({ error: ProtocolError.GENERIC_FAIL }); - } else { - const coreResult = promiseResult.value; - if (coreResult.failure) { - result.failures.push(coreResult.failure); - } else { - result.successes.push(coreResult.success); - } - } - } - return result; - } - - private async renewAndSubscribePeer( - peerId: PeerId - ): Promise { - try { - const newPeer = await this.renewPeer(peerId); - await this.protocol.subscribe( - this.pubsubTopic, - newPeer, - Array.from(this.subscriptionCallbacks.keys()) - ); - - this.reliabilityMonitor.resetPeerStats(newPeer.id.toString()); - - return newPeer; - } catch (error) { - log.warn(`Failed to renew peer ${peerId.toString()}: ${error}.`); - return; - } finally { - this.peerFailures.delete(peerId.toString()); - this.reliabilityMonitor.removePeerStats(peerId.toString()); - } - } - - private async pingSpecificPeer(peerId: PeerId): Promise { - const peer = this.getPeers().find((p) => p.id.equals(peerId)); - if (!peer) { - return { - success: null, - failure: { - peerId, - error: ProtocolError.NO_PEER_AVAILABLE - } - }; - } - - try { - const result = await this.protocol.ping(peer); - if (result.failure) { - await this.handlePeerFailure(peerId); - } else { - this.peerFailures.delete(peerId.toString()); - } - return result; - } catch (error) { - await this.handlePeerFailure(peerId); - return { - success: null, - failure: { - peerId, - error: ProtocolError.GENERIC_FAIL - } - }; - } - } - - private async handlePeerFailure(peerId: PeerId): Promise { - const failures = (this.peerFailures.get(peerId.toString()) || 0) + 1; - this.peerFailures.set(peerId.toString(), failures); - - if (failures > this.maxPingFailures) { - try { - await this.renewAndSubscribePeer(peerId); - this.peerFailures.delete(peerId.toString()); - } catch (error) { - log.error(`Failed to renew peer ${peerId.toString()}: ${error}.`); - } - } - } - - private startKeepAlivePings(options: SubscribeOptions): void { - const { keepAlive } = options; - if (this.keepAliveTimer) { - log.info("Recurring pings already set up."); - return; - } - - this.keepAliveTimer = setInterval(() => { - void this.ping().catch((error) => { - log.error("Error in keep-alive ping cycle:", error); - }); - }, keepAlive) as unknown as number; - } - - private stopKeepAlivePings(): void { - if (!this.keepAliveTimer) { - log.info("Already stopped recurring pings."); - return; - } - - log.info("Stopping recurring pings."); - clearInterval(this.keepAliveTimer); - this.keepAliveTimer = null; - } -} - class FilterSDK extends BaseProtocolSDK implements IFilterSDK { public readonly protocol: FilterCore; @@ -613,31 +303,3 @@ export function wakuFilter( ): (libp2p: Libp2p) => IFilterSDK { return (libp2p: Libp2p) => new FilterSDK(connectionManager, libp2p, init); } - -async function pushMessage( - subscriptionCallback: SubscriptionCallback, - pubsubTopic: PubsubTopic, - message: WakuMessage -): Promise { - const { decoders, callback } = subscriptionCallback; - - const { contentTopic } = message; - if (!contentTopic) { - log.warn("Message has no content topic, skipping"); - return; - } - - try { - const decodePromises = decoders.map((dec) => - dec - .fromProtoObj(pubsubTopic, message as IProtoMessage) - .then((decoded) => decoded || Promise.reject("Decoding failed")) - ); - - const decodedMessage = await Promise.any(decodePromises); - - await callback(decodedMessage); - } catch (e) { - log.error("Error decoding message", e); - } -} diff --git a/packages/sdk/src/protocols/filter/subscription_manager.ts b/packages/sdk/src/protocols/filter/subscription_manager.ts new file mode 100644 index 0000000000..aafedb36e4 --- /dev/null +++ b/packages/sdk/src/protocols/filter/subscription_manager.ts @@ -0,0 +1,344 @@ +import type { Peer, PeerId } from "@libp2p/interface"; +import { FilterCore } from "@waku/core"; +import { + type Callback, + type ContentTopic, + type CoreProtocolResult, + type IDecodedMessage, + type IDecoder, + IProtoMessage, + type ISubscriptionSDK, + type PeerIdStr, + ProtocolError, + type PubsubTopic, + type SDKProtocolResult, + type SubscribeOptions, + SubscriptionCallback +} from "@waku/interfaces"; +import { WakuMessage } from "@waku/proto"; +import { groupByContentTopic, Logger } from "@waku/utils"; + +import { DEFAULT_KEEP_ALIVE, DEFAULT_SUBSCRIBE_OPTIONS } from "./constants"; +import { ReliabilityMonitor } from "./reliability_monitor"; + +const DEFAULT_MAX_PINGS = 3; + +const log = new Logger("subscription-manager"); + +export class SubscriptionManager implements ISubscriptionSDK { + private keepAliveTimer: number | null = null; + private peerFailures: Map = new Map(); + private maxPingFailures: number = DEFAULT_MAX_PINGS; + + private reliabilityMonitor: ReliabilityMonitor; + + private subscriptionCallbacks: Map< + ContentTopic, + SubscriptionCallback + >; + + public constructor( + private readonly pubsubTopic: PubsubTopic, + private protocol: FilterCore, + private getPeers: () => Peer[], + private readonly renewPeer: (peerToDisconnect: PeerId) => Promise + ) { + this.pubsubTopic = pubsubTopic; + this.subscriptionCallbacks = new Map(); + + this.reliabilityMonitor = new ReliabilityMonitor( + getPeers, + this.renewAndSubscribePeer.bind(this) + ); + } + + public async subscribe( + decoders: IDecoder | IDecoder[], + callback: Callback, + options: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS + ): Promise { + this.keepAliveTimer = options.keepAlive || DEFAULT_KEEP_ALIVE; + this.maxPingFailures = options.pingsBeforePeerRenewed || DEFAULT_MAX_PINGS; + + const decodersArray = Array.isArray(decoders) ? decoders : [decoders]; + + // check that all decoders are configured for the same pubsub topic as this subscription + for (const decoder of decodersArray) { + if (decoder.pubsubTopic !== this.pubsubTopic) { + return { + failures: [ + { + error: ProtocolError.TOPIC_DECODER_MISMATCH + } + ], + successes: [] + }; + } + } + + const decodersGroupedByCT = groupByContentTopic(decodersArray); + const contentTopics = Array.from(decodersGroupedByCT.keys()); + + const promises = this.getPeers().map(async (peer) => + this.protocol.subscribe(this.pubsubTopic, peer, contentTopics) + ); + + const results = await Promise.allSettled(promises); + + const finalResult = this.handleResult(results, "subscribe"); + + // Save the callback functions by content topics so they + // can easily be removed (reciprocally replaced) if `unsubscribe` (reciprocally `subscribe`) + // is called for those content topics + decodersGroupedByCT.forEach((decoders, contentTopic) => { + // Cast the type because a given `subscriptionCallbacks` map may hold + // Decoder that decode to different implementations of `IDecodedMessage` + const subscriptionCallback = { + decoders, + callback + } as unknown as SubscriptionCallback; + + // The callback and decoder may override previous values, this is on + // purpose as the user may call `subscribe` to refresh the subscription + this.subscriptionCallbacks.set(contentTopic, subscriptionCallback); + }); + + if (options.keepAlive) { + this.startKeepAlivePings(options); + } + + return finalResult; + } + + public async unsubscribe( + contentTopics: ContentTopic[] + ): Promise { + const promises = this.getPeers().map(async (peer) => { + const response = await this.protocol.unsubscribe( + this.pubsubTopic, + peer, + contentTopics + ); + + contentTopics.forEach((contentTopic: string) => { + this.subscriptionCallbacks.delete(contentTopic); + }); + + return response; + }); + + const results = await Promise.allSettled(promises); + const finalResult = this.handleResult(results, "unsubscribe"); + + if (this.subscriptionCallbacks.size === 0) { + if (this.keepAliveTimer) { + this.stopKeepAlivePings(); + } + } + + return finalResult; + } + + public async ping(peerId?: PeerId): Promise { + const peers = peerId ? [peerId] : this.getPeers().map((peer) => peer.id); + + const promises = peers.map((peerId) => this.pingSpecificPeer(peerId)); + const results = await Promise.allSettled(promises); + + return this.handleResult(results, "ping"); + } + + public async unsubscribeAll(): Promise { + const promises = this.getPeers().map(async (peer) => + this.protocol.unsubscribeAll(this.pubsubTopic, peer) + ); + + const results = await Promise.allSettled(promises); + + this.subscriptionCallbacks.clear(); + + const finalResult = this.handleResult(results, "unsubscribeAll"); + + if (this.keepAliveTimer) { + this.stopKeepAlivePings(); + } + + return finalResult; + } + + public async processIncomingMessage( + message: WakuMessage, + peerIdStr: PeerIdStr + ): Promise { + const alreadyReceived = await this.reliabilityMonitor.processMessage( + this.pubsubTopic, + message, + peerIdStr + ); + + if (alreadyReceived) { + return; + } + + const { contentTopic } = message; + const subscriptionCallback = this.subscriptionCallbacks.get(contentTopic); + if (!subscriptionCallback) { + log.error("No subscription callback available for ", contentTopic); + return; + } + log.info( + "Processing message with content topic ", + contentTopic, + " on pubsub topic ", + this.pubsubTopic + ); + await pushMessage(subscriptionCallback, this.pubsubTopic, message); + } + + private handleResult( + results: PromiseSettledResult[], + type: "ping" | "subscribe" | "unsubscribe" | "unsubscribeAll" + ): SDKProtocolResult { + const result: SDKProtocolResult = { failures: [], successes: [] }; + + for (const promiseResult of results) { + if (promiseResult.status === "rejected") { + log.error( + `Failed to resolve ${type} promise successfully: `, + promiseResult.reason + ); + result.failures.push({ error: ProtocolError.GENERIC_FAIL }); + } else { + const coreResult = promiseResult.value; + if (coreResult.failure) { + result.failures.push(coreResult.failure); + } else { + result.successes.push(coreResult.success); + } + } + } + return result; + } + + private async renewAndSubscribePeer( + peerId: PeerId + ): Promise { + try { + const newPeer = await this.renewPeer(peerId); + await this.protocol.subscribe( + this.pubsubTopic, + newPeer, + Array.from(this.subscriptionCallbacks.keys()) + ); + + this.reliabilityMonitor.resetPeerStats(newPeer.id.toString()); + + return newPeer; + } catch (error) { + log.warn(`Failed to renew peer ${peerId.toString()}: ${error}.`); + return; + } finally { + this.peerFailures.delete(peerId.toString()); + this.reliabilityMonitor.removePeerStats(peerId.toString()); + } + } + + private async pingSpecificPeer(peerId: PeerId): Promise { + const peer = this.getPeers().find((p) => p.id.equals(peerId)); + if (!peer) { + return { + success: null, + failure: { + peerId, + error: ProtocolError.NO_PEER_AVAILABLE + } + }; + } + + try { + const result = await this.protocol.ping(peer); + if (result.failure) { + await this.handlePeerFailure(peerId); + } else { + this.peerFailures.delete(peerId.toString()); + } + return result; + } catch (error) { + await this.handlePeerFailure(peerId); + return { + success: null, + failure: { + peerId, + error: ProtocolError.GENERIC_FAIL + } + }; + } + } + + private async handlePeerFailure(peerId: PeerId): Promise { + const failures = (this.peerFailures.get(peerId.toString()) || 0) + 1; + this.peerFailures.set(peerId.toString(), failures); + + if (failures > this.maxPingFailures) { + try { + await this.renewAndSubscribePeer(peerId); + this.peerFailures.delete(peerId.toString()); + } catch (error) { + log.error(`Failed to renew peer ${peerId.toString()}: ${error}.`); + } + } + } + + private startKeepAlivePings(options: SubscribeOptions): void { + const { keepAlive } = options; + if (this.keepAliveTimer) { + log.info("Recurring pings already set up."); + return; + } + + this.keepAliveTimer = setInterval(() => { + void this.ping().catch((error) => { + log.error("Error in keep-alive ping cycle:", error); + }); + }, keepAlive) as unknown as number; + } + + private stopKeepAlivePings(): void { + if (!this.keepAliveTimer) { + log.info("Already stopped recurring pings."); + return; + } + + log.info("Stopping recurring pings."); + clearInterval(this.keepAliveTimer); + this.keepAliveTimer = null; + } +} + +async function pushMessage( + subscriptionCallback: SubscriptionCallback, + pubsubTopic: PubsubTopic, + message: WakuMessage +): Promise { + const { decoders, callback } = subscriptionCallback; + + const { contentTopic } = message; + if (!contentTopic) { + log.warn("Message has no content topic, skipping"); + return; + } + + try { + const decodePromises = decoders.map((dec) => + dec + .fromProtoObj(pubsubTopic, message as IProtoMessage) + .then((decoded) => decoded || Promise.reject("Decoding failed")) + ); + + const decodedMessage = await Promise.any(decodePromises); + + await callback(decodedMessage); + } catch (e) { + log.error("Error decoding message", e); + } +} From f7eede4ff953800e072b215b5c141360ef0bbdf8 Mon Sep 17 00:00:00 2001 From: Danish Arora Date: Thu, 22 Aug 2024 16:39:54 +0530 Subject: [PATCH 5/6] chore: update type --- packages/interfaces/src/filter.ts | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/packages/interfaces/src/filter.ts b/packages/interfaces/src/filter.ts index 67946a02a9..c5e2ab42ec 100644 --- a/packages/interfaces/src/filter.ts +++ b/packages/interfaces/src/filter.ts @@ -18,6 +18,11 @@ export type SubscribeOptions = { maxMissedMessagesThreshold?: number; }; +export type SubscriptionCallback = { + decoders: IDecoder[]; + callback: Callback; +}; + export type IFilter = IReceiver & IBaseProtocolCore; export interface ISubscriptionSDK { From 4d20c67c1fcda9f5c95d6372cc69e363150ee8a1 Mon Sep 17 00:00:00 2001 From: Danish Arora Date: Tue, 27 Aug 2024 16:46:08 +0530 Subject: [PATCH 6/6] chore: bind context to function --- packages/sdk/src/protocols/filter/subscription_manager.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/sdk/src/protocols/filter/subscription_manager.ts b/packages/sdk/src/protocols/filter/subscription_manager.ts index aafedb36e4..7e76271a7f 100644 --- a/packages/sdk/src/protocols/filter/subscription_manager.ts +++ b/packages/sdk/src/protocols/filter/subscription_manager.ts @@ -47,7 +47,7 @@ export class SubscriptionManager implements ISubscriptionSDK { this.subscriptionCallbacks = new Map(); this.reliabilityMonitor = new ReliabilityMonitor( - getPeers, + getPeers.bind(this), this.renewAndSubscribePeer.bind(this) ); }