From d200fc45c01f4d8d9c36975fa7527e55aecad655 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Mon, 27 Mar 2023 10:35:09 +0100 Subject: [PATCH 1/2] fix: invoke onProgress callback with DHT queries during routing Allow passing an `onProgress` callback to the peer/content routers that can receive DHT query events. Experimental for now. --- package.json | 3 +- src/dht/dht-content-routing.ts | 26 ++++++++++++--- src/dht/dht-peer-routing.ts | 17 ++++++++-- test/content-routing/content-routing.node.ts | 33 ++++++++++++++++++++ 4 files changed, 70 insertions(+), 9 deletions(-) diff --git a/package.json b/package.json index bf69387669..f401621664 100644 --- a/package.json +++ b/package.json @@ -143,12 +143,12 @@ "it-drain": "^2.0.0", "it-filter": "^2.0.0", "it-first": "^2.0.0", - "it-parallel": "^3.0.0", "it-handshake": "^4.1.2", "it-length-prefixed": "^8.0.2", "it-map": "^2.0.0", "it-merge": "^2.0.0", "it-pair": "^2.0.2", + "it-parallel": "^3.0.0", "it-pb-stream": "^3.2.0", "it-pipe": "^2.0.3", "it-stream-types": "^1.0.4", @@ -159,6 +159,7 @@ "p-queue": "^7.3.4", "p-retry": "^5.0.0", "private-ip": "^3.0.0", + "progress-events": "^1.0.0", "protons-runtime": "^5.0.0", "rate-limiter-flexible": "^2.3.11", "retimer": "^3.0.0", diff --git a/src/dht/dht-content-routing.ts b/src/dht/dht-content-routing.ts index 4c10d5bcf0..a9f07d6919 100644 --- a/src/dht/dht-content-routing.ts +++ b/src/dht/dht-content-routing.ts @@ -1,10 +1,20 @@ import drain from 'it-drain' import { CodeError } from '@libp2p/interfaces/errors' -import type { DHT } from '@libp2p/interface-dht' +import type { DHT, QueryEvent } from '@libp2p/interface-dht' import type { ContentRouting } from '@libp2p/interface-content-routing' import type { CID } from 'multiformats/cid' import type { AbortOptions } from '@libp2p/interfaces' import type { PeerInfo } from '@libp2p/interface-peer-info' +import { CustomProgressEvent, type ProgressEvent, type ProgressOptions } from 'progress-events' + +export type FindProvidersProgressEvents = + ProgressEvent<'content-routing:find-providers:dht:event', QueryEvent> + +export type PutProgressEvents = + ProgressEvent<'content-routing:put:dht:event', QueryEvent> + +export type GetProgressEvents = + ProgressEvent<'content-routing:get:dht:event', QueryEvent> /** * Wrapper class to convert events into returned values @@ -20,20 +30,26 @@ export class DHTContentRouting implements ContentRouting { await drain(this.dht.provide(cid)) } - async * findProviders (cid: CID, options: AbortOptions = {}): AsyncGenerator { + async * findProviders (cid: CID, options: AbortOptions & ProgressOptions = {}): AsyncGenerator { for await (const event of this.dht.findProviders(cid, options)) { + options.onProgress?.(new CustomProgressEvent('content-routing:find-providers:dht:event', event)) + if (event.name === 'PROVIDER') { yield * event.providers } } } - async put (key: Uint8Array, value: Uint8Array, options?: AbortOptions): Promise { - await drain(this.dht.put(key, value, options)) + async put (key: Uint8Array, value: Uint8Array, options: AbortOptions & ProgressOptions = {}): Promise { + for await (const event of this.dht.put(key, value, options)) { + options.onProgress?.(new CustomProgressEvent('content-routing:put:dht:event', event)) + } } - async get (key: Uint8Array, options?: AbortOptions): Promise { + async get (key: Uint8Array, options: AbortOptions & ProgressOptions = {}): Promise { for await (const event of this.dht.get(key, options)) { + options.onProgress?.(new CustomProgressEvent('content-routing:get:dht:event', event)) + if (event.name === 'VALUE') { return event.value } diff --git a/src/dht/dht-peer-routing.ts b/src/dht/dht-peer-routing.ts index 61ff4f4820..bb1ebc4f00 100644 --- a/src/dht/dht-peer-routing.ts +++ b/src/dht/dht-peer-routing.ts @@ -1,10 +1,17 @@ import { CodeError } from '@libp2p/interfaces/errors' import { messages, codes } from '../errors.js' import type { PeerRouting } from '@libp2p/interface-peer-routing' -import type { DHT } from '@libp2p/interface-dht' +import type { DHT, QueryEvent } from '@libp2p/interface-dht' import type { PeerId } from '@libp2p/interface-peer-id' import type { AbortOptions } from '@libp2p/interfaces' import type { PeerInfo } from '@libp2p/interface-peer-info' +import { CustomProgressEvent, type ProgressEvent, type ProgressOptions } from 'progress-events' + +export type FindPeerProgressEvents = + ProgressEvent<'peer-routing:find-peer:dht:event', QueryEvent> + +export type GetClosestPeerProgressEvents = + ProgressEvent<'peer-routing:get-closest-peer:dht:event', QueryEvent> /** * Wrapper class to convert events into returned values @@ -16,8 +23,10 @@ export class DHTPeerRouting implements PeerRouting { this.dht = dht } - async findPeer (peerId: PeerId, options: AbortOptions = {}): Promise { + async findPeer (peerId: PeerId, options: AbortOptions & ProgressOptions = {}): Promise { for await (const event of this.dht.findPeer(peerId, options)) { + options.onProgress?.(new CustomProgressEvent('peer-routing:find-peer:dht:event', event)) + if (event.name === 'FINAL_PEER') { return event.peer } @@ -26,8 +35,10 @@ export class DHTPeerRouting implements PeerRouting { throw new CodeError(messages.NOT_FOUND, codes.ERR_NOT_FOUND) } - async * getClosestPeers (key: Uint8Array, options: AbortOptions = {}): AsyncIterable { + async * getClosestPeers (key: Uint8Array, options: AbortOptions & ProgressOptions = {}): AsyncIterable { for await (const event of this.dht.getClosestPeers(key, options)) { + options.onProgress?.(new CustomProgressEvent('peer-routing:get-closest-peer:dht:event', event)) + if (event.name === 'FINAL_PEER') { yield event.peer } diff --git a/test/content-routing/content-routing.node.ts b/test/content-routing/content-routing.node.ts index 8c8ee224f8..ee78bae199 100644 --- a/test/content-routing/content-routing.node.ts +++ b/test/content-routing/content-routing.node.ts @@ -115,6 +115,39 @@ describe('content-routing', () => { return await deferred.promise }) + + it('should call progress events', async () => { + const deferred = pDefer() + + if (nodes[0].dht == null) { + throw new Error('DHT was not configured') + } + + sinon.stub(nodes[0].dht, 'findProviders').callsFake(async function * () { + yield { + from: nodes[0].peerId, + type: 0, + name: 'PROVIDER', + providers: [{ + id: nodes[0].peerId, + multiaddrs: [], + protocols: [] + }] + } + deferred.resolve() + }) + + const onProgress = sinon.stub() + + await drain(nodes[0].contentRouting.findProviders(CID.parse('QmU621oD8AhHw6t25vVyfYKmL9VV3PTgc52FngEhTGACFB'), { + // @ts-expect-error - can be removed when the content routing interface is updated to include progress events + onProgress + })) + + await deferred.promise + + expect(onProgress.called).to.be.true() + }) }) describe('via delegate router', () => { From e1671d37020ac6b93fe37c8b48d05ffd457249c0 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Tue, 28 Mar 2023 16:14:28 +0100 Subject: [PATCH 2/2] chore: add libp2p prefix --- src/dht/dht-content-routing.ts | 29 +++++++++++++++++++---------- src/dht/dht-peer-routing.ts | 13 ++++++++----- 2 files changed, 27 insertions(+), 15 deletions(-) diff --git a/src/dht/dht-content-routing.ts b/src/dht/dht-content-routing.ts index a9f07d6919..23164207df 100644 --- a/src/dht/dht-content-routing.ts +++ b/src/dht/dht-content-routing.ts @@ -1,4 +1,3 @@ -import drain from 'it-drain' import { CodeError } from '@libp2p/interfaces/errors' import type { DHT, QueryEvent } from '@libp2p/interface-dht' import type { ContentRouting } from '@libp2p/interface-content-routing' @@ -7,32 +6,42 @@ import type { AbortOptions } from '@libp2p/interfaces' import type { PeerInfo } from '@libp2p/interface-peer-info' import { CustomProgressEvent, type ProgressEvent, type ProgressOptions } from 'progress-events' +export type ProvideProgressEvents = + ProgressEvent<'libp2p:content-routing:provide:dht:event', QueryEvent> + export type FindProvidersProgressEvents = - ProgressEvent<'content-routing:find-providers:dht:event', QueryEvent> + ProgressEvent<'libp2p:content-routing:find-providers:dht:event', QueryEvent> export type PutProgressEvents = - ProgressEvent<'content-routing:put:dht:event', QueryEvent> + ProgressEvent<'libp2p:content-routing:put:dht:event', QueryEvent> export type GetProgressEvents = - ProgressEvent<'content-routing:get:dht:event', QueryEvent> + ProgressEvent<'libp2p:content-routing:get:dht:event', QueryEvent> /** * Wrapper class to convert events into returned values */ -export class DHTContentRouting implements ContentRouting { +export class DHTContentRouting implements ContentRouting< + ProvideProgressEvents, + FindProvidersProgressEvents, + PutProgressEvents, + GetProgressEvents +> { private readonly dht: DHT constructor (dht: DHT) { this.dht = dht } - async provide (cid: CID): Promise { - await drain(this.dht.provide(cid)) + async provide (cid: CID, options: AbortOptions & ProgressOptions = {}): Promise { + for await (const event of this.dht.provide(cid, options)) { + options.onProgress?.(new CustomProgressEvent('libp2p:content-routing:provide:dht:event', event)) + } } async * findProviders (cid: CID, options: AbortOptions & ProgressOptions = {}): AsyncGenerator { for await (const event of this.dht.findProviders(cid, options)) { - options.onProgress?.(new CustomProgressEvent('content-routing:find-providers:dht:event', event)) + options.onProgress?.(new CustomProgressEvent('libp2p:content-routing:find-providers:dht:event', event)) if (event.name === 'PROVIDER') { yield * event.providers @@ -42,13 +51,13 @@ export class DHTContentRouting implements ContentRouting { async put (key: Uint8Array, value: Uint8Array, options: AbortOptions & ProgressOptions = {}): Promise { for await (const event of this.dht.put(key, value, options)) { - options.onProgress?.(new CustomProgressEvent('content-routing:put:dht:event', event)) + options.onProgress?.(new CustomProgressEvent('libp2p:content-routing:put:dht:event', event)) } } async get (key: Uint8Array, options: AbortOptions & ProgressOptions = {}): Promise { for await (const event of this.dht.get(key, options)) { - options.onProgress?.(new CustomProgressEvent('content-routing:get:dht:event', event)) + options.onProgress?.(new CustomProgressEvent('libp2p:content-routing:get:dht:event', event)) if (event.name === 'VALUE') { return event.value diff --git a/src/dht/dht-peer-routing.ts b/src/dht/dht-peer-routing.ts index bb1ebc4f00..536dea4830 100644 --- a/src/dht/dht-peer-routing.ts +++ b/src/dht/dht-peer-routing.ts @@ -8,15 +8,18 @@ import type { PeerInfo } from '@libp2p/interface-peer-info' import { CustomProgressEvent, type ProgressEvent, type ProgressOptions } from 'progress-events' export type FindPeerProgressEvents = - ProgressEvent<'peer-routing:find-peer:dht:event', QueryEvent> + ProgressEvent<'libp2p:peer-routing:find-peer:dht:event', QueryEvent> export type GetClosestPeerProgressEvents = - ProgressEvent<'peer-routing:get-closest-peer:dht:event', QueryEvent> + ProgressEvent<'libp2p:peer-routing:get-closest-peer:dht:event', QueryEvent> /** * Wrapper class to convert events into returned values */ -export class DHTPeerRouting implements PeerRouting { +export class DHTPeerRouting implements PeerRouting< + FindPeerProgressEvents, + GetClosestPeerProgressEvents +> { private readonly dht: DHT constructor (dht: DHT) { @@ -25,7 +28,7 @@ export class DHTPeerRouting implements PeerRouting { async findPeer (peerId: PeerId, options: AbortOptions & ProgressOptions = {}): Promise { for await (const event of this.dht.findPeer(peerId, options)) { - options.onProgress?.(new CustomProgressEvent('peer-routing:find-peer:dht:event', event)) + options.onProgress?.(new CustomProgressEvent('libp2p:peer-routing:find-peer:dht:event', event)) if (event.name === 'FINAL_PEER') { return event.peer @@ -37,7 +40,7 @@ export class DHTPeerRouting implements PeerRouting { async * getClosestPeers (key: Uint8Array, options: AbortOptions & ProgressOptions = {}): AsyncIterable { for await (const event of this.dht.getClosestPeers(key, options)) { - options.onProgress?.(new CustomProgressEvent('peer-routing:get-closest-peer:dht:event', event)) + options.onProgress?.(new CustomProgressEvent('libp2p:peer-routing:get-closest-peer:dht:event', event)) if (event.name === 'FINAL_PEER') { yield event.peer