From ef7334b310bd56f4764b625d0c51fd4940495138 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Mon, 12 Feb 2024 15:11:00 -0800 Subject: [PATCH 1/3] extract carstore postgres backend --- carstore/bs.go | 360 ++++++---------------------------------- carstore/pgbackend.go | 376 ++++++++++++++++++++++++++++++++++++++++++ carstore/repo_test.go | 7 +- 3 files changed, 429 insertions(+), 314 deletions(-) create mode 100644 carstore/pgbackend.go diff --git a/carstore/bs.go b/carstore/bs.go index 1471be3d2..c55957350 100644 --- a/carstore/bs.go +++ b/carstore/bs.go @@ -10,7 +10,6 @@ import ( "os" "path/filepath" "sort" - "strings" "sync" "sync/atomic" "time" @@ -51,14 +50,30 @@ const MaxSliceLength = 2 << 20 const BigShardThreshold = 2 << 20 type CarStore struct { - meta *gorm.DB rootDir string + backend Backend + lscLk sync.Mutex lastShardCache map[models.Uid]*CarShard } -func NewCarStore(meta *gorm.DB, root string) (*CarStore, error) { +type Backend interface { + UserHasBlock(ctx context.Context, usr models.Uid, blk cid.Cid) (bool, error) + FindBlock(ctx context.Context, usr models.Uid, k cid.Cid) (string, int64, bool, error) + GetLastShard(ctx context.Context, user models.Uid) (*CarShard, error) + ReadUserCar(ctx context.Context, user models.Uid, sinceRev string, incremental bool, w io.Writer) error + PutShard(ctx context.Context, shard *CarShard, brefs []map[string]any, rmcids map[cid.Cid]bool) error + GetAllShards(ctx context.Context, usr models.Uid) ([]CarShard, error) + WipeUserData(ctx context.Context, user models.Uid) error + deleteShards(ctx context.Context, shs []*CarShard) error + GetCompactionTargets(ctx context.Context, shardCount int) ([]CompactionTarget, error) + GetBlockRefsForShards(ctx context.Context, shardIds []uint) ([]blockRef, error) + DeleteStaleRefs(ctx context.Context, uid models.Uid, brefs []blockRef, staleRefs []staleRef, removedShards map[uint]bool) error + GetStaleRefs(ctx context.Context, user models.Uid) ([]staleRef, error) +} + +func NewCarStore(backend Backend, root string) (*CarStore, error) { if _, err := os.Stat(root); err != nil { if !os.IsNotExist(err) { return nil, err @@ -68,15 +83,9 @@ func NewCarStore(meta *gorm.DB, root string) (*CarStore, error) { return nil, err } } - if err := meta.AutoMigrate(&CarShard{}, &blockRef{}); err != nil { - return nil, err - } - if err := meta.AutoMigrate(&staleRef{}); err != nil { - return nil, err - } return &CarStore{ - meta: meta, + backend: backend, rootDir: root, lastShardCache: make(map[models.Uid]*CarShard), }, nil @@ -164,17 +173,7 @@ func (uv *userView) HashOnRead(hor bool) { } func (uv *userView) Has(ctx context.Context, k cid.Cid) (bool, error) { - var count int64 - if err := uv.cs.meta. - Model(blockRef{}). - Select("path, block_refs.offset"). - Joins("left join car_shards on block_refs.shard = car_shards.id"). - Where("usr = ? AND cid = ?", uv.user, models.DbCID{CID: k}). - Count(&count).Error; err != nil { - return false, err - } - - return count > 0, nil + return uv.cs.backend.UserHasBlock(ctx, uv.user, k) } var CacheHits int64 @@ -195,30 +194,13 @@ func (uv *userView) Get(ctx context.Context, k cid.Cid) (blockformat.Block, erro } atomic.AddInt64(&CacheMiss, 1) - // TODO: for now, im using a join to ensure we only query blocks from the - // correct user. maybe it makes sense to put the user in the blockRef - // directly? tradeoff of time vs space - var info struct { - Path string - Offset int64 - Usr models.Uid - } - if err := uv.cs.meta.Raw(`SELECT - (select path from car_shards where id = block_refs.shard) as path, - block_refs.offset, - (select usr from car_shards where id = block_refs.shard) as usr -FROM block_refs -WHERE - block_refs.cid = ? -LIMIT 1;`, models.DbCID{CID: k}).Scan(&info).Error; err != nil { + path, offset, prefetchOkay, err := uv.cs.backend.FindBlock(ctx, uv.user, k) + if err != nil { return nil, err } - if info.Path == "" { - return nil, ipld.ErrNotFound{Cid: k} - } prefetch := uv.prefetch - if info.Usr != uv.user { + if !prefetchOkay { blockGetTotalCounterUsrskip.Add(1) prefetch = false } else { @@ -226,9 +208,9 @@ LIMIT 1;`, models.DbCID{CID: k}).Scan(&info).Error; err != nil { } if prefetch { - return uv.prefetchRead(ctx, k, info.Path, info.Offset) + return uv.prefetchRead(ctx, k, path, offset) } else { - return uv.singleRead(ctx, k, info.Path, info.Offset) + return uv.singleRead(ctx, k, path, offset) } } @@ -388,18 +370,13 @@ func (cs *CarStore) getLastShard(ctx context.Context, user models.Uid) (*CarShar return maybeLs, nil } - var lastShard CarShard - // this is often slow (which is why we're caching it) but could be sped up with an extra index: - // CREATE INDEX idx_car_shards_usr_id ON car_shards (usr, seq DESC); - if err := cs.meta.WithContext(ctx).Model(CarShard{}).Limit(1).Order("seq desc").Find(&lastShard, "usr = ?", user).Error; err != nil { - //if err := cs.meta.Model(CarShard{}).Where("user = ?", user).Last(&lastShard).Error; err != nil { - //if err != gorm.ErrRecordNotFound { + lastShard, err := cs.backend.GetLastShard(ctx, user) + if err != nil { return nil, err - //} } - cs.putLastShardCache(&lastShard) - return &lastShard, nil + cs.putLastShardCache(lastShard) + return lastShard, nil } var ErrRepoBaseMismatch = fmt.Errorf("attempted a delta session on top of the wrong previous head") @@ -454,47 +431,10 @@ func (cs *CarStore) ReadUserCar(ctx context.Context, user models.Uid, sinceRev s ctx, span := otel.Tracer("carstore").Start(ctx, "ReadUserCar") defer span.End() - var earlySeq int - if sinceRev != "" { - var untilShard CarShard - if err := cs.meta.Where("rev >= ? AND usr = ?", sinceRev, user).Order("rev").First(&untilShard).Error; err != nil { - return fmt.Errorf("finding early shard: %w", err) - } - earlySeq = untilShard.Seq - } - - var shards []CarShard - if err := cs.meta.Order("seq desc").Where("usr = ? AND seq >= ?", user, earlySeq).Find(&shards).Error; err != nil { - return err - } - - if !incremental && earlySeq > 0 { - // have to do it the ugly way - return fmt.Errorf("nyi") - } - - if len(shards) == 0 { - return fmt.Errorf("no data found for user %d", user) - } - - // fast path! - if err := car.WriteHeader(&car.CarHeader{ - Roots: []cid.Cid{shards[0].Root.CID}, - Version: 1, - }, w); err != nil { - return err - } - - for _, sh := range shards { - if err := cs.writeShardBlocks(ctx, &sh, w); err != nil { - return err - } - } - - return nil + return cs.backend.ReadUserCar(ctx, user, sinceRev, incremental, w) } -func (cs *CarStore) writeShardBlocks(ctx context.Context, sh *CarShard, w io.Writer) error { +func writeShardBlocks(ctx context.Context, sh *CarShard, w io.Writer) error { ctx, span := otel.Tracer("carstore").Start(ctx, "writeShardBlocks") defer span.End() @@ -669,7 +609,7 @@ func (cs *CarStore) writeNewShardFile(ctx context.Context, user models.Uid, seq return fname, nil } -func (cs *CarStore) deleteShardFile(ctx context.Context, sh *CarShard) error { +func deleteShardFile(ctx context.Context, sh *CarShard) error { return os.Remove(sh.Path) } @@ -767,87 +707,14 @@ func (cs *CarStore) putShard(ctx context.Context, shard *CarShard, brefs []map[s ctx, span := otel.Tracer("carstore").Start(ctx, "putShard") defer span.End() - // TODO: there should be a way to create the shard and block_refs that - // reference it in the same query, would save a lot of time - tx := cs.meta.WithContext(ctx).Begin() - - if err := tx.WithContext(ctx).Create(shard).Error; err != nil { - return fmt.Errorf("failed to create shard in DB tx: %w", err) + if err := cs.backend.PutShard(ctx, shard, brefs, rmcids); err != nil { + return err } if !nocache { cs.putLastShardCache(shard) } - for _, ref := range brefs { - ref["shard"] = shard.ID - } - - if err := createBlockRefs(ctx, tx, brefs); err != nil { - return fmt.Errorf("failed to create block refs: %w", err) - } - - if len(rmcids) > 0 { - cids := make([]cid.Cid, 0, len(rmcids)) - for c := range rmcids { - cids = append(cids, c) - } - - if err := tx.Create(&staleRef{ - Cids: packCids(cids), - Usr: shard.Usr, - }).Error; err != nil { - return err - } - } - - err := tx.WithContext(ctx).Commit().Error - if err != nil { - return fmt.Errorf("failed to commit shard DB transaction: %w", err) - } - - return nil -} - -func createBlockRefs(ctx context.Context, tx *gorm.DB, brefs []map[string]any) error { - ctx, span := otel.Tracer("carstore").Start(ctx, "createBlockRefs") - defer span.End() - - if err := createInBatches(ctx, tx, brefs, 2000); err != nil { - return err - } - - return nil -} - -func generateInsertQuery(data []map[string]any) (string, []any) { - placeholders := strings.Repeat("(?, ?, ?),", len(data)) - placeholders = placeholders[:len(placeholders)-1] // trim trailing comma - - query := "INSERT INTO block_refs (\"cid\", \"offset\", \"shard\") VALUES " + placeholders - - values := make([]any, 0, 3*len(data)) - for _, entry := range data { - values = append(values, entry["cid"], entry["offset"], entry["shard"]) - } - - return query, values -} - -// Function to create in batches -func createInBatches(ctx context.Context, tx *gorm.DB, data []map[string]any, batchSize int) error { - for i := 0; i < len(data); i += batchSize { - batch := data[i:] - if len(batch) > batchSize { - batch = batch[:batchSize] - } - - query, values := generateInsertQuery(batch) - - if err := tx.WithContext(ctx).Exec(query, values...).Error; err != nil { - return err - } - } return nil } @@ -1027,8 +894,8 @@ type UserStat struct { } func (cs *CarStore) Stat(ctx context.Context, usr models.Uid) ([]UserStat, error) { - var shards []CarShard - if err := cs.meta.Order("seq asc").Find(&shards, "usr = ?", usr).Error; err != nil { + shards, err := cs.backend.GetAllShards(ctx, usr) + if err != nil { return nil, err } @@ -1045,74 +912,15 @@ func (cs *CarStore) Stat(ctx context.Context, usr models.Uid) ([]UserStat, error } func (cs *CarStore) WipeUserData(ctx context.Context, user models.Uid) error { - var shards []*CarShard - if err := cs.meta.Find(&shards, "usr = ?", user).Error; err != nil { + if err := cs.backend.WipeUserData(ctx, user); err != nil { return err } - if err := cs.deleteShards(ctx, shards); err != nil { - if !os.IsNotExist(err) { - return err - } - } - cs.removeLastShardCache(user) return nil } -func (cs *CarStore) deleteShards(ctx context.Context, shs []*CarShard) error { - ctx, span := otel.Tracer("carstore").Start(ctx, "deleteShards") - defer span.End() - - deleteSlice := func(ctx context.Context, subs []*CarShard) error { - var ids []uint - for _, sh := range subs { - ids = append(ids, sh.ID) - } - - txn := cs.meta.Begin() - - if err := txn.Delete(&CarShard{}, "id in (?)", ids).Error; err != nil { - return err - } - - if err := txn.Delete(&blockRef{}, "shard in (?)", ids).Error; err != nil { - return err - } - - if err := txn.Commit().Error; err != nil { - return err - } - - var outErr error - for _, sh := range subs { - if err := cs.deleteShardFile(ctx, sh); err != nil { - if !os.IsNotExist(err) { - return err - } - outErr = err - } - } - - return outErr - } - - chunkSize := 10000 - for i := 0; i < len(shs); i += chunkSize { - sl := shs[i:] - if len(sl) > chunkSize { - sl = sl[:chunkSize] - } - - if err := deleteSlice(ctx, sl); err != nil { - return err - } - } - - return nil -} - type shardStat struct { ID uint Dirty int @@ -1241,12 +1049,7 @@ func (cs *CarStore) GetCompactionTargets(ctx context.Context, shardCount int) ([ ctx, span := otel.Tracer("carstore").Start(ctx, "GetCompactionTargets") defer span.End() - var targets []CompactionTarget - if err := cs.meta.Raw(`select usr, count(*) as num_shards from car_shards group by usr having count(*) > ? order by num_shards desc`, shardCount).Scan(&targets).Error; err != nil { - return nil, err - } - - return targets, nil + return cs.backend.GetCompactionTargets(ctx, shardCount) } func (cs *CarStore) getBlockRefsForShards(ctx context.Context, shardIds []uint) ([]blockRef, error) { @@ -1255,20 +1058,9 @@ func (cs *CarStore) getBlockRefsForShards(ctx context.Context, shardIds []uint) span.SetAttributes(attribute.Int("shards", len(shardIds))) - chunkSize := 10000 - out := make([]blockRef, 0, len(shardIds)) - for i := 0; i < len(shardIds); i += chunkSize { - sl := shardIds[i:] - if len(sl) > chunkSize { - sl = sl[:chunkSize] - } - - var brefs []blockRef - if err := cs.meta.Raw(`select * from block_refs where shard in (?)`, sl).Scan(&brefs).Error; err != nil { - return nil, err - } - - out = append(out, brefs...) + out, err := cs.backend.GetBlockRefsForShards(ctx, shardIds) + if err != nil { + return nil, err } span.SetAttributes(attribute.Int("refs", len(out))) @@ -1300,8 +1092,8 @@ func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid, skip span.SetAttributes(attribute.Int64("user", int64(user))) - var shards []CarShard - if err := cs.meta.WithContext(ctx).Find(&shards, "usr = ?", user).Error; err != nil { + shards, err := cs.backend.GetAllShards(ctx, user) + if err != nil { return nil, err } @@ -1354,10 +1146,7 @@ func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid, skip span.SetAttributes(attribute.Int("blockRefs", len(brefs))) - var staleRefs []staleRef - if err := cs.meta.WithContext(ctx).Find(&staleRefs, "usr = ?", user).Error; err != nil { - return nil, err - } + staleRefs, err := cs.backend.GetStaleRefs(ctx, user) span.SetAttributes(attribute.Int("staleRefs", len(staleRefs))) @@ -1496,14 +1285,14 @@ func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid, skip } stats.ShardsDeleted += len(todelete) - if err := cs.deleteShards(ctx, todelete); err != nil { + if err := cs.backend.deleteShards(ctx, todelete); err != nil { return nil, fmt.Errorf("deleting shards: %w", err) } } // now we need to delete the staleRefs we successfully cleaned up // we can safely delete a staleRef if all the shards that have blockRefs with matching stale refs were processed - if err := cs.deleteStaleRefs(ctx, user, brefs, staleRefs, removedShards); err != nil { + if err := cs.backend.DeleteStaleRefs(ctx, user, brefs, staleRefs, removedShards); err != nil { return nil, err } @@ -1512,61 +1301,6 @@ func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid, skip return stats, nil } -func (cs *CarStore) deleteStaleRefs(ctx context.Context, uid models.Uid, brefs []blockRef, staleRefs []staleRef, removedShards map[uint]bool) error { - ctx, span := otel.Tracer("carstore").Start(ctx, "deleteStaleRefs") - defer span.End() - - brByCid := make(map[cid.Cid][]blockRef) - for _, br := range brefs { - brByCid[br.Cid.CID] = append(brByCid[br.Cid.CID], br) - } - - var staleToKeep []cid.Cid - for _, sr := range staleRefs { - cids, err := sr.getCids() - if err != nil { - return fmt.Errorf("getCids on staleRef failed (%d): %w", sr.ID, err) - } - - for _, c := range cids { - brs := brByCid[c] - del := true - for _, br := range brs { - if !removedShards[br.Shard] { - del = false - break - } - } - - if !del { - staleToKeep = append(staleToKeep, c) - } - } - } - - txn := cs.meta.Begin() - - if err := txn.Delete(&staleRef{}, "usr = ?", uid).Error; err != nil { - return err - } - - // now create a new staleRef with all the refs we couldn't clear out - if len(staleToKeep) > 0 { - if err := txn.Create(&staleRef{ - Usr: uid, - Cids: packCids(staleToKeep), - }).Error; err != nil { - return err - } - } - - if err := txn.Commit().Error; err != nil { - return fmt.Errorf("failed to commit staleRef updates: %w", err) - } - - return nil -} - func (cs *CarStore) compactBucket(ctx context.Context, user models.Uid, b *compBucket, shardsById map[uint]CarShard, keep map[cid.Cid]bool) error { ctx, span := otel.Tracer("carstore").Start(ctx, "compactBucket") defer span.End() @@ -1627,7 +1361,7 @@ func (cs *CarStore) compactBucket(ctx context.Context, user models.Uid, b *compB Rev: lastsh.Rev, } - if err := cs.putShard(ctx, &shard, nbrefs, nil, true); err != nil { + if err := cs.backend.PutShard(ctx, &shard, nbrefs, nil); err != nil { // if writing the shard fails, we should also delete the file _ = fi.Close() diff --git a/carstore/pgbackend.go b/carstore/pgbackend.go new file mode 100644 index 000000000..211b11d7d --- /dev/null +++ b/carstore/pgbackend.go @@ -0,0 +1,376 @@ +package carstore + +import ( + "context" + "fmt" + "io" + "os" + "strings" + + "github.com/bluesky-social/indigo/models" + "github.com/ipfs/go-cid" + ipld "github.com/ipfs/go-ipld-format" + car "github.com/ipld/go-car" + "go.opentelemetry.io/otel" + "gorm.io/gorm" +) + +func NewPostgresBackend(db *gorm.DB) (Backend, error) { + if err := db.AutoMigrate(&CarShard{}, &blockRef{}); err != nil { + return nil, err + } + if err := db.AutoMigrate(&staleRef{}); err != nil { + return nil, err + } + + return &PgBackend{ + db: db, + }, nil +} + +type PgBackend struct { + db *gorm.DB +} + +func (pgb *PgBackend) UserHasBlock(ctx context.Context, usr models.Uid, blk cid.Cid) (bool, error) { + var count int64 + if err := pgb.db. + Model(blockRef{}). + Select("path, block_refs.offset"). + Joins("left join car_shards on block_refs.shard = car_shards.id"). + Where("usr = ? AND cid = ?", usr, models.DbCID{CID: blk}). + Count(&count).Error; err != nil { + return false, err + } + + return count > 0, nil +} + +func (pgb *PgBackend) FindBlock(ctx context.Context, usr models.Uid, k cid.Cid) (string, int64, bool, error) { + // TODO: for now, im using a join to ensure we only query blocks from the + // correct user. maybe it makes sense to put the user in the blockRef + // directly? tradeoff of time vs space + var info struct { + Path string + Offset int64 + Usr models.Uid + } + if err := pgb.db.Raw(`SELECT + (select path from car_shards where id = block_refs.shard) as path, + block_refs.offset, + (select usr from car_shards where id = block_refs.shard) as usr +FROM block_refs +WHERE + block_refs.cid = ? +LIMIT 1;`, models.DbCID{CID: k}).Scan(&info).Error; err != nil { + return "", 0, false, err + } + if info.Path == "" { + return "", 0, false, ipld.ErrNotFound{Cid: k} + } + + return info.Path, info.Offset, usr == info.Usr, nil +} + +func (pgb *PgBackend) GetLastShard(ctx context.Context, user models.Uid) (*CarShard, error) { + var lastShard CarShard + // this is often slow (which is why we're caching it) but could be sped up with an extra index: + // CREATE INDEX idx_car_shards_usr_id ON car_shards (usr, seq DESC); + if err := pgb.db.WithContext(ctx).Model(CarShard{}).Limit(1).Order("seq desc").Find(&lastShard, "usr = ?", user).Error; err != nil { + //if err := cs.meta.Model(CarShard{}).Where("user = ?", user).Last(&lastShard).Error; err != nil { + //if err != gorm.ErrRecordNotFound { + return nil, err + //} + } + + return &lastShard, nil +} + +func (pgb *PgBackend) ReadUserCar(ctx context.Context, user models.Uid, sinceRev string, incremental bool, w io.Writer) error { + var earlySeq int + if sinceRev != "" { + var untilShard CarShard + if err := pgb.db.Where("rev >= ? AND usr = ?", sinceRev, user).Order("rev").First(&untilShard).Error; err != nil { + return fmt.Errorf("finding early shard: %w", err) + } + earlySeq = untilShard.Seq + } + + var shards []CarShard + if err := pgb.db.Order("seq desc").Where("usr = ? AND seq >= ?", user, earlySeq).Find(&shards).Error; err != nil { + return err + } + + if !incremental && earlySeq > 0 { + // have to do it the ugly way + return fmt.Errorf("nyi") + } + + if len(shards) == 0 { + return fmt.Errorf("no data found for user %d", user) + } + + // fast path! + if err := car.WriteHeader(&car.CarHeader{ + Roots: []cid.Cid{shards[0].Root.CID}, + Version: 1, + }, w); err != nil { + return err + } + + for _, sh := range shards { + if err := writeShardBlocks(ctx, &sh, w); err != nil { + return err + } + } + + return nil +} + +func (pgb *PgBackend) PutShard(ctx context.Context, shard *CarShard, brefs []map[string]any, rmcids map[cid.Cid]bool) error { + // TODO: there should be a way to create the shard and block_refs that + // reference it in the same query, would save a lot of time + tx := pgb.db.WithContext(ctx).Begin() + + if err := tx.WithContext(ctx).Create(shard).Error; err != nil { + return fmt.Errorf("failed to create shard in DB tx: %w", err) + } + + for _, ref := range brefs { + ref["shard"] = shard.ID + } + + if err := createBlockRefs(ctx, tx, brefs); err != nil { + return fmt.Errorf("failed to create block refs: %w", err) + } + + if len(rmcids) > 0 { + cids := make([]cid.Cid, 0, len(rmcids)) + for c := range rmcids { + cids = append(cids, c) + } + + if err := tx.Create(&staleRef{ + Cids: packCids(cids), + Usr: shard.Usr, + }).Error; err != nil { + return err + } + } + + err := tx.WithContext(ctx).Commit().Error + if err != nil { + return fmt.Errorf("failed to commit shard DB transaction: %w", err) + } + + return nil +} + +func createBlockRefs(ctx context.Context, tx *gorm.DB, brefs []map[string]any) error { + ctx, span := otel.Tracer("carstore").Start(ctx, "createBlockRefs") + defer span.End() + + if err := createInBatches(ctx, tx, brefs, 2000); err != nil { + return err + } + + return nil +} + +func generateInsertQuery(data []map[string]any) (string, []any) { + placeholders := strings.Repeat("(?, ?, ?),", len(data)) + placeholders = placeholders[:len(placeholders)-1] // trim trailing comma + + query := "INSERT INTO block_refs (\"cid\", \"offset\", \"shard\") VALUES " + placeholders + + values := make([]any, 0, 3*len(data)) + for _, entry := range data { + values = append(values, entry["cid"], entry["offset"], entry["shard"]) + } + + return query, values +} + +// Function to create in batches +func createInBatches(ctx context.Context, tx *gorm.DB, data []map[string]any, batchSize int) error { + for i := 0; i < len(data); i += batchSize { + batch := data[i:] + if len(batch) > batchSize { + batch = batch[:batchSize] + } + + query, values := generateInsertQuery(batch) + + if err := tx.WithContext(ctx).Exec(query, values...).Error; err != nil { + return err + } + } + return nil +} + +func (pgb *PgBackend) GetAllShards(ctx context.Context, usr models.Uid) ([]CarShard, error) { + var shards []CarShard + if err := pgb.db.Order("seq asc").Find(&shards, "usr = ?", usr).Error; err != nil { + return nil, err + } + return shards, nil +} + +func (pgb *PgBackend) WipeUserData(ctx context.Context, user models.Uid) error { + var shards []*CarShard + if err := pgb.db.Find(&shards, "usr = ?", user).Error; err != nil { + return err + } + + if err := pgb.deleteShards(ctx, shards); err != nil { + if !os.IsNotExist(err) { + return err + } + } + + return nil +} + +func (pgb *PgBackend) deleteShards(ctx context.Context, shs []*CarShard) error { + ctx, span := otel.Tracer("carstore").Start(ctx, "deleteShards") + defer span.End() + + deleteSlice := func(ctx context.Context, subs []*CarShard) error { + var ids []uint + for _, sh := range subs { + ids = append(ids, sh.ID) + } + + txn := pgb.db.Begin() + + if err := txn.Delete(&CarShard{}, "id in (?)", ids).Error; err != nil { + return err + } + + if err := txn.Delete(&blockRef{}, "shard in (?)", ids).Error; err != nil { + return err + } + + if err := txn.Commit().Error; err != nil { + return err + } + + var outErr error + for _, sh := range subs { + if err := deleteShardFile(ctx, sh); err != nil { + if !os.IsNotExist(err) { + return err + } + outErr = err + } + } + + return outErr + } + + chunkSize := 10000 + for i := 0; i < len(shs); i += chunkSize { + sl := shs[i:] + if len(sl) > chunkSize { + sl = sl[:chunkSize] + } + + if err := deleteSlice(ctx, sl); err != nil { + return err + } + } + + return nil +} + +func (pgb *PgBackend) GetCompactionTargets(ctx context.Context, shardCount int) ([]CompactionTarget, error) { + var targets []CompactionTarget + if err := pgb.db.Raw(`select usr, count(*) as num_shards from car_shards group by usr having count(*) > ? order by num_shards desc`, shardCount).Scan(&targets).Error; err != nil { + return nil, err + } + + return targets, nil +} + +func (pgb *PgBackend) GetBlockRefsForShards(ctx context.Context, shardIds []uint) ([]blockRef, error) { + chunkSize := 10000 + out := make([]blockRef, 0, len(shardIds)) + for i := 0; i < len(shardIds); i += chunkSize { + sl := shardIds[i:] + if len(sl) > chunkSize { + sl = sl[:chunkSize] + } + + var brefs []blockRef + if err := pgb.db.Raw(`select * from block_refs where shard in (?)`, sl).Scan(&brefs).Error; err != nil { + return nil, err + } + + out = append(out, brefs...) + } + + return out, nil +} + +func (pgb *PgBackend) DeleteStaleRefs(ctx context.Context, uid models.Uid, brefs []blockRef, staleRefs []staleRef, removedShards map[uint]bool) error { + ctx, span := otel.Tracer("pgbackend").Start(ctx, "deleteStaleRefs") + defer span.End() + + brByCid := make(map[cid.Cid][]blockRef) + for _, br := range brefs { + brByCid[br.Cid.CID] = append(brByCid[br.Cid.CID], br) + } + + var staleToKeep []cid.Cid + for _, sr := range staleRefs { + cids, err := sr.getCids() + if err != nil { + return fmt.Errorf("getCids on staleRef failed (%d): %w", sr.ID, err) + } + + for _, c := range cids { + brs := brByCid[c] + del := true + for _, br := range brs { + if !removedShards[br.Shard] { + del = false + break + } + } + + if !del { + staleToKeep = append(staleToKeep, c) + } + } + } + + txn := pgb.db.Begin() + + if err := txn.Delete(&staleRef{}, "usr = ?", uid).Error; err != nil { + return err + } + + // now create a new staleRef with all the refs we couldn't clear out + if len(staleToKeep) > 0 { + if err := txn.Create(&staleRef{ + Usr: uid, + Cids: packCids(staleToKeep), + }).Error; err != nil { + return err + } + } + + if err := txn.Commit().Error; err != nil { + return fmt.Errorf("failed to commit staleRef updates: %w", err) + } + + return nil +} + +func (pgb *PgBackend) GetStaleRefs(ctx context.Context, user models.Uid) ([]staleRef, error) { + var staleRefs []staleRef + if err := pgb.db.WithContext(ctx).Find(&staleRefs, "usr = ?", user).Error; err != nil { + return nil, err + } + return staleRefs, nil +} diff --git a/carstore/repo_test.go b/carstore/repo_test.go index 084f16f36..a04b25338 100644 --- a/carstore/repo_test.go +++ b/carstore/repo_test.go @@ -45,7 +45,12 @@ func testCarStore() (*CarStore, func(), error) { return nil, nil, err } - cs, err := NewCarStore(db, sharddir) + pgb, err := NewPostgresBackend(db) + if err != nil { + return nil, nil, err + } + + cs, err := NewCarStore(pgb, sharddir) if err != nil { return nil, nil, err } From 4a65093a726595d6231580fc937c6c28782e381e Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Mon, 12 Feb 2024 15:29:03 -0800 Subject: [PATCH 2/3] fixup all usage of carstore constructor --- cmd/bigsky/main.go | 7 ++++++- cmd/gosky/admin.go | 22 ++++++++++++---------- cmd/labelmaker/main.go | 7 ++++++- cmd/laputa/main.go | 7 ++++++- cmd/supercollider/main.go | 7 ++++++- events/dbpersist_test.go | 7 ++++++- indexer/posts_test.go | 7 ++++++- labeler/service_test.go | 7 ++++++- pds/handlers_test.go | 7 ++++++- repomgr/bench_test.go | 7 ++++++- repomgr/ingest_test.go | 14 ++++++++++++-- testing/labelmaker_fakedata_test.go | 7 ++++++- testing/utils.go | 14 ++++++++++++-- 13 files changed, 96 insertions(+), 24 deletions(-) diff --git a/cmd/bigsky/main.go b/cmd/bigsky/main.go index 66daff184..bdbac5d60 100644 --- a/cmd/bigsky/main.go +++ b/cmd/bigsky/main.go @@ -246,7 +246,12 @@ func Bigsky(cctx *cli.Context) error { } os.MkdirAll(filepath.Dir(csdir), os.ModePerm) - cstore, err := carstore.NewCarStore(csdb, csdir) + csb, err := carstore.NewPostgresBackend(csdb) + if err != nil { + return err + } + + cstore, err := carstore.NewCarStore(csb, csdir) if err != nil { return err } diff --git a/cmd/gosky/admin.go b/cmd/gosky/admin.go index bcb8d7f7f..4a9d5c2e6 100644 --- a/cmd/gosky/admin.go +++ b/cmd/gosky/admin.go @@ -97,17 +97,19 @@ var checkUserCmd = &cli.Command{ } } else { var invby string - if fa := rep.InvitedBy.ForAccount; fa != "" { - if fa == "admin" { - invby = fa - } else { - handle, _, err := api.ResolveDidToHandle(ctx, plcc, phr, fa) - if err != nil { - fmt.Println("ERROR: failed to resolve inviter: ", err) - handle = fa - } + if rep.InvitedBy != nil { + if fa := rep.InvitedBy.ForAccount; fa != "" { + if fa == "admin" { + invby = fa + } else { + handle, _, err := api.ResolveDidToHandle(ctx, plcc, phr, fa) + if err != nil { + fmt.Println("ERROR: failed to resolve inviter: ", err) + handle = fa + } - invby = handle + invby = handle + } } } diff --git a/cmd/labelmaker/main.go b/cmd/labelmaker/main.go index f9e36c5bf..39445cb16 100644 --- a/cmd/labelmaker/main.go +++ b/cmd/labelmaker/main.go @@ -183,7 +183,12 @@ func run(args []string) error { } os.MkdirAll(filepath.Dir(csdir), os.ModePerm) - cstore, err := carstore.NewCarStore(csdb, csdir) + csb, err := carstore.NewPostgresBackend(csdb) + if err != nil { + return err + } + + cstore, err := carstore.NewCarStore(csb, csdir) if err != nil { return err } diff --git a/cmd/laputa/main.go b/cmd/laputa/main.go index 2cedb393a..fc193e3bd 100644 --- a/cmd/laputa/main.go +++ b/cmd/laputa/main.go @@ -158,7 +158,12 @@ func run(args []string) { } } - cstore, err := carstore.NewCarStore(csdb, csdir) + csb, err := carstore.NewPostgresBackend(csdb) + if err != nil { + return err + } + + cstore, err := carstore.NewCarStore(csb, csdir) if err != nil { return err } diff --git a/cmd/supercollider/main.go b/cmd/supercollider/main.go index 4d5b64bc0..a5a89b1eb 100644 --- a/cmd/supercollider/main.go +++ b/cmd/supercollider/main.go @@ -565,7 +565,12 @@ func initSpeedyRepoMan(key *godid.PrivKey) (*repomgr.RepoManager, *godid.PrivKey return nil, nil, err } - cs, err := carstore.NewCarStore(cardb, cspath) + csb, err := carstore.NewPostgresBackend(cardb) + if err != nil { + return nil, nil, err + } + + cs, err := carstore.NewCarStore(csb, cspath) if err != nil { return nil, nil, err } diff --git a/events/dbpersist_test.go b/events/dbpersist_test.go index 6954812df..defe38f1f 100644 --- a/events/dbpersist_test.go +++ b/events/dbpersist_test.go @@ -301,7 +301,12 @@ func setupDBs(t testing.TB) (*gorm.DB, *gorm.DB, *carstore.CarStore, string, err return nil, nil, nil, "", err } - cs, err := carstore.NewCarStore(cardb, cspath) + csb, err := carstore.NewPostgresBackend(cardb) + if err != nil { + return nil, nil, nil, "", err + } + + cs, err := carstore.NewCarStore(csb, cspath) if err != nil { return nil, nil, nil, "", err } diff --git a/indexer/posts_test.go b/indexer/posts_test.go index 19c14b407..378104260 100644 --- a/indexer/posts_test.go +++ b/indexer/posts_test.go @@ -50,7 +50,12 @@ func testIndexer(t *testing.T) *testIx { t.Fatal(err) } - cs, err := carstore.NewCarStore(cardb, cspath) + csb, err := carstore.NewPostgresBackend(cardb) + if err != nil { + t.Fatal(err) + } + + cs, err := carstore.NewCarStore(csb, cspath) if err != nil { t.Fatal(err) } diff --git a/labeler/service_test.go b/labeler/service_test.go index b5fec72dc..fcc8e4f25 100644 --- a/labeler/service_test.go +++ b/labeler/service_test.go @@ -26,7 +26,12 @@ func testLabelMaker(t *testing.T) *Server { t.Fatal(err) } - cs, err := carstore.NewCarStore(db, sharddir) + csb, err := carstore.NewPostgresBackend(db) + if err != nil { + t.Fatal(err) + } + + cs, err := carstore.NewCarStore(csb, sharddir) if err != nil { t.Fatal(err) } diff --git a/pds/handlers_test.go b/pds/handlers_test.go index d83c23fb4..b3862037c 100644 --- a/pds/handlers_test.go +++ b/pds/handlers_test.go @@ -29,7 +29,12 @@ func testCarStore(t *testing.T, db *gorm.DB) (*carstore.CarStore, func()) { t.Fatal(err) } - cs, err := carstore.NewCarStore(db, sharddir) + csb, err := carstore.NewPostgresBackend(db) + if err != nil { + t.Fatal(err) + } + + cs, err := carstore.NewCarStore(csb, sharddir) if err != nil { t.Fatal(err) } diff --git a/repomgr/bench_test.go b/repomgr/bench_test.go index 271813909..fc4f4799b 100644 --- a/repomgr/bench_test.go +++ b/repomgr/bench_test.go @@ -54,7 +54,12 @@ func BenchmarkRepoMgrCreates(b *testing.B) { b.Fatal(err) } - cs, err := carstore.NewCarStore(cardb, cspath) + csb, err := carstore.NewPostgresBackend(cardb) + if err != nil { + b.Fatal(err) + } + + cs, err := carstore.NewCarStore(csb, cspath) if err != nil { b.Fatal(err) } diff --git a/repomgr/ingest_test.go b/repomgr/ingest_test.go index 4296cd949..7bc20049e 100644 --- a/repomgr/ingest_test.go +++ b/repomgr/ingest_test.go @@ -50,7 +50,12 @@ func TestLoadNewRepo(t *testing.T) { t.Fatal(err) } - cs, err := carstore.NewCarStore(cardb, cspath) + csb, err := carstore.NewPostgresBackend(cardb) + if err != nil { + t.Fatal(err) + } + + cs, err := carstore.NewCarStore(csb, cspath) if err != nil { t.Fatal(err) } @@ -80,7 +85,12 @@ func testCarstore(t *testing.T, dir string) *carstore.CarStore { t.Fatal(err) } - cs, err := carstore.NewCarStore(cardb, cspath) + csb, err := carstore.NewPostgresBackend(cardb) + if err != nil { + t.Fatal(err) + } + + cs, err := carstore.NewCarStore(csb, cspath) if err != nil { t.Fatal(err) } diff --git a/testing/labelmaker_fakedata_test.go b/testing/labelmaker_fakedata_test.go index d182d9794..f1157112d 100644 --- a/testing/labelmaker_fakedata_test.go +++ b/testing/labelmaker_fakedata_test.go @@ -39,7 +39,12 @@ func testLabelMaker(t *testing.T) *labeler.Server { t.Fatal(err) } - cs, err := carstore.NewCarStore(db, sharddir) + csb, err := carstore.NewPostgresBackend(db) + if err != nil { + t.Fatal(err) + } + + cs, err := carstore.NewCarStore(csb, sharddir) if err != nil { t.Fatal(err) } diff --git a/testing/utils.go b/testing/utils.go index 84bbe1432..fa6d9cf37 100644 --- a/testing/utils.go +++ b/testing/utils.go @@ -115,7 +115,12 @@ func SetupPDS(ctx context.Context, suffix string, plc plc.PLCClient) (*TestPDS, return nil, err } - cs, err := carstore.NewCarStore(cardb, cspath) + csb, err := carstore.NewPostgresBackend(cardb) + if err != nil { + return nil, err + } + + cs, err := carstore.NewCarStore(csb, cspath) if err != nil { return nil, err } @@ -420,7 +425,12 @@ func SetupBGS(ctx context.Context, didr plc.PLCClient) (*TestBGS, error) { return nil, err } - cs, err := carstore.NewCarStore(cardb, cspath) + csb, err := carstore.NewPostgresBackend(cardb) + if err != nil { + return nil, err + } + + cs, err := carstore.NewCarStore(csb, cspath) if err != nil { return nil, err } From b5200095ac44412b190cbcd6526d074fd258758b Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Wed, 14 Feb 2024 11:30:58 -0800 Subject: [PATCH 3/3] capitalize --- carstore/bs.go | 4 ++-- carstore/pgbackend.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/carstore/bs.go b/carstore/bs.go index c55957350..65d04112d 100644 --- a/carstore/bs.go +++ b/carstore/bs.go @@ -66,7 +66,7 @@ type Backend interface { PutShard(ctx context.Context, shard *CarShard, brefs []map[string]any, rmcids map[cid.Cid]bool) error GetAllShards(ctx context.Context, usr models.Uid) ([]CarShard, error) WipeUserData(ctx context.Context, user models.Uid) error - deleteShards(ctx context.Context, shs []*CarShard) error + DeleteShards(ctx context.Context, shs []*CarShard) error GetCompactionTargets(ctx context.Context, shardCount int) ([]CompactionTarget, error) GetBlockRefsForShards(ctx context.Context, shardIds []uint) ([]blockRef, error) DeleteStaleRefs(ctx context.Context, uid models.Uid, brefs []blockRef, staleRefs []staleRef, removedShards map[uint]bool) error @@ -1285,7 +1285,7 @@ func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid, skip } stats.ShardsDeleted += len(todelete) - if err := cs.backend.deleteShards(ctx, todelete); err != nil { + if err := cs.backend.DeleteShards(ctx, todelete); err != nil { return nil, fmt.Errorf("deleting shards: %w", err) } } diff --git a/carstore/pgbackend.go b/carstore/pgbackend.go index 211b11d7d..e8e1cc85a 100644 --- a/carstore/pgbackend.go +++ b/carstore/pgbackend.go @@ -222,7 +222,7 @@ func (pgb *PgBackend) WipeUserData(ctx context.Context, user models.Uid) error { return err } - if err := pgb.deleteShards(ctx, shards); err != nil { + if err := pgb.DeleteShards(ctx, shards); err != nil { if !os.IsNotExist(err) { return err } @@ -231,7 +231,7 @@ func (pgb *PgBackend) WipeUserData(ctx context.Context, user models.Uid) error { return nil } -func (pgb *PgBackend) deleteShards(ctx context.Context, shs []*CarShard) error { +func (pgb *PgBackend) DeleteShards(ctx context.Context, shs []*CarShard) error { ctx, span := otel.Tracer("carstore").Start(ctx, "deleteShards") defer span.End()