Skip to content

Commit

Permalink
DDEX indexer -> publisher talking (#7452)
Browse files Browse the repository at this point in the history
  • Loading branch information
michellebrier authored Feb 5, 2024
1 parent 109be52 commit 9f169e7
Show file tree
Hide file tree
Showing 8 changed files with 202 additions and 16 deletions.
5 changes: 3 additions & 2 deletions packages/ddex/README.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
# Audius DDEX

Ingests, parses, and uploads DDEX uploads.
Processes and uploads DDEX releases to Audius.

## Local Dev
DDEX requires these services: `ddex-webapp`, `ddex-crawler`, `ddex-indexer`, `ddex-parser`, `ddex-publisher`, `ddex-mongo`.

### Setup
1. (At the monorepo root) Generate a keyfile for mongo:
1. (At the monorepo root) Generate a keyfile for mongodb:
```
openssl rand -base64 756 > packages/ddex/mongo-keyfile
chmod 400 packages/ddex/mongo-keyfile
Expand All @@ -17,6 +17,7 @@ chmod 400 packages/ddex/mongo-keyfile

### Bring up the ddex stack subsequently
`audius-compose up --ddex`

Note: `audius-compose down` removes the `ddex-mongo-db` volume, so if you run this, you will need to initiate the mongodb replica set again the next time you bring up the `ddex-mongo` container. See step 4 in the Setup section above.

To access the ddex db via the mongo shell: `docker exec -it ddex-mongo mongosh -u mongo -p mongo --authenticationDatabase admin` then `use ddex`
Expand Down
8 changes: 8 additions & 0 deletions packages/ddex/ingester/constants/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package constants

const (
DeliveryStatusError = "error"
DeliveryStatusRejected = "rejected"
DeliveryStatusValidating = "validating"
DeliveryStatusAwaitingPublishing = "awaiting_publishing"
)
48 changes: 41 additions & 7 deletions packages/ddex/ingester/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package indexer

import (
"context"
"fmt"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"ingester/constants"
"log"
"os"
)

Expand All @@ -17,19 +19,51 @@ func Run() {
if err != nil {
panic(err)
}
fmt.Println("Indexer: connected to mongo")
log.Println("Indexer: connected to mongo")
defer client.Disconnect(context.Background())

coll := client.Database("ddex").Collection("uploads")
changeStream, err := coll.Watch(context.Background(), mongo.Pipeline{})
uploadsColl := client.Database("ddex").Collection("uploads")
changeStream, err := uploadsColl.Watch(context.Background(), mongo.Pipeline{})
if err != nil {
panic(err)
}
fmt.Println("Indexer: watching collection 'uploads'")
log.Println("Indexer: watching collection 'uploads'")
defer changeStream.Close(context.Background())

for changeStream.Next(context.Background()) {
fmt.Printf("Indexer: received change event: %v\n", changeStream.Current)
// TODO process the event
var changeDoc bson.M
if err := changeStream.Decode(&changeDoc); err != nil {
log.Fatal(err)
}
fullDocument, _ := changeDoc["fullDocument"].(bson.M)
indexUpload(client, fullDocument)
}

if err := changeStream.Err(); err != nil {
log.Fatal(err)
}
}

func indexUpload(client *mongo.Client, fullDocument bson.M) {
log.Printf("Indexed: Processing new upload: %v\n", fullDocument)
indexedColl := client.Database("ddex").Collection("indexed")

delete(fullDocument, "_id")

// TODO process upload:
// 1. download zip from raw s3 bucket
// 2. unzip
// 3. upload files to indexed s3 bucket

// Write delivery to 'indexed' collection
fullDocument["delivery_status"] = constants.DeliveryStatusValidating
// TODO download xml from bucket
// fullDocument["delivery_xml"] = ...

result, err := indexedColl.InsertOne(context.Background(), fullDocument)
if err != nil {
log.Fatal(err)
}

log.Println("Indexer: New indexed doc ID: ", result.InsertedID)
}
85 changes: 78 additions & 7 deletions packages/ddex/ingester/parser/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@ package parser

import (
"context"
"fmt"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"ingester/constants"
"log"
"os"
"time"
)

func Run() {
Expand All @@ -17,19 +21,86 @@ func Run() {
if err != nil {
panic(err)
}
fmt.Println("Parser: connected to mongo")
log.Println("Parser: connected to mongo")
defer client.Disconnect(context.Background())

coll := client.Database("ddex").Collection("indexed")
changeStream, err := coll.Watch(context.Background(), mongo.Pipeline{})
indexedColl := client.Database("ddex").Collection("indexed")
pipeline := mongo.Pipeline{bson.D{{"$match", bson.D{{"operationType", "insert"}}}}}
changeStream, err := indexedColl.Watch(context.Background(), pipeline)
if err != nil {
panic(err)
}
fmt.Println("Parser: watching collection 'indexed'")
log.Println("Parser: watching collection 'indexed'")
defer changeStream.Close(context.Background())

for changeStream.Next(context.Background()) {
fmt.Printf("Parser: received change event: %v\n", changeStream.Current)
// TODO process the event
var changeDoc bson.M
if err := changeStream.Decode(&changeDoc); err != nil {
log.Fatal(err)
}
fullDocument, _ := changeDoc["fullDocument"].(bson.M)
parseIndexed(client, fullDocument)
}

if err := changeStream.Err(); err != nil {
log.Fatal(err)
}
}

func parseIndexed(client *mongo.Client, fullDocument bson.M) {
log.Printf("Parser: Processing new indexed document: %v\n", fullDocument)
indexedColl := client.Database("ddex").Collection("indexed")
parsedColl := client.Database("ddex").Collection("parsed")

// TODO process the delivery xml
// 1. Validate all referenced audio/image files exist within the delivery

session, err := client.StartSession()
if err != nil {
failAndUpdateStatus(err, indexedColl, context.Background(), fullDocument["_id"].(primitive.ObjectID))
}
err = mongo.WithSession(context.Background(), session, func(sessionContext mongo.SessionContext) error {
if err := session.StartTransaction(); err != nil {
return err
}

// 2. Write each release in "delivery_xml" in the indexed doc as a bson doc in the 'parsed' collection
parsedDoc := bson.M{
"delivery_id": fullDocument["delivery_id"],
"entity": "track",
"publish_date": time.Now(),
}
result, err := parsedColl.InsertOne(context.Background(), parsedDoc)
if err != nil {
session.AbortTransaction(sessionContext)
return err
}
log.Println("Parser: New parsed release doc ID: ", result.InsertedID)

// 3. Set delivery status for delivery in 'indexed' collection
err = setDeliveryStatus(indexedColl, sessionContext, fullDocument["_id"].(primitive.ObjectID), constants.DeliveryStatusAwaitingPublishing)
if err != nil {
session.AbortTransaction(sessionContext)
return err
}

return session.CommitTransaction(sessionContext)
})

if err != nil {
failAndUpdateStatus(err, indexedColl, context.Background(), fullDocument["_id"].(primitive.ObjectID))
}

session.EndSession(context.Background())
}

func setDeliveryStatus(collection *mongo.Collection, context context.Context, documentId primitive.ObjectID, status string) error {
update := bson.M{"$set": bson.M{"delivery_status": status}}
_, err := collection.UpdateByID(context, documentId, update)
return err
}

func failAndUpdateStatus(err error, collection *mongo.Collection, context context.Context, documentId primitive.ObjectID) {
setDeliveryStatus(collection, context, documentId, constants.DeliveryStatusError)
log.Fatal(err)
}
3 changes: 3 additions & 0 deletions packages/ddex/publisher/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ dotenv.config({ path: path.resolve(process.cwd(), envFile) })

import createApp from './app'
import { dialDb } from './services/dbService'
import { publishReleases } from './services/publisherService'

const port = process.env.DDEX_PORT || 9001

Expand All @@ -19,6 +20,8 @@ const port = process.env.DDEX_PORT || 9001

const app = createApp()

publishReleases()

app.listen(port, () => {
console.log(`[server]: Server is running at http://localhost:${port}`)
})
Expand Down
12 changes: 12 additions & 0 deletions packages/ddex/publisher/src/models/parsed.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import mongoose from 'mongoose'

// Releases that parsed from indexed DDEX deliveries, awaiting publishing
const parsedSchema = new mongoose.Schema({
delivery_id: String,
entity: String,
publish_date: Date,
})

const Parsed = mongoose.model('Parsed', parsedSchema, 'parsed')

export default Parsed
13 changes: 13 additions & 0 deletions packages/ddex/publisher/src/models/published.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import mongoose from 'mongoose'

// DDEX releases that have been published
const publishedSchema = new mongoose.Schema({
delivery_id: String,
entity: String,
publish_date: Date,
track_id: String,
})

const Published = mongoose.model('Published', publishedSchema, 'published')

export default Published
44 changes: 44 additions & 0 deletions packages/ddex/publisher/src/services/publisherService.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import mongoose from 'mongoose'
import Parsed from '../models/parsed'
import Published from '../models/published'

export const publishReleases = async () => {
// eslint-disable-next-line no-constant-condition
while (true) {
const currentDate = new Date()

const session = await mongoose.startSession()
session.startTransaction()

try {
const documents = await Parsed.find({
publish_date: { $lte: currentDate },
}).session(session)

for (const doc of documents) {
// TODO publish release using SDK

// Move document to 'published' collection
const publishedData = {
...doc.toObject(),
track_id: 'todo',
}
const publishedDoc = new Published(publishedData)
await publishedDoc.save({ session })
await Parsed.deleteOne({ _id: doc._id }).session(session)
// TODO update indexed delivery_status to 'published'
console.log('Published release: ', publishedData)
}

await session.commitTransaction()
} catch (error) {
console.error('Error publishing release, rolling back.', error)
await session.abortTransaction()
} finally {
session.endSession()
}

// 10 seconds
await new Promise((resolve) => setTimeout(resolve, 10000))
}
}

0 comments on commit 9f169e7

Please sign in to comment.