diff --git a/.gitignore b/.gitignore index 0d780dea..2b3ab4ef 100644 --- a/.gitignore +++ b/.gitignore @@ -24,6 +24,7 @@ peer.key wallet/ duplicateGuard/ pinQueue/ +stagingdata/ cidlistsdir/* diff --git a/cmd/estuary-shuttle/rpc.go b/cmd/estuary-shuttle/rpc.go index 70cb4c1f..34ea0905 100644 --- a/cmd/estuary-shuttle/rpc.go +++ b/cmd/estuary-shuttle/rpc.go @@ -321,7 +321,7 @@ func (d *Shuttle) handleRpcTakeContent(ctx context.Context, cmd *drpc.TakeConten } func (s *Shuttle) handleRpcAggregateStagedContent(ctx context.Context, aggregate *drpc.AggregateContents) error { - // only progress if aggr is not allready in progress + // only progress if aggr is not already in progress if !s.markStartAggr(aggregate.DBID) { return nil } diff --git a/contentmgr/gc.go b/contentmgr/gc.go index 86e3ffc7..9bb45fd7 100644 --- a/contentmgr/gc.go +++ b/contentmgr/gc.go @@ -3,7 +3,6 @@ package contentmgr import ( "context" "fmt" - "github.com/application-research/estuary/util" "github.com/ipfs/go-cid" "golang.org/x/xerrors" @@ -152,18 +151,17 @@ func (cm *ContentManager) UnpinContent(ctx context.Context, contid uint) error { return err } - if err := cm.DB.Delete(&util.Content{ID: pin.ID}).Error; err != nil { - return err - } - + // delete object refs rows for deleted content if err := cm.DB.Where("content = ?", pin.ID).Delete(&util.ObjRef{}).Error; err != nil { return err } + // delete objects that no longer have obj refs if err := cm.clearUnreferencedObjects(ctx, objs); err != nil { return err } + // delete objects from blockstore that are no longer present in db for _, o := range objs { // TODO: this is safe, but... slow? if _, err := cm.deleteIfNotPinned(ctx, o); err != nil { @@ -171,14 +169,28 @@ func (cm *ContentManager) UnpinContent(ctx context.Context, contid uint) error { } } - buckets, ok := cm.Buckets[pin.UserID] - if ok { - for _, bucket := range buckets { - if _, err := cm.tryRemoveContent(bucket, pin); err != nil { + // delete from contents table and adjust aggregate size, if applicable, in one tx + err = cm.DB.Transaction(func(tx *gorm.DB) error { + // delete contid row from contents table + if err := tx.Model(util.Content{}).Delete(&util.Content{ID: pin.ID}).Error; err != nil { + return err + } + + if pin.AggregatedIn > 0 { + // decrease aggregate's size by cont's size in contents table + if err := tx.Model(util.Content{}). + Where("id = ?", pin.AggregatedIn). + UpdateColumn("size", gorm.Expr("size - ?", pin.Size)).Error; err != nil { return err } } + + return nil + }) + if err != nil { + return err } + return nil } diff --git a/contentmgr/pinning.go b/contentmgr/pinning.go index 054b8a83..ece456b5 100644 --- a/contentmgr/pinning.go +++ b/contentmgr/pinning.go @@ -336,10 +336,8 @@ func (cm *ContentManager) selectLocationForRetrieval(ctx context.Context, cont u } func (cm *ContentManager) primaryStagingLocation(ctx context.Context, uid uint) string { - cm.BucketLk.Lock() - defer cm.BucketLk.Unlock() - zones, ok := cm.Buckets[uid] - if !ok { + var zones []util.Content + if err := cm.DB.Find(&zones, "user_id = ? and aggregate", uid).Error; err != nil { return "" } @@ -428,6 +426,10 @@ func (cm *ContentManager) handlePinningComplete(ctx context.Context, handle stri }).Error; err != nil { return xerrors.Errorf("failed to update content in database: %w", err) } + + // if the content is a consolidated aggregate, it means aggregation has been completed and we can mark as finished + cm.MarkFinishedAggregating(cont) + // after aggregate is done, make deal for it cm.ToCheck(cont.ID) return nil diff --git a/contentmgr/replication.go b/contentmgr/replication.go index 02d5a77f..76ee7862 100644 --- a/contentmgr/replication.go +++ b/contentmgr/replication.go @@ -79,9 +79,13 @@ type ContentManager struct { retrLk sync.Mutex retrievalsInProgress map[uint]*util.RetrievalProgress contentLk sync.RWMutex - // deal bucketing stuff - BucketLk sync.Mutex - Buckets map[uint][]*ContentStagingZone + // consolidatingZonesLk is used to serialize reads and writes to consolidatingZones + consolidatingZonesLk sync.Mutex + // aggregatingZonesLk is used to serialize reads and writes to aggregatingZones + aggregatingZonesLk sync.Mutex + // addStagingContentLk is used to serialize content adds to staging zones + // otherwise, we'd risk creating multiple "initial" staging zones, or exceeding MaxDealContentSize + addStagingContentLk sync.Mutex // some behavior flags FailDealOnTransferFailure bool dealDisabledLk sync.Mutex @@ -96,6 +100,8 @@ type ContentManager struct { IncomingRPCMessages chan *drpc.Message minerManager miner.IMinerManager log *zap.SugaredLogger + consolidatingZones map[uint]bool + aggregatingZones map[uint]bool } func (cm *ContentManager) isInflight(c cid.Cid) bool { @@ -144,7 +150,7 @@ func (cb *ContentStagingZone) DeepCopy() *ContentStagingZone { return cb2 } -func (cm *ContentManager) newContentStagingZone(user uint, loc string) (*ContentStagingZone, error) { +func (cm *ContentManager) newContentStagingZone(user uint, loc string) (*util.Content, error) { content := &util.Content{ Size: 0, Name: "aggregate", @@ -160,99 +166,36 @@ func (cm *ContentManager) newContentStagingZone(user uint, loc string) (*Content return nil, err } - return &ContentStagingZone{ - ZoneOpened: content.CreatedAt, - MinSize: cm.cfg.Content.MinSize, - MaxSize: cm.cfg.Content.MaxSize, - User: user, - ContID: content.ID, - Location: content.Location, - Readiness: stagingZoneReadiness{false, "Readiness not yet evaluated"}, - }, nil -} - -func (cb *ContentStagingZone) updateReadiness() { - // if it's above the size requirement, go right ahead - if cb.CurSize > cb.MinSize { - cb.Readiness.IsReady = true - cb.Readiness.ReadinessReason = fmt.Sprintf( - "Current deal size of %d bytes is above minimum size requirement of %d bytes", - cb.CurSize, - cb.MinSize) - return - } - - cb.Readiness.IsReady = false - cb.Readiness.ReadinessReason = fmt.Sprintf( - "Minimum size requirement not met (current: %d bytes, minimum: %d bytes)\n", - cb.CurSize, - cb.MinSize) + return content, nil } -func (cm *ContentManager) tryAddContent(cb *ContentStagingZone, c util.Content) (bool, error) { - cb.lk.Lock() - defer cb.lk.Unlock() - defer cb.updateReadiness() - - // TODO: propagate reason for failing to add content to bucket to end user - - // if this bucket is being consolidated, do not add anymore content - if cb.IsConsolidating { +func (cm *ContentManager) tryAddContent(zone util.Content, c util.Content) (bool, error) { + // if this bucket is being consolidated or aggregated, do not add anymore content + if cm.IsZoneConsolidating(zone.ID) || cm.IsZoneAggregating(zone.ID) { return false, nil } - if cb.CurSize+c.Size > cb.MaxSize { - return false, nil - } - - if err := cm.DB.Model(util.Content{}). - Where("id = ?", c.ID). - UpdateColumn("aggregated_in", cb.ContID).Error; err != nil { - return false, err - } - - cb.Contents = append(cb.Contents, c) - cb.CurSize += c.Size - return true, nil -} - -// tryRemoveContent Removes content from in-memory buckets -// Assumes content is already removed from DB -func (cm *ContentManager) tryRemoveContent(cb *ContentStagingZone, c util.Content) (bool, error) { - cb.lk.Lock() - defer cb.lk.Unlock() - - // if this bucket is being consolidated, do not remove content - if cb.IsConsolidating { - return false, nil - } + err := cm.DB.Transaction(func(tx *gorm.DB) error { + if err := tx.Model(util.Content{}). + Where("id = ?", c.ID). + UpdateColumn("aggregated_in", zone.ID).Error; err != nil { + return err + } - newContents := make([]util.Content, 0) - newSize := int64(0) - for _, cont := range cb.Contents { - if cont.ID != c.ID { - newContents = append(newContents, cont) - newSize += cont.Size + if err := tx.Model(util.Content{}). + Where("id = ?", zone.ID). + UpdateColumn("size", gorm.Expr("size + ?", c.Size)).Error; err != nil { + return err } + return nil + }) + if err != nil { + return false, err } - cb.Contents = newContents - cb.CurSize = newSize return true, nil } -func (cb *ContentStagingZone) hasContent(c util.Content) bool { - cb.lk.Lock() - defer cb.lk.Unlock() - - for _, cont := range cb.Contents { - if cont.ID == c.ID { - return true - } - } - return false -} - func NewContentManager(db *gorm.DB, api api.Gateway, fc *filclient.FilClient, tbs *util.TrackingBlockstore, pinmgr *pinner.PinManager, nd *node.Node, cfg *config.Estuary, minerManager miner.IMinerManager, log *zap.SugaredLogger) (*ContentManager, error) { cache, err := lru.NewARC(50000) if err != nil { @@ -270,7 +213,6 @@ func NewContentManager(db *gorm.DB, api api.Gateway, fc *filclient.FilClient, tb Tracker: tbs, toCheck: make(chan uint, 100000), retrievalsInProgress: make(map[uint]*util.RetrievalProgress), - Buckets: make(map[uint][]*ContentStagingZone), PinMgr: pinmgr, remoteTransferStatus: cache, Shuttles: make(map[string]*ShuttleConnection), @@ -282,6 +224,8 @@ func NewContentManager(db *gorm.DB, api api.Gateway, fc *filclient.FilClient, tb IncomingRPCMessages: make(chan *drpc.Message, cfg.RPCMessage.IncomingQueueSize), minerManager: minerManager, log: log, + consolidatingZones: make(map[uint]bool), + aggregatingZones: make(map[uint]bool), } cm.queueMgr = newQueueManager(func(c uint) { @@ -297,6 +241,16 @@ func (cm *ContentManager) ToCheck(contID uint) { } } +func (cm *ContentManager) getReadyStagingZones() ([]util.Content, error) { + var readyZones []util.Content + if err := cm.DB.Model(&util.Content{}). + Where("not active and pinning and aggregate and size >= ?", constants.MinDealContentSize). + Find(&readyZones).Error; err != nil { + return nil, err + } + return readyZones, nil +} + func (cm *ContentManager) runStagingBucketWorker(ctx context.Context) { timer := time.NewTicker(cm.cfg.StagingBucket.AggregateInterval) for { @@ -304,10 +258,16 @@ func (cm *ContentManager) runStagingBucketWorker(ctx context.Context) { case <-timer.C: cm.log.Debugw("content check queue", "length", len(cm.queueMgr.queue.elems), "nextEvent", cm.queueMgr.nextEvent) - zones := cm.popReadyStagingZone() - for _, z := range zones { + cm.log.Debugf("checking for ready staging zones") + readyZones, err := cm.getReadyStagingZones() + if err != nil { + cm.log.Errorf("failed to get ready staging zones: %s", err) + } + + cm.log.Debugf("found ready staging zones: %d", len(readyZones)) + for _, z := range readyZones { if err := cm.processStagingZone(ctx, z); err != nil { - cm.log.Errorf("content aggregation failed (zone %d): %s", z.ContID, err) + cm.log.Errorf("content aggregation failed (zone %d): %s", z.ID, err) continue } } @@ -340,16 +300,15 @@ func (cm *ContentManager) runDealWorker(ctx context.Context) { } func (cm *ContentManager) Run(ctx context.Context) { - // if staging buckets are enabled, rebuild the buckets, and run the bucket aggregate worker + // if staging buckets are enabled, run the bucket aggregate worker if cm.cfg.StagingBucket.Enabled { - // rebuild the staging buckets - if err := cm.rebuildStagingBuckets(); err != nil { - cm.log.Fatalf("failed to rebuild staging buckets: %s", err) + // recomputing staging zone sizes + if err := cm.recomputeStagingZoneSizes(); err != nil { + cm.log.Errorf("failed to recompute staging zone sizes: %s", err) } - // run the staging bucket aggregator worker go cm.runStagingBucketWorker(ctx) - cm.log.Infof("rebuilt staging buckets and spun up staging bucket worker") + cm.log.Infof("spun up staging bucket worker") } // if FilecoinStorage is enabled, check content deals or make content deals @@ -477,24 +436,15 @@ func (qm *queueManager) processQueue() { qm.nextEvent = time.Time{} } -func (cm *ContentManager) currentLocationForContent(cntId uint) (string, error) { - var cont util.Content - if err := cm.DB.First(&cont, "id = ?", cntId).Error; err != nil { - return "", err +func (cm *ContentManager) getStagedContentsGroupedByLocation(ctx context.Context, zoneID uint) (map[string][]util.Content, error) { + var conts []util.Content + if err := cm.DB.Find(&conts, "aggregated_in = ?", zoneID).Error; err != nil { + return nil, err } - return cont.Location, nil -} -func (cm *ContentManager) getGroupedStagedContentLocations(ctx context.Context, b *ContentStagingZone) (map[string]string, error) { - out := make(map[string]string) - for _, c := range b.Contents { - // need to get current location from db, incase this stage content was a part of a consolidated content - its location would have changed. - // so we can group it into its current location - loc, err := cm.currentLocationForContent(c.ID) - if err != nil { - return nil, err - } - out[c.Location] = loc + out := make(map[string][]util.Content) + for _, c := range conts { + out[c.Location] = append(out[c.Location], c) } if len(out) == 0 { return nil, fmt.Errorf("no location for staged contents") @@ -502,22 +452,21 @@ func (cm *ContentManager) getGroupedStagedContentLocations(ctx context.Context, return out, nil } -func (cm *ContentManager) consolidateStagedContent(ctx context.Context, b *ContentStagingZone) error { +func (cm *ContentManager) consolidateStagedContent(ctx context.Context, zone util.Content) error { var dstLocation string var curMax int64 dataByLoc := make(map[string]int64) - contentByLoc := make(map[string][]util.Content) + contentByLoc, err := cm.getStagedContentsGroupedByLocation(ctx, zone.ID) + if err != nil { + return err + } - // TODO: make this one batch DB query instead of querying per content - for _, c := range b.Contents { - loc, err := cm.currentLocationForContent(c.ID) - if err != nil { - return err + for loc, contents := range contentByLoc { + var ntot int64 + for _, c := range contents { + ntot = dataByLoc[loc] + c.Size + dataByLoc[loc] = ntot } - contentByLoc[loc] = append(contentByLoc[loc], c) - - ntot := dataByLoc[loc] + c.Size - dataByLoc[loc] = ntot // temp: dont ever migrate content back to primary instance for aggregation, always prefer elsewhere if loc == constants.ContentLocationLocal { @@ -538,19 +487,71 @@ func (cm *ContentManager) consolidateStagedContent(ctx context.Context, b *Conte } } - cm.log.Debugw("consolidating content to single location for aggregation", "user", b.User, "dstLocation", dstLocation, "numItems", len(toMove), "primaryWeight", curMax) + cm.log.Debugw("consolidating content to single location for aggregation", "user", zone.UserID, "dstLocation", dstLocation, "numItems", len(toMove), "primaryWeight", curMax) if dstLocation == constants.ContentLocationLocal { + defer cm.MarkFinishedConsolidating(zone) return cm.migrateContentsToLocalNode(ctx, toMove) - } else { + } else if dstLocation != "" { return cm.SendConsolidateContentCmd(ctx, dstLocation, toMove) + } else { + // unable to find a destination location, unmark as consolidating and let it retry later + cm.log.Warnf("unable to find a destination location for consolidating zone: %d", zone.ID) + cm.MarkFinishedConsolidating(zone) + return nil + } +} + +func (cm *ContentManager) IsZoneConsolidating(zoneID uint) bool { + cm.consolidatingZonesLk.Lock() + defer cm.consolidatingZonesLk.Unlock() + return cm.consolidatingZones[zoneID] +} + +func (cm *ContentManager) MarkStartedConsolidating(zone util.Content) bool { + cm.consolidatingZonesLk.Lock() + defer cm.consolidatingZonesLk.Unlock() + if cm.consolidatingZones[zone.ID] { + // skip since it is already processing + return false } + cm.consolidatingZones[zone.ID] = true + return true +} + +func (cm *ContentManager) MarkFinishedConsolidating(zone util.Content) { + cm.consolidatingZonesLk.Lock() + delete(cm.consolidatingZones, zone.ID) + cm.consolidatingZonesLk.Unlock() +} + +func (cm *ContentManager) IsZoneAggregating(zoneID uint) bool { + cm.aggregatingZonesLk.Lock() + defer cm.aggregatingZonesLk.Unlock() + return cm.aggregatingZones[zoneID] } -func (cm *ContentManager) processStagingZone(ctx context.Context, b *ContentStagingZone) error { +func (cm *ContentManager) MarkStartedAggregating(zone util.Content) bool { + cm.aggregatingZonesLk.Lock() + defer cm.aggregatingZonesLk.Unlock() + if cm.aggregatingZones[zone.ID] { + // skip since it is already processing + return false + } + cm.aggregatingZones[zone.ID] = true + return true +} + +func (cm *ContentManager) MarkFinishedAggregating(zone util.Content) { + cm.aggregatingZonesLk.Lock() + delete(cm.aggregatingZones, zone.ID) + cm.aggregatingZonesLk.Unlock() +} + +func (cm *ContentManager) processStagingZone(ctx context.Context, zone util.Content) error { ctx, span := cm.tracer.Start(ctx, "aggregateContent") defer span.End() - grpLocs, err := cm.getGroupedStagedContentLocations(ctx, b) + grpLocs, err := cm.getStagedContentsGroupedByLocation(ctx, zone.ID) if err != nil { return err } @@ -559,42 +560,54 @@ func (cm *ContentManager) processStagingZone(ctx context.Context, b *ContentStag // Need to migrate/consolidate the contents to the same location // TODO - we should avoid doing this, best we have staging by location - this process is just to expensive if len(grpLocs) > 1 { - cm.BucketLk.Lock() // Need to migrate content all to the same shuttle // Only attempt consolidation on a zone if one is not ongoing, prevents re-consolidation request - if !b.IsConsolidating { - b.IsConsolidating = true - go func() { - if err := cm.consolidateStagedContent(ctx, b); err != nil { - cm.log.Errorf("failed to consolidate staged content: %s", err) - } - }() + + // should never be aggregating here but check anyways + if cm.IsZoneAggregating(zone.ID) || !cm.MarkStartedConsolidating(zone) { + // skip if it is aggregating or already consolidating + return nil } - // put the staging zone back in the list - cm.Buckets[b.User] = append(cm.Buckets[b.User], b) - cm.BucketLk.Unlock() + go func() { + // sometimes this call just ends in sending the take content cmd + if err := cm.consolidateStagedContent(ctx, zone); err != nil { + cm.log.Errorf("failed to consolidate staged content: %s", err) + } + }() + return nil + } + var loc string + for grpLoc := range grpLocs { + loc = grpLoc + break + } + + // if we reached here, consolidation is done and we can move to aggregating + cm.MarkFinishedConsolidating(zone) + if !cm.MarkStartedAggregating(zone) { + // skip if zone is consolidating or already aggregating return nil } + // if all contents are already in one location, proceed to aggregate them - return cm.AggregateStagingZone(ctx, b, grpLocs) + return cm.AggregateStagingZone(ctx, zone, loc) } -func (cm *ContentManager) AggregateStagingZone(ctx context.Context, z *ContentStagingZone, grpLocs map[string]string) error { +// AggregateStagingZone assumes zone is already in consolidatingZones +func (cm *ContentManager) AggregateStagingZone(ctx context.Context, zone util.Content, loc string) error { ctx, span := cm.tracer.Start(ctx, "aggregateStagingZone") defer span.End() - cm.log.Debugf("aggregating zone: %d", z.ContID) + cm.log.Debugf("aggregating zone: %d", zone.ID) - // get the aggregate content location - var loc string - for _, cl := range grpLocs { - loc = cl - break + var contents []util.Content + if err := cm.DB.Find(&contents, "aggregated_in = ?", zone.ID).Error; err != nil { + return err } if loc == constants.ContentLocationLocal { - dir, err := cm.CreateAggregate(ctx, z.Contents) + dir, err := cm.CreateAggregate(ctx, contents) if err != nil { return xerrors.Errorf("failed to create aggregate: %w", err) } @@ -606,7 +619,7 @@ func (cm *ContentManager) AggregateStagingZone(ctx context.Context, z *ContentSt } if size == 0 { - cm.log.Warnf("content %d aggregate dir apparent size is zero", z.ContID) + cm.log.Warnf("content %d aggregate dir apparent size is zero", zone.ID) } obj := &util.Object{ @@ -618,7 +631,7 @@ func (cm *ContentManager) AggregateStagingZone(ctx context.Context, z *ContentSt } if err := cm.DB.Create(&util.ObjRef{ - Content: z.ContID, + Content: zone.ID, Object: obj.ID, }).Error; err != nil { return err @@ -628,7 +641,7 @@ func (cm *ContentManager) AggregateStagingZone(ctx context.Context, z *ContentSt return err } - if err := cm.DB.Model(util.Content{}).Where("id = ?", z.ContID).UpdateColumns(map[string]interface{}{ + if err := cm.DB.Model(util.Content{}).Where("id = ?", zone.ID).UpdateColumns(map[string]interface{}{ "active": true, "pinning": false, "cid": util.DbCID{CID: ncid}, @@ -639,19 +652,22 @@ func (cm *ContentManager) AggregateStagingZone(ctx context.Context, z *ContentSt } go func() { - cm.ToCheck(z.ContID) + cm.ToCheck(zone.ID) }() + + cm.MarkFinishedAggregating(zone) + return nil } // handle aggregate on shuttle var aggrConts []drpc.AggregateContent - for _, c := range z.Contents { + for _, c := range contents { aggrConts = append(aggrConts, drpc.AggregateContent{ID: c.ID, Name: c.Name, CID: c.Cid.CID}) } var bContent util.Content - if err := cm.DB.First(&bContent, "id = ?", z.ContID).Error; err != nil { + if err := cm.DB.First(&bContent, "id = ?", zone.ID).Error; err != nil { return err } return cm.SendAggregateCmd(ctx, loc, bContent, aggrConts) @@ -687,42 +703,63 @@ func (cm *ContentManager) CreateAggregate(ctx context.Context, conts []util.Cont return dirNd, nil } -func (cm *ContentManager) rebuildStagingBuckets() error { - cm.log.Info("rebuilding staging buckets.......") +type zoneSize struct { + ID uint + Size int64 +} + +func (cm *ContentManager) recomputeStagingZoneSizes() error { + cm.log.Info("recomputing staging zone sizes .......") - var stages []util.Content - if err := cm.DB.Find(&stages, "not active and pinning and aggregate").Error; err != nil { + var storedZoneSizes []zoneSize + if err := cm.DB.Model(&util.Content{}). + Where("not active and pinning and aggregate and size = 0"). + Select("id, size"). + Find(&storedZoneSizes).Error; err != nil { return err } - zones := make(map[uint][]*ContentStagingZone) - for _, c := range stages { - var inZones []util.Content - if err := cm.DB.Find(&inZones, "aggregated_in = ?", c.ID).Error; err != nil { + if len(storedZoneSizes) > 0 { + var zoneIDs []uint + for _, zone := range storedZoneSizes { + zoneIDs = append(zoneIDs, zone.ID) + } + + var actualZoneSizes []zoneSize + if err := cm.DB.Model(&util.Content{}). + Where("aggregated_in IN ?", zoneIDs). + Select("aggregated_in, sum(size) as zoneSize"). + Group("aggregated_in"). + Select("aggregated_in as id, sum(size) as size"). + Find(&actualZoneSizes).Error; err != nil { return err } - var zSize int64 - for _, zc := range inZones { - // TODO: do some sanity checking that we havent messed up and added - // too many items to this staging zone - zSize += zc.Size + var zoneToStoredSize = make(map[uint]int64) + var toUpdate = make(map[uint]int64) + for _, zone := range storedZoneSizes { + zoneToStoredSize[zone.ID] = zone.Size + } + for _, zone := range actualZoneSizes { + storedSize := zoneToStoredSize[zone.ID] + if zone.Size != storedSize { + toUpdate[zone.ID] = zone.Size + } } - z := &ContentStagingZone{ - ZoneOpened: c.CreatedAt, - Contents: inZones, - MinSize: cm.cfg.Content.MinSize, - MaxSize: cm.cfg.Content.MaxSize, - CurSize: zSize, - User: c.UserID, - ContID: c.ID, - Location: c.Location, + for id, size := range toUpdate { + if err := cm.DB.Model(util.Content{}). + Where("id = ?", id). + UpdateColumn("size", size).Error; err != nil { + return err + } } - z.updateReadiness() - zones[c.UserID] = append(zones[c.UserID], z) + + cm.log.Infof("completed recomputing staging zone sizes, %d updates made", len(toUpdate)) + } else { + cm.log.Infof("no staging zones to recompute") } - cm.Buckets = zones + return nil } @@ -780,51 +817,57 @@ func (cm *ContentManager) SetDataTransferStartedOrFinished(ctx context.Context, return nil } -func (cm *ContentManager) contentInStagingZone(ctx context.Context, content util.Content) bool { - cm.BucketLk.Lock() - defer cm.BucketLk.Unlock() - - bucks, ok := cm.Buckets[content.UserID] - if !ok { - return false +func (cm *ContentManager) buildStagingZoneFromContent(zone util.Content) (*ContentStagingZone, error) { + var contents []util.Content + if err := cm.DB.Find(&contents, "aggregated_in = ?", zone.ID).Error; err != nil { + return nil, errors.Wrapf(err, "could not build ContentStagingZone struct from content id") } - - for _, b := range bucks { - if b.hasContent(content) { - return true - } + var zSize int64 + for _, c := range contents { + zSize += c.Size } - return false + return &ContentStagingZone{ + ZoneOpened: zone.CreatedAt, + Contents: contents, + MinSize: constants.MinDealContentSize, + MaxSize: constants.MaxDealContentSize, + CurSize: zSize, + User: zone.UserID, + ContID: zone.ID, + Location: zone.Location, + }, nil } func (cm *ContentManager) GetStagingZonesForUser(ctx context.Context, user uint) []*ContentStagingZone { - cm.BucketLk.Lock() - defer cm.BucketLk.Unlock() - - blist, ok := cm.Buckets[user] - if !ok { - return []*ContentStagingZone{} + var zones []util.Content + if err := cm.DB.Find(&zones, "not active and pinning and aggregate and user_id = ?", user).Error; err != nil { + return nil } var out []*ContentStagingZone - for _, b := range blist { - out = append(out, b.DeepCopy()) + for _, zone := range zones { + stagingZone, err := cm.buildStagingZoneFromContent(zone) + if err != nil { + continue + } + out = append(out, stagingZone) } return out } func (cm *ContentManager) GetStagingZoneSnapshot(ctx context.Context) map[uint][]*ContentStagingZone { - cm.BucketLk.Lock() - defer cm.BucketLk.Unlock() + var zones []util.Content + if err := cm.DB.Find(&zones, "not active and pinning and aggregate").Error; err != nil { + return nil + } out := make(map[uint][]*ContentStagingZone) - for u, blist := range cm.Buckets { - var copylist []*ContentStagingZone - - for _, b := range blist { - copylist = append(copylist, b.DeepCopy()) + for _, zone := range zones { + stagingZone, err := cm.buildStagingZoneFromContent(zone) + if err != nil { + continue } - out[u] = copylist + out[zone.UserID] = append(out[zone.UserID], stagingZone) } return out } @@ -832,33 +875,38 @@ func (cm *ContentManager) GetStagingZoneSnapshot(ctx context.Context) map[uint][ func (cm *ContentManager) addContentToStagingZone(ctx context.Context, content util.Content) error { _, span := cm.tracer.Start(ctx, "stageContent") defer span.End() + + cm.addStagingContentLk.Lock() + defer cm.addStagingContentLk.Unlock() + if content.AggregatedIn > 0 { cm.log.Warnf("attempted to add content to staging zone that was already staged: %d (is in %d)", content.ID, content.AggregatedIn) return nil } cm.log.Debugf("adding content to staging zone: %d", content.ID) - cm.BucketLk.Lock() - defer cm.BucketLk.Unlock() - blist, ok := cm.Buckets[content.UserID] - if !ok { - b, err := cm.newContentStagingZone(content.UserID, content.Location) + // TODO: move processing state into DB, use FirstOrInit here, also filter for not processing + // theoretically any user only needs to have up to one non-processing zone at a time + var zones []util.Content + if err := cm.DB.Find(&zones, "not active and pinning and aggregate and user_id = ? and size + ? <= ?", content.UserID, content.Size, constants.MaxDealContentSize).Error; err != nil { + return nil + } + if len(zones) == 0 { + zone, err := cm.newContentStagingZone(content.UserID, content.Location) if err != nil { return fmt.Errorf("failed to create new staging zone content: %w", err) } - _, err = cm.tryAddContent(b, content) + _, err = cm.tryAddContent(*zone, content) if err != nil { return fmt.Errorf("failed to add content to staging zone: %w", err) } - - cm.Buckets[content.UserID] = []*ContentStagingZone{b} return nil } - for _, b := range blist { - ok, err := cm.tryAddContent(b, content) + for _, zone := range zones { + ok, err := cm.tryAddContent(zone, content) if err != nil { return err } @@ -868,13 +916,12 @@ func (cm *ContentManager) addContentToStagingZone(ctx context.Context, content u } } - b, err := cm.newContentStagingZone(content.UserID, content.Location) + zone, err := cm.newContentStagingZone(content.UserID, content.Location) if err != nil { return err } - cm.Buckets[content.UserID] = append(blist, b) - _, err = cm.tryAddContent(b, content) + _, err = cm.tryAddContent(*zone, content) if err != nil { return err } @@ -882,26 +929,6 @@ func (cm *ContentManager) addContentToStagingZone(ctx context.Context, content u return nil } -func (cm *ContentManager) popReadyStagingZone() []*ContentStagingZone { - cm.BucketLk.Lock() - defer cm.BucketLk.Unlock() - - var out []*ContentStagingZone - for uid, blist := range cm.Buckets { - var keep []*ContentStagingZone - for _, b := range blist { - b.updateReadiness() - if b.Readiness.IsReady { - out = append(out, b) - } else { - keep = append(keep, b) - } - } - cm.Buckets[uid] = keep - } - return out -} - func (cm *ContentManager) ensureStorage(ctx context.Context, content util.Content, done func(time.Duration)) error { ctx, span := cm.tracer.Start(ctx, "ensureStorage", trace.WithAttributes( attribute.Int("content", int(content.ID)), @@ -944,11 +971,6 @@ func (cm *ContentManager) ensureStorage(ctx context.Context, content util.Conten return nil } - // If this content is already scheduled to be aggregated and is waiting in a bucket - if cm.contentInStagingZone(ctx, content) { - return nil - } - // it's too big, need to split it up into chunks, no need to requeue dagsplit root content if content.Size > cm.cfg.Content.MaxSize { return cm.splitContent(ctx, content, cm.cfg.Content.MaxSize) @@ -967,7 +989,7 @@ func (cm *ContentManager) ensureStorage(ctx context.Context, content util.Conten return cm.addContentToStagingZone(ctx, content) } - // check on each of the existing deals, see if they any needs fixing + // check on each of the existing deals, see if any needs fixing var countLk sync.Mutex var numSealed, numPublished, numProgress int var wg sync.WaitGroup @@ -2699,6 +2721,7 @@ func (cm *ContentManager) sendSplitContentCmd(ctx context.Context, loc string, c } func (cm *ContentManager) SendConsolidateContentCmd(ctx context.Context, loc string, contents []util.Content) error { + cm.log.Debugf("attempting to send consolidate content cmd to %s", loc) tc := &drpc.TakeContent{} for _, c := range contents { prs := make([]*peer.AddrInfo, 0) diff --git a/contentmgr/shuttle.go b/contentmgr/shuttle.go index 0bcb0a0b..0b2c7d1c 100644 --- a/contentmgr/shuttle.go +++ b/contentmgr/shuttle.go @@ -271,7 +271,7 @@ func (cm *ContentManager) ShuttleCanAddContent(handle string) bool { defer cm.ShuttlesLk.Unlock() d, ok := cm.Shuttles[handle] if ok { - return d.ContentAddingDisabled + return !d.ContentAddingDisabled } return true } diff --git a/handlers.go b/handlers.go index 725b3c42..7d49d577 100644 --- a/handlers.go +++ b/handlers.go @@ -26,7 +26,6 @@ import ( "github.com/application-research/estuary/collections" "github.com/application-research/estuary/constants" - "github.com/application-research/estuary/contentmgr" "github.com/application-research/estuary/miner" "github.com/application-research/estuary/model" "github.com/application-research/estuary/node/modules/peering" @@ -2744,10 +2743,7 @@ func (s *Server) handleGetContentFailures(c echo.Context, u *util.User) error { } func (s *Server) handleAdminGetStagingZones(c echo.Context) error { - s.CM.BucketLk.Lock() - defer s.CM.BucketLk.Unlock() - - return c.JSON(http.StatusOK, s.CM.Buckets) + return c.JSON(http.StatusOK, s.CM.GetStagingZoneSnapshot(c.Request().Context())) } func (s *Server) handleGetOffloadingCandidates(c echo.Context) error { @@ -4472,23 +4468,17 @@ func (s *Server) handleContentHealthCheck(c echo.Context) error { case 0: log.Warnf("content %d has nothing aggregated in it", cont.ID) case 1: - var zSize int64 - for _, zc := range aggr { - zSize += zc.Size + var aggrLoc string + for loc := range aggrLocs { + aggrLoc = loc + break } - z := &contentmgr.ContentStagingZone{ - ZoneOpened: cont.CreatedAt, - Contents: aggr, - MinSize: s.cfg.Content.MinSize, - MaxSize: s.cfg.Content.MaxSize, - CurSize: zSize, - User: cont.UserID, - ContID: cont.ID, - Location: cont.Location, + if !s.CM.MarkStartedAggregating(cont) { + // skip since it is already aggregating + return nil } - - if err := s.CM.AggregateStagingZone(ctx, z, aggrLocs); err != nil { + if err := s.CM.AggregateStagingZone(ctx, cont, aggrLoc); err != nil { return err } fixedAggregateSize = true diff --git a/pinning.go b/pinning.go index 702eeb4c..f64cf975 100644 --- a/pinning.go +++ b/pinning.go @@ -498,6 +498,20 @@ func (s *Server) handleDeletePin(e echo.Context, u *util.User) error { return err } + if content.AggregatedIn > 0 { + if s.CM.IsZoneConsolidating(content.AggregatedIn) || s.CM.IsZoneAggregating(content.AggregatedIn) { + return fmt.Errorf("unable to unpin content while zone is consolidating or aggregating (pin: %d, zone: %d)", content.ID, content.AggregatedIn) + } + var zone util.Content + if err := s.DB.Find(&zone).Where("id = ?", content.AggregatedIn).Error; err != nil { + return err + } + if zone.Active { + // don't unpin content belonging to a pinned aggregate + return fmt.Errorf("unable to unpin content belonging to a pinned aggregate (pin: %d, zone: %d)", content.ID, content.AggregatedIn) + } + } + // mark as replace since it will removed and so it should not be fetched anymore if err := s.DB.Model(&util.Content{}).Where("id = ?", pinID).Update("replace", true).Error; err != nil { return err