Skip to content

Commit

Permalink
More DDEX cleanup (#7516)
Browse files Browse the repository at this point in the history
  • Loading branch information
michellebrier authored Feb 8, 2024
1 parent cf0042b commit 1e99ac8
Show file tree
Hide file tree
Showing 24 changed files with 258 additions and 204 deletions.
18 changes: 9 additions & 9 deletions packages/ddex/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,35 +6,35 @@ Processes and uploads DDEX releases to Audius.
DDEX requires these services: `ddex-webapp`, `ddex-crawler`, `ddex-indexer`, `ddex-parser`, `ddex-publisher`, `ddex-mongo`.

### Env configuration
All services read from `.env`.
All services read from `packages/ddex/.env`.

To use stage envs: `cp .env.stage .env`
To use stage envs: `cp packages/ddex/.env.stage packages/ddex/.env`

To use dev envs: `cp .env.dev .env`
To use dev envs: `cp packages/ddex/.env.dev packages/ddex/.env`

Fill in all missing values. See the `Creating a bucket in S3` section below for how to set up S3.

For docker compose to work: (at monorepo root) `cat packages/ddex/.env >> dev-tools/compose/.env`
For docker compose to work: `cat packages/ddex/.env >> dev-tools/compose/.env`

### Setup
1. (At the monorepo root) Generate a keyfile for mongodb:
```
openssl rand -base64 756 > packages/ddex/mongo-keyfile
chmod 400 packages/ddex/mongo-keyfile
```
2. `audius-compose connect` to update your /etc/hosts
2. `audius-compose connect` to update your `/etc/hosts`
3. `audius-compose up --ddex`
4. Once the `ddex-mongo` container is running: manually intiate the mongodb replica set with `docker exec -it ddex-mongo mongosh -u mongo -p mongo --authenticationDatabase admin --eval 'rs.initiate({_id:"rs0", members:[{_id:0, host:"ddex-mongo:27017"}]})'`. The other ddex containers will be blocked from starting until this command succeeds.

### Bring up the ddex stack subsequently
`audius-compose up --ddex`
`audius-compose up --ddex` or `docker compose -f dev-tools/compose/docker-compose.ddex.yml --profile ddex up -d`

Note: `audius-compose down` removes the `ddex-mongo-db` volume, so if you run this, you will need to initiate the mongodb replica set again the next time you bring up the `ddex-mongo` container. See step 4 in the Setup section above.
Note: `audius-compose down` removes the `ddex-mongo-db` volume, so if you run this, you will need to initiate the mongodb replica set again the next time you bring up the `ddex-mongo` container. See step 4 in the `Setup` section above.

To access the ddex db via the mongo shell: `docker exec -it ddex-mongo mongosh -u mongo -p mongo --authenticationDatabase admin` then `use ddex`

### Develop with hot reloading
Each service can be run independently as long as `ddex-mongo` is up (from `audius-compose up --ddex`` or `docker compose --project-directory=dev-tools/compose --file=dev-tools/compose/docker-compose.yml --profile=ddex up ddex-mongo -d`). See the respective subdirectories' READMEs.
Each service can be run independently as long as `ddex-mongo` is up (from `audius-compose up --ddex` or `docker compose --project-directory=dev-tools/compose --file=dev-tools/compose/docker-compose.yml --profile=ddex up ddex-mongo -d`). See the respective subdirectories' READMEs.

### Creating a bucket in S3
1. Create a new bucket in the S3 console with the name `ddex-[dev|staging]-<label/distributor>-raw`. Use all the defaults, including "ACLs disabled"
Expand All @@ -51,7 +51,7 @@ Each service can be run independently as long as `ddex-mongo` is up (from `audiu
4. Create an IAM User [here](https://us-east-1.console.aws.amazon.com/iamv2/home?region=us-west-2#/users/create) (or search IAM and click Users > Create User).
* Name the user `ddex-[dev|staging]-<label/distributor>-user` and press Next.
* Select "Attach policies directly," and search for the policy you created (`ddex-[dev|staging]-<label/distributor>-policy`). Check the box next to it and press Next and then Create User.
5. Search for your new user and press "Create access key" and then "Third-party service." Copy the access key and secret access key into your .env file (assuming you've already done `cp .env.dev .env`).
5. Search for your new user and press "Create access key" and then "Third-party service." Copy the access key and secret access key into your `.env` file (assuming you've already done `cp .env.[dev|stage] .env`).
6. Go back to the bucket ending with `raw`, and add CORS at the bottom of the Permissions tab. Here's an example for dev, but for a prod environment you'll wnat to replace "*" in "AllowedOrigins" with the DNS that the frontend will be served from:
```json
[
Expand Down
4 changes: 2 additions & 2 deletions packages/ddex/ingester/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ func main() {

switch *service {
case "crawler":
go crawler.Run(ctx)
go crawler.RunNewCrawler(ctx)
case "indexer":
go indexer.RunNewIndexer(ctx)
case "parser":
go parser.Run(ctx)
go parser.RunNewParser(ctx)
default:
fmt.Println("Unknown service: " + *service)
}
Expand Down
9 changes: 5 additions & 4 deletions packages/ddex/ingester/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package common
import (
"context"
"log"
"log/slog"
"os"
"time"

Expand All @@ -14,7 +15,7 @@ import (
"go.mongodb.org/mongo-driver/mongo/options"
)

func InitMongoClient(ctx context.Context) *mongo.Client {
func InitMongoClient(ctx context.Context, logger *slog.Logger) *mongo.Client {
mongoUrl := os.Getenv("DDEX_MONGODB_URL")
if mongoUrl == "" {
mongoUrl = "mongodb://mongo:mongo@localhost:27017/ddex?authSource=admin&replicaSet=rs0"
Expand All @@ -23,11 +24,11 @@ func InitMongoClient(ctx context.Context) *mongo.Client {
if err != nil {
panic(err)
}
log.Println("Connected to mongo")
logger.Info("Connected to mongo")
return client
}

func InitS3Client() (*s3.S3, *session.Session) {
func InitS3Client(logger *slog.Logger) (*s3.S3, *session.Session) {
awsRegion := MustGetenv("AWS_REGION")
awsKey := MustGetenv("AWS_ACCESS_KEY_ID")
awsSecret := MustGetenv("AWS_SECRET_ACCESS_KEY")
Expand All @@ -38,7 +39,7 @@ func InitS3Client() (*s3.S3, *session.Session) {
if err != nil {
panic(err)
}
log.Println("Connected to s3")
logger.Info("Connected to s3")
return s3.New(sess), sess
}

Expand Down
11 changes: 5 additions & 6 deletions packages/ddex/ingester/common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,29 +13,28 @@ type Upload struct {
CreatedAt time.Time `bson:"created_at"`
}

type Indexed struct {
type Delivery struct {
ID primitive.ObjectID `bson:"_id"`
UploadETag string `bson:"upload_etag"`
DeliveryID string `bson:"delivery_id"`
DeliveryStatus string `bson:"delivery_status"`
XmlFilePath string `bson:"xml_file_path"`
XmlContent primitive.Binary `bson:"xml_content"`
CreatedAt time.Time `bson:"created_at"`
}

type Parsed struct {
type PendingRelease struct {
ID primitive.ObjectID `bson:"_id"`
UploadETag string `bson:"upload_etag"`
DeliveryID string `bson:"delivery_id"`
DeliveryID primitive.ObjectID `bson:"delivery_id"`
Entity string `bson:"entity"`
PublishDate time.Time `bson:"publish_date"`
CreatedAt time.Time `bson:"created_at"`
}

type Published struct {
type PublishedRelease struct {
ID primitive.ObjectID `bson:"_id"`
UploadETag string `bson:"upload_etag"`
DeliveryID string `bson:"delivery_id"`
DeliveryID primitive.ObjectID `bson:"delivery_id"`
Entity string `bson:"entity"`
PublishDate time.Time `bson:"publish_date"`
EntityID string `bson:"entity_id"`
Expand Down
74 changes: 45 additions & 29 deletions packages/ddex/ingester/crawler/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ package crawler

import (
"context"
"fmt"
"ingester/common"
"log"
"log/slog"
"strings"
"time"

Expand All @@ -14,12 +15,32 @@ import (
"go.mongodb.org/mongo-driver/mongo/options"
)

func Run(ctx context.Context) {
s3Client, _ := common.InitS3Client()
mongoClient := common.InitMongoClient(ctx)
type Crawler struct {
s3Client *s3.S3
mongoClient *mongo.Client
rawBucket string
cursorsColl *mongo.Collection
uploadsColl *mongo.Collection
ctx context.Context
logger *slog.Logger
}

func RunNewCrawler(ctx context.Context) {
logger := slog.With("service", "crawler")
s3Client, _ := common.InitS3Client(logger)
mongoClient := common.InitMongoClient(ctx, logger)
defer mongoClient.Disconnect(ctx)

bucketName := common.MustGetenv("AWS_BUCKET_RAW")
c := &Crawler{
s3Client: s3Client,
mongoClient: mongoClient,
rawBucket: common.MustGetenv("AWS_BUCKET_RAW"),
cursorsColl: mongoClient.Database("ddex").Collection("cursors"),
uploadsColl: mongoClient.Database("ddex").Collection("uploads"),
ctx: ctx,
logger: logger,
}

ticker := time.NewTicker(3 * time.Minute)
defer ticker.Stop()

Expand All @@ -28,30 +49,30 @@ func Run(ctx context.Context) {
case <-ctx.Done():
return
case <-ticker.C:
lastPolledTime, err := getLastPolledTime(mongoClient, ctx)
lastPolledTime, err := c.getLastPolledTime()
if err != nil {
log.Println("Failed to retrieve s3 raw bucket's last polled time:", err)
c.logger.Error("Failed to retrieve s3 raw bucket's last polled time", "error", err)
continue
}

uploads, err := pollS3Bucket(s3Client, bucketName, lastPolledTime)
uploads, err := c.pollS3Bucket(lastPolledTime)
if err != nil {
log.Println("Error polling S3 bucket:", err)
c.logger.Error("Error polling S3 bucket", "error", err)
continue
}

if len(uploads) > 0 {
err = persistUploads(mongoClient, bucketName, uploads, ctx)
err = c.persistUploads(uploads)
if err != nil {
log.Println("Error inserting into mongodb:", err)
c.logger.Error("Error inserting into mongodb", "error", err)
continue
}
log.Printf("Processed %d new uploads", len(uploads))
c.logger.Info(fmt.Sprintf("Processed %d new uploads", len(uploads)))
}

err = updateLastPolledTime(mongoClient, time.Now(), ctx)
err = c.updateLastPolledTime(time.Now())
if err != nil {
log.Println("Failed to update s3 raw bucket's last polled time:", err)
c.logger.Error("Failed to update s3 raw bucket's last polled time", "error", err)
}

if ctx.Err() != nil {
Expand All @@ -61,26 +82,22 @@ func Run(ctx context.Context) {
}
}

func updateLastPolledTime(client *mongo.Client, lastPolledTime time.Time, ctx context.Context) error {
collection := client.Database("ddex").Collection("cursors")

func (c *Crawler) updateLastPolledTime(lastPolledTime time.Time) error {
filter := bson.M{"service": "crawler"}
update := bson.M{"$set": bson.M{"s3RawLastPolledTime": lastPolledTime}}
opts := options.Update().SetUpsert(true)

_, err := collection.UpdateOne(ctx, filter, update, opts)
_, err := c.cursorsColl.UpdateOne(c.ctx, filter, update, opts)
return err
}

func getLastPolledTime(client *mongo.Client, ctx context.Context) (time.Time, error) {
collection := client.Database("ddex").Collection("cursors")

func (c *Crawler) getLastPolledTime() (time.Time, error) {
var result struct {
LastPolledTime time.Time `bson:"s3RawLastPolledTime"`
}
filter := bson.M{"service": "crawler"}

err := collection.FindOne(ctx, filter).Decode(&result)
err := c.cursorsColl.FindOne(c.ctx, filter).Decode(&result)
if err != nil {
if err == mongo.ErrNoDocuments {
// No record found, return zero time
Expand All @@ -92,9 +109,9 @@ func getLastPolledTime(client *mongo.Client, ctx context.Context) (time.Time, er
return result.LastPolledTime, nil
}

func pollS3Bucket(s3Client *s3.S3, bucketName string, lastPolledTime time.Time) ([]*s3.Object, error) {
resp, err := s3Client.ListObjectsV2(&s3.ListObjectsV2Input{
Bucket: aws.String(bucketName),
func (c *Crawler) pollS3Bucket(lastPolledTime time.Time) ([]*s3.Object, error) {
resp, err := c.s3Client.ListObjectsV2(&s3.ListObjectsV2Input{
Bucket: aws.String(c.rawBucket),
})
if err != nil {
return nil, err
Expand All @@ -109,16 +126,15 @@ func pollS3Bucket(s3Client *s3.S3, bucketName string, lastPolledTime time.Time)
return newUploads, nil
}

func persistUploads(client *mongo.Client, bucket string, uploads []*s3.Object, ctx context.Context) error {
uploadsColl := client.Database("ddex").Collection("uploads")
func (c *Crawler) persistUploads(uploads []*s3.Object) error {
for _, upload := range uploads {
path := "s3://" + bucket + "/" + *upload.Key
path := "s3://" + c.rawBucket + "/" + *upload.Key
etag := strings.Trim(*upload.ETag, "\"")
// Only insert if a document doesn't already exist with this path and etag
filter := bson.M{"path": path, "upload_etag": etag}
update := bson.M{"$setOnInsert": bson.M{"path": path, "upload_etag": etag, "created_at": upload.LastModified}}
opts := options.Update().SetUpsert(true)
_, err := uploadsColl.UpdateOne(ctx, filter, update, opts)
_, err := c.uploadsColl.UpdateOne(c.ctx, filter, update, opts)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 1e99ac8

Please sign in to comment.