Skip to content

Commit

Permalink
fix: cancel in-flight block requests when racing brokers (#490)
Browse files Browse the repository at this point in the history
If multiple brokers are configured, when one resolves a block request,
cancel the outstanding requests to free up resources.
  • Loading branch information
achingbrain authored Apr 9, 2024
1 parent 532d6c4 commit 395cd9e
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 2 deletions.
3 changes: 3 additions & 0 deletions packages/utils/src/utils/networked-storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,9 @@ async function raceBlockRetrievers (cid: CID, blockBrokers: BlockBroker[], hashe
})
)
} finally {
// we have the block from the fastest block retriever, abort any still
// in-flight retrieve attempts
controller.abort()
signal.clear()
}
}
19 changes: 17 additions & 2 deletions packages/utils/test/utils/networked-storage.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
import { defaultHashers } from '../../src/utils/default-hashers.js'
import { NetworkedStorage } from '../../src/utils/networked-storage.js'
import { createBlock } from '../fixtures/create-block.js'
import type { NetworkedStorageComponents } from '../../src/utils/networked-storage.js'
import type { BlockBroker } from '@helia/interface/blocks'
import type { Blockstore } from 'interface-blockstore'

Expand All @@ -24,6 +25,7 @@ describe('networked-storage', () => {
let blockstore: Blockstore
let bitswap: StubbedInstance<Required<BlockBroker>>
let blocks: Array<{ cid: CID, block: Uint8Array }>
let components: NetworkedStorageComponents

beforeEach(async () => {
blocks = []
Expand All @@ -34,14 +36,15 @@ describe('networked-storage', () => {

blockstore = new MemoryBlockstore()
bitswap = stubInterface()
storage = new NetworkedStorage({
components = {
blockstore,
logger: defaultLogger(),
blockBrokers: [
bitswap
],
hashers: defaultHashers()
})
}
storage = new NetworkedStorage(components)
})

it('gets a block from the blockstore', async () => {
Expand Down Expand Up @@ -196,4 +199,16 @@ describe('networked-storage', () => {
const block = await storage.get(cid)
expect(uint8ArrayToString(block)).to.equal('hello world')
})

it('cancels in-flight block requests when one resolves', async () => {
const slowBroker = stubInterface<Required<BlockBroker>>()
components.blockBrokers.push(slowBroker)

bitswap.retrieve.withArgs(blocks[0].cid).resolves(blocks[0].block)

const block = await storage.get(blocks[0].cid)

expect(block).to.equalBytes(blocks[0].block)
expect(slowBroker.retrieve.getCall(0)).to.have.nested.property('args[1].signal.aborted', true)
})
})

0 comments on commit 395cd9e

Please sign in to comment.