-
Notifications
You must be signed in to change notification settings - Fork 111
/
CreatorNode.ts
510 lines (456 loc) · 14.6 KB
/
CreatorNode.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
import type { StorageNodeSelectorService } from '@audius/sdk'
import retry from 'async-retry'
import axios, { AxiosRequestConfig } from 'axios'
import FormData from 'form-data'
import {
GatedConditions,
Nullable,
TrackMetadata,
UploadTrackMetadata,
UserMetadata,
Utils,
uuid
} from '../../utils'
import { hashAndSign, sortObjectKeys } from '../../utils/apiSigning'
import {
userSchemaType,
trackSchemaType,
playlistSchemaType,
Schemas
} from '../schemaValidator/SchemaValidator'
import type { MonitoringCallbacks } from '../types'
import type { Web3Manager } from '../web3Manager'
const { wait } = Utils
const MAX_TRACK_TRANSCODE_TIMEOUT = 3600000 // 1 hour
const MAX_IMAGE_RESIZE_TIMEOUT_MS = 5 * 60_000 // 5 minutes
const POLL_STATUS_INTERVAL = 1000 // 1s
type PlaylistTrackId = { time: number; track: number }
type PlaylistContents = {
track_ids: PlaylistTrackId[]
}
export type PlaylistMetadata = {
playlist_contents: PlaylistContents
playlist_id: number
playlist_name: string
playlist_image_sizes_multihash: string
description: string
is_album: boolean
is_private: boolean
is_image_autogenerated: boolean
is_stream_gated: boolean
stream_conditions: Nullable<GatedConditions>
release_date: Nullable<string>
is_scheduled_release: boolean
ddex_app?: string | null
upc?: string | null
}
export type ProgressCB = (
progress:
| {
art: {
upload?: { loaded: number; total: number }
transcode?: { decimal: number }
resize?: undefined
}
}
| {
audio: {
upload?: { loaded: number; total: number }
transcode?: { decimal: number }
resize?: undefined
}
}
) => void
export type CreatorNodeConfig = {
web3Manager: Web3Manager
wallet?: string
userId?: number
// fallback creator node endpoint (to be deprecated)
creatorNodeEndpoint: string
isServer: boolean
schemas: Schemas
// whether or not to include only specified nodes (default null)
passList: Set<string> | null
// whether or not to exclude any nodes (default null)
blockList: Set<string> | null
// callbacks to be invoked with metrics from requests sent to a service
monitoringCallbacks: MonitoringCallbacks
fallbackUrl: string
storageNodeSelector: StorageNodeSelectorService
}
// Currently only supports a single logged-in audius user
export class CreatorNode {
/* Static Utils */
/* -------------- */
web3Manager: Nullable<Web3Manager>
creatorNodeEndpoint: string
isServer: boolean
schemas: Schemas | undefined
passList: Set<string> | null
blockList: Set<string> | null
monitoringCallbacks: MonitoringCallbacks
maxBlockNumber: number
storageNodeSelector: StorageNodeSelectorService
wallet?: string
userId?: number
/**
* Constructs a service class for a creator node
*/
constructor(
web3Manager: Nullable<Web3Manager>,
creatorNodeEndpoint: string,
isServer: boolean,
schemas: Schemas | undefined,
passList: Set<string> | null = null,
blockList: Set<string> | null = null,
monitoringCallbacks: MonitoringCallbacks = {},
storageNodeSelector: StorageNodeSelectorService,
wallet?: string,
userId?: number
) {
this.web3Manager = web3Manager
// This is just 1 endpoint (primary), unlike the creator_node_endpoint field in user metadata
this.creatorNodeEndpoint = creatorNodeEndpoint
this.isServer = isServer
this.schemas = schemas
this.maxBlockNumber = 0
this.passList = passList
this.blockList = blockList
this.monitoringCallbacks = monitoringCallbacks
this.storageNodeSelector = storageNodeSelector
this.wallet = wallet
this.userId = userId
}
async init() {
if (!this.web3Manager) throw new Error('Failed to initialize CreatorNode')
}
// Throws an error upon validation failure
validatePlaylistSchema(metadata: PlaylistMetadata) {
this.schemas?.[playlistSchemaType].validate?.(metadata)
}
// Throws an error upon validation failure
validateUserSchema(metadata: UserMetadata) {
this.schemas?.[userSchemaType].validate?.(metadata)
}
// Throws an error upon validation failure
validateTrackSchema(metadata: Partial<TrackMetadata>) {
this.schemas?.[trackSchemaType].validate?.(metadata)
}
getEndpoint() {
return this.creatorNodeEndpoint
}
/**
* Switch from one creatorNodeEndpoint to another
*/
async setEndpoint(creatorNodeEndpoint: string) {
this.creatorNodeEndpoint = creatorNodeEndpoint
}
async transcodeTrackPreview(metadata: TrackMetadata): Promise<TrackMetadata> {
if (metadata.preview_start_seconds == null) {
throw new Error('No track preview start time specified')
}
if (!metadata.audio_upload_id) {
throw new Error('Missing required audio_upload_id')
}
const updatedMetadata = { ...metadata }
const data = {
previewStartSeconds: metadata.preview_start_seconds.toString()
}
const resp = await this._retry3(
async () => await this.editFileV2(metadata.audio_upload_id!, data),
(e) => {
console.info('Retrying editFileV2', e)
}
)
// Update metadata with new track preview cid
const previewKey = `320_preview|${updatedMetadata.preview_start_seconds}`
updatedMetadata.preview_cid = resp.results[previewKey]
return updatedMetadata
}
async uploadTrackAudioAndCoverArtV2(
trackFile: File,
coverArtFile: File | null,
metadata: UploadTrackMetadata,
onProgress: ProgressCB
): Promise<TrackMetadata> {
const updatedMetadata: TrackMetadata = {
track_cid: '',
preview_cid: null,
audio_upload_id: null,
orig_file_cid: '',
orig_filename: '',
...metadata
}
const audioUploadOpts: { [key: string]: string } = {}
if (updatedMetadata.preview_start_seconds !== null) {
audioUploadOpts.previewStartSeconds =
updatedMetadata.preview_start_seconds.toString()
}
if (metadata.placement_hosts) {
audioUploadOpts.placement_hosts = metadata.placement_hosts
}
// Upload audio and cover art
const promises = [
this._retry3(
async () =>
await this.uploadTrackAudioV2(trackFile, onProgress, audioUploadOpts),
(e) => {
console.info('Retrying uploadTrackAudioV2', e)
}
)
]
if (coverArtFile) {
promises.push(
this._retry3(
async () =>
await this.uploadTrackCoverArtV2(coverArtFile, onProgress),
(e) => {
console.info('Retrying uploadTrackCoverArtV2', e)
}
)
)
}
const [audioResp, coverArtResp] = await Promise.all(promises)
// Update metadata to include uploaded CIDs
updatedMetadata.track_segments = []
updatedMetadata.duration = parseInt(audioResp.probe.format.duration, 10)
updatedMetadata.track_cid = audioResp.results['320']
updatedMetadata.orig_file_cid = audioResp.orig_file_cid
updatedMetadata.orig_filename = audioResp.orig_filename
if (updatedMetadata.preview_start_seconds !== null) {
const previewKey = `320_preview|${updatedMetadata.preview_start_seconds}`
updatedMetadata.preview_cid = audioResp.results[previewKey]
}
if (audioResp.audio_analysis_results != null) {
if ('bpm' in audioResp.audio_analysis_results) {
updatedMetadata.bpm = audioResp.audio_analysis_results.bpm
}
if ('key' in audioResp.audio_analysis_results) {
updatedMetadata.musical_key = audioResp.audio_analysis_results.key
}
}
updatedMetadata.audio_analysis_error_count =
audioResp.audio_analysis_error_count || 0
updatedMetadata.audio_upload_id = audioResp.id
if (coverArtResp) updatedMetadata.cover_art_sizes = coverArtResp.id
return updatedMetadata
}
async uploadTrackAudioV2(
file: File,
onProgress: ProgressCB,
options?: { [key: string]: string }
) {
return await this.uploadFileV2(file, onProgress, 'audio', options)
}
async uploadTrackCoverArtV2(file: File, onProgress: ProgressCB = () => {}) {
return await this.uploadFileV2(file, onProgress, 'img_square')
}
async uploadProfilePictureV2(file: File, onProgress: ProgressCB = () => {}) {
return await this.uploadFileV2(file, onProgress, 'img_square')
}
async uploadCoverPhotoV2(file: File, onProgress: ProgressCB = () => {}) {
return await this.uploadFileV2(file, onProgress, 'img_backdrop')
}
async editFileV2(
uploadId: string,
data: { [key: string]: string },
onProgress?: ProgressCB
) {
const myPrivateKey = this.web3Manager?.getOwnerWalletPrivateKey()
if (!myPrivateKey) {
throw new Error('Missing user private key')
}
// Generate signature
const signatureData = {
upload_id: uploadId,
timestamp: Date.now()
}
const signature = await hashAndSign(
JSON.stringify(sortObjectKeys(signatureData)),
'0x' + myPrivateKey.toString('hex')
)
const signatureEnvelope = {
data: JSON.stringify(signatureData),
signature
}
const headers = {
'X-Request-ID': uuid()
}
const response = await this._makeRequestV2({
method: 'post',
url: `/uploads/${uploadId}`,
data,
params: { signature: JSON.stringify(signatureEnvelope) },
headers
})
// Poll for re-transcoding to complete
return await this.pollProcessingStatusV2(
uploadId,
response.data.template,
onProgress
)
}
async uploadFileV2(
file: File,
onProgress: ProgressCB,
template: 'audio' | 'img_square' | 'img_backdrop',
options?: { [key: string]: string }
) {
const { headers, formData } = this.createFormDataAndUploadHeadersV2(file, {
template,
...options
})
const response = await this._makeRequestV2({
method: 'post',
url: '/uploads',
data: formData,
headers,
onUploadProgress: (progressEvent) => {
const progress = {
upload: { loaded: progressEvent.loaded, total: progressEvent.total }
}
onProgress(
template === 'audio' ? { audio: progress } : { art: progress }
)
}
})
// Covers no response or empty response
if (!response?.data?.length) {
throw new Error('No upload response from storage node')
}
return await this.pollProcessingStatusV2(
response.data[0].id,
template,
onProgress
)
}
/**
* Works for both track transcode and image resize jobs
* @param id ID of the transcode/resize job
* @param maxPollingMs millis to stop polling and error if job is not done
* @returns successful job info, or throws error if job fails / times out
*/
async pollProcessingStatusV2(
id: string,
template: string,
onProgress?: ProgressCB
) {
const start = Date.now()
const maxPollingMs =
template === 'audio'
? MAX_TRACK_TRANSCODE_TIMEOUT
: MAX_IMAGE_RESIZE_TIMEOUT_MS
while (Date.now() - start < maxPollingMs) {
try {
const resp = await this.getProcessingStatusV2(id)
if (template === 'audio' && resp.transcode_progress) {
onProgress?.({
audio: {
transcode: { decimal: resp.transcode_progress }
}
})
}
if (resp?.status === 'done') return resp
if (
resp?.status === 'error' ||
resp?.status === 'error_retranscode_preview'
) {
throw new Error(
`Upload failed: id=${id}, resp=${JSON.stringify(resp)}`
)
}
} catch (e: any) {
// Rethrow if error is "Upload failed" or if status code is 422 (Unprocessable Entity)
if (
e.message?.startsWith('Upload failed') ||
(e.response && e.response?.status === 422)
) {
throw e
}
// Swallow errors caused by failure to establish connection to node so we can retry polling
console.error(`Failed to poll for processing status, ${e}`)
}
await wait(POLL_STATUS_INTERVAL)
}
throw new Error(`Upload took over ${maxPollingMs}ms. id=${id}`)
}
/**
* Gets the task progress given the task type and id associated with the job
* @param id the id of the transcoding or resizing job
* @returns the status, and the success or failed response if the job is complete
*/
async getProcessingStatusV2(id: string) {
const { data } = await this._makeRequestV2({
method: 'get',
url: `/uploads/${id}`
})
return data
}
/* ------- INTERNAL FUNCTIONS ------- */
/**
* Makes an axios request to this.creatorNodeEndpoint
* @return response body
*/
async _makeRequestV2(axiosRequestObj: AxiosRequestConfig) {
// TODO: This might want to have other error handling, request UUIDs, etc...
// But I didn't want to pull in all the chaos and incompatiblity of the old _makeRequest
axiosRequestObj.baseURL = this.creatorNodeEndpoint
try {
return await axios(axiosRequestObj)
} catch (e: any) {
// storageNodeSelector is not always defined (not always passed in to the constructor)
const storageNodes = this.storageNodeSelector.getNodes(this.wallet ?? '')
for (const storageNode of storageNodes) {
try {
axiosRequestObj.baseURL = storageNode
return await axios(axiosRequestObj)
} catch (e) {
// continue
}
}
const requestId = axiosRequestObj.headers['X-Request-ID']
const msg = `Error sending storagev2 request for X-Request-ID=${requestId}, tried all storage nodes: ${e}`
console.error(msg)
throw new Error(msg)
}
}
/**
* Create headers and formData for file upload
* @param file the file to upload
* @returns headers and formData in an object
*/
createFormDataAndUploadHeadersV2(
file: File,
extraFormDataOptions: Record<string, unknown> = {}
) {
// form data is from browser, not imported npm module
const formData = new FormData()
formData.append('files', file, file.name)
Object.keys(extraFormDataOptions).forEach((key) => {
formData.append(key, `${extraFormDataOptions[key]}`)
})
let headers: Record<string, string | null> = {}
if (this.isServer) {
headers = formData.getHeaders()
}
const requestId = uuid()
headers['X-Request-ID'] = requestId
if (this.wallet && this.userId) {
headers['X-User-Wallet-Addr'] = this.wallet
headers['X-User-Id'] = `${this.userId}`
}
return { headers, formData }
}
/**
* Calls fn and then retries once after 500ms, again after 1500ms, and again after 4000ms
*/
async _retry3(fn: () => Promise<any>, onRetry = (_err: any) => {}) {
return await retry(fn, {
minTimeout: 500,
maxTimeout: 4000,
factor: 3,
retries: 3,
onRetry
})
}
}