diff --git a/packages/bitswap/src/bitswap.ts b/packages/bitswap/src/bitswap.ts index 5d2bf58f..65db4b06 100644 --- a/packages/bitswap/src/bitswap.ts +++ b/packages/bitswap/src/bitswap.ts @@ -117,7 +117,10 @@ export class Bitswap implements BitswapInterface { * Sends notifications about the arrival of a block */ async notify (cid: CID, block: Uint8Array, options: ProgressOptions & AbortOptions = {}): Promise { - await this.peerWantLists.receivedBlock(cid, options) + await Promise.all([ + this.peerWantLists.receivedBlock(cid, options), + this.wantList.receivedBlock(cid, options) + ]) } getWantlist (): WantListEntry[] { diff --git a/packages/bitswap/src/network.ts b/packages/bitswap/src/network.ts index 17dc36fb..1352966e 100644 --- a/packages/bitswap/src/network.ts +++ b/packages/bitswap/src/network.ts @@ -14,13 +14,13 @@ import { CID } from 'multiformats/cid' import { CustomProgressEvent } from 'progress-events' import { raceEvent } from 'race-event' import { toString as uint8ArrayToString } from 'uint8arrays/to-string' -import { BITSWAP_120, DEFAULT_MAX_INBOUND_STREAMS, DEFAULT_MAX_OUTBOUND_STREAMS, DEFAULT_MAX_PROVIDERS_PER_REQUEST, DEFAULT_MESSAGE_RECEIVE_TIMEOUT, DEFAULT_MESSAGE_SEND_TIMEOUT, DEFAULT_RUN_ON_TRANSIENT_CONNECTIONS } from './constants.js' +import { BITSWAP_120, DEFAULT_MAX_INBOUND_STREAMS, DEFAULT_MAX_OUTBOUND_STREAMS, DEFAULT_MAX_PROVIDERS_PER_REQUEST, DEFAULT_MESSAGE_RECEIVE_TIMEOUT, DEFAULT_MESSAGE_SEND_CONCURRENCY, DEFAULT_MESSAGE_SEND_TIMEOUT, DEFAULT_RUN_ON_TRANSIENT_CONNECTIONS } from './constants.js' import { BitswapMessage } from './pb/message.js' import type { WantOptions } from './bitswap.js' import type { MultihashHasherLoader } from './index.js' import type { Block, BlockPresence, WantlistEntry } from './pb/message.js' import type { Provider, Routing } from '@helia/interface/routing' -import type { Libp2p, AbortOptions, Connection, PeerId, IncomingStreamData, Topology, MetricGroup, ComponentLogger, Metrics, IdentifyResult } from '@libp2p/interface' +import type { Libp2p, AbortOptions, Connection, PeerId, IncomingStreamData, Topology, ComponentLogger, IdentifyResult, Counter } from '@libp2p/interface' import type { Logger } from '@libp2p/logger' import type { ProgressEvent, ProgressOptions } from 'progress-events' @@ -79,7 +79,6 @@ export interface NetworkComponents { routing: Routing logger: ComponentLogger libp2p: Libp2p - metrics?: Metrics } export interface BitswapMessageEventDetail { @@ -107,7 +106,7 @@ export class Network extends TypedEventEmitter { private readonly maxOutboundStreams: number private readonly messageReceiveTimeout: number private registrarIds: string[] - private readonly metrics?: { blocksSent: MetricGroup, dataSent: MetricGroup } + private readonly metrics: { blocksSent?: Counter, dataSent?: Counter } private readonly sendQueue: PeerQueue private readonly messageSendTimeout: number private readonly runOnTransientConnections: boolean @@ -129,17 +128,14 @@ export class Network extends TypedEventEmitter { this.messageReceiveTimeout = init.messageReceiveTimeout ?? DEFAULT_MESSAGE_RECEIVE_TIMEOUT this.messageSendTimeout = init.messageSendTimeout ?? DEFAULT_MESSAGE_SEND_TIMEOUT this.runOnTransientConnections = init.runOnTransientConnections ?? DEFAULT_RUN_ON_TRANSIENT_CONNECTIONS - - if (components.metrics != null) { - this.metrics = { - blocksSent: components.metrics?.registerMetricGroup('ipfs_bitswap_sent_blocks'), - dataSent: components.metrics?.registerMetricGroup('ipfs_bitswap_sent_data_bytes') - } + this.metrics = { + blocksSent: components.libp2p.metrics?.registerCounter('ipfs_bitswap_sent_blocks_total'), + dataSent: components.libp2p.metrics?.registerCounter('ipfs_bitswap_sent_data_bytes_total') } this.sendQueue = new PeerQueue({ - concurrency: init.messageSendConcurrency, - metrics: components.metrics, + concurrency: init.messageSendConcurrency ?? DEFAULT_MESSAGE_SEND_CONCURRENCY, + metrics: components.libp2p.metrics, metricName: 'ipfs_bitswap_message_send_queue' }) this.sendQueue.addEventListener('error', (evt) => { @@ -302,9 +298,9 @@ export class Network extends TypedEventEmitter { */ async findAndConnect (cid: CID, options?: WantOptions): Promise { await drain( - take( - map(this.findProviders(cid, options), async provider => this.connectTo(provider.id, options)), - options?.maxProviders ?? DEFAULT_MAX_PROVIDERS_PER_REQUEST + map( + take(this.findProviders(cid, options), options?.maxProviders ?? DEFAULT_MAX_PROVIDERS_PER_REQUEST), + async provider => this.connectTo(provider.id, options) ) ) .catch(err => { @@ -335,9 +331,11 @@ export class Network extends TypedEventEmitter { setMaxListeners(Infinity, signal) try { - const existingJob = this.sendQueue.find(peerId) + const existingJob = this.sendQueue.queue.find(job => { + return peerId.equals(job.options.peerId) && job.status === 'queued' + }) - if (existingJob?.status === 'queued') { + if (existingJob != null) { // merge messages instead of adding new job existingJob.options.message = mergeMessages(existingJob.options.message, message) @@ -371,7 +369,7 @@ export class Network extends TypedEventEmitter { stream.abort(err) } - this._updateSentStats(peerId, message.blocks) + this._updateSentStats(message.blocks) }, { peerId, signal, @@ -417,23 +415,15 @@ export class Network extends TypedEventEmitter { return connection } - _updateSentStats (peerId: PeerId, blocks: Block[] = []): void { - if (this.metrics != null) { - let bytes = 0 - - for (const block of blocks.values()) { - bytes += block.data.byteLength - } + _updateSentStats (blocks: Block[] = []): void { + let bytes = 0 - this.metrics.dataSent.increment({ - global: bytes, - [peerId.toString()]: bytes - }) - this.metrics.blocksSent.increment({ - global: blocks.length, - [peerId.toString()]: blocks.length - }) + for (const block of blocks.values()) { + bytes += block.data.byteLength } + + this.metrics.dataSent?.increment(bytes) + this.metrics.blocksSent?.increment(blocks.length) } } diff --git a/packages/bitswap/src/peer-want-lists/index.ts b/packages/bitswap/src/peer-want-lists/index.ts index bcd3b8fc..4d1b316b 100644 --- a/packages/bitswap/src/peer-want-lists/index.ts +++ b/packages/bitswap/src/peer-want-lists/index.ts @@ -6,7 +6,7 @@ import { Ledger } from './ledger.js' import type { BitswapNotifyProgressEvents, WantListEntry } from '../index.js' import type { Network } from '../network.js' import type { BitswapMessage } from '../pb/message.js' -import type { ComponentLogger, Logger, Metrics, PeerId } from '@libp2p/interface' +import type { ComponentLogger, Libp2p, Logger, PeerId } from '@libp2p/interface' import type { PeerMap } from '@libp2p/peer-collections' import type { Blockstore } from 'interface-blockstore' import type { AbortOptions } from 'it-length-prefixed-stream' @@ -19,7 +19,7 @@ export interface PeerWantListsInit { export interface PeerWantListsComponents { blockstore: Blockstore network: Network - metrics?: Metrics + libp2p: Libp2p logger: ComponentLogger } @@ -46,7 +46,7 @@ export class PeerWantLists { this.ledgerMap = trackedPeerMap({ name: 'ipfs_bitswap_ledger_map', - metrics: components.metrics + metrics: components.libp2p.metrics }) this.network.addEventListener('bitswap:message', (evt) => { diff --git a/packages/bitswap/src/session.ts b/packages/bitswap/src/session.ts index d9f7c691..245f8e65 100644 --- a/packages/bitswap/src/session.ts +++ b/packages/bitswap/src/session.ts @@ -89,6 +89,11 @@ class BitswapSession implements BitswapSessionInterface { void Promise.resolve() .then(async () => { for await (const peerId of source) { + if (found === this.maxProviders) { + this.queue.clear() + break + } + // eslint-disable-next-line no-loop-func await this.queue.add(async () => { try { diff --git a/packages/bitswap/src/stats.ts b/packages/bitswap/src/stats.ts index 9853dab0..c50fd787 100644 --- a/packages/bitswap/src/stats.ts +++ b/packages/bitswap/src/stats.ts @@ -1,7 +1,7 @@ -import type { MetricGroup, Metrics, PeerId } from '@libp2p/interface' +import type { Libp2p, MetricGroup, PeerId } from '@libp2p/interface' export interface StatsComponents { - metrics?: Metrics + libp2p: Libp2p } export class Stats { @@ -11,10 +11,10 @@ export class Stats { private readonly duplicateDataReceived?: MetricGroup constructor (components: StatsComponents) { - this.blocksReceived = components.metrics?.registerMetricGroup('ipfs_bitswap_received_blocks') - this.duplicateBlocksReceived = components.metrics?.registerMetricGroup('ipfs_bitswap_duplicate_received_blocks') - this.dataReceived = components.metrics?.registerMetricGroup('ipfs_bitswap_data_received_bytes') - this.duplicateDataReceived = components.metrics?.registerMetricGroup('ipfs_bitswap_duplicate_data_received_bytes') + this.blocksReceived = components.libp2p.metrics?.registerMetricGroup('ipfs_bitswap_received_blocks') + this.duplicateBlocksReceived = components.libp2p.metrics?.registerMetricGroup('ipfs_bitswap_duplicate_received_blocks') + this.dataReceived = components.libp2p.metrics?.registerMetricGroup('ipfs_bitswap_data_received_bytes') + this.duplicateDataReceived = components.libp2p.metrics?.registerMetricGroup('ipfs_bitswap_duplicate_data_received_bytes') } updateBlocksReceived (count: number = 1, peerId?: PeerId): void { diff --git a/packages/bitswap/src/want-list.ts b/packages/bitswap/src/want-list.ts index 5cd3d632..05b89a1d 100644 --- a/packages/bitswap/src/want-list.ts +++ b/packages/bitswap/src/want-list.ts @@ -1,4 +1,4 @@ -import { AbortError } from '@libp2p/interface' +import { TypedEventEmitter } from '@libp2p/interface' import { trackedPeerMap, PeerSet } from '@libp2p/peer-collections' import { trackedMap } from '@libp2p/utils/tracked-map' import all from 'it-all' @@ -8,14 +8,16 @@ import { pipe } from 'it-pipe' import { CID } from 'multiformats/cid' import { sha256 } from 'multiformats/hashes/sha2' import pDefer from 'p-defer' +import { raceEvent } from 'race-event' +import { equals as uint8ArrayEquals } from 'uint8arrays/equals' import { toString as uint8ArrayToString } from 'uint8arrays/to-string' import { DEFAULT_MESSAGE_SEND_DELAY } from './constants.js' import { BlockPresenceType, WantType } from './pb/message.js' import vd from './utils/varint-decoder.js' -import type { MultihashHasherLoader } from './index.js' +import type { BitswapNotifyProgressEvents, MultihashHasherLoader } from './index.js' import type { BitswapNetworkWantProgressEvents, Network } from './network.js' import type { BitswapMessage } from './pb/message.js' -import type { ComponentLogger, Metrics, PeerId, Startable, AbortOptions } from '@libp2p/interface' +import type { ComponentLogger, PeerId, Startable, AbortOptions, Libp2p, TypedEventTarget } from '@libp2p/interface' import type { Logger } from '@libp2p/logger' import type { PeerMap } from '@libp2p/peer-collections' import type { DeferredPromise } from 'p-defer' @@ -24,7 +26,7 @@ import type { ProgressOptions } from 'progress-events' export interface WantListComponents { network: Network logger: ComponentLogger - metrics?: Metrics + libp2p: Libp2p } export interface WantListInit { @@ -63,16 +65,6 @@ export interface WantListEntry { * If this set has members, the want will only be sent to these peers */ session: PeerSet - - /** - * Promises returned from `.wantBlock` for this block - */ - blockWantListeners: Array> - - /** - * Promises returned from `.wantPresence` for this block - */ - blockPresenceListeners: Array> } export interface WantOptions extends AbortOptions, ProgressOptions { @@ -108,7 +100,12 @@ export interface WantHaveResult { export type WantPresenceResult = WantDontHaveResult | WantHaveResult -export class WantList implements Startable { +export interface WantListEvents { + block: CustomEvent + presence: CustomEvent +} + +export class WantList extends TypedEventEmitter implements Startable, TypedEventTarget { /** * Tracks what CIDs we've previously sent to which peers */ @@ -119,15 +116,18 @@ export class WantList implements Startable { private readonly sendMessagesDelay: number private sendMessagesTimeout?: ReturnType private readonly hashLoader?: MultihashHasherLoader + private sendingMessages?: DeferredPromise constructor (components: WantListComponents, init: WantListInit = {}) { + super() + this.peers = trackedPeerMap({ name: 'ipfs_bitswap_peers', - metrics: components.metrics + metrics: components.libp2p.metrics }) this.wants = trackedMap({ name: 'ipfs_bitswap_wantlist', - metrics: components.metrics + metrics: components.libp2p.metrics }) this.network = components.network this.sendMessagesDelay = init.sendMessagesDelay ?? DEFAULT_MESSAGE_SEND_DELAY @@ -164,9 +164,7 @@ export class WantList implements Startable { priority: options.priority ?? 1, wantType: options.wantType ?? WantType.WantBlock, cancel: false, - sendDontHave: true, - blockWantListeners: [], - blockPresenceListeners: [] + sendDontHave: true } if (options.peerId != null) { @@ -198,30 +196,41 @@ export class WantList implements Startable { } } - // add a promise that will be resolved or rejected when the response arrives - let deferred: DeferredPromise - - if (options.wantType === WantType.WantBlock) { - const p = deferred = pDefer() + // broadcast changes + await this.sendMessagesDebounced() - entry.blockWantListeners.push(p) - } else { - const p = deferred = pDefer() + try { + if (options.wantType === WantType.WantBlock) { + const event = await raceEvent>(this, 'block', options?.signal, { + filter: (event) => { + return uint8ArrayEquals(cid.multihash.digest, event.detail.cid.multihash.digest) + }, + errorMessage: 'Want was aborted' + }) - entry.blockPresenceListeners.push(p) - } + return event.detail + } - // reject the promise if the want is rejected - const abortListener = (): void => { - this.log('want for %c was aborted, cancelling want', cid) + const event = await raceEvent>(this, 'presence', options?.signal, { + filter: (event) => { + return uint8ArrayEquals(cid.multihash.digest, event.detail.cid.multihash.digest) + }, + errorMessage: 'Want was aborted' + }) - if (entry != null) { + return event.detail + } finally { + if (options.signal?.aborted === true) { + this.log('want for %c was aborted, cancelling want', cid) entry.cancel = true + // broadcast changes + await this.sendMessagesDebounced() } - - deferred.reject(new AbortError('Want was aborted')) } - options.signal?.addEventListener('abort', abortListener) + } + + private async sendMessagesDebounced (): Promise { + await this.sendingMessages?.promise // broadcast changes clearTimeout(this.sendMessagesTimeout) @@ -231,77 +240,70 @@ export class WantList implements Startable { this.log('error sending messages to peers', err) }) }, this.sendMessagesDelay) - - try { - return await deferred.promise - } finally { - // remove listener - options.signal?.removeEventListener('abort', abortListener) - // remove deferred promise - if (options.wantType === WantType.WantBlock) { - entry.blockWantListeners = entry.blockWantListeners.filter(recipient => recipient !== deferred) - } else { - entry.blockPresenceListeners = entry.blockPresenceListeners.filter(recipient => recipient !== deferred) - } - } } private async sendMessages (): Promise { - for (const [peerId, sentWants] of this.peers) { - const sent = new Set() - const message: Partial = { - wantlist: { - full: false, - entries: pipe( - this.wants.entries(), - (source) => filter(source, ([key, entry]) => { - // skip session-only wants - if (entry.session.size > 0 && !entry.session.has(peerId)) { - return false - } - - const sentPreviously = sentWants.has(key) - - // don't cancel if we've not sent it to them before - if (entry.cancel) { - return sentPreviously - } - - // only send if we've not sent it to them before - return !sentPreviously - }), - (source) => map(source, ([key, entry]) => { - sent.add(key) - - return { - cid: entry.cid.bytes, - priority: entry.priority, - wantType: entry.wantType, - cancel: entry.cancel, - sendDontHave: entry.sendDontHave - } - }), - (source) => all(source) - ) + this.sendingMessages = pDefer() + + await Promise.all( + [...this.peers.entries()].map(async ([peerId, sentWants]) => { + const sent = new Set() + const message: Partial = { + wantlist: { + full: false, + entries: pipe( + this.wants.entries(), + (source) => filter(source, ([key, entry]) => { + // skip session-only wants + if (entry.session.size > 0 && !entry.session.has(peerId)) { + return false + } + + const sentPreviously = sentWants.has(key) + + // don't cancel if we've not sent it to them before + if (entry.cancel) { + return sentPreviously + } + + // only send if we've not sent it to them before + return !sentPreviously + }), + (source) => map(source, ([key, entry]) => { + sent.add(key) + + return { + cid: entry.cid.bytes, + priority: entry.priority, + wantType: entry.wantType, + cancel: entry.cancel, + sendDontHave: entry.sendDontHave + } + }), + (source) => all(source) + ) + } } - } - if (message.wantlist?.entries.length === 0) { - return - } + if (message.wantlist?.entries.length === 0) { + return + } - // add message to send queue - try { - await this.network.sendMessage(peerId, message) + // add message to send queue + try { + await this.network.sendMessage(peerId, message) - // update list of messages sent to remote - for (const key of sent) { - sentWants.add(key) + // update list of messages sent to remote + for (const key of sent) { + sentWants.add(key) + } + } catch (err: any) { + this.log.error('error sending full wantlist to new peer', err) } - } catch (err: any) { - this.log.error('error sending full wantlist to new peer', err) - } - } + }) + ).catch(err => { + this.log.error('error sending messages', err) + }) // queued all message sends, remove cancelled wants from wantlist and sent // wants @@ -314,6 +316,8 @@ export class WantList implements Startable { } } } + + this.sendingMessages.resolve() } has (cid: CID): boolean { @@ -325,31 +329,30 @@ export class WantList implements Startable { * Add a CID to the wantlist */ async wantPresence (cid: CID, options: WantOptions = {}): Promise { - if (options.peerId != null && this.peers.get(options.peerId) == null) { - const cidStr = uint8ArrayToString(cid.multihash.bytes, 'base64') + if (options.peerId != null) { + const peer = options.peerId - try { - // if we don't have them as a peer, add them - this.peers.set(options.peerId, new Set([cidStr])) + // sending WantHave directly to peer + await this.network.sendMessage(options.peerId, { + wantlist: { + full: false, + entries: [{ + cid: cid.bytes, + sendDontHave: true, + wantType: WantType.WantHave, + priority: 1 + }] + } + }) - // sending WantHave directly to peer - await this.network.sendMessage(options.peerId, { - wantlist: { - full: false, - entries: [{ - cid: cid.bytes, - sendDontHave: true, - wantType: WantType.WantHave, - priority: 1 - }] - } - }) - } catch (err) { - // sending failed, remove them as a peer - this.peers.delete(options.peerId) + // wait for peer response + const event = await raceEvent>(this, 'presence', options.signal, { + filter: (event) => { + return peer.equals(event.detail.sender) && uint8ArrayEquals(cid.multihash.digest, event.detail.cid.multihash.digest) + } + }) - throw err - } + return event.detail } return this.addEntry(cid, { @@ -368,15 +371,29 @@ export class WantList implements Startable { }) } + /** + * Invoked when a block has been received from an external source + */ + async receivedBlock (cid: CID, options: ProgressOptions & AbortOptions): Promise { + const cidStr = uint8ArrayToString(cid.multihash.bytes, 'base64') + + const entry = this.wants.get(cidStr) + + if (entry == null) { + return + } + + entry.cancel = true + + await this.sendMessagesDebounced() + } + /** * Invoked when a message is received from a bitswap peer */ private async receiveMessage (sender: PeerId, message: BitswapMessage): Promise { this.log('received message from %p', sender) - - // blocks received - const blockResults: WantBlockResult[] = [] - const presenceResults: WantPresenceResult[] = [] + let blocksCancelled = false // process blocks for (const block of message.blocks) { @@ -384,7 +401,6 @@ export class WantList implements Startable { continue } - this.log('received block') const values = vd(block.prefix) const cidVersion = values[0] const multicodec = values[1] @@ -403,66 +419,55 @@ export class WantList implements Startable { this.log('received block from %p for %c', sender, cid) - blockResults.push({ - sender, - cid, - block: block.data + this.safeDispatchEvent('block', { + detail: { + sender, + cid, + block: block.data + } }) - presenceResults.push({ - sender, - cid, - has: true - }) - } - - // process block presences - for (const { cid: cidBytes, type } of message.blockPresences) { - const cid = CID.decode(cidBytes) - - this.log('received %s from %p for %c', type, sender, cid) - - presenceResults.push({ - sender, - cid, - has: type === BlockPresenceType.HaveBlock + this.safeDispatchEvent('presence', { + detail: { + sender, + cid, + has: true, + block: block.data + } }) - } - for (const result of blockResults) { - const cidStr = uint8ArrayToString(result.cid.multihash.bytes, 'base64') + const cidStr = uint8ArrayToString(cid.multihash.bytes, 'base64') const entry = this.wants.get(cidStr) if (entry == null) { return } - const recipients = entry.blockWantListeners - entry.blockWantListeners = [] - recipients.forEach((p) => { - p.resolve(result) - }) - // since we received the block, flip the cancel flag to send cancels to // any peers on the next message sending iteration, this will remove it // from the internal want list entry.cancel = true + blocksCancelled = true } - for (const result of presenceResults) { - const cidStr = uint8ArrayToString(result.cid.multihash.bytes, 'base64') - const entry = this.wants.get(cidStr) + // process block presences + for (const { cid: cidBytes, type } of message.blockPresences) { + const cid = CID.decode(cidBytes) - if (entry == null) { - return - } + this.log('received %s from %p for %c', type, sender, cid) - const recipients = entry.blockPresenceListeners - entry.blockPresenceListeners = [] - recipients.forEach((p) => { - p.resolve(result) + this.safeDispatchEvent('presence', { + detail: { + sender, + cid, + has: type === BlockPresenceType.HaveBlock + } }) } + + if (blocksCancelled) { + await this.sendMessagesDebounced() + } } /** diff --git a/packages/bitswap/test/bitswap.spec.ts b/packages/bitswap/test/bitswap.spec.ts index f8a4a630..8a0e4d8c 100644 --- a/packages/bitswap/test/bitswap.spec.ts +++ b/packages/bitswap/test/bitswap.spec.ts @@ -12,6 +12,7 @@ import { duplexPair } from 'it-pair/duplex' import { pbStream } from 'it-protobuf-stream' import { CID } from 'multiformats/cid' import { sha256 } from 'multiformats/hashes/sha2' +import pDefer from 'p-defer' import pWaitFor from 'p-wait-for' import Sinon from 'sinon' import { stubInterface } from 'sinon-ts' @@ -46,7 +47,9 @@ describe('bitswap', () => { peerId: await createEd25519PeerId(), routing: stubInterface(), blockstore: new MemoryBlockstore(), - libp2p: stubInterface() + libp2p: stubInterface({ + metrics: undefined + }) } bitswap = new Bitswap({ @@ -91,8 +94,7 @@ describe('bitswap', () => { id: await createEd25519PeerId(), multiaddrs: [ multiaddr(`/ip4/4${i}.4${i}.4${i}.4${i}/tcp/${1234 + i}`) - ], - protocols: ['transport-bitswap'] + ] } }) ) @@ -198,8 +200,7 @@ describe('bitswap', () => { id: await createEd25519PeerId(), multiaddrs: [ multiaddr('/ip4/41.41.41.41/tcp/1234') - ], - protocols: ['transport-bitswap'] + ] }] components.routing.findProviders.withArgs(cid).returns((async function * () { @@ -226,8 +227,7 @@ describe('bitswap', () => { id: await createEd25519PeerId(), multiaddrs: [ multiaddr('/ip4/41.41.41.41/tcp/1234') - ], - protocols: ['transport-bitswap'] + ] }] components.routing.findProviders.withArgs(cid).returns((async function * () { @@ -283,10 +283,26 @@ describe('bitswap', () => { describe('want', () => { it('should want a block that is available on the network', async () => { const remotePeer = await createEd25519PeerId() - const findProvsSpy = Sinon.spy(bitswap.network, 'findAndConnect') + const findProvsSpy = bitswap.network.findAndConnect = Sinon.stub() + findProvsSpy.resolves() + + // add peer + bitswap.wantList.peers.set(remotePeer, new Set()) + + // wait for message send to peer + const sentMessages = pDefer() + + bitswap.network.sendMessage = async (peerId) => { + if (remotePeer.equals(peerId)) { + sentMessages.resolve() + } + } const p = bitswap.want(cid) + // wait for message send to peer + await sentMessages.promise + // provider sends message bitswap.network.safeDispatchEvent('bitswap:message', { detail: { @@ -334,8 +350,26 @@ describe('bitswap', () => { const remotePeer = await createEd25519PeerId() expect(bitswap.getWantlist()).to.be.empty() + const findProvsSpy = bitswap.network.findAndConnect = Sinon.stub() + findProvsSpy.resolves() + + // add peer + bitswap.wantList.peers.set(remotePeer, new Set()) + + // wait for message send to peer + const sentMessages = pDefer() + + bitswap.network.sendMessage = async (peerId) => { + if (remotePeer.equals(peerId)) { + sentMessages.resolve() + } + } + const p = bitswap.want(cid) + // wait for message send to peer + await sentMessages.promise + expect(bitswap.getWantlist().map(w => w.cid)).to.include(cid) // provider sends message diff --git a/packages/bitswap/test/network.spec.ts b/packages/bitswap/test/network.spec.ts index ad958e4a..261ea8b9 100644 --- a/packages/bitswap/test/network.spec.ts +++ b/packages/bitswap/test/network.spec.ts @@ -34,7 +34,8 @@ describe('network', () => { components = { routing: stubInterface(), libp2p: stubInterface({ - getConnections: () => [] + getConnections: () => [], + metrics: undefined }) } diff --git a/packages/bitswap/test/peer-want-list.spec.ts b/packages/bitswap/test/peer-want-list.spec.ts index 2ed3e0ed..dafa487b 100644 --- a/packages/bitswap/test/peer-want-list.spec.ts +++ b/packages/bitswap/test/peer-want-list.spec.ts @@ -20,6 +20,7 @@ interface PeerWantListsComponentStubs { peerId: PeerId blockstore: Blockstore network: Network + libp2p: Libp2p logger: ComponentLogger } @@ -30,18 +31,21 @@ describe('peer-want-lists', () => { beforeEach(async () => { const logger = defaultLogger() + const libp2p = stubInterface({ + getConnections: () => [], + metrics: undefined + }) network = new Network({ routing: stubInterface(), logger, - libp2p: stubInterface({ - getConnections: () => [] - }) + libp2p }) components = { peerId: await createEd25519PeerId(), blockstore: new MemoryBlockstore(), network, + libp2p, logger: defaultLogger() } diff --git a/packages/bitswap/test/stats.spec.ts b/packages/bitswap/test/stats.spec.ts index bd895295..923f6dfa 100644 --- a/packages/bitswap/test/stats.spec.ts +++ b/packages/bitswap/test/stats.spec.ts @@ -2,10 +2,10 @@ import { createEd25519PeerId } from '@libp2p/peer-id-factory' import { expect } from 'aegir/chai' import { stubInterface, type StubbedInstance } from 'sinon-ts' import { Stats } from '../src/stats.js' -import type { MetricGroup, Metrics } from '@libp2p/interface' +import type { Libp2p, MetricGroup, Metrics } from '@libp2p/interface' interface StubbedStatsComponents { - metrics: StubbedInstance + libp2p: StubbedInstance } describe('stats', () => { @@ -15,13 +15,15 @@ describe('stats', () => { beforeEach(() => { components = { - metrics: stubInterface() + libp2p: stubInterface({ + metrics: stubInterface() + }) } metricGroup = stubInterface() // @ts-expect-error tsc does not select correct method overload sig - components.metrics.registerMetricGroup.returns(metricGroup) + components.libp2p.metrics?.registerMetricGroup.returns(metricGroup) stats = new Stats(components) }) diff --git a/packages/bitswap/test/want-list.spec.ts b/packages/bitswap/test/want-list.spec.ts index 333fe6df..e096e265 100644 --- a/packages/bitswap/test/want-list.spec.ts +++ b/packages/bitswap/test/want-list.spec.ts @@ -7,9 +7,11 @@ import { stubInterface, type StubbedInstance } from 'sinon-ts' import { type Network } from '../src/network.js' import { WantType } from '../src/pb/message.js' import { WantList } from '../src/want-list.js' +import type { Libp2p } from '@libp2p/interface' interface StubbedWantListComponents { network: StubbedInstance + libp2p: StubbedInstance } describe('wantlist', () => { @@ -18,7 +20,10 @@ describe('wantlist', () => { beforeEach(() => { components = { - network: stubInterface() + network: stubInterface(), + libp2p: stubInterface({ + metrics: undefined + }) } wantList = new WantList({