Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: two explicit queues for session work #538

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
32 changes: 32 additions & 0 deletions packages/block-brokers/test/trustless-gateway-sessions.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,4 +108,36 @@ describe('trustless-gateway sessions', () => {
await expect(session.retrieve(cid)).to.eventually.deep.equal(block)
expect(queryProviderSpy.callCount).to.equal(1)
})

it('should ignore duplicate providers when unable to retrieve a block', async () => {
const session = createTrustlessGatewaySession(components, {
allowInsecure: true,
allowLocal: true
})

// changed the CID to end in `aa` instead of `aq`
const cid = CID.parse('bafkreiefnkxuhnq3536qo2i2w3tazvifek4mbbzb6zlq3ouhprjce5c3aa')

const queryProviderSpy = Sinon.spy(session, 'queryProvider')
const findNewProvidersSpy = Sinon.spy(session, 'findNewProviders')
const hasProviderSpy = Sinon.spy(session, 'hasProvider')

const prov = {
id: await createEd25519PeerId(),
multiaddrs: [
uriToMultiaddr(process.env.TRUSTLESS_GATEWAY ?? '')
]
}

components.routing.findProviders.callsFake(async function * () {
yield prov
})

await expect(session.retrieve(cid)).to.eventually.be.rejected()
expect(hasProviderSpy.callCount).to.be.greaterThanOrEqual(2)
expect(hasProviderSpy.getCall(0).returnValue).to.be.false()
expect(hasProviderSpy.getCall(1).returnValue).to.be.true()
expect(findNewProvidersSpy.callCount).to.be.greaterThanOrEqual(2)
expect(queryProviderSpy.callCount).to.equal(1)
})
})
227 changes: 142 additions & 85 deletions packages/utils/src/abstract-session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
private readonly maxProviders: number
public readonly providers: Provider[]
private readonly evictionFilter: BloomFilter
findProviderQueue: Queue<void, AbortOptions>
queryProviderQueue: Queue<Uint8Array, { provider: Provider, priority?: number } & AbortOptions>

constructor (components: AbstractSessionComponents, init: AbstractCreateSessionOptions) {
super()
Expand All @@ -45,6 +47,12 @@
this.maxProviders = init.maxProviders ?? DEFAULT_SESSION_MAX_PROVIDERS
this.providers = []
this.evictionFilter = BloomFilter.create(this.maxProviders)
this.findProviderQueue = new Queue({
concurrency: 1
})
this.queryProviderQueue = new Queue({
concurrency: this.maxProviders
})
}

async retrieve (cid: CID, options: BlockRetrievalOptions<RetrieveBlockProgressEvents> = {}): Promise<Uint8Array> {
Expand All @@ -56,10 +64,18 @@
this.log('join existing request for %c', cid)
return existingJob
}

let foundBlock = false
const deferred: DeferredPromise<Uint8Array> = pDefer()
this.requests.set(cidStr, deferred.promise)

const peerAddedToSessionListener = (event: CustomEvent<Provider>): void => {
this.log('peer added to session...')
this.addQueryProviderJob(cid, event.detail, options)
}

// add new session peers to query as they are discovered
this.addEventListener('provider', peerAddedToSessionListener)

if (this.providers.length === 0) {
let first = false

Expand All @@ -74,113 +90,139 @@
if (first) {
this.log('found initial session peers for %c', cid)
}
} else {
/**
* Query all existing providers.
*
* This is really only used when querying for subsequent blocks in the same
* session. e.g. You create the session for CID bafy1234, and then you want
* to retrieve bafy5678 from the same session. This call makes sure that we
* initially query the existing providers for the new CID before we start
* finding new providers.
*/
void Promise.all([...this.providers].map(async (provider) => {
this.log('querying existing provider %o', this.toEvictionKey(provider))
return this.addQueryProviderJob(cid, provider, options)
}))

Check warning on line 106 in packages/utils/src/abstract-session.ts

View check run for this annotation

Codecov / codecov/patch

packages/utils/src/abstract-session.ts#L94-L106

Added lines #L94 - L106 were not covered by tests
}

let foundBlock = false
let findProvidersErrored = false
this.findProviderQueue.addEventListener('failure', (evt) => {
this.log.error('error finding new providers for %c', cid, evt.detail.error)

// this queue manages outgoing requests - as new peers are added to the
// session they will be added to the queue so we can request the current
// block from multiple peers as they are discovered
const queue = new Queue<Uint8Array, { provider: Provider, priority?: number } & AbortOptions>({
concurrency: this.maxProviders
findProvidersErrored = true
if (foundBlock) {
// we found the block, so we can ignore this error
return
}

Check warning on line 117 in packages/utils/src/abstract-session.ts

View check run for this annotation

Codecov / codecov/patch

packages/utils/src/abstract-session.ts#L115-L117

Added lines #L115 - L117 were not covered by tests
if (['ERR_INSUFFICIENT_PROVIDERS_FOUND'].includes((evt.detail.error as CodeError).code)) {
this.log.error('insufficient providers found for %c', cid)
if (this.queryProviderQueue.running === 0) {
// only reject if we're not currently querying any providers
deferred.reject(evt.detail.error)
}
}
})
queue.addEventListener('error', () => {})
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if adding this empty error handler is necessary?

queue.addEventListener('failure', (evt) => {

this.findProviderQueue.addEventListener('idle', () => {
this.log.trace('findProviderQueue idle')
if (options.signal?.aborted === true && !foundBlock) {
deferred.reject(new CodeError(options.signal.reason, 'ABORT_ERR'))
return
}

if (foundBlock || findProvidersErrored || options.signal?.aborted === true) {
return
}
// continuously find new providers while we haven't found the block and signal is not aborted
this.addFindProviderJob(cid, options)
})

this.queryProviderQueue.addEventListener('failure', (evt) => {
this.log.error('error querying provider %o, evicting from session', evt.detail.job.options.provider, evt.detail.error)
this.evict(evt.detail.job.options.provider)
})
queue.addEventListener('success', (evt) => {
// peer has sent block, return it to the caller

this.queryProviderQueue.addEventListener('success', (event) => {
this.log.trace('queryProviderQueue success')
foundBlock = true
deferred.resolve(evt.detail.result)
// this.findProviderQueue.clear()
SgtPooki marked this conversation as resolved.
Show resolved Hide resolved
deferred.resolve(event.detail.result)
})
queue.addEventListener('idle', () => {
if (foundBlock || options.signal?.aborted === true) {
// we either found the block or the user gave up

this.queryProviderQueue.addEventListener('idle', () => {
this.log.trace('queryProviderQueue is idle')
if (foundBlock) {
return
}

// find more session peers and retry
Promise.resolve()
.then(async () => {
this.log('no session peers had block for for %c, finding new providers', cid)

// evict this.minProviders random providers to make room for more
for (let i = 0; i < this.minProviders; i++) {
if (this.providers.length === 0) {
break
}

const provider = this.providers[Math.floor(Math.random() * this.providers.length)]
this.evict(provider)
}

// find new providers for the CID
await this.findProviders(cid, this.minProviders, options)

// keep trying until the abort signal fires
this.log('found new providers re-retrieving %c', cid)
this.requests.delete(cidStr)
deferred.resolve(await this.retrieve(cid, options))
})
.catch(err => {
this.log.error('could not find new providers for %c', cid, err)
deferred.reject(err)
})
if (options.signal?.aborted === true) {
// if the signal was aborted, we should reject the request
deferred.reject(options.signal.reason)
return

Check warning on line 161 in packages/utils/src/abstract-session.ts

View check run for this annotation

Codecov / codecov/patch

packages/utils/src/abstract-session.ts#L159-L161

Added lines #L159 - L161 were not covered by tests
}
// we're done querying found providers.. if we can't find new providers, we should reject
if (findProvidersErrored) {
deferred.reject(new CodeError('Done querying all found providers and unable to retrieve the block', 'ERR_NO_PROVIDERS_HAD_BLOCK'))
return

Check warning on line 166 in packages/utils/src/abstract-session.ts

View check run for this annotation

Codecov / codecov/patch

packages/utils/src/abstract-session.ts#L165-L166

Added lines #L165 - L166 were not covered by tests
}
// otherwise, we're still waiting for more providers to query
this.log('waiting for more providers to query')
// if this.findProviders is not running, start it
if (this.findProviderQueue.running === 0) {
this.addFindProviderJob(cid, options)
}
})

const peerAddedToSessionListener = (event: CustomEvent<Provider>): void => {
queue.add(async () => {
return this.queryProvider(cid, event.detail, options)
}, {
provider: event.detail
})
.catch(err => {
if (options.signal?.aborted === true) {
// skip logging error if signal was aborted because abort can happen
// on success (e.g. another session found the block)
return
}

this.log.error('error retrieving session block for %c', cid, err)
})
try {
// this.intialPeerSearchComplete = this.findProviders(cid, this.minProviders, options)
SgtPooki marked this conversation as resolved.
Show resolved Hide resolved
return await deferred.promise
} finally {
this.log('finally block, cleaning up session')
this.removeEventListener('provider', peerAddedToSessionListener)
this.findProviderQueue.clear()
this.queryProviderQueue.clear()
this.requests.delete(cidStr)
}
}

// add new session peers to query as they are discovered
this.addEventListener('provider', peerAddedToSessionListener)

// query each session peer directly
Promise.all([...this.providers].map(async (provider) => {
return queue.add(async () => {
return this.queryProvider(cid, provider, options)
}, {
provider
})
}))
addFindProviderJob (cid: CID, options: AbortOptions): any {
return this.findProviderQueue.add(async () => {
await this.findProviders(cid, this.minProviders, options)
}, { signal: options.signal })
.catch(err => {
if (options.signal?.aborted === true) {
// skip logging error if signal was aborted because abort can happen
// on success (e.g. another session found the block)
// skip logging error if signal was aborted because abort can happen
// on success (e.g. another session found the block)
return
}

this.log.error('error retrieving session block for %c', cid, err)
this.log.error('could not find new providers for %c', cid, err)
})
}

try {
return await deferred.promise
} finally {
this.removeEventListener('provider', peerAddedToSessionListener)
queue.clear()
this.requests.delete(cidStr)
}
addQueryProviderJob (cid: CID, provider: Provider, options: AbortOptions): any {
return this.queryProviderQueue.add(async () => {
return this.queryProvider(cid, provider, options)
}, {
provider,
signal: options.signal
}).catch(err => {
if (options.signal?.aborted === true) {
// skip logging error if signal was aborted because abort can happen
// on success (e.g. another session found the block)
return
}

Check warning on line 213 in packages/utils/src/abstract-session.ts

View check run for this annotation

Codecov / codecov/patch

packages/utils/src/abstract-session.ts#L210-L213

Added lines #L210 - L213 were not covered by tests
this.log.error('error retrieving session block for %c', cid, err)
})
}

evict (provider: Provider): void {
this.evictionFilter.add(this.toEvictionKey(provider))
this.log('provider added to evictionFilter')
const index = this.providers.findIndex(prov => this.equals(prov, provider))
this.log('index of provider in this.providers: %d', index)

if (index === -1) {
this.log('tried to evict provider, but it was not in this.providers')

Check warning on line 225 in packages/utils/src/abstract-session.ts

View check run for this annotation

Codecov / codecov/patch

packages/utils/src/abstract-session.ts#L225

Added line #L225 was not covered by tests
return
}

Expand All @@ -193,7 +235,7 @@

hasProvider (provider: Provider): boolean {
// dedupe existing gateways
if (this.providers.find(prov => this.equals(prov, provider)) != null) {
if (this.providers.some(prov => this.equals(prov, provider))) {
return true
}

Expand All @@ -205,7 +247,14 @@
return false
}

/**
* @param cid - The CID of the block to find providers for
* @param count - The number of providers to find
* @param options - AbortOptions
* @returns
*/
private async findProviders (cid: CID, count: number, options: AbortOptions): Promise<void> {
this.log('findProviders called')
const deferred: DeferredPromise<void> = pDefer()
let found = 0

Expand All @@ -216,23 +265,31 @@
this.log('finding %d-%d new provider(s) for %c', count, this.maxProviders, cid)

for await (const provider of this.findNewProviders(cid, options)) {
if (found === this.maxProviders || options.signal?.aborted === true) {
this.log('found new provider %o', this.toEvictionKey(provider))
// options.signal?.throwIfAborted()
if (this.providers.length === this.maxProviders || options.signal?.aborted === true) {
// if (this.providers.length === this.maxProviders) {

Check warning on line 271 in packages/utils/src/abstract-session.ts

View check run for this annotation

Codecov / codecov/patch

packages/utils/src/abstract-session.ts#L271

Added line #L271 was not covered by tests
break
}

if (this.hasProvider(provider)) {
this.log('ignoring duplicate provider')
continue
} else {
this.log('provider is not a duplicate')
}

this.log('found %d/%d new providers', found, this.maxProviders)
found++
this.log('found %d/%d new providers, (total=%d)', found, this.maxProviders, found + this.providers.length)
// this.providers.push(provider)
// this.providerMap.set(this.toEvictionKey(provider), provider)
this.providers.push(provider)

// let the new peer join current queries
this.safeDispatchEvent('provider', {
detail: provider
})

found++
this.log('emitted provider event')

if (found === count) {
this.log('session is ready')
Expand Down
2 changes: 1 addition & 1 deletion packages/utils/test/abstract-session.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ describe('abstract-session', () => {

await expect(session.retrieve(cid)).to.eventually.be.rejected()

expect(session.findNewProviders).to.have.property('callCount', 2)
expect(session.findNewProviders).to.have.property('callCount', 3)
expect(session.queryProvider).to.have.property('callCount', 1)
})
})
Loading