Skip to content

Commit

Permalink
chore: cleanup ReliabilityMonitor
Browse files Browse the repository at this point in the history
  • Loading branch information
danisharora099 committed Aug 28, 2024
1 parent 2b61774 commit f36812f
Showing 1 changed file with 33 additions and 43 deletions.
76 changes: 33 additions & 43 deletions packages/sdk/src/protocols/filter/reliability_monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,17 @@ import { Logger } from "@waku/utils";

type ReceivedMessageHashes = {
all: Set<string>;
nodes: {
[peerId: PeerIdStr]: Set<string>;
};
nodes: Record<PeerIdStr, Set<string>>;
};

const DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD = 3;

const log = new Logger("sdk:filter:reliability_monitor");

export class ReliabilityMonitor {
public receivedMessagesHashStr: string[] = [];
public receivedMessagesHashes: ReceivedMessageHashes;
public missedMessagesByPeer: Map<string, number> = new Map();
public maxMissedMessagesThreshold = DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD;
private receivedMessagesHashes: ReceivedMessageHashes;
private missedMessagesByPeer: Map<PeerIdStr, number> = new Map();
private maxMissedMessagesThreshold = DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD;

public constructor(
private getPeers: () => Peer[],
Expand All @@ -29,46 +26,37 @@ export class ReliabilityMonitor {

this.receivedMessagesHashes = {
all: new Set(),
nodes: {
...Object.fromEntries(allPeerIdStr.map((peerId) => [peerId, new Set()]))
}
nodes: Object.fromEntries(
allPeerIdStr.map((peerId) => [peerId, new Set()])
)
};
allPeerIdStr.forEach((peerId) => this.missedMessagesByPeer.set(peerId, 0));
}

public setMaxMissedMessagesThreshold(value: number | undefined): void {
if (value === undefined) {
return;
if (value !== undefined) {
this.maxMissedMessagesThreshold = value;
}
this.maxMissedMessagesThreshold = value;
}

public get messageHashes(): string[] {
return [...this.receivedMessagesHashes.all];
}

public addMessage(
message: WakuMessage,
pubsubTopic: PubsubTopic,
peerIdStr?: string
peerIdStr?: PeerIdStr
): boolean {
const hashedMessageStr = messageHashStr(
pubsubTopic,
message as IProtoMessage
);

const isNewMessage = !this.receivedMessagesHashes.all.has(hashedMessageStr);
this.receivedMessagesHashes.all.add(hashedMessageStr);

if (peerIdStr) {
this.receivedMessagesHashes.nodes[peerIdStr].add(hashedMessageStr);
}

if (this.receivedMessagesHashStr.includes(hashedMessageStr)) {
return true;
} else {
this.receivedMessagesHashStr.push(hashedMessageStr);
return false;
}
return !isNewMessage;
}

public async validateMessage(): Promise<void> {
Expand All @@ -79,36 +67,38 @@ export class ReliabilityMonitor {
if (!hashes.has(hash)) {
this.incrementMissedMessageCount(peerIdStr);
if (this.shouldRenewPeer(peerIdStr)) {
log.info(
`Peer ${peerIdStr} has missed too many messages, renewing.`
);
const peerId = this.getPeers().find(
(p) => p.id.toString() === peerIdStr
)?.id;
if (!peerId) {
log.error(
`Unexpected Error: Peer ${peerIdStr} not found in connected peers.`
);
continue;
}
try {
await this.renewAndSubscribePeer(peerId);
} catch (error) {
log.error(`Failed to renew peer ${peerIdStr}: ${error}`);
}
await this.renewPeer(peerIdStr);
}
}
}
}
}

private incrementMissedMessageCount(peerIdStr: string): void {
private incrementMissedMessageCount(peerIdStr: PeerIdStr): void {
const currentCount = this.missedMessagesByPeer.get(peerIdStr) || 0;
this.missedMessagesByPeer.set(peerIdStr, currentCount + 1);
}

private shouldRenewPeer(peerIdStr: string): boolean {
private shouldRenewPeer(peerIdStr: PeerIdStr): boolean {
const missedMessages = this.missedMessagesByPeer.get(peerIdStr) || 0;
return missedMessages > this.maxMissedMessagesThreshold;
}

private async renewPeer(peerIdStr: PeerIdStr): Promise<void> {
log.info(`Peer ${peerIdStr} has missed too many messages, renewing.`);
const peerId = this.getPeers().find(
(p) => p.id.toString() === peerIdStr
)?.id;
if (!peerId) {
log.error(
`Unexpected Error: Peer ${peerIdStr} not found in connected peers.`
);
return;
}
try {
await this.renewAndSubscribePeer(peerId);
} catch (error) {
log.error(`Failed to renew peer ${peerIdStr}: ${error}`);
}
}
}

0 comments on commit f36812f

Please sign in to comment.