Skip to content

Commit

Permalink
[PROTO-1784] Improve existing ddex UI for local dev (#8235)
Browse files Browse the repository at this point in the history
  • Loading branch information
theoilie authored Apr 25, 2024
1 parent a065720 commit a95a703
Show file tree
Hide file tree
Showing 35 changed files with 1,457 additions and 1,082 deletions.
46 changes: 46 additions & 0 deletions packages/ddex/ingester/.air.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
root = "."
testdata_dir = "testdata"
tmp_dir = "tmp"

[build]
args_bin = []
bin = "./tmp/main"
cmd = "go build -o ./tmp/main ./cmd/"
delay = 1000
exclude_dir = ["assets", "tmp", "vendor", "testdata"]
exclude_file = []
exclude_regex = ["_test.go"]
exclude_unchanged = false
follow_symlink = false
full_bin = ""
include_dir = []
include_ext = ["go", "tpl", "tmpl", "html"]
include_file = []
kill_delay = "5s"
log = "build-errors.log"
poll = false
poll_interval = 0
post_cmd = []
pre_cmd = []
rerun = false
rerun_delay = 500
send_interrupt = true
stop_on_error = false

[color]
app = ""
build = "yellow"
main = "magenta"
runner = "green"
watcher = "cyan"

[log]
main_only = false
time = false

[misc]
clean_on_exit = true

[screen]
clear_on_rebuild = false
keep_scroll = true
3 changes: 2 additions & 1 deletion packages/ddex/ingester/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ Crawls and parses new DDEX uploads.
### Local Dev
1. Make sure the DDEX dependencies are running: `audius-compose up --ddex-deps`
2. (Optional) See the webapp README to start that server and go through the OAuth flow with a staging user
3. Parse a file: `IS_DEV=true AWS_ENDPOINT=http://ingress:4566 DDEX_CHOREOGRAPHY=ERNReleaseByRelease IS_DEV=true AWS_ENDPOINT=http://ingress:4566 DDEX_CHOREOGRAPHY=ERNBatched go run cmd/main.go ./e2e_test/fixtures/batch/fuga/20240305090456555 --wipe`
3. Parse a file: `IS_DEV=true AWS_ENDPOINT=http://ingress:4566 DDEX_CHOREOGRAPHY=ERNBatched go run cmd/main.go ./e2e_test/fixtures/batch/fuga/20240305090456555 --wipe`
4. Alternatively, run `IS_DEV=true AWS_ENDPOINT=http://ingress:4566 DDEX_CHOREOGRAPHY=ERNBatched air` to run the server with hot reloading. Then, run the webapp (see its README) to use its "Re-process All" button to test code changes against files you sync using the below instructions


### Getting files
Expand Down
72 changes: 60 additions & 12 deletions packages/ddex/ingester/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ import (
"fmt"
"ingester/artistutils"
"ingester/common"
"ingester/constants"
"log"
"log/slog"
"os"
"os/signal"
"path/filepath"
"strings"
"syscall"
"time"

"ingester/crawler"
"ingester/parser"
Expand Down Expand Up @@ -53,6 +55,11 @@ func main() {

// Optionally wipe all state except for OAuthed users
if os.Getenv("IS_DEV") == "true" && len(os.Args) > 2 && os.Args[2] == "--wipe" {
if ingester.S3Client.Endpoint != "http://ingress:4566" && ingester.S3Client.Endpoint != "http://localhost:4566" {
logger.Error("not honoring the --wipe flag because the AWS bucket is not localstack")
return
}

if err := wipeBucket(ingester.S3Client, ingester.RawBucket); err != nil {
logger.Error("Error creating raw bucket", "err", err)
}
Expand All @@ -67,19 +74,14 @@ func main() {
logger.Error("Error wiping crawler_cursor collection", "err", err)
}
if result, err := ingester.DeliveriesColl.DeleteMany(ingester.Ctx, filter); err == nil {
log.Printf("Deleted %d deliveries\n", result.DeletedCount)
log.Printf("Deleted %d deliveries documents\n", result.DeletedCount)
} else {
logger.Error("Error wiping deliveries collection", "err", err)
}
if result, err := ingester.PendingReleasesColl.DeleteMany(ingester.Ctx, filter); err == nil {
log.Printf("Deleted %d pending_releases\n", result.DeletedCount)
} else {
logger.Error("Error wiping pending_releases collection", "err", err)
}
if result, err := ingester.PublishedReleasesColl.DeleteMany(ingester.Ctx, filter); err == nil {
log.Printf("Deleted %d published_releases\n", result.DeletedCount)
if result, err := ingester.ReleasesColl.DeleteMany(ingester.Ctx, filter); err == nil {
log.Printf("Deleted %d releases documents\n", result.DeletedCount)
} else {
logger.Error("Error wiping published_releases collection", "err", err)
logger.Error("Error wiping releases collection", "err", err)
}
}

Expand All @@ -88,8 +90,50 @@ func main() {
log.Fatal(err)
}

// Start the crawler and parser
go crawler.CrawlThenParse(ingester, parser.NewParser(ingester).ProcessDelivery)
// Crawl and parse each new delivery that gets put into S3
p := parser.NewParser(ingester)
go crawler.CrawlThenParse(ingester, p.ProcessDelivery)

// Re-parse releases (UI sets release_status to "awaiting_parse" to re-parse)
go func() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
cursor, err := ingester.ReleasesColl.Find(ingester.Ctx, bson.M{"release_status": constants.ReleaseStatusAwaitingParse})
if err != nil {
logger.Error("Error querying releases", "err", err)
continue
}

for cursor.Next(ingester.Ctx) {
var release common.Release
err := cursor.Decode(&release)
if err != nil {
logger.Error("Error unmarshalling release", "err", err)
continue
}

logger.Info("Re-parsing release", "release_id", release.ReleaseID)
if ok := p.ParseRelease(&release); !ok {
logger.Error("Failed to parse release in an unexpected way (couldn't update status)", "release_id", release.ReleaseID)
}
}

// Close the cursor and check for errors
if err := cursor.Close(ctx); err != nil {
logger.Error("Error closing cursor", "err", err)
}
if err := cursor.Err(); err != nil {
logger.Error("Error during cursor iteration", "err", err)
}
}
}
}()

// Test the ingester with a delivery if provided
if os.Getenv("IS_DEV") == "true" && len(os.Args) > 1 {
Expand All @@ -103,7 +147,7 @@ func main() {

testDeliveryPath := os.Args[1]
testDeliveryURL := uploadTestDelivery(ingester, testDeliveryPath)
logger.Info("Uploaded test delivery", "url", testDeliveryURL)
logger.Info("Uploaded test delivery", "local path", testDeliveryPath, "url", testDeliveryURL)
}

<-ctx.Done() // Wait until the context is canceled
Expand Down Expand Up @@ -134,6 +178,10 @@ func createBucket(s3Client *s3.S3, bucket string) error {
}

func wipeBucket(s3Client *s3.S3, bucketName string) error {
if s3Client.Endpoint != "http://ingress:4566" && s3Client.Endpoint != "http://localhost:4566" {
return fmt.Errorf("cannot wipe bucket '%s' because endpoint is not localstack", bucketName)
}

listParams := &s3.ListObjectsV2Input{
Bucket: aws.String(bucketName),
}
Expand Down
54 changes: 26 additions & 28 deletions packages/ddex/ingester/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,41 +30,39 @@ func MustGetChoreography() constants.DDEXChoreography {
}

type Ingester struct {
DDEXChoreography constants.DDEXChoreography
Ctx context.Context
MongoClient *mongo.Client
S3Client *s3.S3
S3Downloader *s3manager.Downloader
S3Uploader *s3manager.Uploader
RawBucket string
CrawledBucket string
CrawlerCursorColl *mongo.Collection
DeliveriesColl *mongo.Collection
PendingReleasesColl *mongo.Collection
PublishedReleasesColl *mongo.Collection
UsersColl *mongo.Collection
Logger *slog.Logger
DDEXChoreography constants.DDEXChoreography
Ctx context.Context
MongoClient *mongo.Client
S3Client *s3.S3
S3Downloader *s3manager.Downloader
S3Uploader *s3manager.Uploader
RawBucket string
CrawledBucket string
CrawlerCursorColl *mongo.Collection
DeliveriesColl *mongo.Collection
ReleasesColl *mongo.Collection
UsersColl *mongo.Collection
Logger *slog.Logger
}

func NewIngester(ctx context.Context) *Ingester {
s3, s3Session := InitS3Client(slog.Default())
mongoClient := InitMongoClient(ctx, slog.Default())

return &Ingester{
DDEXChoreography: MustGetChoreography(),
S3Client: s3,
S3Downloader: s3manager.NewDownloader(s3Session),
S3Uploader: s3manager.NewUploader(s3Session),
MongoClient: mongoClient,
RawBucket: MustGetenv("AWS_BUCKET_RAW"),
CrawledBucket: MustGetenv("AWS_BUCKET_CRAWLED"),
CrawlerCursorColl: mongoClient.Database("ddex").Collection("crawler_cursor"),
DeliveriesColl: mongoClient.Database("ddex").Collection("deliveries"),
PendingReleasesColl: mongoClient.Database("ddex").Collection("pending_releases"),
PublishedReleasesColl: mongoClient.Database("ddex").Collection("published_releases"),
UsersColl: mongoClient.Database("ddex").Collection("users"),
Ctx: ctx,
Logger: slog.Default(),
DDEXChoreography: MustGetChoreography(),
S3Client: s3,
S3Downloader: s3manager.NewDownloader(s3Session),
S3Uploader: s3manager.NewUploader(s3Session),
MongoClient: mongoClient,
RawBucket: MustGetenv("AWS_BUCKET_RAW"),
CrawledBucket: MustGetenv("AWS_BUCKET_CRAWLED"),
CrawlerCursorColl: mongoClient.Database("ddex").Collection("crawler_cursor"),
DeliveriesColl: mongoClient.Database("ddex").Collection("deliveries"),
ReleasesColl: mongoClient.Database("ddex").Collection("releases"),
UsersColl: mongoClient.Database("ddex").Collection("users"),
Ctx: ctx,
Logger: slog.Default(),
}
}

Expand Down
54 changes: 26 additions & 28 deletions packages/ddex/ingester/common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type UnprocessedBatch struct {
NumMessages int `bson:"num_messages"`
}

// TODO: Delivery should just store IDs of releases that it inserted into the releases collection
// Delivery represents crawled upload contents that are ready to be parsed
type Delivery struct {
RemotePath string `bson:"_id"`
Expand All @@ -35,39 +36,36 @@ type Delivery struct {
CreatedAt time.Time `bson:"created_at"`
ValidationErrors []string `bson:"validation_errors"`

// Note: these only contain the data to be parsed. They're moved to PendingRelease after parsing
// TODO: Don't store releases and batches in the delivery
// Note: these only contain the data to be parsed. They're moved to the releases collection after parsing
Releases []UnprocessedRelease `bson:"releases"`
Batches []UnprocessedBatch `bson:"batches"`
}

// TODO: When processing a release where a ReleaseID already exists, we can update the existing document and have a field with edit history

// PendingRelease represents a fully formed release that waiting to be uploaded to Audius
type PendingRelease struct {
ReleaseID string `bson:"_id"`
DeliveryRemotePath string `bson:"delivery_remote_path"` // aka Delivery._id
Release Release `bson:"release"`
CreatedAt time.Time `bson:"created_at"`
PublishErrors []string `bson:"publish_errors"`
FailureCount int `bson:"failure_count"`
FailedAfterUpload bool `bson:"failed_after_upload"` // If the release failed after uploading to Audius, which means there could be some cleanup to do
}

// PublishedRelease represents a release that has been successfully uploaded to Audius
type PublishedRelease struct {
ReleaseID string `bson:"_id"`
DeliveryRemotePath string `bson:"delivery_remote_path"`
EntityID string `bson:"entity_id"`
Blockhash string `bson:"blockhash"`
Blocknumber int64 `bson:"blocknumber"`
Release Release `bson:"release"`
CreatedAt time.Time `bson:"created_at"`
}

// Release represents a track or album to be uploaded to Audius
type Release struct {
ReleaseProfile ReleaseProfile `bson:"release_profile"` // "ReleaseProfileVersionId" from the DDEX XML
ParsedReleaseElems []ParsedReleaseElement `bson:"parsed_release_elems"` // Releases parsed from XML
SDKUploadMetadata SDKUploadMetadata `bson:"sdk_upload_metadata"` // Metadata for the publisher to upload to Audius via SDK
ReleaseID string `bson:"_id"`
DeliveryRemotePath string `bson:"delivery_remote_path"` // aka Delivery._id (the delivery this release comes from)
BatchID string `bson:"batch_id,omitempty"` // If this release is part of a batch
RawXML primitive.Binary `bson:"raw_xml"` // The raw XML content of the release
ParseErrors []string `bson:"parse_errors"`
PublishErrors []string `bson:"publish_errors"`
FailureCount int `bson:"failure_count"`
ReleaseStatus string `bson:"release_status"`

// TODO: Probably want to change this and/or add LastParsed
CreatedAt time.Time `bson:"created_at"`

// Parsed from the release's XML
ExpectedERNVersion string `bson:"expected_ern_version,omitempty"` // From the batch's XML
ReleaseProfile ReleaseProfile `bson:"release_profile"` // "ReleaseProfileVersionId" from the DDEX XML
ParsedReleaseElems []ParsedReleaseElement `bson:"parsed_release_elems"` // Releases parsed from XML
SDKUploadMetadata SDKUploadMetadata `bson:"sdk_upload_metadata"` // Metadata for the publisher to upload to Audius via SDK

// Only set if the release was successfully published
EntityID string `bson:"entity_id"`
Blockhash string `bson:"blockhash"`
Blocknumber int64 `bson:"blocknumber"`
}

// ParsedReleaseElement contains parsed details of a <Release> element
Expand Down
12 changes: 12 additions & 0 deletions packages/ddex/ingester/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,22 @@ const (
ERNBatched DDEXChoreography = "ERNBatched"
)

// TODO: We don't want to store statuses about deliveries for the most part. We should just store the releases and batches
const (
DeliveryStatusParsing = "parsing" // The delivery was crawled, and its XML is being parsed
DeliveryStatusSuccess = "success" // All releases were parsed and moved to PendingRelease. Over and done with forever, unless it will be re-processed for bug fixes. Even if we later fail to upload a PendingRelease created by this delivery, the delivery itself was successful
DeliveryStatusErrorCrawling = "error_crawling" // There was an error crawling the delivery
DeliveryStatusErrorParsing = "error_parsing" // There was an error parsing the crawled content. Any releases that were successfully parsed were ignored (not moved to PendingRelease)
DeliveryStatusRejected = "rejected" // Crawling and parsing succeeded, but the delivery was rejected for some reason
)

const (
ReleaseStatusAwaitingParse = "awaiting_parse" // The release is waiting to be published
ReleaseStatusAwaitingPublish = "awaiting_publish" // The release is waiting to be uploaded to Audius
ReleaseStatusErrorUserMatch = "error_user_match" // The release didn't have a user that matched with an OAuthed Audius user
ReleaseStatusErrorGenreMatch = "error_genre_match" // The release didn't have a genre that matched with an Audius genre
ReleaseStatusErrorParsing = "error_parsing" // Some other error occurred during parsing. See ParseErrors
ReleaseStatusErrorDuringUpload = "error_during_upload" // An error occurred while trying to publish to Audius
ReleaseStatusErrorAfterUpload = "error_after_upload" // The release was published to Audius, but there was an error after publishing
ReleaseStatusPublished = "published" // The release was successfully published to Audius
)
Loading

0 comments on commit a95a703

Please sign in to comment.