Skip to content

Commit

Permalink
persist bpm if succeeds even if key fails. clean up. use audio analyz…
Browse files Browse the repository at this point in the history
…ed at timestamp for timeout check
  • Loading branch information
michellebrier committed May 29, 2024
1 parent 35ea40e commit 35fea74
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 25 deletions.
51 changes: 28 additions & 23 deletions mediorum/server/audio_analysis.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,10 @@ func (ss *MediorumServer) findMissedAnalysisJobs(work chan *Upload, myHost strin

logger := ss.logger.With("upload_id", upload.ID, "upload_status", upload.Status, "my_rank", myRank)

// allow a 1 minute timeout period for audio analysis after transcoding completes
if time.Since(upload.TranscodedAt) > time.Minute {
// allow a 1 minute timeout period for audio analysis.
// upload.AudioAnalyzedAt is set in transcode.go after successfully transcoding a new audio upload,
// or by the /uploads/:id/analyze endpoint when triggering a re-analysis.
if time.Since(upload.AudioAnalyzedAt) > time.Minute {
// mark analysis as timed out and the upload as done.
// failed or timed out analyses do not block uploads.
ss.logger.Warn("audio analysis timed out", "upload", upload.ID)
Expand All @@ -122,22 +124,22 @@ func (ss *MediorumServer) findMissedAnalysisJobs(work chan *Upload, myHost strin
if myRank == 2 {
// no recent update?
timedOut = upload.Status == JobStatusBusyAudioAnalysis &&
time.Since(upload.TranscodedAt) > time.Second*20
time.Since(upload.AudioAnalyzedAt) > time.Second*20

// never started?
neverStarted = upload.Status == JobStatusAudioAnalysis &&
time.Since(upload.TranscodedAt) > time.Second*20
time.Since(upload.AudioAnalyzedAt) > time.Second*20
}

// for #3 rank worker:
if myRank == 3 {
// no recent update?
timedOut = upload.Status == JobStatusBusyAudioAnalysis &&
time.Since(upload.TranscodedAt) > time.Second*40
time.Since(upload.AudioAnalyzedAt) > time.Second*40

// never started?
neverStarted = upload.Status == JobStatusAudioAnalysis &&
time.Since(upload.TranscodedAt) > time.Second*40
time.Since(upload.AudioAnalyzedAt) > time.Second*40
}

if timedOut {
Expand All @@ -164,13 +166,13 @@ func (ss *MediorumServer) startAudioAnalysisWorker(n int, work chan *Upload) {

func (ss *MediorumServer) analyzeAudio(upload *Upload) error {
upload.AudioAnalyzedBy = ss.Config.Self.Host
upload.AudioAnalyzedAt = time.Now().UTC()
upload.Status = JobStatusBusyAudioAnalysis
ss.crud.Update(upload)
ctx := context.Background()

onError := func(err error) error {
upload.AudioAnalysisError = err.Error()
upload.AudioAnalyzedAt = time.Now().UTC()
upload.AudioAnalysisStatus = JobStatusError
// failed analyses do not block uploads
upload.Status = JobStatusDone
Expand Down Expand Up @@ -229,36 +231,39 @@ func (ss *MediorumServer) analyzeAudio(upload *Upload) error {
err = convertToWav(inputFile, wavFile)
if err != nil {
logger.Error("failed to convert MP3 to WAV", "err", err)
return err
return onError(fmt.Errorf("failed to convert MP3 to WAV: %w", err))
}
}

// analyze musical key
musicalKey, err := ss.analyzeKey(wavFile)
if err != nil {
logger.Error("failed to analyze key", "err", err)
return onError(err)
}
if musicalKey == "" || musicalKey == "Unknown" {
err := fmt.Errorf("unexpected output: %s", musicalKey)
logger.Error("failed to analyze key", "err", err)
return onError(err)
}

// analyze BPM
bpmFloat, err := ss.analyzeBPM(wavFile)
if err != nil {
logger.Error("failed to analyze BPM", "err", err)
return onError(err)
return onError(fmt.Errorf("failed to analyze BPM: %w", err))
}
bpm := strconv.FormatFloat(bpmFloat, 'f', 1, 64)

// success
upload.AudioAnalysisResults = map[string]string{
"key": musicalKey,
"bpm": bpm,
}
upload.AudioAnalysisError = ""
ss.crud.Update(upload)

// analyze musical key
musicalKey, err := ss.analyzeKey(wavFile)
if err != nil {
logger.Error("failed to analyze key", "err", err)
return onError(fmt.Errorf("failed to analyze key: %w", err))
}
if musicalKey == "" || musicalKey == "Unknown" {
err := fmt.Errorf("unexpected output: %s", musicalKey)
logger.Error("failed to analyze key", "err", err)
return onError(fmt.Errorf("failed to analyze key: %w", err))
}
upload.AudioAnalysisResults["key"] = musicalKey
upload.AudioAnalysisError = ""

// success
upload.AudioAnalyzedAt = time.Now().UTC()
upload.AudioAnalysisStatus = JobStatusDone
upload.Status = JobStatusDone
Expand Down
9 changes: 7 additions & 2 deletions mediorum/server/serve_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,12 +239,17 @@ func (ss *MediorumServer) analyzeUpload(c echo.Context) error {
return err
}

if upload.Template == "audio" && upload.Status == JobStatusDone && upload.AudioAnalysisResults == nil {
upload.UpdatedAt = time.Now().UTC()
if upload.Template == "audio" && upload.Status == JobStatusDone && upload.AudioAnalysisStatus != JobStatusDone {
upload.AudioAnalyzedAt = time.Now().UTC()
upload.AudioAnalysisStatus = ""
upload.AudioAnalysisError = ""
upload.Status = JobStatusAudioAnalysis
err = ss.crud.Update(upload)
if err != nil {
ss.logger.Warn("update upload failed", "err", err)
return c.JSON(500, map[string]string{
"message": "Failed to trigger audio analysis",
})
}
}

Expand Down
1 change: 1 addition & 0 deletions mediorum/server/transcode.go
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,7 @@ func (ss *MediorumServer) transcode(upload *Upload) error {
}
// analyze audio for new full audio uploads
nextJobStatus = JobStatusAudioAnalysis
upload.AudioAnalyzedAt = time.Now().UTC()
}
}

Expand Down

0 comments on commit 35fea74

Please sign in to comment.