Skip to content

Commit

Permalink
fix: speed up ydb insert (#1793)
Browse files Browse the repository at this point in the history
* speed up ydb indexing by executing inserts in batches instead of concurrently
* add insert batch size database setting that is default to 10k
  • Loading branch information
ischasny authored Nov 1, 2023
1 parent 9eb0db3 commit 350adad
Showing 1 changed file with 45 additions and 61 deletions.
106 changes: 45 additions & 61 deletions extern/boostd-data/yugabyte/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/filecoin-project/go-address"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"github.com/multiformats/go-multihash"
mh "github.com/multiformats/go-multihash"
"github.com/yugabyte/gocql"
"github.com/yugabyte/pgx/v4/pgxpool"
Expand All @@ -33,6 +32,10 @@ const defaultKeyspace = "idx"

const CqlTimeout = 60

// The Cassandra driver has a 50k limit on batch statements. Keeping
// batch size small makes sure we're under the limit.
const InsertBatchSize = 10000

type DBSettings struct {
// The cassandra hosts to connect to
Hosts []string
Expand All @@ -42,6 +45,8 @@ type DBSettings struct {
PayloadPiecesParallelism int
// CQL timeout in seconds
CQLTimeout int
// Number of records per insert batch
InsertBatchSize int
}

type StoreOpt func(*Store)
Expand Down Expand Up @@ -69,6 +74,9 @@ func NewStore(settings DBSettings, migrator *Migrator, opts ...StoreOpt) *Store
if settings.PayloadPiecesParallelism == 0 {
settings.PayloadPiecesParallelism = 16
}
if settings.InsertBatchSize == 0 {
settings.InsertBatchSize = InsertBatchSize
}

cluster := gocql.NewCluster(settings.Hosts...)
cluster.Timeout = time.Duration(settings.CQLTimeout) * time.Second
Expand Down Expand Up @@ -368,7 +376,7 @@ func (s *Store) GetIndex(ctx context.Context, pieceCid cid.Cid) (<-chan types.In
}

// Parse the multihash bytes
_, pmh, err := multihash.MHFromBytes(payloadMHBz)
_, pmh, err := mh.MHFromBytes(payloadMHBz)
if err != nil {
records <- types.IndexRecord{Error: err}
return
Expand Down Expand Up @@ -526,54 +534,63 @@ func (s *Store) addMultihashesToPieces(ctx context.Context, pieceCid cid.Cid, re
ctx, span := tracing.Tracer.Start(ctx, "store.add_index.payloadpiece")
defer span.End()

var count float64
return s.execParallel(ctx, recs, s.settings.PayloadPiecesParallelism, func(rec model.Record) error {
multihashBytes := rec.Cid.Hash()
q := `INSERT INTO PayloadToPieces (PayloadMultihash, PieceCid) VALUES (?, ?)`
err := s.session.Query(q, trimMultihash(multihashBytes), pieceCid.Bytes()).Exec()
if err != nil {
return fmt.Errorf("inserting into PayloadToPieces: %w", err)
insertPieceOffsetsQry := `INSERT INTO PayloadToPieces (PayloadMultihash, PieceCid) VALUES (?, ?)`
pieceCidBytes := pieceCid.Bytes()

var batch *gocql.Batch
for allIdx, rec := range recs {
if batch == nil {
batch = s.session.NewBatch(gocql.UnloggedBatch).WithContext(ctx)
batch.Entries = make([]gocql.BatchEntry, 0, s.settings.InsertBatchSize)
}

count++
progress(count / float64(len(recs)))
return nil
})
batch.Entries = append(batch.Entries, gocql.BatchEntry{
Stmt: insertPieceOffsetsQry,
Args: []interface{}{trimMultihash(rec.Cid.Hash()), pieceCidBytes},
Idempotent: true,
})

if allIdx == len(recs)-1 || len(batch.Entries) == s.settings.InsertBatchSize {
err := s.session.ExecuteBatch(batch)
if err != nil {
return fmt.Errorf("inserting into PayloadToPieces: %w", err)
}
batch = nil

progress(float64(allIdx+1) / float64(len(recs)))
}
}
return nil
}

func (s *Store) addPieceInfos(ctx context.Context, pieceCid cid.Cid, recs []model.Record, progress func(addProgress float64)) error {
ctx, span := tracing.Tracer.Start(ctx, "store.add_index.pieceinfo")
defer span.End()

batchEntries := make([]gocql.BatchEntry, 0, len(recs))
insertPieceOffsetsQry := `INSERT INTO PieceBlockOffsetSize (PieceCid, PayloadMultihash, BlockOffset, BlockSize) VALUES (?, ?, ?, ?)`
for _, rec := range recs {
batchEntries = append(batchEntries, gocql.BatchEntry{
Stmt: insertPieceOffsetsQry,
Args: []interface{}{pieceCid.Bytes(), rec.Cid.Hash(), rec.Offset, rec.Size},
Idempotent: true,
})
}
pieceCidBytes := pieceCid.Bytes()

// The Cassandra driver has a 50k limit on batch statements. Keeping
// batch size small makes sure we're under the limit.
const batchSize = 5000
var batch *gocql.Batch
for allIdx, entry := range batchEntries {
for allIdx, rec := range recs {
if batch == nil {
batch = s.session.NewBatch(gocql.UnloggedBatch).WithContext(ctx)
batch.Entries = make([]gocql.BatchEntry, 0, s.settings.InsertBatchSize)
}

batch.Entries = append(batch.Entries, entry)
batch.Entries = append(batch.Entries, gocql.BatchEntry{
Stmt: insertPieceOffsetsQry,
Args: []any{pieceCidBytes, rec.Cid.Hash(), rec.Offset, rec.Size},
Idempotent: true,
})

if allIdx == len(batchEntries)-1 || len(batch.Entries) == batchSize {
if allIdx == len(recs)-1 || len(batch.Entries) == s.settings.InsertBatchSize {
err := s.session.ExecuteBatch(batch)
if err != nil {
return fmt.Errorf("executing offset / size batch insert for piece %s: %w", pieceCid, err)
}
batch = nil

progress((float64(allIdx+1) / float64(len(batchEntries))))
progress(float64(allIdx+1) / float64(len(recs)))
}
}

Expand Down Expand Up @@ -824,36 +841,3 @@ func (s *Store) RemoveIndexes(ctx context.Context, pieceCid cid.Cid) error {
failureMetrics = false
return nil
}

func (s *Store) execParallel(ctx context.Context, recs []model.Record, parallelism int, f func(record model.Record) error) error {
queue := make(chan model.Record, len(recs))
for _, rec := range recs {
queue <- rec
}
close(queue)

var eg errgroup.Group
for i := 0; i < parallelism; i++ {
eg.Go(func() error {
for ctx.Err() == nil {
select {
case <-ctx.Done():
return ctx.Err()
case rec, ok := <-queue:
if !ok {
// Finished adding all the queued items, exit the thread
return nil
}

err := f(rec)
if err != nil {
return err
}
}
}

return ctx.Err()
})
}
return eg.Wait()
}

0 comments on commit 350adad

Please sign in to comment.