diff --git a/mediorum/Dockerfile.unittests b/mediorum/Dockerfile.unittests index 7b2c250e20a..62f6bdc8154 100644 --- a/mediorum/Dockerfile.unittests +++ b/mediorum/Dockerfile.unittests @@ -1,5 +1,7 @@ FROM golang:alpine +RUN apk add ffmpeg + WORKDIR /app COPY . . diff --git a/mediorum/server/replicate.go b/mediorum/server/replicate.go index 5ec38a5c638..605b0d3f825 100644 --- a/mediorum/server/replicate.go +++ b/mediorum/server/replicate.go @@ -8,7 +8,9 @@ import ( "mime/multipart" "net/http" "net/url" + "os" "strings" + "sync" "time" "github.com/AudiusProject/audius-protocol/mediorum/server/signature" @@ -18,6 +20,47 @@ import ( "gocloud.dev/blob" ) +func (ss *MediorumServer) replicateFileParallel(cid string, filePath string) ([]string, error) { + preferred, _ := ss.rendezvousHealthyHosts(cid) + queue := make(chan string, len(preferred)) + for _, p := range preferred { + queue <- p + } + + mu := sync.Mutex{} + results := []string{} + + wg := sync.WaitGroup{} + wg.Add(ss.Config.ReplicationFactor) + + for i := 0; i < ss.Config.ReplicationFactor; i++ { + go func() { + defer wg.Done() + + file, err := os.Open(filePath) + if err != nil { + ss.logger.Error("failed to open file", "filePath", filePath, "err", err) + return + } + defer file.Close() + for peer := range queue { + file.Seek(0, 0) + err := ss.replicateFileToHost(peer, cid, file) + if err == nil { + mu.Lock() + results = append(results, peer) + mu.Unlock() + break + } + } + + }() + } + + wg.Wait() + return results, nil +} + func (ss *MediorumServer) replicateFile(fileName string, file io.ReadSeeker) ([]string, error) { logger := ss.logger.With("task", "replicate", "cid", fileName) diff --git a/mediorum/server/serve_crud.go b/mediorum/server/serve_crud.go index 15b07e38119..57ddb0ebad0 100644 --- a/mediorum/server/serve_crud.go +++ b/mediorum/server/serve_crud.go @@ -5,6 +5,8 @@ import ( "context" "fmt" "net/http" + "os" + "strconv" "time" "github.com/AudiusProject/audius-protocol/mediorum/crudr" @@ -59,6 +61,10 @@ func (ss *MediorumServer) serveCrudPush(c echo.Context) error { return c.String(http.StatusBadRequest, err.Error()) } + if v, _ := strconv.ParseBool(os.Getenv("LOG_CRUD_PUSH")); v { + ss.logger.Info("CRUD_PUSH", "op", op) + } + known := ss.crud.KnownType(op) if !known { return c.String(406, "unknown crudr type") diff --git a/mediorum/server/serve_upload_test.go b/mediorum/server/serve_upload_test.go new file mode 100644 index 00000000000..f900fafbc94 --- /dev/null +++ b/mediorum/server/serve_upload_test.go @@ -0,0 +1,68 @@ +package server + +import ( + "bytes" + "encoding/json" + "io" + "mime/multipart" + "net/http" + "os" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestUploadFile(t *testing.T) { + ss := testNetwork[0] + s2 := testNetwork[1] + + var b bytes.Buffer + w := multipart.NewWriter(&b) + + { + fw, err := w.CreateFormField("template") + assert.NoError(t, err) + fw.Write([]byte("audio")) + } + + fw, err := w.CreateFormFile(filesFormFieldName, "beep.wav") + assert.NoError(t, err) + + hit, err := os.Open("testdata/beep.wav") + assert.NoError(t, err) + + io.Copy(fw, hit) + hit.Close() + w.Close() + + resp, err := http.Post(ss.Config.Self.Host+"/uploads", w.FormDataContentType(), &b) + assert.NoError(t, err) + assert.Equal(t, resp.StatusCode, 200) + + dec := json.NewDecoder(resp.Body) + var uploads []Upload + err = dec.Decode(&uploads) + assert.NoError(t, err) + uploadId := uploads[0].ID + + // force sweep (since blob changes SkipBroadcast) + for _, s := range testNetwork { + s.crud.ForceSweep() + } + + // poll for complete + var u2 *Upload + for i := 0; i < 3; i++ { + resp, err := s2.reqClient.R().SetSuccessResult(&u2).Get(s2.Config.Self.Host + "/uploads/" + uploadId) + assert.NoError(t, err) + assert.Equal(t, resp.StatusCode, 200) + if u2.Status == JobStatusDone { + break + } + time.Sleep(time.Second) + } + + assert.Equal(t, u2.TranscodeProgress, 1.0) + assert.Len(t, u2.TranscodedMirrors, ss.Config.ReplicationFactor) +} diff --git a/mediorum/server/test_main_test.go b/mediorum/server/test_main_test.go index 51fd16d9bb0..79819393768 100644 --- a/mediorum/server/test_main_test.go +++ b/mediorum/server/test_main_test.go @@ -64,7 +64,7 @@ func setupTestNetwork(replicationFactor, serverCount int) []*MediorumServer { } func TestMain(m *testing.M) { - testNetwork = setupTestNetwork(5, 5) + testNetwork = setupTestNetwork(5, 9) exitVal := m.Run() // todo: tear down testNetwork diff --git a/mediorum/server/testdata/beep.wav b/mediorum/server/testdata/beep.wav new file mode 100644 index 00000000000..119b95fbe8c Binary files /dev/null and b/mediorum/server/testdata/beep.wav differ diff --git a/mediorum/server/transcode.go b/mediorum/server/transcode.go index 17fe528dc5d..6717c196775 100644 --- a/mediorum/server/transcode.go +++ b/mediorum/server/transcode.go @@ -318,10 +318,12 @@ func (ss *MediorumServer) transcodeAudio(upload *Upload, destPath string, cmd *e fmt.Sscanf(line, "out_time_us=%f", &u) if u > 0 && durationUs > 0 { percent := u / durationUs - // logger.Debug("transcode", "file", fileHash, "progress", percent) - upload.TranscodeProgress = percent - upload.TranscodedAt = time.Now().UTC() - ss.crud.Patch(upload) + + if percent-upload.TranscodeProgress > 0.1 { + upload.TranscodeProgress = percent + upload.TranscodedAt = time.Now().UTC() + ss.crud.Patch(upload) + } } } } @@ -378,7 +380,7 @@ func (ss *MediorumServer) transcodeFullAudio(upload *Upload, temp *os.File, logg return onError(err, upload.Status, "computeFileCID") } resultKey := resultHash - upload.TranscodedMirrors, err = ss.replicateFile(resultHash, dest) + upload.TranscodedMirrors, err = ss.replicateFileParallel(resultHash, destPath) if err != nil { return onError(err, upload.Status, "replicateFile") } @@ -446,7 +448,7 @@ func (ss *MediorumServer) transcodeAudioPreview(upload *Upload, temp *os.File, l return onError(err, upload.Status, "computeFileCID") } resultKey := resultHash - mirrors, err := ss.replicateFile(resultHash, dest) + mirrors, err := ss.replicateFileParallel(resultHash, destPath) if err != nil { return onError(err, upload.Status, "replicating file") } diff --git a/mediorum/server/upload_client.go b/mediorum/server/upload_client.go index 65c5bfda9be..59621ba5307 100644 --- a/mediorum/server/upload_client.go +++ b/mediorum/server/upload_client.go @@ -59,7 +59,7 @@ func (ss *MediorumServer) startUploadScroller() { uploadCursor.After = upload.CreatedAt } - if len(uploads) == 0 { + if len(overwrites) == 0 { continue }