Skip to content

Commit

Permalink
[PROTO-1669] Find Mongo changes when offline and clean up DDEX (#7680)
Browse files Browse the repository at this point in the history
  • Loading branch information
theoilie authored Feb 22, 2024
1 parent 043c3a5 commit 336dc7a
Show file tree
Hide file tree
Showing 4 changed files with 193 additions and 160 deletions.
121 changes: 121 additions & 0 deletions packages/ddex/ingester/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,116 @@ import (
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)

type Cursor struct {
CollectionName string `bson:"collection_name"`
ResumeToken bson.Raw `bson:"resume_token"`
LastUpdated time.Time `bson:"last_updated"`
}

type Ingester interface {
GetResumeToken(ctx context.Context, collectionName string) (bson.Raw, error)
UpdateResumeToken(ctx context.Context, collectionName string, resumeToken bson.Raw) error
}

type BaseIngester struct {
Ctx context.Context
MongoClient *mongo.Client
S3Client *s3.S3
S3Downloader *s3manager.Downloader
S3Uploader *s3manager.Uploader
RawBucket string
IndexedBucket string
CursorsColl *mongo.Collection
UploadsColl *mongo.Collection
DeliveriesColl *mongo.Collection
PendingReleasesColl *mongo.Collection
UsersColl *mongo.Collection
Logger *slog.Logger
}

func NewBaseIngester(ctx context.Context, service string) *BaseIngester {
logger := slog.With("service", service)
s3, s3Session := InitS3Client(logger)
mongoClient := InitMongoClient(ctx, logger)

return &BaseIngester{
S3Client: s3,
S3Downloader: s3manager.NewDownloader(s3Session),
S3Uploader: s3manager.NewUploader(s3Session),
MongoClient: mongoClient,
RawBucket: MustGetenv("AWS_BUCKET_RAW"),
IndexedBucket: MustGetenv("AWS_BUCKET_INDEXED"),
CursorsColl: mongoClient.Database("ddex").Collection("cursors"),
UploadsColl: mongoClient.Database("ddex").Collection("uploads"),
DeliveriesColl: mongoClient.Database("ddex").Collection("deliveries"),
PendingReleasesColl: mongoClient.Database("ddex").Collection("pending_releases"),
UsersColl: mongoClient.Database("ddex").Collection("users"),
Ctx: ctx,
Logger: logger,
}
}

// ProcessChangeStream watches a collection for new insert operations and processes them with the provided function.
func (bi *BaseIngester) ProcessChangeStream(c *mongo.Collection, exec func(*mongo.ChangeStream)) {
p := mongo.Pipeline{bson.D{{Key: "$match", Value: bson.D{{Key: "operationType", Value: "insert"}}}}}
rt, _ := bi.GetResumeToken(c.Name())
var opts *options.ChangeStreamOptions
if rt == nil {
// Start at the beginning (oldest timestamp in oplog because the time has to be in the oplog for the resume token to work)
oldestOplogTimestamp, err := getOldestOplogTimestamp(bi.MongoClient)
if err != nil {
log.Fatal(err)
}
bi.Logger.Info("Starting at oldest oplog timestamp", "timestamp", oldestOplogTimestamp)
opts = options.ChangeStream().SetStartAtOperationTime(&oldestOplogTimestamp)
} else {
// Note: if the collection is dropped and recreated, inserts will still be replayed from this resume token
bi.Logger.Info("Resuming from resume token", "resume_token", rt)
opts = options.ChangeStream().SetStartAfter(rt)
}

cs, err := c.Watch(bi.Ctx, p, opts)
if err != nil {
panic(err)
}
bi.Logger.Info("Watching collection", "collection", c.Name())
defer cs.Close(bi.Ctx)

for cs.Next(bi.Ctx) {
exec(cs)
rt := cs.ResumeToken()
if err := bi.UpdateResumeToken(c.Name(), rt); err != nil {
log.Fatalf("Failed to update resume token for '%s': %v", c.Name(), err)
}
}

if err := cs.Err(); err != nil {
log.Fatal(err)
}
}

func (bi *BaseIngester) GetResumeToken(collectionName string) (bson.Raw, error) {
var cursorDoc Cursor
if err := bi.CursorsColl.FindOne(bi.Ctx, bson.M{"collection_name": collectionName}).Decode(&cursorDoc); err != nil {
return nil, err
}
return cursorDoc.ResumeToken, nil
}

func (bi *BaseIngester) UpdateResumeToken(collectionName string, resumeToken bson.Raw) error {
filter := bson.M{"collection_name": collectionName}
update := bson.M{"$set": Cursor{CollectionName: collectionName, ResumeToken: resumeToken, LastUpdated: time.Now()}}
_, err := bi.CursorsColl.UpdateOne(bi.Ctx, filter, update, options.Update().SetUpsert(true))
return err
}

func InitMongoClient(ctx context.Context, logger *slog.Logger) *mongo.Client {
mongoUrl := os.Getenv("DDEX_MONGODB_URL")
if mongoUrl == "" {
Expand Down Expand Up @@ -53,3 +159,18 @@ func MustGetenv(key string) string {
}
return val
}

func getOldestOplogTimestamp(client *mongo.Client) (primitive.Timestamp, error) {
var result struct {
TS primitive.Timestamp `bson:"ts"`
}

oplogCollection := client.Database("local").Collection("oplog.rs")
opts := options.FindOne().SetSort(bson.D{{Key: "ts", Value: 1}})
err := oplogCollection.FindOne(context.TODO(), bson.D{}, opts).Decode(&result)
if err != nil {
return primitive.Timestamp{}, err
}

return result.TS, nil
}
45 changes: 14 additions & 31 deletions packages/ddex/ingester/crawler/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"ingester/common"
"log/slog"
"strings"
"time"

Expand All @@ -16,30 +15,14 @@ import (
)

type Crawler struct {
s3Client *s3.S3
mongoClient *mongo.Client
rawBucket string
cursorsColl *mongo.Collection
uploadsColl *mongo.Collection
ctx context.Context
logger *slog.Logger
*common.BaseIngester
}

func RunNewCrawler(ctx context.Context) {
logger := slog.With("service", "crawler")
s3Client, _ := common.InitS3Client(logger)
mongoClient := common.InitMongoClient(ctx, logger)
defer mongoClient.Disconnect(ctx)

c := &Crawler{
s3Client: s3Client,
mongoClient: mongoClient,
rawBucket: common.MustGetenv("AWS_BUCKET_RAW"),
cursorsColl: mongoClient.Database("ddex").Collection("cursors"),
uploadsColl: mongoClient.Database("ddex").Collection("uploads"),
ctx: ctx,
logger: logger,
BaseIngester: common.NewBaseIngester(ctx, "crawler"),
}
defer c.MongoClient.Disconnect(ctx)

ticker := time.NewTicker(3 * time.Minute)
defer ticker.Stop()
Expand All @@ -51,28 +34,28 @@ func RunNewCrawler(ctx context.Context) {
case <-ticker.C:
lastPolledTime, err := c.getLastPolledTime()
if err != nil {
c.logger.Error("Failed to retrieve s3 raw bucket's last polled time", "error", err)
c.Logger.Error("Failed to retrieve s3 raw bucket's last polled time", "error", err)
continue
}

uploads, err := c.pollS3Bucket(lastPolledTime)
if err != nil {
c.logger.Error("Error polling S3 bucket", "error", err)
c.Logger.Error("Error polling S3 bucket", "error", err)
continue
}

if len(uploads) > 0 {
err = c.persistUploads(uploads)
if err != nil {
c.logger.Error("Error inserting into mongodb", "error", err)
c.Logger.Error("Error inserting into mongodb", "error", err)
continue
}
c.logger.Info(fmt.Sprintf("Processed %d new uploads", len(uploads)))
c.Logger.Info(fmt.Sprintf("Processed %d new uploads", len(uploads)))
}

err = c.updateLastPolledTime(time.Now())
if err != nil {
c.logger.Error("Failed to update s3 raw bucket's last polled time", "error", err)
c.Logger.Error("Failed to update s3 raw bucket's last polled time", "error", err)
}

if ctx.Err() != nil {
Expand All @@ -87,7 +70,7 @@ func (c *Crawler) updateLastPolledTime(lastPolledTime time.Time) error {
update := bson.M{"$set": bson.M{"s3RawLastPolledTime": lastPolledTime}}
opts := options.Update().SetUpsert(true)

_, err := c.cursorsColl.UpdateOne(c.ctx, filter, update, opts)
_, err := c.CursorsColl.UpdateOne(c.Ctx, filter, update, opts)
return err
}

Expand All @@ -97,7 +80,7 @@ func (c *Crawler) getLastPolledTime() (time.Time, error) {
}
filter := bson.M{"service": "crawler"}

err := c.cursorsColl.FindOne(c.ctx, filter).Decode(&result)
err := c.CursorsColl.FindOne(c.Ctx, filter).Decode(&result)
if err != nil {
if err == mongo.ErrNoDocuments {
// No record found, return zero time
Expand All @@ -110,8 +93,8 @@ func (c *Crawler) getLastPolledTime() (time.Time, error) {
}

func (c *Crawler) pollS3Bucket(lastPolledTime time.Time) ([]*s3.Object, error) {
resp, err := c.s3Client.ListObjectsV2(&s3.ListObjectsV2Input{
Bucket: aws.String(c.rawBucket),
resp, err := c.S3Client.ListObjectsV2(&s3.ListObjectsV2Input{
Bucket: aws.String(c.RawBucket),
})
if err != nil {
return nil, err
Expand All @@ -128,13 +111,13 @@ func (c *Crawler) pollS3Bucket(lastPolledTime time.Time) ([]*s3.Object, error) {

func (c *Crawler) persistUploads(uploads []*s3.Object) error {
for _, upload := range uploads {
path := "s3://" + c.rawBucket + "/" + *upload.Key
path := "s3://" + c.RawBucket + "/" + *upload.Key
etag := strings.Trim(*upload.ETag, "\"")
// Only insert if a document doesn't already exist with this path and etag
filter := bson.M{"path": path, "upload_etag": etag}
update := bson.M{"$setOnInsert": bson.M{"path": path, "upload_etag": etag, "created_at": upload.LastModified}}
opts := options.Update().SetUpsert(true)
_, err := c.uploadsColl.UpdateOne(c.ctx, filter, update, opts)
_, err := c.UploadsColl.UpdateOne(c.Ctx, filter, update, opts)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 336dc7a

Please sign in to comment.