Skip to content

Commit

Permalink
[PROTO-1621] Implement ddex indexer (#7486)
Browse files Browse the repository at this point in the history
  • Loading branch information
theoilie authored Feb 7, 2024
1 parent de047ad commit de70135
Show file tree
Hide file tree
Showing 12 changed files with 292 additions and 195 deletions.
4 changes: 2 additions & 2 deletions packages/ddex/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ chmod 400 packages/ddex/mongo-keyfile
```
2. `audius-compose connect` to update your /etc/hosts
3. `audius-compose up --ddex`
4. Once the `ddex-mongo` container is running: manually intiate the mongodb replica set with `docker exec -it ddex-mongo mongosh -u mongo -p mongo --authenticationDatabase admin --eval 'rs.initiate({_id:"rs0", members:[{_id:0, host:"ddex-mongo:27017"}]})`. The other ddex containers will be blocked from starting until this command succeeds.
4. Once the `ddex-mongo` container is running: manually intiate the mongodb replica set with `docker exec -it ddex-mongo mongosh -u mongo -p mongo --authenticationDatabase admin --eval 'rs.initiate({_id:"rs0", members:[{_id:0, host:"ddex-mongo:27017"}]})'`. The other ddex containers will be blocked from starting until this command succeeds.

### Bring up the ddex stack subsequently
`audius-compose up --ddex`
Expand All @@ -32,7 +32,7 @@ Note: `audius-compose down` removes the `ddex-mongo-db` volume, so if you run th
To access the ddex db via the mongo shell: `docker exec -it ddex-mongo mongosh -u mongo -p mongo --authenticationDatabase admin` then `use ddex`

### Develop with hot reloading
Each service can be run independently as long as `ddex-mongo` is up. See the respective subdirectories' READMEs.
Each service can be run independently as long as `ddex-mongo` is up (from `audius-compose up --ddex`` or `docker compose --project-directory=dev-tools/compose --file=dev-tools/compose/docker-compose.yml --profile=ddex up ddex-mongo -d`). See the respective subdirectories' READMEs.

### Creating a bucket in S3
1. Create a new bucket in the S3 console with the name `ddex-[dev|staging]-<label/distributor>-raw`. Use all the defaults, including "ACLs disabled"
Expand Down
6 changes: 5 additions & 1 deletion packages/ddex/ingester/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"ingester/indexer"
"ingester/parser"
"log"
"log/slog"
"os"
"os/signal"
"syscall"
Expand All @@ -30,6 +31,9 @@ func main() {
cancel()
}()

logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{AddSource: true}))
slog.SetDefault(logger)

err := godotenv.Load("../.env")
if err != nil {
if os.IsNotExist(err) {
Expand All @@ -43,7 +47,7 @@ func main() {
case "crawler":
go crawler.Run(ctx)
case "indexer":
go indexer.Run(ctx)
go indexer.RunNewIndexer(ctx)
case "parser":
go parser.Run(ctx)
default:
Expand Down
4 changes: 2 additions & 2 deletions packages/ddex/ingester/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func InitMongoClient(ctx context.Context) *mongo.Client {
return client
}

func InitS3Client() *s3.S3 {
func InitS3Client() (*s3.S3, *session.Session) {
awsRegion := MustGetenv("AWS_REGION")
awsKey := MustGetenv("AWS_ACCESS_KEY_ID")
awsSecret := MustGetenv("AWS_SECRET_ACCESS_KEY")
Expand All @@ -39,7 +39,7 @@ func InitS3Client() *s3.S3 {
panic(err)
}
log.Println("Connected to s3")
return s3.New(sess)
return s3.New(sess), sess
}

func MustGetenv(key string) string {
Expand Down
6 changes: 3 additions & 3 deletions packages/ddex/ingester/crawler/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
)

func Run(ctx context.Context) {
s3Client := common.InitS3Client()
s3Client, _ := common.InitS3Client()
mongoClient := common.InitMongoClient(ctx)
defer mongoClient.Disconnect(ctx)

Expand Down Expand Up @@ -115,8 +115,8 @@ func persistUploads(client *mongo.Client, bucket string, uploads []*s3.Object, c
path := "s3://" + bucket + "/" + *upload.Key
etag := strings.Trim(*upload.ETag, "\"")
// Only insert if a document doesn't already exist with this path and etag
filter := bson.M{"path": path, "delivery_etag": etag}
update := bson.M{"$setOnInsert": bson.M{"path": path, "delivery_etag": etag}}
filter := bson.M{"path": path, "upload_etag": etag}
update := bson.M{"$setOnInsert": bson.M{"path": path, "upload_etag": etag}}
opts := options.Update().SetUpsert(true)
_, err := uploadsColl.UpdateOne(ctx, filter, update, opts)
if err != nil {
Expand Down
31 changes: 10 additions & 21 deletions packages/ddex/ingester/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,36 +3,25 @@ module ingester
go 1.21.5

require (
dario.cat/mergo v1.0.0 // indirect
github.com/aws/aws-sdk-go v1.50.11 // indirect
github.com/bep/godartsass v1.2.0 // indirect
github.com/bep/godartsass/v2 v2.0.0 // indirect
github.com/bep/golibsass v1.1.1 // indirect
github.com/cli/safeexec v1.0.1 // indirect
github.com/cosmtrek/air v1.49.0 // indirect
github.com/creack/pty v1.1.21 // indirect
github.com/fatih/color v1.16.0 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/gohugoio/hugo v0.122.0 // indirect
github.com/aws/aws-sdk-go v1.50.11
github.com/joho/godotenv v1.5.1
go.mongodb.org/mongo-driver v1.13.1
)

require (
github.com/golang/snappy v0.0.1 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/joho/godotenv v1.5.1 // indirect
github.com/klauspost/compress v1.13.6 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect
github.com/pelletier/go-toml v1.9.5 // indirect
github.com/pelletier/go-toml/v2 v2.1.1 // indirect
github.com/spf13/afero v1.11.0 // indirect
github.com/tdewolff/parse/v2 v2.7.11 // indirect
github.com/oklog/ulid/v2 v2.1.0
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.1.2 // indirect
github.com/xdg-go/stringprep v1.0.4 // indirect
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect
go.mongodb.org/mongo-driver v1.13.1 // indirect
golang.org/x/crypto v0.18.0 // indirect
golang.org/x/net v0.20.0 // indirect
golang.org/x/sync v0.6.0 // indirect
golang.org/x/sys v0.16.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/protobuf v1.32.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
)
Loading

0 comments on commit de70135

Please sign in to comment.