diff --git a/creator-node/src/utils/fileHasher.js b/creator-node/src/utils/fileHasher.js index 96dc43fad7d..4c3aaf0cd68 100644 --- a/creator-node/src/utils/fileHasher.js +++ b/creator-node/src/utils/fileHasher.js @@ -179,19 +179,7 @@ export const fileHasher = { */ async generateNonImageCid(content, logger = console) { const buffer = await fileHasher.convertToBuffer(content, logger) - - const startHashing = process.hrtime.bigint() - const cid = await fileHasher.hashNonImages(buffer) - - const hashDurationMs = fileHasher.convertNanosToMillis( - process.hrtime.bigint() - startHashing - ) - - logger.debug( - `[fileHasher - generateNonImageCid()] CID=${cid} hashDurationMs=${hashDurationMs}ms` - ) - - return cid + return fileHasher.hashNonImages(buffer) }, /** @@ -200,17 +188,7 @@ export const fileHasher = { * @param {Object?} logger * @returns {HashedImage[]} only hash responses with the structure [{path: , cid: , size: }] */ - async generateImageCids(content, logger = console) { - const startHashing = process.hrtime.bigint() - const hashedImages = await fileHasher.hashImages(content) - const hashDurationMs = fileHasher.convertNanosToMillis( - process.hrtime.bigint() - startHashing - ) - - const hashedImagesStr = JSON.stringify(hashedImages) - logger.debug( - `[fileHasher - generateImageCids()] hashedImages=${hashedImagesStr} hashImagesDurationMs=${hashDurationMs}ms` - ) - return hashedImages + async generateImageCids(content, _ = console) { + return fileHasher.hashImages(content) } } diff --git a/creator-node/src/utils/fsUtils.ts b/creator-node/src/utils/fsUtils.ts index 4a8e879ed4c..380384acd56 100644 --- a/creator-node/src/utils/fsUtils.ts +++ b/creator-node/src/utils/fsUtils.ts @@ -91,9 +91,6 @@ export async function validateStateForImageDirCIDAndReturnFileUUID( if (!imageDirCID) { return null } - req.logger.debug( - `Beginning validateStateForImageDirCIDAndReturnFileUUID for imageDirCID ${imageDirCID}` - ) // Ensure db row exists for dirCID const dirFile = await models.File.findOne({ @@ -139,9 +136,6 @@ export async function validateStateForImageDirCIDAndReturnFileUUID( }) ) - req.logger.debug( - `Completed validateStateForImageDirCIDAndReturnFileUUID for imageDirCID ${imageDirCID}` - ) return dirFile.fileUUID } @@ -262,7 +256,6 @@ export function computeFilePathInDir(dirName: string, fileName: string) { const parentDirPath = computeFilePath(dirName) const absolutePath = path.join(parentDirPath, fileName) - genericLogger.debug(`File path computed, absolutePath=${absolutePath}`) return absolutePath } @@ -279,7 +272,6 @@ export async function computeFilePathInDirAndEnsureItExists( const parentDirPath = computeFilePath(dirName) await ensureDirPathExists(parentDirPath) const absolutePath = path.join(parentDirPath, fileName) - genericLogger.debug(`File path computed, absolutePath=${absolutePath}`) return absolutePath } diff --git a/discovery-provider/src/queries/get_tracks.py b/discovery-provider/src/queries/get_tracks.py index 858cb66f884..5f830b90c33 100644 --- a/discovery-provider/src/queries/get_tracks.py +++ b/discovery-provider/src/queries/get_tracks.py @@ -163,9 +163,10 @@ def _get_tracks(session, args): sort_fn( coalesce( func.to_timestamp_safe( - TrackWithAggregates.release_date, "Dy Mon DD YYYY HH24:MI:SS GMTTZHTZM" + TrackWithAggregates.release_date, + "Dy Mon DD YYYY HH24:MI:SS GMTTZHTZM", ), - TrackWithAggregates.created_at + TrackWithAggregates.created_at, ) ) ) @@ -204,7 +205,8 @@ def _get_tracks(session, args): coalesce( # This func is defined in alembic migrations func.to_timestamp_safe( - TrackWithAggregates.release_date, "Dy Mon DD YYYY HH24:MI:SS GMTTZHTZM" + TrackWithAggregates.release_date, + "Dy Mon DD YYYY HH24:MI:SS GMTTZHTZM", ), TrackWithAggregates.created_at, ).desc() diff --git a/libs/src/api/Track.ts b/libs/src/api/Track.ts index e313ef9a3c6..bce12a9ef88 100644 --- a/libs/src/api/Track.ts +++ b/libs/src/api/Track.ts @@ -394,84 +394,48 @@ export class Track extends Base { metadata: TrackMetadata, onProgress: () => void ) { - this.REQUIRES(Services.CREATOR_NODE) // TODO: Change to storage node + // Validate inputs + this.REQUIRES(Services.CREATOR_NODE) this.FILE_IS_VALID(trackFile) - - try { - if (coverArtFile) this.FILE_IS_VALID(coverArtFile) - - this.IS_OBJECT(metadata) - - const ownerId = this.userStateManager.getCurrentUserId() - if (!ownerId) { - return { - error: 'No users loaded for this wallet' - } + if (coverArtFile) this.FILE_IS_VALID(coverArtFile) + this.IS_OBJECT(metadata) + const ownerId = this.userStateManager.getCurrentUserId() + if (!ownerId) { + return { + error: 'No users loaded for this wallet' } + } + metadata.owner_id = ownerId + this._validateTrackMetadata(metadata) - metadata.owner_id = ownerId - this._validateTrackMetadata(metadata) - - // Upload metadata - const { - metadataMultihash, - // metadataFileUUID, - // transcodedTrackUUID, - transcodedTrackCID - } = await retry( - async () => { - return await this.creatorNode.uploadTrackContentV2( - trackFile, - coverArtFile, - metadata, - onProgress - ) - // return this.creatorNode.uploadTrackContent( - // trackFile, - // coverArtFile, - // metadata, - // onProgress - // ) - }, - { - // Retry function 3x - // 1st retry delay = 500ms, 2nd = 1500ms, 3rd...nth retry = 4000 ms (capped) - minTimeout: 500, - maxTimeout: 4000, - factor: 3, - retries: 3, - onRetry: (err) => { - if (err) { - console.log('uploadTrackContentV2 retry error: ', err) - } - } - } - ) + // Upload track audio and cover art to storage node + const updatedMetadata = await this.creatorNode.uploadTrackAudioAndCoverArtV2( + trackFile, + coverArtFile, + metadata, + onProgress + ) - // Write metadata to chain - // TODO: Make discovery index by reading its own db instead of hitting CN /ipfs - const trackId = await this._generateTrackId() - const response = await this.contracts.EntityManagerClient!.manageEntity( - ownerId, - EntityManagerClient.EntityType.TRACK, - trackId, - EntityManagerClient.Action.CREATE, - metadataMultihash - ) - const txReceipt = response.txReceipt + // Write metadata to chain + const trackId = await this._generateTrackId() + // TODO: Uncomment once we can index full metadata (not metadata hash) + // TODO: DON'T UNCOMMENTED ON STAGE OR PROD UNTIL THEN, OR ELSE INDEXING WILL STALL!! + // const response = await this.contracts.EntityManagerClient!.manageEntity( + // ownerId, + // EntityManagerClient.EntityType.TRACK, + // trackId, + // EntityManagerClient.Action.CREATE, + // JSON.stringify(updatedMetadata) // TODO: @michelle: would this work? It doesn't need to be a different EntityType that accepts metadata instead of metadata hash? + // ) + // const txReceipt = response.txReceipt + const txReceipt = { blockHash: 'UNCOMMENT ABOVE', blockNumber: 1 } - return { - blockHash: txReceipt.blockHash, - blockNumber: txReceipt.blockNumber, - trackId, - transcodedTrackCID, - error: false - } - } catch (e) { - return { - error: (e as Error).message, - stack: (e as Error).stack - } + return { + blockHash: txReceipt.blockHash, + blockNumber: txReceipt.blockNumber, + trackId, + transcodedTrackCID: updatedMetadata.track_cid, + error: false } } diff --git a/libs/src/services/creatorNode/CreatorNode.ts b/libs/src/services/creatorNode/CreatorNode.ts index d0554a23ebd..7dcc08f0bfc 100644 --- a/libs/src/services/creatorNode/CreatorNode.ts +++ b/libs/src/services/creatorNode/CreatorNode.ts @@ -15,6 +15,7 @@ import type { MonitoringCallbacks } from '../types' const { wait } = Utils const MAX_TRACK_TRANSCODE_TIMEOUT = 3600000 // 1 hour +const MAX_IMAGE_RESIZE_TIMEOUT_MS = 5 * 60_000 // 5 minutes const POLL_STATUS_INTERVAL = 3000 // 3s const BROWSER_SESSION_REFRESH_TIMEOUT = 604800000 // 1 week @@ -392,52 +393,94 @@ export class CreatorNode { return { ...metadataResp, ...trackContentResp } } - // TODO: Move this to SDK and have AudiusBackend.ts in client call this instead of calling libs here: https://github.com/AudiusProject/audius-client/blob/main/packages/common/src/services/audius-backend/AudiusBackend.ts#L1235-L1240 - // TODO: Need to work out something to show progress - async uploadTrackContentV2( + async uploadTrackAudioAndCoverArtV2( trackFile: File, - // coverArtFile: File, - // metadata: TrackMetadata, - _: File, - __: TrackMetadata, - onProgress: ProgressCB = () => {} + coverArtFile: File, + metadata: TrackMetadata, + onProgress: ProgressCB = () => { } ) { - // Upload track file - const trackFileFormData = new FormData() - trackFileFormData.append('template', 'audio') - trackFileFormData.append('files', trackFile, { - contentType: 'audio', - filename: trackFile.name - }) - const trackFileUploadResponse = await axios({ + const updatedMetadata = { ...metadata } + + // Upload audio and cover art + const [audioResp, coverArtResp] = await Promise.all([ + this._retry3(async () => this.uploadTrackAudioV2(trackFile, onProgress), (e) => { console.log('Retrying uploadTrackAudioV2', e) }), + this._retry3(async () => this.uploadTrackCoverArtV2(coverArtFile, onProgress), (e) => { console.log('Retrying uploadTrackCoverArtV2', e) }) + ]) + + // Update metadata to include uploaded CIDs + // TODO: Make sure discovery and elsewhere accept 0-length array. Some checks in CN currently fail if there's not at least 1 valid segment + updatedMetadata.track_segments = [] + updatedMetadata.track_cid = audioResp.results["320"] + if (updatedMetadata.download?.is_downloadable) { + updatedMetadata.download.cid = updatedMetadata.track_cid + } + updatedMetadata.cover_art_sizes = coverArtResp.id + + return updatedMetadata + } + + async uploadTrackAudioV2(file: File, onProgress: ProgressCB) { + return this.uploadFileV2(file, onProgress, 'audio') + } + + async uploadTrackCoverArtV2(file: File, onProgress: ProgressCB) { + return this.uploadFileV2(file, onProgress, 'img_square') + } + + async uploadFileV2(file: File, onProgress: ProgressCB, template: 'audio' | 'img_square' | 'img_backdrop') { + const formData = new FormData() + formData.append('template', template) + formData.append('files', file, file.name) + const response = await this._makeRequestV2({ method: 'post', - url: this.creatorNodeEndpoint + '/mediorum/uploads', - data: trackFileFormData, - headers: { - 'Content-Type': 'multipart/form-data' - }, - onUploadProgress: (progressEvent) => { - onProgress(progressEvent.loaded, progressEvent.total) - } + url: '/mediorum/uploads', + data: formData, + headers: { 'Content-Type': 'multipart/form-data' }, + onUploadProgress: (progressEvent) => onProgress(progressEvent.loaded, progressEvent.total) }) - const trackFileUploadResponseData = trackFileUploadResponse.data - console.log( - 'trackFileUploadResponseData for v2 upload', - JSON.stringify(trackFileUploadResponseData) + return this.pollProcessingStatusV2( + response.data[0].id, + template === 'audio' ? MAX_TRACK_TRANSCODE_TIMEOUT : MAX_IMAGE_RESIZE_TIMEOUT_MS ) + } + + /** + * Works for both track transcode and image resize jobs + * @param id ID of the transcode/resize job + * @param maxPollingMs millis to stop polling and error if job is not done + * @returns successful job info, or throws error if job fails / times out + */ + async pollProcessingStatusV2(id: string, maxPollingMs: number) { + const start = Date.now() + while (Date.now() - start < maxPollingMs) { + try { + const resp = await this.getProcessingStatusV2(id) + if (resp?.status === 'done') return resp + if (resp?.status === 'error') { + throw new Error(`Upload failed: id=${id}, resp=${JSON.stringify(resp)}`) + } + } catch (e) { + // Swallow errors caused by failure to establish connection to node so we can retry polling + console.error(`Failed to poll for processing status, ${e}`) + } - return { - metadataMultihash: '', - metadataFileUUID: '', - transcodedTrackUUID: '', - transcodedTrackCID: '' + await wait(POLL_STATUS_INTERVAL) } - // TODO: Upload coverArtFile (same /storage/api/v1/file endpoint) + throw new Error(`Upload took over ${maxPollingMs}ms. id=${id}`) + } - // TODO: Need metadata to have track_cid, download.cid (if download.is_downloadable), and cover_art_sizes. I think we can stop doing the track_segments thing - // TODO: Write metadata to EntityManager for both audio and cover art files, and probably return the tx receipt here - // TODO: Make discovery stop calling /ipfs/ and instead read from its own db where it already indexes this metadata + /** + * Gets the task progress given the task type and id associated with the job + * @param id the id of the transcoding or resizing job + * @returns the status, and the success or failed response if the job is complete + */ + async getProcessingStatusV2(id: string) { + const { data } = await this._makeRequestV2({ + method: 'get', + url: `/mediorum/uploads/${id}` + }) + return data } /** @@ -716,6 +759,19 @@ export class CreatorNode { /* ------- INTERNAL FUNCTIONS ------- */ + /** + * Makes an axios request to the connected creator node + * @return response body + */ + async _makeRequestV2( + axiosRequestObj: AxiosRequestConfig + ) { + // TODO: This might want to have other error handling, request UUIDs, etc... + // But I didn't want to pull in all the chaos and incompatiblity of the old _makeRequest + axiosRequestObj.baseURL = this.creatorNodeEndpoint + return axios(axiosRequestObj) + } + /** * Signs up a creator node user with a wallet address * @param walletAddress @@ -1184,4 +1240,19 @@ export class CreatorNode { throw e } } + + /** + * Calls fn and then retries once after 500ms, again after 1500ms, and again after 4000ms + */ + async _retry3(fn: () => Promise, onRetry = (_err: any) => { }) { + return retry(fn, + { + minTimeout: 500, + maxTimeout: 4000, + factor: 3, + retries: 3, + onRetry + } + ) + } } diff --git a/libs/src/utils/fileHasher.ts b/libs/src/utils/fileHasher.ts index f99c986da95..a577d2bc854 100644 --- a/libs/src/utils/fileHasher.ts +++ b/libs/src/utils/fileHasher.ts @@ -4,7 +4,6 @@ import { UserImporterOptions } from 'ipfs-unixfs-importer' import fs from 'fs' -import { hrtime } from 'process' import { promisify } from 'util' import { Stream } from 'stream' import type { Blockstore, Options } from 'interface-blockstore' @@ -238,19 +237,7 @@ export const fileHasher = { logger: any = console ): Promise { const buffer = await fileHasher.convertToBuffer(content, logger) - - const startHashing: bigint = hrtime.bigint() - const cid = await fileHasher.hashNonImages(buffer) - - const hashDurationMs = fileHasher.convertNanosToMillis( - hrtime.bigint() - startHashing - ) - - logger.debug( - `[fileHasher - generateNonImageCid()] CID=${cid} hashDurationMs=${hashDurationMs}ms` - ) - - return cid + return fileHasher.hashNonImages(buffer) }, /** @@ -261,18 +248,8 @@ export const fileHasher = { */ async generateImageCids( content: ImportCandidate, - logger: any = console + _: any = console ): Promise { - const startHashing: bigint = hrtime.bigint() - const hashedImages: HashedImage[] = await fileHasher.hashImages(content) - const hashDurationMs = fileHasher.convertNanosToMillis( - hrtime.bigint() - startHashing - ) - - const hashedImagesStr = JSON.stringify(hashedImages) - logger.debug( - `[fileHasher - generateImageCids()] hashedImages=${hashedImagesStr} hashImagesDurationMs=${hashDurationMs}ms` - ) - return hashedImages + return fileHasher.hashImages(content) } } diff --git a/mediorum/go.mod b/mediorum/go.mod index f1f873e0de1..c02b4868788 100644 --- a/mediorum/go.mod +++ b/mediorum/go.mod @@ -28,9 +28,10 @@ require ( github.com/golang/protobuf v1.5.2 // indirect github.com/googleapis/gax-go/v2 v2.7.0 // indirect github.com/ipfs/go-cid v0.3.2 // indirect + github.com/ipfs/go-ipfs-util v0.0.2 // indirect github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.5 // indirect - github.com/klauspost/cpuid/v2 v2.0.4 // indirect + github.com/klauspost/cpuid/v2 v2.0.9 // indirect github.com/labstack/gommon v0.4.0 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.17 // indirect @@ -41,9 +42,10 @@ require ( github.com/multiformats/go-base32 v0.0.3 // indirect github.com/multiformats/go-base36 v0.1.0 // indirect github.com/multiformats/go-multibase v0.0.3 // indirect - github.com/multiformats/go-multihash v0.0.15 // indirect + github.com/multiformats/go-multihash v0.2.1 // indirect github.com/multiformats/go-varint v0.0.6 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/fasttemplate v1.2.2 // indirect go.opencensus.io v0.24.0 // indirect @@ -61,4 +63,5 @@ require ( google.golang.org/protobuf v1.28.1 // indirect gopkg.in/cenkalti/backoff.v1 v1.1.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect + lukechampine.com/blake3 v1.1.6 // indirect ) diff --git a/mediorum/go.sum b/mediorum/go.sum index 907b7dfda0e..8a8ef56ba5f 100644 --- a/mediorum/go.sum +++ b/mediorum/go.sum @@ -1199,6 +1199,8 @@ github.com/intel/goresctrl v0.2.0/go.mod h1:+CZdzouYFn5EsxgqAQTEzMfwKwuc0fVdMrT9 github.com/ionos-cloud/sdk-go/v6 v6.1.3/go.mod h1:Ox3W0iiEz0GHnfY9e5LmAxwklsxguuNFEUSu0gVRTME= github.com/ipfs/go-cid v0.3.2 h1:OGgOd+JCFM+y1DjWPmVH+2/4POtpDzwcr7VgnB7mZXc= github.com/ipfs/go-cid v0.3.2/go.mod h1:gQ8pKqT/sUxGY+tIwy1RPpAojYu7jAyCp5Tz1svoupw= +github.com/ipfs/go-ipfs-util v0.0.2 h1:59Sswnk1MFaiq+VcaknX7aYEyGyGDAA73ilhEK2POp8= +github.com/ipfs/go-ipfs-util v0.0.2/go.mod h1:CbPtkWJzjLdEcezDns2XYaehFVNXG9zrdrtMecczcsQ= github.com/j-keck/arping v0.0.0-20160618110441-2cf9dc699c56/go.mod h1:ymszkNOg6tORTn+6F6j+Jc8TOr5osrynvN6ivFWZ2GA= github.com/j-keck/arping v1.0.2/go.mod h1:aJbELhR92bSk7tp79AWM/ftfc90EfEi2bQJrbBFOsPw= github.com/jackc/chunkreader v1.0.0/go.mod h1:RT6O25fNZIuasFJRyZ4R/Y2BbhasbmZXF9QQ7T3kePo= @@ -1282,6 +1284,8 @@ github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47e github.com/klauspost/compress v1.15.1/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/klauspost/cpuid/v2 v2.0.4 h1:g0I61F2K2DjRHz1cnxlkNSBIaePVoJIjjnHui8QHbiw= github.com/klauspost/cpuid/v2 v2.0.4/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= +github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4= +github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/kolo/xmlrpc v0.0.0-20201022064351-38db28db192b/go.mod h1:pcaDhQK0/NJZEvtCO0qQPPropqV0sJOJ6YW7X+9kRwM= github.com/kolo/xmlrpc v0.0.0-20220921171641-a4b6fa1dd06b/go.mod h1:pcaDhQK0/NJZEvtCO0qQPPropqV0sJOJ6YW7X+9kRwM= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= @@ -1373,6 +1377,7 @@ github.com/miekg/dns v1.1.50/go.mod h1:e3IlAVfNqAllflbibAZEWOXOQ+Ynzk/dDozDxY7Xn github.com/miekg/pkcs11 v1.0.3/go.mod h1:XsNlhZGX73bx86s2hdc/FuaLm2CPZJemRLMA+WTFxgs= github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 h1:lYpkrQH5ajf0OXOcUbGjvZxxijuBwbbmlSxLiuofa+g= github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1/go.mod h1:pD8RvIylQ358TN4wwqatJ8rNavkEINozVn9DtGI3dfQ= +github.com/minio/sha256-simd v0.1.1-0.20190913151208-6de447530771/go.mod h1:B5e1o+1/KgNmWrSQK08Y6Z1Vb5pwIktudl0J58iy0KM= github.com/minio/sha256-simd v1.0.0 h1:v1ta+49hkWZyvaKwrQB8elexRqm6Y0aMLjCNsrYxo6g= github.com/minio/sha256-simd v1.0.0/go.mod h1:OuYzVNI5vcoYIAmbIvHPl3N3jUzVedXbKy5RFepssQM= github.com/mistifyio/go-zfs v2.1.2-0.20190413222219-f784269be439+incompatible/go.mod h1:8AuVvqP/mXw1px98n46wfvcGfQ4ci2FwoAjKYxuo3Z4= @@ -1413,6 +1418,7 @@ github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJ github.com/montanaflynn/stats v0.6.6/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow= github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc= github.com/mr-tron/base58 v1.1.0/go.mod h1:xcD2VGqlgYjBdcBLw+TuYLr8afG+Hj8g2eTVqeSzSU8= +github.com/mr-tron/base58 v1.1.3/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc= github.com/mr-tron/base58 v1.2.0 h1:T/HDJBh4ZCPbU39/+c3rRvE0uKBQlU27+QI8LJ4t64o= github.com/mr-tron/base58 v1.2.0/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc= github.com/mrunalp/fileutils v0.5.0/go.mod h1:M1WthSahJixYnrXQl/DFQuteStB1weuxD2QJNHXfbSQ= @@ -1422,8 +1428,12 @@ github.com/multiformats/go-base36 v0.1.0 h1:JR6TyF7JjGd3m6FbLU2cOxhC0Li8z8dLNGQ8 github.com/multiformats/go-base36 v0.1.0/go.mod h1:kFGE83c6s80PklsHO9sRn2NCoffoRdUUOENyW/Vv6sM= github.com/multiformats/go-multibase v0.0.3 h1:l/B6bJDQjvQ5G52jw4QGSYeOTZoAwIO77RblWplfIqk= github.com/multiformats/go-multibase v0.0.3/go.mod h1:5+1R4eQrT3PkYZ24C3W2Ue2tPwIdYQD509ZjSb5y9Oc= +github.com/multiformats/go-multihash v0.0.13/go.mod h1:VdAWLKTwram9oKAatUcLxBNUjdtcVwxObEQBtRfuyjc= github.com/multiformats/go-multihash v0.0.15 h1:hWOPdrNqDjwHDx82vsYGSDZNyktOJJ2dzZJzFkOV1jM= github.com/multiformats/go-multihash v0.0.15/go.mod h1:D6aZrWNLFTV/ynMpKsNtB40mJzmCl4jb1alC0OvHiHg= +github.com/multiformats/go-multihash v0.2.1 h1:aem8ZT0VA2nCHHk7bPJ1BjUbHNciqZC/d16Vve9l108= +github.com/multiformats/go-multihash v0.2.1/go.mod h1:WxoMcYG85AZVQUyRyo9s4wULvW5qrI9vb2Lt6evduFc= +github.com/multiformats/go-varint v0.0.5/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXSrVKRY101jdMZYE= github.com/multiformats/go-varint v0.0.6 h1:gk85QWKxh3TazbLxED/NlDVv8+q+ReFJk7Y2W/KhfNY= github.com/multiformats/go-varint v0.0.6/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXSrVKRY101jdMZYE= github.com/munnerz/goautoneg v0.0.0-20120707110453-a547fc61f48d/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= @@ -1654,6 +1664,8 @@ github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4k github.com/soheilhy/cmux v0.1.5/go.mod h1:T7TcVDs9LWfQgPlPsdngu6I6QIoyIFZDDC6sNE1GqG0= github.com/sony/gobreaker v0.4.1/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= +github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= +github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= github.com/spf13/afero v1.2.2/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk= github.com/spf13/afero v1.3.3/go.mod h1:5KUK8ByomD5Ti5Artl0RtHeI5pTF7MIDuXL3yY520V4= @@ -2734,6 +2746,8 @@ k8s.io/utils v0.0.0-20210819203725-bdf08cb9a70a/go.mod h1:jPW/WVKK9YHAvNhRxK0md/ k8s.io/utils v0.0.0-20210930125809-cb0fa318a74b/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= k8s.io/utils v0.0.0-20211116205334-6203023598ed/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= k8s.io/utils v0.0.0-20220728103510-ee6ede2d64ed/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= +lukechampine.com/blake3 v1.1.6 h1:H3cROdztr7RCfoaTpGZFQsrqvweFLrqS73j7L7cmR5c= +lukechampine.com/blake3 v1.1.6/go.mod h1:tkKEOtDkNtklkXtLNEOGNq5tcV90tJiA1vAA12R78LA= nhooyr.io/websocket v1.8.6/go.mod h1:B70DZP8IakI65RVQ51MsWP/8jndNma26DVA/nFSCgW0= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= diff --git a/mediorum/server/db.go b/mediorum/server/db.go index 91055293d27..7bf4d36bc4b 100644 --- a/mediorum/server/db.go +++ b/mediorum/server/db.go @@ -1,7 +1,6 @@ package server import ( - "encoding/json" "mediorum/crudr" "time" @@ -15,16 +14,12 @@ type Blob struct { CreatedAt time.Time `json:"created_at"` } -type JsonCid struct { - Cid string `json:"key" gorm:"primaryKey;not null;default:null"` - Data json.RawMessage `json:"data"` -} - type Upload struct { ID string `json:"id"` // base32 file hash Template string `json:"template"` OrigFileName string `json:"orig_filename"` + OrigFileCID string `json:"orig_file_cid"` FFProbe *FFProbeResult `json:"probe" gorm:"serializer:json"` Error string `json:"error,omitempty"` Mirrors []string `json:"mirrors" gorm:"serializer:json"` @@ -86,12 +81,12 @@ func dbMustDial(dbPath string) *gorm.DB { func dbMigrate(crud *crudr.Crudr) { // Migrate the schema - err := crud.DB.AutoMigrate(&Blob{}, &Upload{}, &ServerHealth{}, &LogLine{}, &JsonCid{}) + err := crud.DB.AutoMigrate(&Blob{}, &Upload{}, &ServerHealth{}, &LogLine{}) if err != nil { panic(err) } // register any models to be managed by crudr - crud.RegisterModels(&LogLine{}, &Blob{}, &Upload{}, &ServerHealth{}, &JsonCid{}) + crud.RegisterModels(&LogLine{}, &Blob{}, &Upload{}, &ServerHealth{}) } diff --git a/mediorum/server/serve_blob.go b/mediorum/server/serve_blob.go index eb6699ef092..98816c43a4e 100644 --- a/mediorum/server/serve_blob.go +++ b/mediorum/server/serve_blob.go @@ -78,9 +78,15 @@ func (ss *MediorumServer) postBlob(c echo.Context) error { } defer inp.Close() - // this allows caller to write any key - // a safer option might be to hash everything all the time - err = ss.replicateToMyBucket(upload.Filename, inp) + cid, err := computeFileCID(inp) + if err != nil { + return err + } + if cid != upload.Filename { + ss.logger.Warn("postBlob CID mismatch", "filename", upload.Filename, "cid", cid) + } + + err = ss.replicateToMyBucket(cid, inp) if err != nil { ss.logger.Info("accept ERR", "file", upload.Filename, "err", err) } diff --git a/mediorum/server/serve_metadata_cids.go b/mediorum/server/serve_metadata_cids.go deleted file mode 100644 index 5e106690697..00000000000 --- a/mediorum/server/serve_metadata_cids.go +++ /dev/null @@ -1,39 +0,0 @@ -package server - -import ( - "io" - - "github.com/ipfs/go-cid" - "github.com/labstack/echo/v4" -) - -func (ss *MediorumServer) postMetadataCid(c echo.Context) error { - // read the json - j, err := io.ReadAll(c.Request().Body) - if err != nil { - return c.JSON(400, map[string]string{ - "error": "bad json" + err.Error(), - }) - } - - // compute the cid - builder := &cid.V0Builder{} - cid, err := builder.Sum(j) - if err != nil { - return err - } - - record := &JsonCid{ - Cid: cid.String(), - Data: j, - } - err = ss.crud.Create(record) - if err != nil { - return err - } - - return c.JSON(200, map[string]string{ - "metadataMultihash": cid.String(), - // metadataFileUUID: - }) -} diff --git a/mediorum/server/serve_metadata_test.go b/mediorum/server/serve_metadata_test.go deleted file mode 100644 index c347811b8bb..00000000000 --- a/mediorum/server/serve_metadata_test.go +++ /dev/null @@ -1,66 +0,0 @@ -package server - -import ( - "bytes" - "fmt" - "io" - "net/http" - "testing" - "time" - - "github.com/stretchr/testify/assert" -) - -// add table for json blobs -// add some routes to match content-node - -// test that sends track json to server 1 -// read it from server 2 - -func TestJsonStuff(t *testing.T) { - t.Skip("skipping metadata CID tests...") - - // replicationFactor := 3 - servers := testNetwork - - s1 := servers[0] - s2 := servers[1] - - // let's make a POST request - stuff := []byte(`{"day": "friday"}`) - resp, err := http.Post(apiPath(s1.Config.Self.Host, "tracks/metadata"), "application/json", bytes.NewReader(stuff)) - assert.NoError(t, err) - assert.Equal(t, resp.StatusCode, 200) - - txt, _ := io.ReadAll(resp.Body) - fmt.Println(string(txt)) - - time.Sleep(time.Millisecond * 100) - - // let's check that s2 has it - var c int64 - err = s2.crud.DB.Model(&JsonCid{}).Count(&c).Error - assert.NoError(t, err) - assert.Equal(t, int64(1), c) - - // dumb load test... how long it take? - - // log.Println("creating 10k stuff") - // for i := 0; i < 10000; i++ { - // err := s1.crud.Create(&JsonCid{ - // Cid: fmt.Sprintf("thing_%d", i), - // Data: []byte(`{"day": "friday"}`), - // }) - // assert.NoError(t, err) - // } - // log.Println("done") - - // time.Sleep(time.Millisecond * 500) - - // // j := &JsonCid{} - // var c int64 - // err := s2.crud.DB.Model(&JsonCid{}).Count(&c).Error - // assert.NoError(t, err) - // // assert.Equal(t, j.Cid, "abc123") - // assert.Equal(t, c, 10000) -} diff --git a/mediorum/server/serve_upload.go b/mediorum/server/serve_upload.go index 55331c612dc..92b37629dec 100644 --- a/mediorum/server/serve_upload.go +++ b/mediorum/server/serve_upload.go @@ -10,7 +10,9 @@ import ( "sync" "time" + "github.com/ipfs/go-cid" "github.com/labstack/echo/v4" + "github.com/multiformats/go-multihash" ) var ( @@ -67,13 +69,19 @@ func (ss *MediorumServer) postUpload(c echo.Context) error { } uploads[idx] = upload - fileHash, err := hashFileUpload(formFile) + randomID, err := randomHash() + if err != nil { + upload.Error = err.Error() + return + } + formFileCID, err := computeFileHeaderCID(formFile) if err != nil { upload.Error = err.Error() return } - upload.ID = fileHash + upload.ID = randomID + upload.OrigFileCID = formFileCID upload.FFProbe, _ = ffprobeUpload(formFile) // mirror to n peers @@ -83,13 +91,13 @@ func (ss *MediorumServer) postUpload(c echo.Context) error { return } - upload.Mirrors, err = ss.replicateFile(fileHash, file) + upload.Mirrors, err = ss.replicateFile(formFileCID, file) if err != nil { upload.Error = err.Error() return } - ss.logger.Info("mirrored", "name", formFile.Filename, "hash", fileHash, "mirrors", upload.Mirrors) + ss.logger.Info("mirrored", "name", formFile.Filename, "randomID", randomID, "formFileCID", formFileCID, "mirrors", upload.Mirrors) err = ss.crud.Create(upload) if err != nil { @@ -106,20 +114,27 @@ func (ss *MediorumServer) postUpload(c echo.Context) error { return c.JSON(200, uploads) } -func hashFileUpload(upload *multipart.FileHeader) (string, error) { - // for testing... want to be able to upload same stuff repeatedly - return randomHash() +func computeFileHeaderCID(fh *multipart.FileHeader) (string, error) { + f, err := fh.Open() + if err != nil { + return "", err + } + defer f.Close() + return computeFileCID(f) +} - file, err := upload.Open() +func computeFileCID(f io.ReadSeeker) (string, error) { + defer f.Seek(0, 0) + builder := cid.V1Builder{} + hash, err := multihash.SumStream(f, multihash.SHA2_256, -1) if err != nil { return "", err } - hash := sha1.New() - if _, err := io.Copy(hash, file); err != nil { + cid, err := builder.Sum(hash) + if err != nil { return "", err } - fileHash := base32.StdEncoding.EncodeToString(hash.Sum(nil)) - return fileHash, nil + return cid.String(), nil } func randomHash() (string, error) { diff --git a/mediorum/server/server.go b/mediorum/server/server.go index 953cf040a16..17cce744813 100644 --- a/mediorum/server/server.go +++ b/mediorum/server/server.go @@ -162,9 +162,6 @@ func New(config MediorumConfig) (*MediorumServer, error) { basePath.GET("/debug/uploads", ss.dumpUploads) basePath.GET("/debug/ls", ss.getLs) - // JSON CID stuff - // basePath.POST("/tracks/metadata", ss.postMetadataCid) - // internal internalApi := basePath.Group("/internal") diff --git a/mediorum/server/transcode.go b/mediorum/server/transcode.go index 00f4ca9c246..4b822f218ea 100644 --- a/mediorum/server/transcode.go +++ b/mediorum/server/transcode.go @@ -180,7 +180,7 @@ func (ss *MediorumServer) transcode(upload *Upload) error { upload.Status = JobStatusBusy ss.crud.Update(upload) - fileHash := upload.ID + fileHash := upload.OrigFileCID logger := ss.logger.New("template", upload.Template, "hash", fileHash) onError := func(err error, info ...string) error { @@ -204,17 +204,18 @@ func (ss *MediorumServer) transcode(upload *Upload) error { for _, targetBox := range squares { temp.Seek(0, 0) out, w, h := Resized(".jpg", temp, targetBox, targetBox, "fill") - outName := fmt.Sprintf("%s_%d", fileHash, targetBox) - mirrors, err := ss.replicateFile(outName, out) + resultHash, err := computeFileCID(out) + if err != nil { + return onError(err, "computeFileCID") + } + mirrors, err := ss.replicateFile(resultHash, out) if err != nil { return onError(err, "replicate") } - logger.Debug("did square", "w", w, "h", h, "key", outName, "mirrors", mirrors) + logger.Debug("did square", "w", w, "h", h, "key", resultHash, "mirrors", mirrors) - // for now use this made up result key - // in future... will be SHA of transcode result variantName := fmt.Sprintf("%dx%[1]d", targetBox) - upload.TranscodeResults[variantName] = outName + upload.TranscodeResults[variantName] = resultHash } case JobTemplateImgBackdrop: @@ -223,17 +224,18 @@ func (ss *MediorumServer) transcode(upload *Upload) error { for _, targetWidth := range widths { temp.Seek(0, 0) out, w, h := Resized(".jpg", temp, targetWidth, AUTO, "fill") - outName := fmt.Sprintf("%s_%d", fileHash, targetWidth) - mirrors, err := ss.replicateFile(outName, out) + resultHash, err := computeFileCID(out) + if err != nil { + return onError(err, "computeFileCID") + } + mirrors, err := ss.replicateFile(resultHash, out) if err != nil { return onError(err, "replicate") } - logger.Debug("did backdrop", "w", w, "h", h, "key", outName, "mirrors", mirrors) + logger.Debug("did backdrop", "w", w, "h", h, "key", resultHash, "mirrors", mirrors) - // for now use this made up result key - // in future... will be SHA of transcode result variantName := fmt.Sprintf("%dwide", targetWidth) - upload.TranscodeResults[variantName] = outName + upload.TranscodeResults[variantName] = resultHash } case JobTemplateAudio, "": @@ -249,6 +251,8 @@ func (ss *MediorumServer) transcode(upload *Upload) error { "-i", srcPath, "-b:a", "320k", "-f", "mp3", + "-metadata", "fileName="+upload.OrigFileName, + "-metadata", "uuid="+upload.ID, "-progress", "pipe:2", destPath) @@ -297,14 +301,16 @@ func (ss *MediorumServer) transcode(upload *Upload) error { // replicate to peers // attempt to forward to an assigned node - resultKey := fileHash + "_320.mp3" - mirrors, err := ss.replicateFile(resultKey, dest) + resultHash, err := computeFileCID(dest) + if err != nil { + return onError(err, "computeFileCID") + } + resultKey := resultHash + mirrors, err := ss.replicateFile(resultHash, dest) if err != nil { return onError(err) } - // for now use this made up result key - // in future... will be SHA of transcode result upload.TranscodeResults["320"] = resultKey ss.logger.Info("transcode done", "mirrors", mirrors)