Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track buckets in DB instead of in-memory #609

Merged
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
41e9505
contentInStagingZone and primaryStagingLocation use DB
neelvirdy Nov 21, 2022
89cbe47
Remove redundant aggregated in check
neelvirdy Nov 28, 2022
9964f1d
use DB for content locations during aggregation
neelvirdy Nov 28, 2022
5ee8310
comments for how to remove other uses of in-memory buckets
neelvirdy Nov 28, 2022
2a291cc
replace popReadyStagingZone with DB query
neelvirdy Nov 28, 2022
c850514
Rewrite addContent, build resps for API, remove all buckets refs
neelvirdy Nov 28, 2022
627b79b
Fix inviableZones logic and some cleanup + validation
neelvirdy Nov 28, 2022
e18c699
gosec fix?
neelvirdy Nov 28, 2022
f777964
gosec fix
neelvirdy Nov 28, 2022
fb75d9e
Track size incrementally
neelvirdy Nov 29, 2022
95e49ab
Recompute staging zone sizes at startup
neelvirdy Nov 29, 2022
e1e480f
Recompute staging zone sizes at startup
neelvirdy Nov 29, 2022
087aec7
nit: zoneSize struct
neelvirdy Nov 29, 2022
47075b6
polish + only run recompute on size = 0 and stop if no results
neelvirdy Nov 30, 2022
e849c7b
Unable to unpin content belonging to pinned aggregate
neelvirdy Nov 30, 2022
1e7d479
Lock around ZonesConsolidating, return nil instead of err if unpinnin…
neelvirdy Nov 30, 2022
46a92d5
Simplify inviable zones check and defer unlock
neelvirdy Nov 30, 2022
4501ea5
more readable size viability query
neelvirdy Nov 30, 2022
ebcf928
Mark/unmark processingZones
neelvirdy Dec 2, 2022
ffe7a2c
unpin update contents db in one transaction
neelvirdy Dec 2, 2022
c1fc46b
update contents db in upin after deleting blocks
neelvirdy Dec 2, 2022
74be82c
Remove bucketLk in addContentToStagingZone
neelvirdy Dec 2, 2022
4c839a7
init processingZones map and use addStagingContentLk
neelvirdy Dec 2, 2022
d844432
clarifying comment
neelvirdy Dec 2, 2022
527499d
Fix processing marks timing
neelvirdy Dec 2, 2022
654556e
Dont defer finish processing for aggregation - only do it on pinComplete
neelvirdy Dec 2, 2022
5d37bd9
Use separate consolidating and aggregating zones maps
neelvirdy Dec 2, 2022
fc14e99
Use separate lks for zone maps, check for aggregating on add and unpin
neelvirdy Dec 2, 2022
9b71719
Check both aggregating/consolidating before acting, mark finished con…
neelvirdy Dec 2, 2022
3d84fa2
dummy commit
neelvirdy Dec 2, 2022
e7b607b
Back to not unpinning contents belonging to pinned aggregates
neelvirdy Dec 2, 2022
7aa3234
fix lock timings and handle no dstLoc case
neelvirdy Dec 5, 2022
00657b0
Fix cm.ShuttleCanAddContent logic
neelvirdy Dec 5, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ peer.key
wallet/
duplicateGuard/
pinQueue/
stagingdata/

cidlistsdir/*

Expand Down
2 changes: 1 addition & 1 deletion cmd/estuary-shuttle/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ func (d *Shuttle) handleRpcTakeContent(ctx context.Context, cmd *drpc.TakeConten
}

func (s *Shuttle) handleRpcAggregateStagedContent(ctx context.Context, aggregate *drpc.AggregateContents) error {
// only progress if aggr is not allready in progress
// only progress if aggr is not already in progress
if !s.markStartAggr(aggregate.DBID) {
return nil
}
Expand Down
30 changes: 21 additions & 9 deletions contentmgr/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package contentmgr
import (
"context"
"fmt"

"github.com/application-research/estuary/util"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
Expand Down Expand Up @@ -152,33 +151,46 @@ func (cm *ContentManager) UnpinContent(ctx context.Context, contid uint) error {
return err
}

if err := cm.DB.Delete(&util.Content{ID: pin.ID}).Error; err != nil {
return err
}

// delete object refs rows for deleted content
if err := cm.DB.Where("content = ?", pin.ID).Delete(&util.ObjRef{}).Error; err != nil {
return err
}

// delete objects that no longer have obj refs
if err := cm.clearUnreferencedObjects(ctx, objs); err != nil {
return err
}

// delete objects from blockstore that are no longer present in db
for _, o := range objs {
// TODO: this is safe, but... slow?
if _, err := cm.deleteIfNotPinned(ctx, o); err != nil {
return err
}
}

buckets, ok := cm.Buckets[pin.UserID]
if ok {
for _, bucket := range buckets {
if _, err := cm.tryRemoveContent(bucket, pin); err != nil {
// delete from contents table and adjust aggregate size, if applicable, in one tx
err = cm.DB.Transaction(func(tx *gorm.DB) error {
// delete contid row from contents table
if err := tx.Model(util.Content{}).Delete(&util.Content{ID: pin.ID}).Error; err != nil {
return err
}

if pin.AggregatedIn > 0 {
// decrease aggregate's size by cont's size in contents table
if err := tx.Model(util.Content{}).
Where("id = ?", pin.AggregatedIn).
UpdateColumn("size", gorm.Expr("size - ?", pin.Size)).Error; err != nil {
return err
}
}

return nil
})
if err != nil {
return err
}

return nil
}

Expand Down
10 changes: 6 additions & 4 deletions contentmgr/pinning.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,10 +336,8 @@ func (cm *ContentManager) selectLocationForRetrieval(ctx context.Context, cont u
}

func (cm *ContentManager) primaryStagingLocation(ctx context.Context, uid uint) string {
cm.BucketLk.Lock()
defer cm.BucketLk.Unlock()
zones, ok := cm.Buckets[uid]
if !ok {
var zones []util.Content
if err := cm.DB.Find(&zones, "user_id = ? and aggregate", uid).Error; err != nil {
return ""
}

Expand Down Expand Up @@ -428,6 +426,10 @@ func (cm *ContentManager) handlePinningComplete(ctx context.Context, handle stri
}).Error; err != nil {
return xerrors.Errorf("failed to update content in database: %w", err)
}

// if the content is a consolidated aggregate, it means aggregation has been completed and we can mark as finished
cm.MarkFinishedAggregating(cont)

// after aggregate is done, make deal for it
cm.ToCheck(cont.ID)
return nil
Expand Down
Loading