Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(store): allow using a specific node #2192

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
18 changes: 17 additions & 1 deletion packages/core/src/lib/base_protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import type { Peer, Stream } from "@libp2p/interface";
import type {
danisharora099 marked this conversation as resolved.
Show resolved Hide resolved
IBaseProtocolCore,
Libp2pComponents,
PeerIdStr,
PubsubTopic
} from "@waku/interfaces";
import { Logger } from "@waku/utils";
Expand Down Expand Up @@ -75,15 +76,30 @@ export class BaseProtocol implements IBaseProtocolCore {
public async getPeers(
{
numPeers,
maxBootstrapPeers
maxBootstrapPeers,
peerIdStr
}: {
numPeers: number;
maxBootstrapPeers: number;
peerIdStr?: PeerIdStr;
} = {
maxBootstrapPeers: 0,
numPeers: 0
}
): Promise<Peer[]> {
if (peerIdStr) {
const peer = (await this.connectedPeers()).find(
(p) => p.id.toString() === peerIdStr
);
if (peer) {
return [peer];
}
this.log.warn(
`Passed node to use for ${this.multicodec} not found: ${peerIdStr}. Attempting to use random peers.`
);
return this.getPeers({ numPeers, maxBootstrapPeers });
}

// Retrieve all connected peers that support the protocol & shard (if configured)
const allAvailableConnectedPeers = await this.connectedPeers();

Expand Down
7 changes: 7 additions & 0 deletions packages/interfaces/src/protocols.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,13 @@ export type ProtocolCreateOptions = {
* List of peers to use to bootstrap the node. Ignored if defaultBootstrap is set to true.
*/
bootstrapPeers?: string[];
/**
* List of nodes' multiaddrs as strings to use for each protocol. If not specified, random nodes will be used.
* This should be used only if you know what you are doing.
*/
nodeToUse?: {
danisharora099 marked this conversation as resolved.
Show resolved Hide resolved
store?: string;
};
};

export type Callback<T extends IDecodedMessage> = (
Expand Down
15 changes: 11 additions & 4 deletions packages/sdk/src/protocols/store/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@ const log = new Logger("waku:store:sdk");
export class Store extends BaseProtocolSDK implements IStore {
public readonly protocol: StoreCore;

public constructor(connectionManager: ConnectionManager, libp2p: Libp2p) {
public constructor(
connectionManager: ConnectionManager,
libp2p: Libp2p,
private readonly peerIdStrToUse?: string
) {
super(
new StoreCore(connectionManager.configuredPubsubTopics, libp2p),
connectionManager,
Expand Down Expand Up @@ -61,9 +65,11 @@ export class Store extends BaseProtocolSDK implements IStore {
const peer = (
await this.protocol.getPeers({
numPeers: this.numPeersToUse,
maxBootstrapPeers: 1
maxBootstrapPeers: 1,
peerIdStr: this.peerIdStrToUse
})
)[0];

if (!peer) {
log.error("No peers available to query");
throw new Error("No peers available to query");
Expand Down Expand Up @@ -237,9 +243,10 @@ export class Store extends BaseProtocolSDK implements IStore {
* @returns A function that takes a Libp2p instance and returns a StoreSDK instance.
*/
export function wakuStore(
connectionManager: ConnectionManager
connectionManager: ConnectionManager,
peerIdStrToUse?: string
danisharora099 marked this conversation as resolved.
Show resolved Hide resolved
): (libp2p: Libp2p) => IStore {
return (libp2p: Libp2p) => {
return new Store(connectionManager, libp2p);
return new Store(connectionManager, libp2p, peerIdStrToUse);
};
}
24 changes: 22 additions & 2 deletions packages/sdk/src/waku/waku.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { Stream } from "@libp2p/interface";
import { isPeerId, PeerId } from "@libp2p/interface";
import { multiaddr, Multiaddr, MultiaddrInput } from "@multiformats/multiaddr";
import { ConnectionManager, getHealthManager } from "@waku/core";
import { ConnectionManager, getHealthManager, StoreCodec } from "@waku/core";
import type {
IFilter,
IHealthManager,
Expand All @@ -10,6 +10,7 @@ import type {
IStore,
IWaku,
Libp2p,
PeerIdStr,
ProtocolCreateOptions,
PubsubTopic
} from "@waku/interfaces";
Expand Down Expand Up @@ -106,7 +107,14 @@ export class WakuNode implements IWaku {
this.health = getHealthManager();

if (protocolsEnabled.store) {
const store = wakuStore(this.connectionManager);
let peerIdStr: PeerIdStr | undefined;
if (options.nodeToUse?.store) {
this.dialMultiaddr(options.nodeToUse.store, StoreCodec).catch((e) => {
log.error("Failed to dial store peer", e);
});
}

const store = wakuStore(this.connectionManager, peerIdStr);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is wrong, peerIdStr does nothing here

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is used to initialise Store with the protocol to use since we need to get the peer when we send a store request. Changed the peerIdStrToUse to options: Partial<StoreProtocolOptions>

this.store = store(libp2p);
}

Expand Down Expand Up @@ -224,6 +232,18 @@ export class WakuNode implements IWaku {
return this.connectionManager.isConnected();
}

private async dialMultiaddr(
danisharora099 marked this conversation as resolved.
Show resolved Hide resolved
multiaddrStr: string,
protocol: string
): Promise<PeerIdStr> {
const ma = multiaddr(multiaddrStr);
if (!ma.getPeerId()) {
throw new Error("Failed to dial multiaddr: missing peer ID");
}
await this.libp2p.dialProtocol(ma, [protocol]);
return ma.getPeerId()!;
}

private mapToPeerIdOrMultiaddr(
peerId: PeerId | MultiaddrInput
): PeerId | Multiaddr {
Expand Down
Loading