Skip to content

Commit

Permalink
Change service GetIndex / AddIndex to return channel instead of array (
Browse files Browse the repository at this point in the history
…#1444)

* feat: yugabyte db impl

* feat: run yugabyte tests against a dockerized yugabyte

* fix: use out own yugabyte docker image

* fix: use yugabyte 2.17.2.0 docker image

* feat: piece doctor yugabyte impl

* fix: go mod tidy

* refactor: remove SetCarSize as its not longer being used

* refactor: remove functionality to mark index as errored (not being used)

* feat: implement delete commands

* refactor: consolidate test params

* feat: add lid yugabyte config

* fix: port map yugabyte postgres to standard port

* Fix yugabyte CI (#1433)

* fix: yugabyte tests in CI

* docker-compose.yml ; Dockerfile.test ; connect to `yugabyte` and not localhost

* add tag

* test lid

* make gen

* fixup

* move couchbase settings under build tag

---------

Co-authored-by: Anton Evangelatov <anton.evangelatov@gmail.com>

* wip: service GetIndex returns channel of records instead of array

* feat: return channel from AddIndex and GetIndex

---------

Co-authored-by: Anton Evangelatov <anton.evangelatov@gmail.com>
  • Loading branch information
2 people authored and LexLuthr committed Jul 20, 2023
1 parent eec2b97 commit 92a9606
Show file tree
Hide file tree
Showing 16 changed files with 385 additions and 212 deletions.
19 changes: 15 additions & 4 deletions cmd/migrate-lid/couch-to-yuga.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"github.com/filecoin-project/boostd-data/couchbase"
"github.com/filecoin-project/boostd-data/model"
"github.com/filecoin-project/boostd-data/yugabyte"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/ipfs/go-cid"
Expand Down Expand Up @@ -211,16 +212,26 @@ func migrateLidToLidIndex(ctx context.Context, pieceCid cid.Cid, source StoreMig
}

// Load the index from the source store
records, err := source.GetIndex(ctx, pieceCid)
idx, err := source.GetIndex(ctx, pieceCid)
if err != nil {
return false, fmt.Errorf("loading index %s: %w", pieceCid, err)
}

var records []model.Record
for r := range idx {
if r.Error != nil {
return false, r.Error
}
records = append(records, r.Record)
}

// Add the index to the destination store
addStart := time.Now()
err = dest.AddIndex(ctx, pieceCid, records, true)
if err != nil {
return false, fmt.Errorf("adding index %s to store: %w", pieceCid, err)
respch := dest.AddIndex(ctx, pieceCid, records, true)
for resp := range respch {
if resp.Err != "" {
return false, fmt.Errorf("adding index %s to store: %s", pieceCid, err)
}
}
log.Debugw("AddIndex", "took", time.Since(addStart).String())

Expand Down
13 changes: 8 additions & 5 deletions cmd/migrate-lid/migrate_lid.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/filecoin-project/boostd-data/ldb"
"github.com/filecoin-project/boostd-data/model"
"github.com/filecoin-project/boostd-data/svc"
"github.com/filecoin-project/boostd-data/svc/types"
"github.com/filecoin-project/go-address"
vfsm "github.com/filecoin-project/go-ds-versioning/pkg/fsm"
"github.com/filecoin-project/go-fil-markets/piecestore"
Expand Down Expand Up @@ -43,8 +44,8 @@ import (
type StoreMigrationApi interface {
Start(ctx context.Context) error
IsIndexed(ctx context.Context, pieceCid cid.Cid) (bool, error)
GetIndex(context.Context, cid.Cid) ([]model.Record, error)
AddIndex(ctx context.Context, pieceCid cid.Cid, records []model.Record, isCompleteIndex bool) error
GetIndex(context.Context, cid.Cid) (<-chan types.IndexRecord, error)
AddIndex(ctx context.Context, pieceCid cid.Cid, records []model.Record, isCompleteIndex bool) <-chan types.AddIndexProgress
AddDealForPiece(ctx context.Context, pcid cid.Cid, info model.DealInfo) error
ListPieces(ctx context.Context) ([]cid.Cid, error)
GetPieceMetadata(ctx context.Context, pieceCid cid.Cid) (model.Metadata, error)
Expand Down Expand Up @@ -333,9 +334,11 @@ func migrateIndex(ctx context.Context, ipath idxPath, store StoreMigrationApi, f

// Add the index to the store
addStart := time.Now()
err = store.AddIndex(ctx, pieceCid, records, false)
if err != nil {
return false, fmt.Errorf("adding index %s to store: %w", ipath.path, err)
respch := store.AddIndex(ctx, pieceCid, records, false)
for resp := range respch {
if resp.Err != "" {
return false, fmt.Errorf("adding index %s to store: %s", ipath.path, err)
}
}
log.Debugw("AddIndex", "took", time.Since(addStart).String())

Expand Down
33 changes: 25 additions & 8 deletions extern/boostd-data/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/filecoin-project/boostd-data/model"
"github.com/filecoin-project/boostd-data/svc/types"
"github.com/filecoin-project/go-jsonrpc"
"github.com/ipfs/go-cid"
logger "github.com/ipfs/go-log/v2"
Expand All @@ -18,10 +19,10 @@ var log = logger.Logger("boostd-data-client")
type Store struct {
client struct {
AddDealForPiece func(context.Context, cid.Cid, model.DealInfo) error
AddIndex func(context.Context, cid.Cid, []model.Record, bool) error
AddIndex func(context.Context, cid.Cid, []model.Record, bool) <-chan types.AddIndexProgress
IsIndexed func(ctx context.Context, pieceCid cid.Cid) (bool, error)
IsCompleteIndex func(ctx context.Context, pieceCid cid.Cid) (bool, error)
GetIndex func(context.Context, cid.Cid) ([]model.Record, error)
GetIndex func(context.Context, cid.Cid) (<-chan types.IndexRecord, error)
GetOffsetSize func(context.Context, cid.Cid, mh.Multihash) (*model.OffsetSize, error)
ListPieces func(ctx context.Context) ([]cid.Cid, error)
GetPieceMetadata func(ctx context.Context, pieceCid cid.Cid) (model.Metadata, error)
Expand All @@ -37,16 +38,17 @@ type Store struct {
FlaggedPiecesList func(ctx context.Context, cursor *time.Time, offset int, limit int) ([]model.FlaggedPiece, error)
FlaggedPiecesCount func(ctx context.Context) (int, error)
}
closer jsonrpc.ClientCloser
closer jsonrpc.ClientCloser
dialOpts []jsonrpc.Option
}

func NewStore() *Store {
return &Store{}
func NewStore(dialOpts ...jsonrpc.Option) *Store {
return &Store{dialOpts: dialOpts}
}

func (s *Store) Dial(ctx context.Context, addr string) error {
var err error
s.closer, err = jsonrpc.NewClient(ctx, addr, "boostddata", &s.client, nil)
s.closer, err = jsonrpc.NewMergeClient(ctx, addr, "boostddata", []interface{}{&s.client}, nil, s.dialOpts...)
if err != nil {
return fmt.Errorf("dialing local index directory server: %w", err)
}
Expand Down Expand Up @@ -93,7 +95,15 @@ func (s *Store) GetRecords(ctx context.Context, pieceCid cid.Cid) ([]model.Recor

log.Debugw("get-records", "piece-cid", pieceCid, "records", len(resp))

return resp, nil
var records []model.Record
for r := range resp {
if r.Error != nil {
return nil, r.Error
}
records = append(records, r.Record)
}

return records, nil
}

func (s *Store) GetPieceMetadata(ctx context.Context, pieceCid cid.Cid) (model.Metadata, error) {
Expand All @@ -115,7 +125,14 @@ func (s *Store) AddDealForPiece(ctx context.Context, pieceCid cid.Cid, dealInfo
func (s *Store) AddIndex(ctx context.Context, pieceCid cid.Cid, records []model.Record, isCompleteIndex bool) error {
log.Debugw("add-index", "piece-cid", pieceCid, "records", len(records))

return s.client.AddIndex(ctx, pieceCid, records, isCompleteIndex)
respch := s.client.AddIndex(ctx, pieceCid, records, isCompleteIndex)
for resp := range respch {
if resp.Err != "" {
return fmt.Errorf("add index with piece cid %s: %s", pieceCid, resp.Err)
}
//fmt.Printf("%s: Percent complete: %f%%\n", time.Now(), resp.Progress*100)
}
return nil
}

func (s *Store) IsIndexed(ctx context.Context, pieceCid cid.Cid) (bool, error) {
Expand Down
172 changes: 66 additions & 106 deletions extern/boostd-data/ldb/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func (s *Store) PiecesContainingMultihash(ctx context.Context, m mh.Multihash) (
return pcs, normalizeMultihashError(m, err)
}

func (s *Store) GetIndex(ctx context.Context, pieceCid cid.Cid) ([]model.Record, error) {
func (s *Store) GetIndex(ctx context.Context, pieceCid cid.Cid) (<-chan types.IndexRecord, error) {
log.Warnw("handle.get-index", "pieceCid", pieceCid)

ctx, span := tracing.Tracer.Start(ctx, "store.get_index")
Expand Down Expand Up @@ -262,7 +262,7 @@ func (s *Store) IsCompleteIndex(ctx context.Context, pieceCid cid.Cid) (bool, er
return md.CompleteIndex, nil
}

func (s *Store) AddIndex(ctx context.Context, pieceCid cid.Cid, records []model.Record, isCompleteIndex bool) error {
func (s *Store) AddIndex(ctx context.Context, pieceCid cid.Cid, records []model.Record, isCompleteIndex bool) <-chan types.AddIndexProgress {
log.Debugw("handle.add-index", "records", len(records))

ctx, span := tracing.Tracer.Start(ctx, "store.add_index")
Expand All @@ -272,124 +272,84 @@ func (s *Store) AddIndex(ctx context.Context, pieceCid cid.Cid, records []model.
log.Debugw("handled.add-index", "took", time.Since(now).String())
}(time.Now())

s.Lock()
defer s.Unlock()

var recs []carindex.Record
for _, r := range records {
recs = append(recs, carindex.Record{
Cid: r.Cid,
Offset: r.Offset,
})
}

return md.IndexedAt, nil
}

func (s *Store) PiecesCount(ctx context.Context) (int, error) {
log.Debugw("handle.pieces-count")

ctx, span := tracing.Tracer.Start(ctx, "store.pieces_count")
defer span.End()

defer func(now time.Time) {
log.Debugw("handled.pieces-count", "took", time.Since(now).String())
}(time.Now())

return s.db.PiecesCount(ctx)
}
progress := make(chan types.AddIndexProgress, 1)
go func() {
defer close(progress)

func (s *Store) ListPieces(ctx context.Context) ([]cid.Cid, error) {
log.Debugw("handle.list-pieces")
s.Lock()
defer s.Unlock()

ctx, span := tracing.Tracer.Start(ctx, "store.list_pieces")
defer span.End()

defer func(now time.Time) {
log.Debugw("handled.list-pieces", "took", time.Since(now).String())
}(time.Now())

return s.db.ListPieces(ctx)
}

func (s *Store) NextPiecesToCheck(ctx context.Context) ([]cid.Cid, error) {
ctx, span := tracing.Tracer.Start(ctx, "store.next_pieces_to_check")
defer span.End()

defer func(now time.Time) {
log.Debugw("handled.next-pieces-to-check", "took", time.Since(now).String())
}(time.Now())

return s.db.NextPiecesToCheck(ctx)
}

func (s *Store) FlagPiece(ctx context.Context, pieceCid cid.Cid, hasUnsealedCopy bool) error {
log.Debugw("handle.flag-piece", "piece-cid", pieceCid, "hasUnsealedCopy", hasUnsealedCopy)

ctx, span := tracing.Tracer.Start(ctx, "store.flag_piece")
defer span.End()
var recs []carindex.Record
for _, r := range records {
recs = append(recs, carindex.Record{
Cid: r.Cid,
Offset: r.Offset,
})
}

defer func(now time.Time) {
log.Debugw("handled.flag-piece", "took", time.Since(now).String())
}(time.Now())
err := s.db.SetMultihashesToPieceCid(ctx, recs, pieceCid)
if err != nil {
progress <- types.AddIndexProgress{Err: err.Error()}
return
}
progress <- types.AddIndexProgress{Progress: 0.45}

s.Lock()
defer s.Unlock()
// get and set next cursor (handle synchronization, maybe with CAS)
cursor, keyCursorPrefix, err := s.db.NextCursor(ctx)
if err != nil {
progress <- types.AddIndexProgress{Err: err.Error()}
return
}

now := time.Now()
// allocate metadata for pieceCid
err = s.db.SetNextCursor(ctx, cursor+1)
if err != nil {
progress <- types.AddIndexProgress{Err: err.Error()}
return
}

// Get the existing deals for the piece
fm, err := s.db.GetPieceCidToFlagged(ctx, pieceCid)
if err != nil {
return fmt.Errorf("failed to add entry from mh to pieceCid: %w", err)
}
// process index and store entries
for _, rec := range records {
err := s.db.AddIndexRecord(ctx, keyCursorPrefix, rec)
if err != nil {
progress <- types.AddIndexProgress{Err: err.Error()}
return
}
}
progress <- types.AddIndexProgress{Progress: 0.9}

// get and set next cursor (handle synchronization, maybe with CAS)
cursor, keyCursorPrefix, err := s.db.NextCursor(ctx)
if err != nil {
return fmt.Errorf("couldnt generate next cursor: %w", err)
}
// get the metadata for the piece
md, err := s.db.GetPieceCidToMetadata(ctx, pieceCid)
if err != nil {
if !errors.Is(err, ds.ErrNotFound) {
progress <- types.AddIndexProgress{Err: err.Error()}
return
}
// there isn't yet any metadata, so create new metadata
md = newLeveldbMetadata()
}

// allocate metadata for pieceCid
err = s.db.SetNextCursor(ctx, cursor+1)
if err != nil {
return err
}
// mark indexing as complete
md.Cursor = cursor
md.IndexedAt = time.Now()
md.CompleteIndex = isCompleteIndex

// process index and store entries
for _, rec := range records {
err := s.db.AddIndexRecord(ctx, keyCursorPrefix, rec)
err = s.db.SetPieceCidToMetadata(ctx, pieceCid, md)
if err != nil {
return err
progress <- types.AddIndexProgress{Err: err.Error()}
return
}
}
progress <- types.AddIndexProgress{Progress: 0.95}

// get the metadata for the piece
md, err := s.db.GetPieceCidToMetadata(ctx, pieceCid)
if err != nil {
if !errors.Is(err, ds.ErrNotFound) {
return fmt.Errorf("getting piece cid metadata for piece %s: %w", pieceCid, err)
err = s.db.Sync(ctx, ds.NewKey(fmt.Sprintf("%d", cursor)))
if err != nil {
progress <- types.AddIndexProgress{Err: err.Error()}
return
}
// there isn't yet any metadata, so create new metadata
md = newLeveldbMetadata()
}

// mark indexing as complete
md.Cursor = cursor
md.IndexedAt = time.Now()
md.CompleteIndex = isCompleteIndex
progress <- types.AddIndexProgress{Progress: 1}
}()

err = s.db.SetPieceCidToMetadata(ctx, pieceCid, md)
if err != nil {
return err
}

err = s.db.Sync(ctx, ds.NewKey(fmt.Sprintf("%d", cursor)))
if err != nil {
return err
}

return nil
return progress
}

func (s *Store) IndexedAt(ctx context.Context, pieceCid cid.Cid) (time.Time, error) {
Expand Down
6 changes: 3 additions & 3 deletions extern/boostd-data/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,16 @@ type Metadata struct {

// Record is the information stored in the index for each block in a piece
type Record struct {
Cid cid.Cid
Cid cid.Cid `json:"c"`
OffsetSize
}

type OffsetSize struct {
// Offset is the offset into the CAR file of the section, where a section
// is <section size><cid><block data>
Offset uint64
Offset uint64 `json:"o"`
// Size is the size of the block data (not the whole section)
Size uint64
Size uint64 `json:"s"`
}

func (ofsz *OffsetSize) MarshallBase64() string {
Expand Down
Loading

0 comments on commit 92a9606

Please sign in to comment.