diff --git a/extern/boostd-data/yugabyte/service.go b/extern/boostd-data/yugabyte/service.go index 4ffb33bd5..5a90a98a0 100644 --- a/extern/boostd-data/yugabyte/service.go +++ b/extern/boostd-data/yugabyte/service.go @@ -13,7 +13,6 @@ import ( "github.com/filecoin-project/go-address" "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" - "github.com/multiformats/go-multihash" mh "github.com/multiformats/go-multihash" "github.com/yugabyte/gocql" "github.com/yugabyte/pgx/v4/pgxpool" @@ -30,6 +29,10 @@ const defaultKeyspace = "idx" const CqlTimeout = 60 +// The Cassandra driver has a 50k limit on batch statements. Keeping +// batch size small makes sure we're under the limit. +const InsertBatchSize = 10000 + type DBSettings struct { // The cassandra hosts to connect to Hosts []string @@ -39,6 +42,8 @@ type DBSettings struct { PayloadPiecesParallelism int // CQL timeout in seconds CQLTimeout int + // Number of records per insert batch + InsertBatchSize int } type StoreOpt func(*Store) @@ -65,6 +70,9 @@ func NewStore(settings DBSettings, migrator *Migrator, opts ...StoreOpt) *Store if settings.PayloadPiecesParallelism == 0 { settings.PayloadPiecesParallelism = 16 } + if settings.InsertBatchSize == 0 { + settings.InsertBatchSize = InsertBatchSize + } cluster := gocql.NewCluster(settings.Hosts...) cluster.Timeout = time.Duration(settings.CQLTimeout) * time.Second @@ -290,7 +298,7 @@ func (s *Store) GetIndex(ctx context.Context, pieceCid cid.Cid) (<-chan types.In } // Parse the multihash bytes - _, pmh, err := multihash.MHFromBytes(payloadMHBz) + _, pmh, err := mh.MHFromBytes(payloadMHBz) if err != nil { records <- types.IndexRecord{Error: err} return @@ -442,54 +450,63 @@ func (s *Store) addMultihashesToPieces(ctx context.Context, pieceCid cid.Cid, re ctx, span := tracing.Tracer.Start(ctx, "store.add_index.payloadpiece") defer span.End() - var count float64 - return s.execParallel(ctx, recs, s.settings.PayloadPiecesParallelism, func(rec model.Record) error { - multihashBytes := rec.Cid.Hash() - q := `INSERT INTO PayloadToPieces (PayloadMultihash, PieceCid) VALUES (?, ?)` - err := s.session.Query(q, trimMultihash(multihashBytes), pieceCid.Bytes()).Exec() - if err != nil { - return fmt.Errorf("inserting into PayloadToPieces: %w", err) + insertPieceOffsetsQry := `INSERT INTO PayloadToPieces (PayloadMultihash, PieceCid) VALUES (?, ?)` + pieceCidBytes := pieceCid.Bytes() + + var batch *gocql.Batch + for allIdx, rec := range recs { + if batch == nil { + batch = s.session.NewBatch(gocql.UnloggedBatch).WithContext(ctx) + batch.Entries = make([]gocql.BatchEntry, 0, s.settings.InsertBatchSize) } - count++ - progress(count / float64(len(recs))) - return nil - }) + batch.Entries = append(batch.Entries, gocql.BatchEntry{ + Stmt: insertPieceOffsetsQry, + Args: []interface{}{trimMultihash(rec.Cid.Hash()), pieceCidBytes}, + Idempotent: true, + }) + + if allIdx == len(recs)-1 || len(batch.Entries) == s.settings.InsertBatchSize { + err := s.session.ExecuteBatch(batch) + if err != nil { + return fmt.Errorf("inserting into PayloadToPieces: %w", err) + } + batch = nil + + progress(float64(allIdx+1) / float64(len(recs))) + } + } + return nil } func (s *Store) addPieceInfos(ctx context.Context, pieceCid cid.Cid, recs []model.Record, progress func(addProgress float64)) error { ctx, span := tracing.Tracer.Start(ctx, "store.add_index.pieceinfo") defer span.End() - batchEntries := make([]gocql.BatchEntry, 0, len(recs)) insertPieceOffsetsQry := `INSERT INTO PieceBlockOffsetSize (PieceCid, PayloadMultihash, BlockOffset, BlockSize) VALUES (?, ?, ?, ?)` - for _, rec := range recs { - batchEntries = append(batchEntries, gocql.BatchEntry{ - Stmt: insertPieceOffsetsQry, - Args: []interface{}{pieceCid.Bytes(), rec.Cid.Hash(), rec.Offset, rec.Size}, - Idempotent: true, - }) - } + pieceCidBytes := pieceCid.Bytes() - // The Cassandra driver has a 50k limit on batch statements. Keeping - // batch size small makes sure we're under the limit. - const batchSize = 5000 var batch *gocql.Batch - for allIdx, entry := range batchEntries { + for allIdx, rec := range recs { if batch == nil { batch = s.session.NewBatch(gocql.UnloggedBatch).WithContext(ctx) + batch.Entries = make([]gocql.BatchEntry, 0, s.settings.InsertBatchSize) } - batch.Entries = append(batch.Entries, entry) + batch.Entries = append(batch.Entries, gocql.BatchEntry{ + Stmt: insertPieceOffsetsQry, + Args: []any{pieceCidBytes, rec.Cid.Hash(), rec.Offset, rec.Size}, + Idempotent: true, + }) - if allIdx == len(batchEntries)-1 || len(batch.Entries) == batchSize { + if allIdx == len(recs)-1 || len(batch.Entries) == s.settings.InsertBatchSize { err := s.session.ExecuteBatch(batch) if err != nil { return fmt.Errorf("executing offset / size batch insert for piece %s: %w", pieceCid, err) } batch = nil - progress((float64(allIdx+1) / float64(len(batchEntries)))) + progress(float64(allIdx+1) / float64(len(recs))) } } @@ -655,36 +672,3 @@ func (s *Store) RemoveIndexes(ctx context.Context, pieceCid cid.Cid) error { return nil } - -func (s *Store) execParallel(ctx context.Context, recs []model.Record, parallelism int, f func(record model.Record) error) error { - queue := make(chan model.Record, len(recs)) - for _, rec := range recs { - queue <- rec - } - close(queue) - - var eg errgroup.Group - for i := 0; i < parallelism; i++ { - eg.Go(func() error { - for ctx.Err() == nil { - select { - case <-ctx.Done(): - return ctx.Err() - case rec, ok := <-queue: - if !ok { - // Finished adding all the queued items, exit the thread - return nil - } - - err := f(rec) - if err != nil { - return err - } - } - } - - return ctx.Err() - }) - } - return eg.Wait() -}