Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PROTO-1784] Improve existing ddex UI for local dev #8235

Merged
merged 3 commits into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions packages/ddex/ingester/.air.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
root = "."
testdata_dir = "testdata"
tmp_dir = "tmp"

[build]
args_bin = []
bin = "./tmp/main"
cmd = "go build -o ./tmp/main ./cmd/"
delay = 1000
exclude_dir = ["assets", "tmp", "vendor", "testdata"]
exclude_file = []
exclude_regex = ["_test.go"]
exclude_unchanged = false
follow_symlink = false
full_bin = ""
include_dir = []
include_ext = ["go", "tpl", "tmpl", "html"]
include_file = []
kill_delay = "5s"
log = "build-errors.log"
poll = false
poll_interval = 0
post_cmd = []
pre_cmd = []
rerun = false
rerun_delay = 500
send_interrupt = true
stop_on_error = false

[color]
app = ""
build = "yellow"
main = "magenta"
runner = "green"
watcher = "cyan"

[log]
main_only = false
time = false

[misc]
clean_on_exit = true

[screen]
clear_on_rebuild = false
keep_scroll = true
3 changes: 2 additions & 1 deletion packages/ddex/ingester/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ Crawls and parses new DDEX uploads.
### Local Dev
1. Make sure the DDEX dependencies are running: `audius-compose up --ddex-deps`
2. (Optional) See the webapp README to start that server and go through the OAuth flow with a staging user
3. Parse a file: `IS_DEV=true AWS_ENDPOINT=http://ingress:4566 DDEX_CHOREOGRAPHY=ERNReleaseByRelease IS_DEV=true AWS_ENDPOINT=http://ingress:4566 DDEX_CHOREOGRAPHY=ERNBatched go run cmd/main.go ./e2e_test/fixtures/batch/fuga/20240305090456555 --wipe`
3. Parse a file: `IS_DEV=true AWS_ENDPOINT=http://ingress:4566 DDEX_CHOREOGRAPHY=ERNBatched go run cmd/main.go ./e2e_test/fixtures/batch/fuga/20240305090456555 --wipe`
4. You can also run `IS_DEV=true AWS_ENDPOINT=http://ingress:4566 DDEX_CHOREOGRAPHY=ERNBatched air` to run the server with hot reloading, and then run the webapp (see its README) to click its buttons to re-parse while making code changes to the parser


### Getting files
Expand Down
72 changes: 60 additions & 12 deletions packages/ddex/ingester/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ import (
"fmt"
"ingester/artistutils"
"ingester/common"
"ingester/constants"
"log"
"log/slog"
"os"
"os/signal"
"path/filepath"
"strings"
"syscall"
"time"

"ingester/crawler"
"ingester/parser"
Expand Down Expand Up @@ -53,6 +55,11 @@ func main() {

// Optionally wipe all state except for OAuthed users
if os.Getenv("IS_DEV") == "true" && len(os.Args) > 2 && os.Args[2] == "--wipe" {
if ingester.S3Client.Endpoint != "http://ingress:4566" && ingester.S3Client.Endpoint != "http://localhost:4566" {
logger.Error("not honoring the --wipe flag because the AWS bucket is not localstack")
return
}

if err := wipeBucket(ingester.S3Client, ingester.RawBucket); err != nil {
logger.Error("Error creating raw bucket", "err", err)
}
Expand All @@ -67,19 +74,14 @@ func main() {
logger.Error("Error wiping crawler_cursor collection", "err", err)
}
if result, err := ingester.DeliveriesColl.DeleteMany(ingester.Ctx, filter); err == nil {
log.Printf("Deleted %d deliveries\n", result.DeletedCount)
log.Printf("Deleted %d deliveries documents\n", result.DeletedCount)
} else {
logger.Error("Error wiping deliveries collection", "err", err)
}
if result, err := ingester.PendingReleasesColl.DeleteMany(ingester.Ctx, filter); err == nil {
log.Printf("Deleted %d pending_releases\n", result.DeletedCount)
} else {
logger.Error("Error wiping pending_releases collection", "err", err)
}
if result, err := ingester.PublishedReleasesColl.DeleteMany(ingester.Ctx, filter); err == nil {
log.Printf("Deleted %d published_releases\n", result.DeletedCount)
if result, err := ingester.ReleasesColl.DeleteMany(ingester.Ctx, filter); err == nil {
log.Printf("Deleted %d releases documents\n", result.DeletedCount)
} else {
logger.Error("Error wiping published_releases collection", "err", err)
logger.Error("Error wiping releases collection", "err", err)
}
}

Expand All @@ -88,8 +90,50 @@ func main() {
log.Fatal(err)
}

// Start the crawler and parser
go crawler.CrawlThenParse(ingester, parser.NewParser(ingester).ProcessDelivery)
// Crawl and parse each new delivery that gets put into S3
p := parser.NewParser(ingester)
go crawler.CrawlThenParse(ingester, p.ProcessDelivery)

// Re-parse releases (UI sets release_status to "awaiting_parse" to re-parse)
go func() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
cursor, err := ingester.ReleasesColl.Find(ingester.Ctx, bson.M{"release_status": constants.ReleaseStatusAwaitingParse})
if err != nil {
logger.Error("Error querying releases", "err", err)
continue
}

for cursor.Next(ingester.Ctx) {
var release common.Release
err := cursor.Decode(&release)
if err != nil {
logger.Error("Error unmarshalling release", "err", err)
continue
}

logger.Info("Re-parsing release", "release_id", release.ReleaseID)
if ok := p.ParseRelease(&release); !ok {
logger.Error("Failed to parse release in an unexpected way (couldn't update status)", "release_id", release.ReleaseID)
}
}

// Close the cursor and check for errors
if err := cursor.Close(ctx); err != nil {
logger.Error("Error closing cursor", "err", err)
}
if err := cursor.Err(); err != nil {
logger.Error("Error during cursor iteration", "err", err)
}
}
}
}()

// Test the ingester with a delivery if provided
if os.Getenv("IS_DEV") == "true" && len(os.Args) > 1 {
Expand All @@ -103,7 +147,7 @@ func main() {

testDeliveryPath := os.Args[1]
testDeliveryURL := uploadTestDelivery(ingester, testDeliveryPath)
logger.Info("Uploaded test delivery", "url", testDeliveryURL)
logger.Info("Uploaded test delivery", "local path", testDeliveryPath, "url", testDeliveryURL)
}

<-ctx.Done() // Wait until the context is canceled
Expand Down Expand Up @@ -134,6 +178,10 @@ func createBucket(s3Client *s3.S3, bucket string) error {
}

func wipeBucket(s3Client *s3.S3, bucketName string) error {
if s3Client.Endpoint != "http://ingress:4566" && s3Client.Endpoint != "http://localhost:4566" {
return fmt.Errorf("cannot wipe bucket '%s' because endpoint is not localstack", bucketName)
}

listParams := &s3.ListObjectsV2Input{
Bucket: aws.String(bucketName),
}
Expand Down
54 changes: 26 additions & 28 deletions packages/ddex/ingester/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,41 +30,39 @@ func MustGetChoreography() constants.DDEXChoreography {
}

type Ingester struct {
DDEXChoreography constants.DDEXChoreography
Ctx context.Context
MongoClient *mongo.Client
S3Client *s3.S3
S3Downloader *s3manager.Downloader
S3Uploader *s3manager.Uploader
RawBucket string
CrawledBucket string
CrawlerCursorColl *mongo.Collection
DeliveriesColl *mongo.Collection
PendingReleasesColl *mongo.Collection
PublishedReleasesColl *mongo.Collection
UsersColl *mongo.Collection
Logger *slog.Logger
DDEXChoreography constants.DDEXChoreography
Ctx context.Context
MongoClient *mongo.Client
S3Client *s3.S3
S3Downloader *s3manager.Downloader
S3Uploader *s3manager.Uploader
RawBucket string
CrawledBucket string
CrawlerCursorColl *mongo.Collection
DeliveriesColl *mongo.Collection
ReleasesColl *mongo.Collection
UsersColl *mongo.Collection
Logger *slog.Logger
}

func NewIngester(ctx context.Context) *Ingester {
s3, s3Session := InitS3Client(slog.Default())
mongoClient := InitMongoClient(ctx, slog.Default())

return &Ingester{
DDEXChoreography: MustGetChoreography(),
S3Client: s3,
S3Downloader: s3manager.NewDownloader(s3Session),
S3Uploader: s3manager.NewUploader(s3Session),
MongoClient: mongoClient,
RawBucket: MustGetenv("AWS_BUCKET_RAW"),
CrawledBucket: MustGetenv("AWS_BUCKET_CRAWLED"),
CrawlerCursorColl: mongoClient.Database("ddex").Collection("crawler_cursor"),
DeliveriesColl: mongoClient.Database("ddex").Collection("deliveries"),
PendingReleasesColl: mongoClient.Database("ddex").Collection("pending_releases"),
PublishedReleasesColl: mongoClient.Database("ddex").Collection("published_releases"),
UsersColl: mongoClient.Database("ddex").Collection("users"),
Ctx: ctx,
Logger: slog.Default(),
DDEXChoreography: MustGetChoreography(),
S3Client: s3,
S3Downloader: s3manager.NewDownloader(s3Session),
S3Uploader: s3manager.NewUploader(s3Session),
MongoClient: mongoClient,
RawBucket: MustGetenv("AWS_BUCKET_RAW"),
CrawledBucket: MustGetenv("AWS_BUCKET_CRAWLED"),
CrawlerCursorColl: mongoClient.Database("ddex").Collection("crawler_cursor"),
DeliveriesColl: mongoClient.Database("ddex").Collection("deliveries"),
ReleasesColl: mongoClient.Database("ddex").Collection("releases"),
UsersColl: mongoClient.Database("ddex").Collection("users"),
Ctx: ctx,
Logger: slog.Default(),
}
}

Expand Down
54 changes: 26 additions & 28 deletions packages/ddex/ingester/common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type UnprocessedBatch struct {
NumMessages int `bson:"num_messages"`
}

// TODO: Delivery should just store IDs of releases that it inserted into the releases collection
// Delivery represents crawled upload contents that are ready to be parsed
type Delivery struct {
RemotePath string `bson:"_id"`
Expand All @@ -35,39 +36,36 @@ type Delivery struct {
CreatedAt time.Time `bson:"created_at"`
ValidationErrors []string `bson:"validation_errors"`

// Note: these only contain the data to be parsed. They're moved to PendingRelease after parsing
// TODO: Don't store releases and batches in the delivery
// Note: these only contain the data to be parsed. They're moved to the releases collection after parsing
Releases []UnprocessedRelease `bson:"releases"`
Batches []UnprocessedBatch `bson:"batches"`
}

// TODO: When processing a release where a ReleaseID already exists, we can update the existing document and have a field with edit history

// PendingRelease represents a fully formed release that waiting to be uploaded to Audius
type PendingRelease struct {
ReleaseID string `bson:"_id"`
DeliveryRemotePath string `bson:"delivery_remote_path"` // aka Delivery._id
Release Release `bson:"release"`
CreatedAt time.Time `bson:"created_at"`
PublishErrors []string `bson:"publish_errors"`
FailureCount int `bson:"failure_count"`
FailedAfterUpload bool `bson:"failed_after_upload"` // If the release failed after uploading to Audius, which means there could be some cleanup to do
}

// PublishedRelease represents a release that has been successfully uploaded to Audius
type PublishedRelease struct {
ReleaseID string `bson:"_id"`
DeliveryRemotePath string `bson:"delivery_remote_path"`
EntityID string `bson:"entity_id"`
Blockhash string `bson:"blockhash"`
Blocknumber int64 `bson:"blocknumber"`
Release Release `bson:"release"`
CreatedAt time.Time `bson:"created_at"`
}

// Release represents a track or album to be uploaded to Audius
type Release struct {
ReleaseProfile ReleaseProfile `bson:"release_profile"` // "ReleaseProfileVersionId" from the DDEX XML
ParsedReleaseElems []ParsedReleaseElement `bson:"parsed_release_elems"` // Releases parsed from XML
SDKUploadMetadata SDKUploadMetadata `bson:"sdk_upload_metadata"` // Metadata for the publisher to upload to Audius via SDK
ReleaseID string `bson:"_id"`
DeliveryRemotePath string `bson:"delivery_remote_path"` // aka Delivery._id (the delivery this release comes from)
BatchID string `bson:"batch_id,omitempty"` // If this release is part of a batch
RawXML primitive.Binary `bson:"raw_xml"` // The raw XML content of the release
ParseErrors []string `bson:"parse_errors"`
PublishErrors []string `bson:"publish_errors"`
FailureCount int `bson:"failure_count"`
ReleaseStatus string `bson:"release_status"`

// TODO: Probably want to change this and/or add LastParsed
CreatedAt time.Time `bson:"created_at"`

// Parsed from the release's XML
ExpectedERNVersion string `bson:"expected_ern_version,omitempty"` // From the batch's XML
ReleaseProfile ReleaseProfile `bson:"release_profile"` // "ReleaseProfileVersionId" from the DDEX XML
ParsedReleaseElems []ParsedReleaseElement `bson:"parsed_release_elems"` // Releases parsed from XML
SDKUploadMetadata SDKUploadMetadata `bson:"sdk_upload_metadata"` // Metadata for the publisher to upload to Audius via SDK

// Only set if the release was successfully published
EntityID string `bson:"entity_id"`
Blockhash string `bson:"blockhash"`
Blocknumber int64 `bson:"blocknumber"`
}

// ParsedReleaseElement contains parsed details of a <Release> element
Expand Down
12 changes: 12 additions & 0 deletions packages/ddex/ingester/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,22 @@ const (
ERNBatched DDEXChoreography = "ERNBatched"
)

// TODO: We don't want to store statuses about deliveries for the most part. We should just store the releases and batches
const (
DeliveryStatusParsing = "parsing" // The delivery was crawled, and its XML is being parsed
DeliveryStatusSuccess = "success" // All releases were parsed and moved to PendingRelease. Over and done with forever, unless it will be re-processed for bug fixes. Even if we later fail to upload a PendingRelease created by this delivery, the delivery itself was successful
DeliveryStatusErrorCrawling = "error_crawling" // There was an error crawling the delivery
DeliveryStatusErrorParsing = "error_parsing" // There was an error parsing the crawled content. Any releases that were successfully parsed were ignored (not moved to PendingRelease)
DeliveryStatusRejected = "rejected" // Crawling and parsing succeeded, but the delivery was rejected for some reason
)

const (
ReleaseStatusAwaitingParse = "awaiting_parse" // The release is waiting to be published
ReleaseStatusAwaitingPublish = "awaiting_publish" // The release is waiting to be uploaded to Audius
ReleaseStatusErrorUserMatch = "error_user_match" // The release didn't have a user that matched with an OAuthed Audius user
ReleaseStatusErrorGenreMatch = "error_genre_match" // The release didn't have a genre that matched with an Audius genre
ReleaseStatusErrorParsing = "error_parsing" // Some other error occurred during parsing. See ParseErrors
ReleaseStatusErrorDuringUpload = "error_during_upload" // An error occurred while trying to publish to Audius
ReleaseStatusErrorAfterUpload = "error_after_upload" // The release was published to Audius, but there was an error after publishing
ReleaseStatusPublished = "published" // The release was successfully published to Audius
)
Loading