Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(filter): reliability monitor as a separate class to handle reliability logic #2127

Closed
wants to merge 8 commits into from
23 changes: 17 additions & 6 deletions packages/core/src/lib/filter/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,13 @@ export const FilterCodecs = {
};

export class FilterCore extends BaseProtocol implements IBaseProtocolCore {
private handleIncomingMessage: (
pubsubTopic: PubsubTopic,
message: WakuMessage,
peerId: string
) => void = () => {};

public constructor(
private handleIncomingMessage: (
pubsubTopic: PubsubTopic,
wakuMessage: WakuMessage,
peerIdStr: string
) => Promise<void>,
public readonly pubsubTopics: PubsubTopic[],
libp2p: Libp2p
) {
Expand All @@ -51,6 +52,16 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore {
});
}

public set incomingMessageHandler(
handler: (
pubsubTopic: PubsubTopic,
message: WakuMessage,
peerId: string
) => void
) {
this.handleIncomingMessage = handler;
}

public async subscribe(
pubsubTopic: PubsubTopic,
peer: Peer,
Expand Down Expand Up @@ -291,7 +302,7 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore {
return;
}

await this.handleIncomingMessage(
this.handleIncomingMessage(
pubsubTopic,
wakuMessage,
connection.remotePeer.toString()
Expand Down
42 changes: 37 additions & 5 deletions packages/interfaces/src/filter.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
import type { PeerId } from "@libp2p/interface";
import type { Peer, PeerId } from "@libp2p/interface";
import { WakuMessage } from "@waku/proto";

import type { IDecodedMessage, IDecoder } from "./message.js";
import type { ContentTopic, ThisOrThat } from "./misc.js";
import type {
ContentTopic,
PeerIdStr,
PubsubTopic,
ThisOrThat
} from "./misc.js";
import type {
Callback,
IBaseProtocolCore,
Expand All @@ -12,6 +18,11 @@ import type {
} from "./protocols.js";
import type { IReceiver } from "./receiver.js";

export type SubscriptionCallback<T extends IDecodedMessage> = {
decoders: IDecoder<T>[];
callback: Callback<T>;
};

export type SubscribeOptions = {
keepAlive?: number;
pingsBeforePeerRenewed?: number;
Expand All @@ -21,17 +32,18 @@ export type SubscribeOptions = {
export type IFilter = IReceiver & IBaseProtocolCore;

export interface ISubscriptionSDK {
readonly pubsubTopic: PubsubTopic;

subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>,
options?: SubscribeOptions
): Promise<SDKProtocolResult>;

unsubscribe(contentTopics: ContentTopic[]): Promise<SDKProtocolResult>;

ping(peerId?: PeerId): Promise<SDKProtocolResult>;

unsubscribeAll(): Promise<SDKProtocolResult>;

renewAndSubscribePeer(peerId: PeerId): Promise<Peer | undefined>;
}

export type IFilterSDK = IReceiver &
Expand All @@ -42,6 +54,26 @@ export type IFilterSDK = IReceiver &
protocolUseOptions?: ProtocolUseOptions,
subscribeOptions?: SubscribeOptions
): Promise<SubscribeResult>;

activeSubscriptions: Map<PubsubTopic, ISubscriptionSDK>;

setIncomingMessageHandler(
handler: (
pubsubTopic: ContentTopic,
message: WakuMessage,
peerIdStr: PeerIdStr
) => void
): void;
handleIncomingMessage: (
pubsubTopic: ContentTopic,
message: WakuMessage,
peerIdStr: PeerIdStr
) => void;
readonly defaultHandleIncomingMessage: (
pubsubTopic: ContentTopic,
message: WakuMessage,
peerIdStr: PeerIdStr
) => void;
};

export type SubscribeResult = SubscriptionSuccess | SubscriptionError;
Expand Down
2 changes: 1 addition & 1 deletion packages/sdk/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ export {
createLibp2pAndUpdateOptions
} from "./create/index.js";
export { wakuLightPush } from "./protocols/light_push.js";
export { wakuFilter } from "./protocols/filter.js";
export { wakuFilter } from "./protocols/filter/index.js";
export { wakuStore } from "./protocols/store.js";

export * as waku from "@waku/core";
Expand Down
Loading
Loading