Skip to content

Commit

Permalink
Add fallbacks to SDK upload
Browse files Browse the repository at this point in the history
  • Loading branch information
theoilie authored and michellebrier committed Oct 24, 2023
1 parent d5b7c69 commit 96276a5
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 66 deletions.
56 changes: 41 additions & 15 deletions packages/libs/src/sdk/services/Storage/Storage.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import axios from 'axios'
import FormData from 'form-data'
import axios, { AxiosRequestConfig, AxiosResponse } from 'axios'

import fetch from 'cross-fetch'
import FormData from 'form-data'

Expand Down Expand Up @@ -122,27 +124,37 @@ export class Storage implements StorageService {
file.name ?? 'blob'
)

const contentNodeEndpoint = await this.storageNodeSelector.getSelectedNode()

if (!contentNodeEndpoint) {
throw new Error('No content node available for upload')
}

// Using axios for now because it supports upload progress,
// and Node doesn't support XmlHttpRequest
const response = await axios({
let response: AxiosResponse<any> | null = null
const request: AxiosRequestConfig = {
method: 'post',
url: `${contentNodeEndpoint}/uploads`,
maxContentLength: Infinity,
data: formData,
headers: formData.getBoundary
? {
'Content-Type': `multipart/form-data; boundary=${formData.getBoundary()}`
}
'Content-Type': `multipart/form-data; boundary=${formData.getBoundary()}`
}
: undefined,
onUploadProgress: (progressEvent) =>
onProgress?.(progressEvent.loaded, progressEvent.total)
})
}

let lastErr
for (let selectedNode = await this.storageNodeSelector.getSelectedNode(); this.storageNodeSelector.triedSelectingAllNodes(); selectedNode = await this.storageNodeSelector.getSelectedNode(true)) {
request.url = `${selectedNode!}/uploads`
try {
response = await axios(request)
} catch (e: any) {
lastErr = e // keep trying other nodes
}
}

if (!response) {
const msg = `Error sending storagev2 upload request, tried all healthy storage nodes. Last error: ${lastErr}`
this.logger.error(msg)
throw new Error(msg)
}

return await this.pollProcessingStatus(
response.data[0].id,
Expand Down Expand Up @@ -199,8 +211,22 @@ export class Storage implements StorageService {
* @returns the status, and the success or failed response if the job is complete
*/
private async getProcessingStatus(id: string): Promise<UploadResponse> {
const contentNodeEndpoint = await this.storageNodeSelector.getSelectedNode()
const response = await fetch(`${contentNodeEndpoint}/uploads/${id}`)
return await response.json()
let lastErr
for (let selectedNode = await this.storageNodeSelector.getSelectedNode(); this.storageNodeSelector.triedSelectingAllNodes(); selectedNode = await this.storageNodeSelector.getSelectedNode(true)) {
try {
const response = await fetch(`${selectedNode}/uploads/${id}`)
if (response.ok) {
return await response.json()
} else {
lastErr = `HTTP error: ${response.status} ${response.statusText}, ${await response.text()}`
}
} catch (e: any) {
lastErr = e
}
}

const msg = `Error sending storagev2 uploads polling request, tried all healthy storage nodes. Last error: ${lastErr}`
this.logger.error(msg)
throw new Error(msg)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ export class StorageNodeSelector implements StorageNodeSelectorService {
private orderedNodes?: string[] // endpoints (lowercase)
private selectedNode?: string | null
private selectedDiscoveryNode?: string | null
private selectionState: 'healthy_only' | 'failed_all'
private readonly discoveryNodeSelector?: DiscoveryNodeSelectorService
private readonly initialDiscoveryFetchPromise: Promise<void>
private resolveInitialDiscoveryFetchPromise: () => void = () => {}
private resolveInitialDiscoveryFetchPromise: () => void = () => { }

constructor(config: StorageNodeSelectorConfig) {
this.config = mergeConfigWithDefaults(
Expand All @@ -43,6 +44,7 @@ export class StorageNodeSelector implements StorageNodeSelectorService {
'[storage-node-selector]'
)
this.nodes = this.config.bootstrapNodes ?? []
this.selectionState = 'healthy_only'

this.discoveryNodeSelector?.addEventListener(
'change',
Expand Down Expand Up @@ -87,11 +89,12 @@ export class StorageNodeSelector implements StorageNodeSelectorService {
}

this.nodes = contentNodes
this.selectionState = 'healthy_only'
this.resolveInitialDiscoveryFetchPromise()
}

public async getSelectedNode() {
if (this.selectedNode) {
public async getSelectedNode(forceReselect = false) {
if (this.selectedNode && !forceReselect) {
return this.selectedNode
}

Expand All @@ -112,19 +115,42 @@ export class StorageNodeSelector implements StorageNodeSelectorService {
return await this.select()
}

public triedSelectingAllNodes() {
return this.selectionState === 'failed_all'
}

public getNodes(cid: string) {
return this.orderNodes(cid)
}

private async select() {
if (!this.orderedNodes) {
this.orderedNodes = await this.orderNodes(
private async select(): Promise<string | null> {
// We've selected all healthy nodes. Restart from the beginning of the ordered list
if (this.selectionState === 'failed_all') {
this.selectionState = 'healthy_only'
}

// Select the next node in rendezvous order from the list of all nodes
this.selectedNode = await this.selectUntilEndOfList() ?? null
this.logger.info('Selected content node', this.selectedNode)

if (!this.selectedNode) {
// We've selected all healthy nodes. Return null and start over next time select() is called
this.logger.info('Selected all healthy nodes. Returning null and starting over next time select() is called')
this.selectionState = 'failed_all'
}

return this.selectedNode
}

private async selectUntilEndOfList(): Promise<Maybe<string>> {
if (!this.orderedNodes?.length) {
this.orderedNodes = this.orderNodes(
(await this.auth.getAddress()).toLowerCase()
)
}

if (this.orderedNodes.length === 0) {
return null
return undefined
}

const currentNodeIndex = this.selectedNode
Expand All @@ -134,19 +160,17 @@ export class StorageNodeSelector implements StorageNodeSelectorService {
let selectedNode: Maybe<string>
let nextNodeIndex = currentNodeIndex

while (!selectedNode) {
nextNodeIndex = (nextNodeIndex + 1) % this.orderedNodes.length
if (nextNodeIndex === currentNodeIndex) break
while (nextNodeIndex !== this.orderedNodes.length - 1) {
nextNodeIndex++
const nextNode = this.orderedNodes[nextNodeIndex]
if (!nextNode) continue
if (!nextNode) continue // should never happen unless this.orderedNodes has falsy values
if (await isNodeHealthy(nextNode)) {
selectedNode = nextNode
break
}
}

this.selectedNode = selectedNode
this.logger.info('Selected content node', this.selectedNode)
return this.selectedNode ?? null
return selectedNode
}

private orderNodes(key: string) {
Expand Down
3 changes: 2 additions & 1 deletion packages/libs/src/sdk/services/StorageNodeSelector/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ import type { DiscoveryNodeSelectorService } from '../DiscoveryNodeSelector'
import type { LoggerService } from '../Logger'

export type StorageNodeSelectorService = {
getSelectedNode: () => Promise<string | null>
getSelectedNode: (forceReselect?: boolean) => Promise<string | null>
getNodes: (cid: string) => string[]
triedSelectingAllNodes: () => boolean
}

export type StorageNode = {
Expand Down
62 changes: 26 additions & 36 deletions packages/libs/src/services/creatorNode/CreatorNode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,19 @@ export type PlaylistMetadata = {
export type ProgressCB = (
progress:
| {
art: {
upload?: { loaded: number; total: number }
transcode?: { decimal: number }
resize?: undefined
}
art: {
upload?: { loaded: number; total: number }
transcode?: { decimal: number }
resize?: undefined
}
}
| {
audio: {
upload?: { loaded: number; total: number }
transcode?: { decimal: number }
resize?: undefined
}
audio: {
upload?: { loaded: number; total: number }
transcode?: { decimal: number }
resize?: undefined
}
}
) => void

export type CreatorNodeConfig = {
Expand Down Expand Up @@ -236,15 +236,15 @@ export class CreatorNode {
return await this.uploadFileV2(file, onProgress, 'audio', options)
}

async uploadTrackCoverArtV2(file: File, onProgress: ProgressCB = () => {}) {
async uploadTrackCoverArtV2(file: File, onProgress: ProgressCB = () => { }) {
return await this.uploadFileV2(file, onProgress, 'img_square')
}

async uploadProfilePictureV2(file: File, onProgress: ProgressCB = () => {}) {
async uploadProfilePictureV2(file: File, onProgress: ProgressCB = () => { }) {
return await this.uploadFileV2(file, onProgress, 'img_square')
}

async uploadCoverPhotoV2(file: File, onProgress: ProgressCB = () => {}) {
async uploadCoverPhotoV2(file: File, onProgress: ProgressCB = () => { }) {
return await this.uploadFileV2(file, onProgress, 'img_backdrop')
}

Expand Down Expand Up @@ -392,33 +392,23 @@ export class CreatorNode {
/* ------- INTERNAL FUNCTIONS ------- */

/**
* Makes an axios request to this.creatorNodeEndpoint
* Makes an axios request to each storage node sequentially until 1 succeeds or all fail
* @return response body
*/
async _makeRequestV2(axiosRequestObj: AxiosRequestConfig) {
// TODO: This might want to have other error handling, request UUIDs, etc...
// But I didn't want to pull in all the chaos and incompatiblity of the old _makeRequest
axiosRequestObj.baseURL = this.creatorNodeEndpoint
try {
return await axios(axiosRequestObj)
} catch (e: any) {
const wallet = this.userStateManager.getCurrentUser()?.wallet
const storageNodes = this.storageNodeSelector.getNodes(wallet ?? '')

for (const storageNode of storageNodes) {
try {
axiosRequestObj.baseURL = storageNode
return await axios(axiosRequestObj)
} catch (e) {
// continue
}
let lastErr
for (let selectedNode = await this.storageNodeSelector.getSelectedNode(); this.storageNodeSelector.triedSelectingAllNodes(); selectedNode = await this.storageNodeSelector.getSelectedNode(true)) {
axiosRequestObj.baseURL = selectedNode!
try {
return await axios(axiosRequestObj)
} catch (e: any) {
lastErr = e // keep trying other nodes
}

const requestId = axiosRequestObj.headers['X-Request-ID']
const msg = `Error sending storagev2 request for X-Request-ID=${requestId}, tried all storage nodes: ${e}`
console.error(msg)
throw new Error(msg)
}
const requestId = axiosRequestObj.headers['X-Request-ID']
const msg = `Error sending storagev2 request for X-Request-ID=${requestId}, tried all healthy storage nodes. Last error: ${lastErr}`
console.error(msg)
throw new Error(msg)
}

/**
Expand Down Expand Up @@ -457,7 +447,7 @@ export class CreatorNode {
/**
* Calls fn and then retries once after 500ms, again after 1500ms, and again after 4000ms
*/
async _retry3(fn: () => Promise<any>, onRetry = (_err: any) => {}) {
async _retry3(fn: () => Promise<any>, onRetry = (_err: any) => { }) {
return await retry(fn, {
minTimeout: 500,
maxTimeout: 4000,
Expand Down

0 comments on commit 96276a5

Please sign in to comment.