Skip to content

Commit

Permalink
Add fallbacks to SDK upload (#5970)
Browse files Browse the repository at this point in the history
Co-authored-by: Michelle Brier <michelle.brier4@gmail.com>
  • Loading branch information
theoilie and michellebrier authored Oct 26, 2023
1 parent 2c71673 commit 3a176a2
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 30 deletions.
63 changes: 48 additions & 15 deletions packages/libs/src/sdk/services/Storage/Storage.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import axios from 'axios'
import axios, { AxiosRequestConfig, AxiosResponse } from 'axios'

import fetch from 'cross-fetch'
import FormData from 'form-data'

Expand Down Expand Up @@ -122,27 +123,41 @@ export class Storage implements StorageService {
file.name ?? 'blob'
)

const contentNodeEndpoint = await this.storageNodeSelector.getSelectedNode()

if (!contentNodeEndpoint) {
throw new Error('No content node available for upload')
}

// Using axios for now because it supports upload progress,
// and Node doesn't support XmlHttpRequest
const response = await axios({
let response: AxiosResponse<any> | null = null
const request: AxiosRequestConfig = {
method: 'post',
url: `${contentNodeEndpoint}/uploads`,
maxContentLength: Infinity,
data: formData,
headers: formData.getBoundary
? {
'Content-Type': `multipart/form-data; boundary=${formData.getBoundary()}`
}
'Content-Type': `multipart/form-data; boundary=${formData.getBoundary()}`
}
: undefined,
onUploadProgress: (progressEvent) =>
onProgress?.(progressEvent.loaded, progressEvent.total)
})
}

let lastErr
for (
let selectedNode = await this.storageNodeSelector.getSelectedNode();
this.storageNodeSelector.triedSelectingAllNodes();
selectedNode = await this.storageNodeSelector.getSelectedNode(true)
) {
request.url = `${selectedNode!}/uploads`
try {
response = await axios(request)
} catch (e: any) {
lastErr = e // keep trying other nodes
}
}

if (!response) {
const msg = `Error sending storagev2 upload request, tried all healthy storage nodes. Last error: ${lastErr}`
this.logger.error(msg)
throw new Error(msg)
}

return await this.pollProcessingStatus(
response.data[0].id,
Expand Down Expand Up @@ -199,8 +214,26 @@ export class Storage implements StorageService {
* @returns the status, and the success or failed response if the job is complete
*/
private async getProcessingStatus(id: string): Promise<UploadResponse> {
const contentNodeEndpoint = await this.storageNodeSelector.getSelectedNode()
const response = await fetch(`${contentNodeEndpoint}/uploads/${id}`)
return await response.json()
let lastErr
for (
let selectedNode = await this.storageNodeSelector.getSelectedNode();
this.storageNodeSelector.triedSelectingAllNodes();
selectedNode = await this.storageNodeSelector.getSelectedNode(true)
) {
try {
const response = await fetch(`${selectedNode}/uploads/${id}`)
if (response.ok) {
return await response.json()
} else {
lastErr = `HTTP error: ${response.status} ${response.statusText}, ${await response.text()}`
}
} catch (e: any) {
lastErr = e
}
}

const msg = `Error sending storagev2 uploads polling request, tried all healthy storage nodes. Last error: ${lastErr}`
this.logger.error(msg)
throw new Error(msg)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -189,4 +189,48 @@ describe('StorageNodeSelector', () => {
storageNodeB.endpoint
])
})

it('force reselects successfully', async () => {
const bootstrapNodes = [storageNodeA, storageNodeB]

const storageNodeSelector = new StorageNodeSelector({
bootstrapNodes,
auth,
discoveryNodeSelector,
logger
})

expect(await storageNodeSelector.getSelectedNode()).toEqual(
storageNodeB.endpoint
)

// force reselect
expect(await storageNodeSelector.getSelectedNode(true)).toEqual(
storageNodeA.endpoint
)
})

it('tries selecting all nodes', async () => {
server.use(
rest.get(`${storageNodeA.endpoint}/health_check`, (_req, res, ctx) => {
return res(ctx.status(400))
})
)
server.use(
rest.get(`${storageNodeB.endpoint}/health_check`, (_req, res, ctx) => {
return res(ctx.status(400))
})
)
const bootstrapNodes = [storageNodeA, storageNodeB]

const storageNodeSelector = new StorageNodeSelector({
bootstrapNodes,
auth,
discoveryNodeSelector,
logger
})

expect(await storageNodeSelector.getSelectedNode()).toBe(null)
expect(await storageNodeSelector.triedSelectingAllNodes()).toBe(true)
})
})
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ export class StorageNodeSelector implements StorageNodeSelectorService {
private orderedNodes?: string[] // endpoints (lowercase)
private selectedNode?: string | null
private selectedDiscoveryNode?: string | null
private selectionState: 'healthy_only' | 'failed_all'
private readonly discoveryNodeSelector?: DiscoveryNodeSelectorService
private readonly initialDiscoveryFetchPromise: Promise<void>
private resolveInitialDiscoveryFetchPromise: () => void = () => {}
private resolveInitialDiscoveryFetchPromise: () => void = () => { }

constructor(config: StorageNodeSelectorConfig) {
this.config = mergeConfigWithDefaults(
Expand All @@ -43,6 +44,7 @@ export class StorageNodeSelector implements StorageNodeSelectorService {
'[storage-node-selector]'
)
this.nodes = this.config.bootstrapNodes ?? []
this.selectionState = 'healthy_only'

this.discoveryNodeSelector?.addEventListener(
'change',
Expand Down Expand Up @@ -87,11 +89,12 @@ export class StorageNodeSelector implements StorageNodeSelectorService {
}

this.nodes = contentNodes
this.selectionState = 'healthy_only'
this.resolveInitialDiscoveryFetchPromise()
}

public async getSelectedNode() {
if (this.selectedNode) {
public async getSelectedNode(forceReselect = false) {
if (this.selectedNode && !forceReselect) {
return this.selectedNode
}

Expand All @@ -112,19 +115,42 @@ export class StorageNodeSelector implements StorageNodeSelectorService {
return await this.select()
}

public triedSelectingAllNodes() {
return this.selectionState === 'failed_all'
}

public getNodes(cid: string) {
return this.orderNodes(cid)
}

private async select() {
if (!this.orderedNodes) {
this.orderedNodes = await this.orderNodes(
private async select(): Promise<string | null> {
// We've selected all healthy nodes. Restart from the beginning of the ordered list
if (this.selectionState === 'failed_all') {
this.selectionState = 'healthy_only'
}

// Select the next node in rendezvous order from the list of all nodes
this.selectedNode = await this.selectUntilEndOfList() ?? null
this.logger.info('Selected content node', this.selectedNode)

if (!this.selectedNode) {
// We've selected all healthy nodes. Return null and start over next time select() is called
this.logger.info('Selected all healthy nodes. Returning null and starting over next time select() is called')
this.selectionState = 'failed_all'
}

return this.selectedNode
}

private async selectUntilEndOfList(): Promise<Maybe<string>> {
if (!this.orderedNodes?.length) {
this.orderedNodes = this.orderNodes(
(await this.auth.getAddress()).toLowerCase()
)
}

if (this.orderedNodes.length === 0) {
return null
return undefined
}

const currentNodeIndex = this.selectedNode
Expand All @@ -134,19 +160,17 @@ export class StorageNodeSelector implements StorageNodeSelectorService {
let selectedNode: Maybe<string>
let nextNodeIndex = currentNodeIndex

while (!selectedNode) {
nextNodeIndex = (nextNodeIndex + 1) % this.orderedNodes.length
if (nextNodeIndex === currentNodeIndex) break
while (nextNodeIndex !== this.orderedNodes.length - 1) {
nextNodeIndex++
const nextNode = this.orderedNodes[nextNodeIndex]
if (!nextNode) continue
if (!nextNode) continue // should never happen unless this.orderedNodes has falsy values
if (await isNodeHealthy(nextNode)) {
selectedNode = nextNode
break
}
}

this.selectedNode = selectedNode
this.logger.info('Selected content node', this.selectedNode)
return this.selectedNode ?? null
return selectedNode
}

private orderNodes(key: string) {
Expand Down
3 changes: 2 additions & 1 deletion packages/libs/src/sdk/services/StorageNodeSelector/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ import type { DiscoveryNodeSelectorService } from '../DiscoveryNodeSelector'
import type { LoggerService } from '../Logger'

export type StorageNodeSelectorService = {
getSelectedNode: () => Promise<string | null>
getSelectedNode: (forceReselect?: boolean) => Promise<string | null>
getNodes: (cid: string) => string[]
triedSelectingAllNodes: () => boolean
}

export type StorageNode = {
Expand Down
1 change: 1 addition & 0 deletions packages/libs/src/services/creatorNode/CreatorNode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,7 @@ export class CreatorNode {
return await axios(axiosRequestObj)
} catch (e: any) {
const wallet = this.userStateManager.getCurrentUser()?.wallet
// storageNodeSelector is not always defined (not always passed in to the constructor)
const storageNodes = this.storageNodeSelector.getNodes(wallet ?? '')

for (const storageNode of storageNodes) {
Expand Down

0 comments on commit 3a176a2

Please sign in to comment.