diff --git a/.circleci/config.yml b/.circleci/config.yml index 288f21043..fd795db5f 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -340,7 +340,5 @@ workflows: suite: all target: "`go list ./... | grep -v boost/itests`" - - test: - name: local index directory - suite: all - cwd: "./extern/boostd-data" + - lid-docker-compose + diff --git a/api/api.go b/api/api.go index 5877f52be..56e71b603 100644 --- a/api/api.go +++ b/api/api.go @@ -55,8 +55,7 @@ type Boost interface { BlockstoreGetSize(ctx context.Context, c cid.Cid) (int, error) //perm:read // MethodGroup: PieceDirectory - PdBuildIndexForPieceCid(ctx context.Context, piececid cid.Cid) error //perm:admin - PdMarkIndexErrored(ctx context.Context, piececid cid.Cid, err string) error //perm:admin + PdBuildIndexForPieceCid(ctx context.Context, piececid cid.Cid) error //perm:admin // RuntimeSubsystems returns the subsystems that are enabled // in this instance. diff --git a/api/proxy_gen.go b/api/proxy_gen.go index 8ff0a22ed..c50f44d42 100644 --- a/api/proxy_gen.go +++ b/api/proxy_gen.go @@ -130,8 +130,6 @@ type BoostStruct struct { PdBuildIndexForPieceCid func(p0 context.Context, p1 cid.Cid) error `perm:"admin"` - PdMarkIndexErrored func(p0 context.Context, p1 cid.Cid, p2 string) error `perm:"admin"` - RuntimeSubsystems func(p0 context.Context) (lapi.MinerSubsystems, error) `perm:"read"` SectorsRefs func(p0 context.Context) (map[string][]lapi.SealedRef, error) `perm:"read"` @@ -780,17 +778,6 @@ func (s *BoostStub) PdBuildIndexForPieceCid(p0 context.Context, p1 cid.Cid) erro return ErrNotSupported } -func (s *BoostStruct) PdMarkIndexErrored(p0 context.Context, p1 cid.Cid, p2 string) error { - if s.Internal.PdMarkIndexErrored == nil { - return ErrNotSupported - } - return s.Internal.PdMarkIndexErrored(p0, p1, p2) -} - -func (s *BoostStub) PdMarkIndexErrored(p0 context.Context, p1 cid.Cid, p2 string) error { - return ErrNotSupported -} - func (s *BoostStruct) RuntimeSubsystems(p0 context.Context) (lapi.MinerSubsystems, error) { if s.Internal.RuntimeSubsystems == nil { return *new(lapi.MinerSubsystems), ErrNotSupported diff --git a/build/openrpc/boost.json.gz b/build/openrpc/boost.json.gz index be25749c3..966bf3abb 100644 Binary files a/build/openrpc/boost.json.gz and b/build/openrpc/boost.json.gz differ diff --git a/cmd/boostd/piecedir.go b/cmd/boostd/piecedir.go index da8f04628..54436b0aa 100644 --- a/cmd/boostd/piecedir.go +++ b/cmd/boostd/piecedir.go @@ -16,7 +16,6 @@ var pieceDirCmd = &cli.Command{ Usage: "Manage Local Index Directory", Subcommands: []*cli.Command{ pdIndexGenerate, - pdIndexMarkErroredCmd, }, } @@ -56,45 +55,3 @@ var pdIndexGenerate = &cli.Command{ return nil }, } - -var pdIndexMarkErroredCmd = &cli.Command{ - Name: "mark-index", - Usage: "Mark an index errored for a given piece in the local index directory", - Flags: []cli.Flag{ - &cli.StringFlag{ - Name: "piece-cid", - Usage: "piece-cid of the index that will be marked as errored", - Required: true, - }, - &cli.StringFlag{ - Name: "error", - Usage: "error message", - Required: true, - }, - }, - Action: func(cctx *cli.Context) error { - ctx := lcli.ReqContext(cctx) - - // parse piececid - piececid, err := cid.Decode(cctx.String("piece-cid")) - if err != nil { - return err - } - - boostApi, ncloser, err := bcli.GetBoostAPI(cctx) - if err != nil { - return fmt.Errorf("getting boost api: %w", err) - } - defer ncloser() - - errMsg := cctx.String("error") - err = boostApi.PdMarkIndexErrored(ctx, piececid, errMsg) - if err != nil { - return err - } - - fmt.Printf("Marked %s as errored with \"%s\"\n", piececid, errMsg) - - return nil - }, -} diff --git a/documentation/en/api-v1-methods.md b/documentation/en/api-v1-methods.md index 1e680cadf..9bb6d0910 100644 --- a/documentation/en/api-v1-methods.md +++ b/documentation/en/api-v1-methods.md @@ -87,7 +87,6 @@ * [OnlineBackup](#onlinebackup) * [Pd](#pd) * [PdBuildIndexForPieceCid](#pdbuildindexforpiececid) - * [PdMarkIndexErrored](#pdmarkindexerrored) * [Runtime](#runtime) * [RuntimeSubsystems](#runtimesubsystems) * [Sectors](#sectors) @@ -1791,23 +1790,6 @@ Inputs: Response: `{}` -### PdMarkIndexErrored - - -Perms: admin - -Inputs: -```json -[ - { - "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" - }, - "string value" -] -``` - -Response: `{}` - ## Runtime diff --git a/extern/boostd-data/client/client.go b/extern/boostd-data/client/client.go index c91a39068..d221f7aeb 100644 --- a/extern/boostd-data/client/client.go +++ b/extern/boostd-data/client/client.go @@ -26,8 +26,6 @@ type Store struct { ListPieces func(ctx context.Context) ([]cid.Cid, error) GetPieceMetadata func(ctx context.Context, pieceCid cid.Cid) (model.Metadata, error) GetPieceDeals func(context.Context, cid.Cid) ([]model.DealInfo, error) - SetCarSize func(ctx context.Context, pieceCid cid.Cid, size uint64) error - MarkIndexErrored func(context.Context, cid.Cid, string) error IndexedAt func(context.Context, cid.Cid) (time.Time, error) PiecesContainingMultihash func(context.Context, mh.Multihash) ([]cid.Cid, error) RemoveDealForPiece func(context.Context, cid.Cid, string) error @@ -110,18 +108,10 @@ func (s *Store) PiecesContainingMultihash(ctx context.Context, m mh.Multihash) ( return s.client.PiecesContainingMultihash(ctx, m) } -func (s *Store) MarkIndexErrored(ctx context.Context, pieceCid cid.Cid, err string) error { - return s.client.MarkIndexErrored(ctx, pieceCid, err) -} - func (s *Store) AddDealForPiece(ctx context.Context, pieceCid cid.Cid, dealInfo model.DealInfo) error { return s.client.AddDealForPiece(ctx, pieceCid, dealInfo) } -func (s *Store) SetCarSize(ctx context.Context, pieceCid cid.Cid, size uint64) error { - return s.client.SetCarSize(ctx, pieceCid, size) -} - func (s *Store) AddIndex(ctx context.Context, pieceCid cid.Cid, records []model.Record, isCompleteIndex bool) error { log.Debugw("add-index", "piece-cid", pieceCid, "records", len(records)) diff --git a/extern/boostd-data/cmd/run.go b/extern/boostd-data/cmd/run.go index 109d804d3..a41bb35cd 100644 --- a/extern/boostd-data/cmd/run.go +++ b/extern/boostd-data/cmd/run.go @@ -9,6 +9,7 @@ import ( "github.com/filecoin-project/boostd-data/shared/cliutil" "github.com/filecoin-project/boostd-data/shared/tracing" "github.com/filecoin-project/boostd-data/svc" + "github.com/filecoin-project/boostd-data/yugabyte" "github.com/mitchellh/go-homedir" "github.com/urfave/cli/v2" ) @@ -18,6 +19,7 @@ var runCmd = &cli.Command{ Subcommands: []*cli.Command{ leveldbCmd, couchbaseCmd, + yugabyteCmd, }, } @@ -106,6 +108,35 @@ var couchbaseCmd = &cli.Command{ }, } +var yugabyteCmd = &cli.Command{ + Name: "yugabyte", + Usage: "Run boostd-data with a yugabyte database", + Before: before, + Flags: append([]cli.Flag{ + &cli.StringSliceFlag{ + Name: "hosts", + Usage: "yugabyte hosts to connect to over cassandra interface eg '127.0.0.1'", + Required: true, + }, + &cli.StringFlag{ + Name: "connect-string", + Usage: "postgres connect string eg 'postgresql://postgres:postgres@localhost'", + Required: true, + }}, + runFlags..., + ), + Action: func(cctx *cli.Context) error { + // Create a yugabyte data service + settings := yugabyte.DBSettings{ + Hosts: cctx.StringSlice("hosts"), + ConnectString: cctx.String("connect-string"), + } + + bdsvc := svc.NewYugabyte(settings) + return runAction(cctx, "yugabyte", bdsvc) + }, +} + func runAction(cctx *cli.Context, dbType string, store *svc.Service) error { ctx := cliutil.ReqContext(cctx) diff --git a/extern/boostd-data/couchbase/db.go b/extern/boostd-data/couchbase/db.go index c624b5df7..bc80f118a 100644 --- a/extern/boostd-data/couchbase/db.go +++ b/extern/boostd-data/couchbase/db.go @@ -337,42 +337,6 @@ func (db *DB) setPieceCidsForMultihash(ctx context.Context, mh multihash.Multiha return nil } -func (db *DB) SetCarSize(ctx context.Context, pieceCid cid.Cid, size uint64) error { - ctx, span := tracing.Tracer.Start(ctx, "db.set_car_size") - defer span.End() - - return db.mutatePieceMetadata(ctx, pieceCid, "set-car-size", func(metadata CouchbaseMetadata) *CouchbaseMetadata { - // Set the car size on each deal (should be the same for all deals) - var deals []model.DealInfo - for _, dl := range metadata.Deals { - dl.CarLength = size - - deals = append(deals, dl) - } - metadata.Deals = deals - return &metadata - }) -} - -func (db *DB) MarkIndexErrored(ctx context.Context, pieceCid cid.Cid, idxErr error) error { - ctx, span := tracing.Tracer.Start(ctx, "db.mark_piece_index_errored") - defer span.End() - - return db.mutatePieceMetadata(ctx, pieceCid, "mark-index-errored", func(metadata CouchbaseMetadata) *CouchbaseMetadata { - // If the error was already set, don't overwrite it - if metadata.Error != "" { - // If the error state has already been set, don't over-write the existing error - return nil - } - - // Set the error state - metadata.Error = idxErr.Error() - metadata.ErrorType = fmt.Sprintf("%T", idxErr) - - return &metadata - }) -} - func (db *DB) MarkIndexingComplete(ctx context.Context, pieceCid cid.Cid, blockCount int, isCompleteIndex bool) error { ctx, span := tracing.Tracer.Start(ctx, "db.mark_indexing_complete") defer span.End() @@ -382,8 +346,6 @@ func (db *DB) MarkIndexingComplete(ctx context.Context, pieceCid cid.Cid, blockC metadata.IndexedAt = time.Now() metadata.CompleteIndex = isCompleteIndex metadata.BlockCount = blockCount - metadata.Error = "" - metadata.ErrorType = "" if metadata.Deals == nil { metadata.Deals = []model.DealInfo{} } diff --git a/extern/boostd-data/go.mod b/extern/boostd-data/go.mod index 26b8a5996..c538f2c12 100644 --- a/extern/boostd-data/go.mod +++ b/extern/boostd-data/go.mod @@ -18,6 +18,7 @@ require ( github.com/ipfs/go-ds-leveldb v0.5.0 github.com/ipfs/go-log/v2 v2.5.1 github.com/ipld/go-car/v2 v2.4.2-0.20220707083113-89de8134e58e + github.com/jackc/pgtype v1.10.0 github.com/mitchellh/go-homedir v1.1.0 github.com/multiformats/go-multicodec v0.6.0 github.com/multiformats/go-multihash v0.2.1 @@ -53,6 +54,7 @@ require ( github.com/golang/protobuf v1.5.2 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/gorilla/websocket v1.5.0 // indirect + github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d // indirect github.com/ipfs/bbloom v0.0.4 // indirect github.com/ipfs/go-block-format v0.0.3 // indirect @@ -77,6 +79,13 @@ require ( github.com/ipfs/go-verifcid v0.0.1 // indirect github.com/ipld/go-codec-dagpb v1.3.2 // indirect github.com/ipld/go-ipld-prime v0.18.0 // indirect + github.com/jackc/chunkreader/v2 v2.0.1 // indirect + github.com/jackc/pgconn v1.11.0 // indirect + github.com/jackc/pgio v1.0.0 // indirect + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgproto3/v2 v2.2.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect + github.com/jackc/puddle v1.2.1 // indirect github.com/jbenet/go-random v0.0.0-20190219211222-123a90aedc0c // indirect github.com/jbenet/goprocess v0.1.4 // indirect github.com/klauspost/cpuid/v2 v2.1.1 // indirect @@ -117,10 +126,12 @@ require ( golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e // indirect golang.org/x/exp v0.0.0-20220916125017-b168a2c6b86b // indirect golang.org/x/sys v0.0.0-20220919091848-fb04ddd9f9c8 // indirect + golang.org/x/text v0.3.7 // indirect golang.org/x/xerrors v0.0.0-20220411194840-2f41105eb62f // indirect google.golang.org/genproto v0.0.0-20220502173005-c8bf987b8c21 // indirect google.golang.org/grpc v1.47.0 // indirect google.golang.org/protobuf v1.28.1 // indirect + gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect gotest.tools/v3 v3.4.0 // indirect lukechampine.com/blake3 v1.1.7 // indirect diff --git a/extern/boostd-data/go.sum b/extern/boostd-data/go.sum index 9b92c31ec..9cfb051cc 100644 --- a/extern/boostd-data/go.sum +++ b/extern/boostd-data/go.sum @@ -67,6 +67,7 @@ github.com/Kubuxu/go-os-helper v0.0.1/go.mod h1:N8B+I7vPCT80IcP58r50u4+gEEcsZETF github.com/Kubuxu/imtui v0.0.0-20210401140320-41663d68d0fa/go.mod h1:WUmMvh9wMtqj1Xhf1hf3kp9RvL+y6odtdYxpyZjb90U= github.com/Masterminds/glide v0.13.2/go.mod h1:STyF5vcenH/rUqTEv+/hBXlSTo7KYwg2oc2f4tzPWic= github.com/Masterminds/semver v1.4.2/go.mod h1:MB6lktGJrhw8PrUyiEoblNEGEQ+RzHPF078ddwwvV3Y= +github.com/Masterminds/semver/v3 v3.1.1/go.mod h1:VPu/7SZ7ePZ3QOrcuXROw5FAcLl4a0cBrbBpGY/8hQs= github.com/Masterminds/vcs v1.13.0/go.mod h1:N09YCmOQr6RLxC6UNHzuVwAdodYbbnycGHSmwVJjcKA= github.com/Microsoft/go-winio v0.5.1/go.mod h1:JPGBdM1cNvN/6ISo+n8V5iA4v8pBzdOpzfwIujj1a84= github.com/Microsoft/go-winio v0.5.2 h1:a9IhgEQBCUEk6QCdml9CiJGhAws+YwffDHEMp1VMrpA= @@ -129,9 +130,13 @@ github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+Ce github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bep/debounce v1.2.0/go.mod h1:H8yggRPQKLUhUoqrJC1bO2xNya7vanpDl7xR3ISbCJ0= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= +github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932 h1:mXoPYz/Ul5HYEDvkta6I8/rnYM5gSdSV2tJ6XbZuEtY= +github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932/go.mod h1:NOuUCSz6Q9T7+igc/hlvDOUdtWKryOrtFyIVABv/p7k= github.com/bits-and-blooms/bitset v1.2.0/go.mod h1:gIdJ4wp64HaoK2YrL1Q5/N7Y16edYb8uY+O0FJTyyDA= github.com/bkaradzic/go-lz4 v1.0.0/go.mod h1:0YdlkowM3VswSROI7qDxhRvJ3sLhlFrRRwjwegp5jy4= github.com/blang/semver/v4 v4.0.0/go.mod h1:IbckMUScFkM3pff0VJDNKRiT6TG/YpiHIM2yvyW5YoQ= +github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY= +github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= github.com/bradfitz/go-smtpd v0.0.0-20170404230938-deb6d6237625/go.mod h1:HYsPBTaaSFSlLx/70C2HPIMNZpVV8+vt/A+FMnYP11g= github.com/briandowns/spinner v1.11.1/go.mod h1:QOuQk7x+EaDASo80FEXwlwiA+j/PPIcX3FScO+3/ZPQ= github.com/btcsuite/btcd v0.0.0-20190213025234-306aecffea32/go.mod h1:DrZx5ec/dmnfpw9KyYoQyYo7d0KEvTkk/5M/vbZjAr8= @@ -218,6 +223,7 @@ github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3Ee github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/go-systemd v0.0.0-20181012123002-c6f51f82210d/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= +github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/go-systemd/v22 v22.0.0/go.mod h1:xO0FLkIi5MaZafQlIrOotqXZ90ih+1atmu1JpKERPPk= github.com/coreos/go-systemd/v22 v22.1.0/go.mod h1:xO0FLkIi5MaZafQlIrOotqXZ90ih+1atmu1JpKERPPk= @@ -488,6 +494,8 @@ github.com/godbus/dbus v0.0.0-20190402143921-271e53dc4968/go.mod h1:/YcGZj5zSblf github.com/godbus/dbus/v5 v5.0.3/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/godbus/dbus/v5 v5.1.0/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= +github.com/gofrs/uuid v4.0.0+incompatible h1:1SD/1F5pU8p29ybwgQSwpQk+mwdRrXCYuPhW6m+TnJw= +github.com/gofrs/uuid v4.0.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= github.com/gogo/googleapis v0.0.0-20180223154316-0cd9801be74a/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s= github.com/gogo/googleapis v1.1.0/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s= github.com/gogo/googleapis v1.4.0/go.mod h1:5YRNX2z1oM5gXdAkurHa942MDgEJyk02w4OecKY87+c= @@ -624,6 +632,8 @@ github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645/go github.com/gxed/go-shellwords v1.0.3/go.mod h1:N7paucT91ByIjmVJHhvoarjoQnmsi3Jd3vH7VqgtMxQ= github.com/gxed/hashland/keccakpg v0.0.1/go.mod h1:kRzw3HkwxFU1mpmPP8v1WyQzwdGfmKFJ6tItnhQ67kU= github.com/gxed/hashland/murmur3 v0.0.1/go.mod h1:KjXop02n4/ckmZSnY2+HKcLud/tcmvhST0bie/0lS48= +github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed h1:5upAirOpQc1Q53c0bnx2ufif5kANL7bfZWcc6VJWJd8= +github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed/go.mod h1:tMWxXQ9wFIaZeTI9F+hmhFiGpFmhOHzyShyFUhRm0H4= github.com/hako/durafmt v0.0.0-20200710122514-c0fb7b4da026/go.mod h1:5Scbynm8dF1XAPwIwkGPqzkM/shndPm79Jd1003hTjE= github.com/hannahhoward/cbor-gen-for v0.0.0-20200817222906-ea96cece81f1/go.mod h1:jvfsLIxk0fY/2BKSQ1xf2406AKA5dwMmKKv0ADcOfN8= github.com/hannahhoward/go-pubsub v0.0.0-20200423002714-8d62886cc36e/go.mod h1:I8h3MITA53gN9OnWGCgaMa0JWVRdXthWw4M3CPM54OY= @@ -1087,6 +1097,10 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/lib/pq v0.0.0-20180327071824-d34b9ff171c2/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= +github.com/lib/pq v1.1.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= +github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= +github.com/lib/pq v1.10.2/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= +github.com/lib/pq v1.10.4 h1:SO9z7FRPzA03QhHKJrH5BXA6HU1rS4V2nIVrrNC1iYk= github.com/lib/pq v1.10.4/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/libp2p/go-addr-util v0.0.1/go.mod h1:4ac6O7n9rIAKB1dnd+s8IbbMXkt+oBpzX4/+RACcnlQ= github.com/libp2p/go-addr-util v0.0.2/go.mod h1:Ecd6Fb3yIuLzq4bD7VcywcVSBtefcAwnUISBM3WG15E= @@ -1529,6 +1543,7 @@ github.com/mattn/go-colorable v0.1.9/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= github.com/mattn/go-isatty v0.0.5/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= +github.com/mattn/go-isatty v0.0.7/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= github.com/mattn/go-isatty v0.0.10/go.mod h1:qgIWMr58cqv1PHHyhnkY9lrL7etaEgOFcMEpPG5Rm84= github.com/mattn/go-isatty v0.0.11/go.mod h1:PhnuNfih5lzO57/f3n+odYbM4JtupLOxQOAqxQCu2WE= @@ -1885,6 +1900,9 @@ github.com/sercand/kuberesolver v2.1.0+incompatible/go.mod h1:lWF3GL0xptCB/vCiJP github.com/sercand/kuberesolver v2.4.0+incompatible/go.mod h1:lWF3GL0xptCB/vCiJPl/ZshwPsX/n4Y7u0CW9E7aQIQ= github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= github.com/shirou/gopsutil v2.18.12+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= +github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4= +github.com/shopspring/decimal v1.2.0 h1:abSATXmQEYyShuxI4/vyW3tV1MrKAJzCZ/0zLUXYbsQ= +github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= github.com/shurcooL/component v0.0.0-20170202220835-f88ec8f54cc4/go.mod h1:XhFIlyj5a1fBNx5aJTbKoIq0mNaPvOagO+HjB3EtxrY= github.com/shurcooL/events v0.0.0-20181021180414-410e4ca65f48/go.mod h1:5u70Mqkb5O5cxEA8nxTsgrgLehJeAw6Oc4Ab1c/P1HM= github.com/shurcooL/github_flavored_markdown v0.0.0-20181002035957-2122de532470/go.mod h1:2dOwnU2uBioM+SGy2aZoq1f/Sd1l9OkAeAUvjSyvgU0= @@ -1954,6 +1972,7 @@ github.com/streadway/handy v0.0.0-20190108123426-d5acb3125c2a/go.mod h1:qNTQ5P5J github.com/streadway/handy v0.0.0-20200128134331-0f66f006fb2e/go.mod h1:qNTQ5P5JnDBl6z3cMAg/SywNDC5ABu5ApDIw6lUbRmI= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= github.com/stretchr/objx v0.3.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= @@ -2065,12 +2084,17 @@ github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1: github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 h1:bAn7/zixMGCfxrRTfdpNzjtPYqr8smhKouy9mxVdGPU= github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673/go.mod h1:N3UwUGtsrSj3ccvlPHLoLsHnpR27oXr4ZE984MbSER8= github.com/ybbus/jsonrpc/v2 v2.1.6/go.mod h1:rIuG1+ORoiqocf9xs/v+ecaAVeo3zcZHQgInyKFMeg0= +github.com/yugabyte/gocql v0.0.0-20221110041640-6fc475c5aeb0 h1:68nrJsrWe0A6JiKnsjWChAaWhj20v+AwYJObtp86D1k= +github.com/yugabyte/gocql v0.0.0-20221110041640-6fc475c5aeb0/go.mod h1:LAokR6+vevDCrTxk52U7p6ki+4qELu4XU7JUGYa2O2M= +github.com/yugabyte/pgx/v4 v4.14.5 h1:XLzEEiO3d/kWzpyctO8l4kwMLhzPQ9n2er7ATH7CJVA= +github.com/yugabyte/pgx/v4 v4.14.5/go.mod h1:nFSvjsVq4CuA61TWGriwWG74ZVxeuJCcNu42Mkn+rgw= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= +github.com/zenazn/goji v0.9.0/go.mod h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxtB1Q= github.com/ziutek/mymysql v1.5.4/go.mod h1:LMSpPZ6DbqWFxNCHW77HeMg9I646SAhApZ/wKdgO/C0= github.com/zondax/hid v0.9.0/go.mod h1:l5wttcP0jwtdLjqjMMWFVEE7d1zO0jvSPA9OPZxWpEM= github.com/zondax/ledger-go v0.12.1/go.mod h1:KatxXrVDzgWwbssUWsF5+cOJHXPvzQ09YSlzGNuhOEo= @@ -2196,6 +2220,7 @@ golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20190611184440-5c40567a22f8/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190618222545-ea8f1a30c443/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190923035154-9ee001bba392/go.mod h1:/lpIB1dKB+9EgE3H3cr1v9wB50oz8l4C4h62xy7jSTY= golang.org/x/crypto v0.0.0-20190927123631-a832865fa7ad/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= @@ -2212,6 +2237,7 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20201203163018-be400aefbc4c/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= @@ -2411,6 +2437,7 @@ golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190801041406-cbf593c0f2f3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190902133755-9109b7679e13/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -2533,6 +2560,7 @@ golang.org/x/tools v0.0.0-20190312151545-0bb0c0a6e846/go.mod h1:LCzVGOaR6xXOjkQ3 golang.org/x/tools v0.0.0-20190312170243-e65039ee4138/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/tools v0.0.0-20190425163242-31fd60d6bfdc/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= golang.org/x/tools v0.0.0-20190506145303-2d16b83fe98c/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= golang.org/x/tools v0.0.0-20190606124116-d0a3d012864b/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= @@ -2540,6 +2568,7 @@ golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgw golang.org/x/tools v0.0.0-20190624222133-a101b041ded4/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= golang.org/x/tools v0.0.0-20190628153133-6cdbf07be9d0/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= golang.org/x/tools v0.0.0-20190816200558-6889da9d5479/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20190823170909-c4a336ef6a2f/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20190907020128-2ca718005c18/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20190911174233-4f2ddba30aff/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20190927191325-030b2cf1153e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= @@ -2592,6 +2621,8 @@ golang.org/x/tools v0.1.9/go.mod h1:nABZi5QlRsZVlzPpHl034qft6wpY4eDcsTt5AaioBiU= golang.org/x/tools v0.1.10/go.mod h1:Uh6Zz+xoGYZom868N8YTex3t7RhtHDBrE8Gzo9bV56E= golang.org/x/tools v0.1.11/go.mod h1:SgwaegtQh8clINPpECJMqnxLv9I09HLqnW3RMqW0CA4= golang.org/x/tools v0.1.12 h1:VveCTK38A2rkS8ZqFY25HIDFscX5X9OoEhJd3quQmXU= +golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/extern/boostd-data/ldb/db.go b/extern/boostd-data/ldb/db.go index 5489f5bce..2ca08ad41 100644 --- a/extern/boostd-data/ldb/db.go +++ b/extern/boostd-data/ldb/db.go @@ -290,43 +290,6 @@ func (db *DB) GetPieceCidToMetadata(ctx context.Context, pieceCid cid.Cid) (Leve return metadata, nil } -func (db *DB) SetCarSize(ctx context.Context, pieceCid cid.Cid, size uint64) error { - ctx, span := tracing.Tracer.Start(ctx, "db.set_car_size") - defer span.End() - - md, err := db.GetPieceCidToMetadata(ctx, pieceCid) - if err != nil { - return fmt.Errorf("getting piece metadata for piece %s: %w", pieceCid, err) - } - - // Set the car size on each deal (should be the same for all deals) - for _, dl := range md.Deals { - dl.CarLength = size - } - - return db.SetPieceCidToMetadata(ctx, pieceCid, md) -} - -func (db *DB) MarkIndexErrored(ctx context.Context, pieceCid cid.Cid, sourceErr error) error { - ctx, span := tracing.Tracer.Start(ctx, "db.mark_piece_index_errored") - defer span.End() - - md, err := db.GetPieceCidToMetadata(ctx, pieceCid) - if err != nil { - return fmt.Errorf("getting piece metadata for piece %s: %w", pieceCid, err) - } - - if md.Error != "" { - // If the error state has already been set, don't over-write the existing error - return nil - } - - md.Error = sourceErr.Error() - md.ErrorType = fmt.Sprintf("%T", sourceErr) - - return db.SetPieceCidToMetadata(ctx, pieceCid, md) -} - // AllRecords func (db *DB) AllRecords(ctx context.Context, cursor uint64) ([]model.Record, error) { ctx, span := tracing.Tracer.Start(ctx, "db.all_records") diff --git a/extern/boostd-data/ldb/service.go b/extern/boostd-data/ldb/service.go index eac757539..5a0e7226c 100644 --- a/extern/boostd-data/ldb/service.go +++ b/extern/boostd-data/ldb/service.go @@ -121,44 +121,6 @@ func (s *Store) AddDealForPiece(ctx context.Context, pieceCid cid.Cid, dealInfo return nil } -func (s *Store) SetCarSize(ctx context.Context, pieceCid cid.Cid, size uint64) error { - log.Debugw("handle.set-car-size", "piece-cid", pieceCid, "size", size) - - ctx, span := tracing.Tracer.Start(ctx, "store.set-car-size") - defer span.End() - - defer func(now time.Time) { - log.Debugw("handled.set-car-size", "took", time.Since(now).String()) - }(time.Now()) - - s.Lock() - defer s.Unlock() - - err := s.db.SetCarSize(ctx, pieceCid, size) - return normalizePieceCidError(pieceCid, err) -} - -func (s *Store) MarkIndexErrored(ctx context.Context, pieceCid cid.Cid, idxErr string) error { - log.Debugw("handle.mark-piece-index-errored", "piece-cid", pieceCid, "err", idxErr) - - ctx, span := tracing.Tracer.Start(ctx, "store.mark-piece-index-errored") - defer span.End() - - defer func(now time.Time) { - log.Debugw("handled.mark-piece-index-errored", "took", time.Since(now).String()) - }(time.Now()) - - s.Lock() - defer s.Unlock() - - err := s.db.MarkIndexErrored(ctx, pieceCid, errors.New(idxErr)) - if err != nil { - return normalizePieceCidError(pieceCid, err) - } - - return s.FlagPiece(ctx, pieceCid) -} - func (s *Store) GetOffsetSize(ctx context.Context, pieceCid cid.Cid, hash mh.Multihash) (*model.OffsetSize, error) { log.Debugw("handle.get-offset-size", "piece-cid", pieceCid) @@ -416,8 +378,6 @@ func (s *Store) FlagPiece(ctx context.Context, pieceCid cid.Cid, hasUnsealedCopy md.Cursor = cursor md.IndexedAt = time.Now() md.CompleteIndex = isCompleteIndex - md.Error = "" - md.ErrorType = "" err = s.db.SetPieceCidToMetadata(ctx, pieceCid, md) if err != nil { diff --git a/extern/boostd-data/model/model.go b/extern/boostd-data/model/model.go index c84d3d1e7..7ab0004a2 100644 --- a/extern/boostd-data/model/model.go +++ b/extern/boostd-data/model/model.go @@ -39,11 +39,10 @@ type Metadata struct { IndexedAt time.Time `json:"i"` // CompleteIndex indicates whether the index has all information or is // missing block size information. Note that indexes imported from the - // dagstore do not have block size information. + // dagstore do not have block size information (they only have block + // offsets). CompleteIndex bool `json:"c"` Deals []DealInfo `json:"d"` - Error string `json:"e"` - ErrorType string `json:"t"` } // Record is the information stored in the index for each block in a piece diff --git a/extern/boostd-data/svc/setup_yugabyte_test_util.go b/extern/boostd-data/svc/setup_yugabyte_test_util.go index 6934a0390..ff7f5e61f 100644 --- a/extern/boostd-data/svc/setup_yugabyte_test_util.go +++ b/extern/boostd-data/svc/setup_yugabyte_test_util.go @@ -1,30 +1,21 @@ package svc import ( + "testing" + "time" + "github.com/filecoin-project/boostd-data/yugabyte" - logging "github.com/ipfs/go-log/v2" "github.com/stretchr/testify/require" "github.com/yugabyte/gocql" "github.com/yugabyte/pgx/v4/pgxpool" "golang.org/x/net/context" - "testing" - "time" ) -var tlog = logging.Logger("ybtest") - var TestYugabyteSettings = yugabyte.DBSettings{ Hosts: []string{"yugabyte"}, ConnectString: "postgresql://postgres:postgres@yugabyte:5433", } -// Use when testing against a local yugabyte instance. -// Warning: This will delete all tables in the local yugabyte instance. -var TestYugabyteSettingsLocal = yugabyte.DBSettings{ - Hosts: []string{"localhost"}, - ConnectString: "postgresql://postgres:postgres@localhost:5433", -} - func SetupYugabyte(t *testing.T) { ctx := context.Background() diff --git a/extern/boostd-data/svc/svc.go b/extern/boostd-data/svc/svc.go index 56ba36871..bf20b009e 100644 --- a/extern/boostd-data/svc/svc.go +++ b/extern/boostd-data/svc/svc.go @@ -12,6 +12,7 @@ import ( "github.com/filecoin-project/boostd-data/couchbase" "github.com/filecoin-project/boostd-data/ldb" "github.com/filecoin-project/boostd-data/svc/types" + "github.com/filecoin-project/boostd-data/yugabyte" "github.com/filecoin-project/go-jsonrpc" "github.com/gorilla/mux" logging "github.com/ipfs/go-log/v2" @@ -22,11 +23,15 @@ var ( ) type Service struct { - impl types.ServiceImpl + Impl types.ServiceImpl +} + +func NewYugabyte(settings yugabyte.DBSettings) *Service { + return &Service{Impl: yugabyte.NewStore(settings)} } func NewCouchbase(settings couchbase.DBSettings) *Service { - return &Service{impl: couchbase.NewStore(settings)} + return &Service{Impl: couchbase.NewStore(settings)} } func NewLevelDB(repoPath string) (*Service, error) { @@ -38,7 +43,7 @@ func NewLevelDB(repoPath string) (*Service, error) { } } - return &Service{impl: ldb.NewStore(repoPath)}, nil + return &Service{Impl: ldb.NewStore(repoPath)}, nil } func MakeLevelDBDir(repoPath string) (string, error) { @@ -55,13 +60,13 @@ func (s *Service) Start(ctx context.Context, addr string) error { return fmt.Errorf("setting up listener for local index directory service: %w", err) } - err = s.impl.Start(ctx) + err = s.Impl.Start(ctx) if err != nil { return fmt.Errorf("starting local index directory service: %w", err) } server := jsonrpc.NewServer() - server.Register("boostddata", s.impl) + server.Register("boostddata", s.Impl) router := mux.NewRouter() router.Handle("/", server) diff --git a/extern/boostd-data/svc/svc_test.go b/extern/boostd-data/svc/svc_test.go index 91459e7ad..1d6099e4f 100644 --- a/extern/boostd-data/svc/svc_test.go +++ b/extern/boostd-data/svc/svc_test.go @@ -1,3 +1,6 @@ +//go:build test_lid +// +build test_lid + package svc import ( @@ -9,8 +12,6 @@ import ( "testing" "time" - "golang.org/x/sync/errgroup" - "github.com/filecoin-project/boost/testutil" "github.com/filecoin-project/boostd-data/client" "github.com/filecoin-project/boostd-data/couchbase" @@ -24,6 +25,7 @@ import ( "github.com/multiformats/go-multicodec" "github.com/multiformats/go-multihash" "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" ) var testCouchSettings = couchbase.DBSettings{ @@ -52,8 +54,10 @@ func TestService(t *testing.T) { defer cancel() bdsvc, err := NewLevelDB("") require.NoError(t, err) + testService(ctx, t, bdsvc, "localhost:8042") }) + t.Run("couchbase", func(t *testing.T) { // TODO: Unskip this test once the couchbase instance can be created // from a docker container in CI as part of the test @@ -64,7 +68,23 @@ func TestService(t *testing.T) { defer cancel() SetupCouchbase(t, testCouchSettings) bdsvc := NewCouchbase(testCouchSettings) - testService(ctx, t, bdsvc, "localhost:8043") + + addr := "localhost:8043" + testService(ctx, t, bdsvc, addr) + }) + + t.Run("yugabyte", func(t *testing.T) { + // Running yugabyte tests may require download the docker container + // so set a high timeout + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) + defer cancel() + + SetupYugabyte(t) + + bdsvc := NewYugabyte(TestYugabyteSettings) + + addr := "localhost:8044" + testService(ctx, t, bdsvc, addr) }) } @@ -116,6 +136,24 @@ func testService(ctx context.Context, t *testing.T, bdsvc *Service, addr string) require.Len(t, dis, 1) require.Equal(t, di, dis[0]) + // Add a second deal + di2 := model.DealInfo{ + DealUuid: uuid.NewString(), + SectorID: abi.SectorNumber(11), + PieceOffset: 11, + PieceLength: 12, + CarLength: 13, + } + err = cl.AddDealForPiece(ctx, pieceCid, di2) + require.NoError(t, err) + + // There should now be two deals + dis, err = cl.GetPieceDeals(ctx, pieceCid) + require.NoError(t, err) + require.Len(t, dis, 2) + require.Contains(t, dis, di) + require.Contains(t, dis, di2) + b, err := hex.DecodeString("1220ff63d7689e2d9567d1a90a7a68425f430137142e1fbc28fe4780b9ee8a5ef842") require.NoError(t, err) @@ -162,24 +200,39 @@ func TestServiceFuzz(t *testing.T) { t.Run("level db", func(t *testing.T) { bdsvc, err := NewLevelDB("") require.NoError(t, err) - testServiceFuzz(ctx, t, bdsvc, "localhost:8042") + addr := "localhost:8042" + err = bdsvc.Start(ctx, addr) + require.NoError(t, err) + testServiceFuzz(ctx, t, addr) }) + t.Run("couchbase", func(t *testing.T) { // TODO: Unskip this test once the couchbase instance can be created // from a docker container in CI as part of the test t.Skip() SetupCouchbase(t, testCouchSettings) bdsvc := NewCouchbase(testCouchSettings) - testServiceFuzz(ctx, t, bdsvc, "localhost:8043") + addr := "localhost:8043" + err := bdsvc.Start(ctx, addr) + require.NoError(t, err) + testServiceFuzz(ctx, t, addr) }) -} -func testServiceFuzz(ctx context.Context, t *testing.T, bdsvc *Service, addr string) { - err := bdsvc.Start(ctx, addr) - require.NoError(t, err) + t.Run("yugabyte", func(t *testing.T) { + SetupYugabyte(t) + bdsvc := NewYugabyte(TestYugabyteSettings) + + addr := "localhost:8044" + err := bdsvc.Start(ctx, addr) + require.NoError(t, err) + + testServiceFuzz(ctx, t, addr) + }) +} +func testServiceFuzz(ctx context.Context, t *testing.T, addr string) { cl := client.NewStore() - err = cl.Dial(context.Background(), "http://localhost:8042") + err := cl.Dial(context.Background(), "http://"+addr) require.NoError(t, err) defer cl.Close(ctx) @@ -381,6 +434,7 @@ func TestCleanup(t *testing.T) { require.NoError(t, err) testCleanup(ctx, t, bdsvc, "localhost:8042") }) + t.Run("couchbase", func(t *testing.T) { // TODO: Unskip this test once the couchbase instance can be created // from a docker container in CI as part of the test @@ -389,6 +443,18 @@ func TestCleanup(t *testing.T) { bdsvc := NewCouchbase(testCouchSettings) testCleanup(ctx, t, bdsvc, "localhost:8043") }) + + t.Run("yugabyte", func(t *testing.T) { + // Running yugabyte tests may require download the docker container + // so set a high timeout + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) + defer cancel() + + SetupYugabyte(t) + + bdsvc := NewYugabyte(TestYugabyteSettings) + testCleanup(ctx, t, bdsvc, "localhost:8044") + }) } func testCleanup(ctx context.Context, t *testing.T, bdsvc *Service, addr string) { @@ -411,29 +477,41 @@ func testCleanup(ctx context.Context, t *testing.T, bdsvc *Service, addr string) records, err := getRecords(subject) require.NoError(t, err) - randomuuid, err := uuid.Parse("4d8f5ce6-dbfd-40dc-8b03-29308e97357b") - require.NoError(t, err) - err = cl.AddIndex(ctx, pieceCid, records, true) require.NoError(t, err) di := model.DealInfo{ - DealUuid: randomuuid.String(), + DealUuid: uuid.NewString(), SectorID: abi.SectorNumber(1), PieceOffset: 1, PieceLength: 2, CarLength: 3, } + di2 := model.DealInfo{ + DealUuid: uuid.NewString(), + SectorID: abi.SectorNumber(10), + PieceOffset: 11, + PieceLength: 12, + CarLength: 13, + } - // Add a deal for the piece + // Add two deals for the piece err = cl.AddDealForPiece(ctx, pieceCid, di) require.NoError(t, err) + err = cl.AddDealForPiece(ctx, pieceCid, di2) + require.NoError(t, err) - // There should only be one deal dis, err := cl.GetPieceDeals(ctx, pieceCid) require.NoError(t, err) + require.Len(t, dis, 2) + + // Remove one deal for the piece + err = cl.RemoveDealForPiece(ctx, pieceCid, di.DealUuid) + require.NoError(t, err) + + dis, err = cl.GetPieceDeals(ctx, pieceCid) + require.NoError(t, err) require.Len(t, dis, 1) - require.Equal(t, di, dis[0]) b, err := hex.DecodeString("1220ff63d7689e2d9567d1a90a7a68425f430137142e1fbc28fe4780b9ee8a5ef842") require.NoError(t, err) @@ -444,7 +522,6 @@ func testCleanup(ctx context.Context, t *testing.T, bdsvc *Service, addr string) offset, err := cl.GetOffsetSize(ctx, pieceCid, mhash) require.NoError(t, err) require.EqualValues(t, 3039040395, offset.Offset) - require.EqualValues(t, 0, offset.Size) pcids, err := cl.PiecesContainingMultihash(ctx, mhash) require.NoError(t, err) @@ -455,18 +532,10 @@ func testCleanup(ctx context.Context, t *testing.T, bdsvc *Service, addr string) require.NoError(t, err) require.True(t, indexed) - recs, err := cl.GetRecords(ctx, pieceCid) - require.NoError(t, err) - require.Equal(t, len(records), len(recs)) - - loadedSubject, err := cl.GetIndex(ctx, pieceCid) - require.NoError(t, err) - - ok, err := compareIndices(subject, loadedSubject) - require.NoError(t, err) - require.True(t, ok) - - err = cl.RemoveDealForPiece(ctx, pieceCid, di.DealUuid) + // Remove the other deal for the piece. + // After this call there are no deals left, so it should also cause the + // piece metadata and indexes to be removed. + err = cl.RemoveDealForPiece(ctx, pieceCid, di2.DealUuid) require.NoError(t, err) _, err = cl.GetPieceDeals(ctx, pieceCid) diff --git a/extern/boostd-data/svc/types/types.go b/extern/boostd-data/svc/types/types.go index 631cf27d7..76106df32 100644 --- a/extern/boostd-data/svc/types/types.go +++ b/extern/boostd-data/svc/types/types.go @@ -18,6 +18,9 @@ var ErrNotFound = errors.New("not found") // We have to do string matching so that it can be used on errors that // cross the RPC boundary (we can't use errors.Is) func IsNotFound(err error) bool { + if err == nil { + return false + } return strings.Contains(err.Error(), ErrNotFound.Error()) } @@ -31,7 +34,6 @@ type Service interface { ListPieces(ctx context.Context) ([]cid.Cid, error) GetPieceMetadata(ctx context.Context, pieceCid cid.Cid) (model.Metadata, error) GetPieceDeals(context.Context, cid.Cid) ([]model.DealInfo, error) - SetCarSize(ctx context.Context, pieceCid cid.Cid, size uint64) error IndexedAt(context.Context, cid.Cid) (time.Time, error) PiecesContainingMultihash(context.Context, mh.Multihash) ([]cid.Cid, error) RemoveDealForPiece(context.Context, cid.Cid, string) error diff --git a/extern/boostd-data/yugabyte/create.cql b/extern/boostd-data/yugabyte/create.cql index fba47dbe4..0e7c01e48 100644 --- a/extern/boostd-data/yugabyte/create.cql +++ b/extern/boostd-data/yugabyte/create.cql @@ -41,6 +41,4 @@ CREATE TABLE IF NOT EXISTS idx.PieceDeal ( PieceOffset BIGINT, PieceLength BIGINT, CarLength BIGINT -) WITH transactions = { 'enabled' : true }; - -CREATE INDEX IF NOT EXISTS PieceDealPieceCid ON idx.PieceDeal (PieceCid); +); diff --git a/extern/boostd-data/yugabyte/create.sql b/extern/boostd-data/yugabyte/create.sql index 8ef72d60f..d296143af 100644 --- a/extern/boostd-data/yugabyte/create.sql +++ b/extern/boostd-data/yugabyte/create.sql @@ -10,8 +10,7 @@ CREATE INDEX IF NOT EXISTS PieceTrackerUpdatedAt ON PieceTracker (UpdatedAt); CREATE TABLE IF NOT EXISTS PieceFlagged ( PieceCid TEXT PRIMARY KEY, CreatedAt TIMESTAMP, - UpdatedAt TIMESTAMP, - HasUnsealedCopy BOOLEAN + UpdatedAt TIMESTAMP ); CREATE INDEX IF NOT EXISTS PieceFlaggedCreatedAt ON PieceFlagged (CreatedAt); diff --git a/extern/boostd-data/yugabyte/piecedoctor.go b/extern/boostd-data/yugabyte/piecedoctor.go index 02d054b25..93a6a9699 100644 --- a/extern/boostd-data/yugabyte/piecedoctor.go +++ b/extern/boostd-data/yugabyte/piecedoctor.go @@ -3,15 +3,13 @@ package yugabyte import ( "context" "fmt" - "time" - "github.com/filecoin-project/boostd-data/model" "github.com/filecoin-project/boostd-data/shared/tracing" - "github.com/filecoin-project/boostd-data/svc/types" "github.com/ipfs/go-cid" "github.com/jackc/pgtype" "go.opentelemetry.io/otel/attribute" "golang.org/x/sync/errgroup" + "time" ) var TrackerCheckBatchSize = 1024 @@ -167,7 +165,7 @@ func (s *Store) execWithConcurrency(ctx context.Context, pcids []pieceCreated, c } // The minimum frequency with which to check pieces for errors (eg bad index) -var MinPieceCheckPeriod = 5 * time.Minute +var MinPieceCheckPeriod = 30 * time.Second // Work out how frequently to check each piece, based on how many pieces // there are: if there are many pieces, each piece will be checked @@ -180,10 +178,10 @@ func (s *Store) getPieceCheckPeriod(ctx context.Context) (time.Duration, error) } // Check period: - // - 1k pieces; every 100s (5 minutes because of MinPieceCheckPeriod) - // - 100k pieces; every 150m - // - 1m pieces; every 20 hours - period := time.Duration(count*100) * time.Millisecond + // - 1k pieces; every 10s + // - 100k pieces; every 15m + // - 1m pieces; every 2 hours + period := time.Duration(count*10) * time.Millisecond if period < MinPieceCheckPeriod { period = MinPieceCheckPeriod } @@ -191,16 +189,16 @@ func (s *Store) getPieceCheckPeriod(ctx context.Context) (time.Duration, error) return period, nil } -func (s *Store) FlagPiece(ctx context.Context, pieceCid cid.Cid, hasUnsealedCopy bool) error { +func (s *Store) FlagPiece(ctx context.Context, pieceCid cid.Cid) error { ctx, span := tracing.Tracer.Start(ctx, "store.flag_piece") span.SetAttributes(attribute.String("pieceCid", pieceCid.String())) defer span.End() now := time.Now() - qry := `INSERT INTO PieceFlagged (PieceCid, CreatedAt, UpdatedAt, HasUnsealedCopy) ` + - `VALUES ($1, $2, $3, $4) ` + + qry := `INSERT INTO PieceFlagged (PieceCid, CreatedAt, UpdatedAt) ` + + `VALUES ($1, $2, $3) ` + `ON CONFLICT (PieceCid) DO UPDATE SET UpdatedAt = excluded.UpdatedAt` - _, err := s.db.Exec(ctx, qry, pieceCid.String(), now, now, hasUnsealedCopy) + _, err := s.db.Exec(ctx, qry, pieceCid.String(), now, now) if err != nil { return fmt.Errorf("flagging piece %s: %w", pieceCid, err) } @@ -221,7 +219,7 @@ func (s *Store) UnflagPiece(ctx context.Context, pieceCid cid.Cid) error { return nil } -func (s *Store) FlaggedPiecesList(ctx context.Context, filter *types.FlaggedPiecesListFilter, cursor *time.Time, offset int, limit int) ([]model.FlaggedPiece, error) { +func (s *Store) FlaggedPiecesList(ctx context.Context, cursor *time.Time, offset int, limit int) ([]model.FlaggedPiece, error) { ctx, span := tracing.Tracer.Start(ctx, "store.flagged_pieces") var spanCursor int if cursor != nil { @@ -234,24 +232,12 @@ func (s *Store) FlaggedPiecesList(ctx context.Context, filter *types.FlaggedPiec var args []interface{} idx := 0 - qry := `SELECT PieceCid, CreatedAt, UpdatedAt, HasUnsealedCopy from PieceFlagged ` - where := "" + qry := `SELECT PieceCid, CreatedAt from PieceFlagged ` if cursor != nil { - where += `WHERE CreatedAt < $1 ` + qry += `WHERE CreatedAt < $1 ` args = append(args, cursor) idx++ } - if filter != nil { - if where == "" { - where += `WHERE ` - } else { - where += `AND ` - } - where += fmt.Sprintf(`HasUnsealedCopy = $%d `, idx+1) - args = append(args, filter.HasUnsealedCopy) - idx++ - } - qry += where qry += `ORDER BY CreatedAt desc ` qry += fmt.Sprintf(`LIMIT $%d OFFSET $%d`, idx+1, idx+2) @@ -266,10 +252,8 @@ func (s *Store) FlaggedPiecesList(ctx context.Context, filter *types.FlaggedPiec var pieces []model.FlaggedPiece var pcid string var createdAt time.Time - var updatedAt time.Time - var hasUnsealedCopy bool for rows.Next() { - err := rows.Scan(&pcid, &createdAt, &updatedAt, &hasUnsealedCopy) + err := rows.Scan(&pcid, &createdAt) if err != nil { return nil, fmt.Errorf("scanning flagged piece: %w", err) } @@ -278,7 +262,7 @@ func (s *Store) FlaggedPiecesList(ctx context.Context, filter *types.FlaggedPiec if err != nil { return nil, fmt.Errorf("parsing flagged piece cid %s: %w", pcid, err) } - pieces = append(pieces, model.FlaggedPiece{PieceCid: c, CreatedAt: createdAt, UpdatedAt: updatedAt, HasUnsealedCopy: hasUnsealedCopy}) + pieces = append(pieces, model.FlaggedPiece{PieceCid: c, CreatedAt: createdAt}) } if err := rows.Err(); err != nil { @@ -288,19 +272,13 @@ func (s *Store) FlaggedPiecesList(ctx context.Context, filter *types.FlaggedPiec return pieces, nil } -func (s *Store) FlaggedPiecesCount(ctx context.Context, filter *types.FlaggedPiecesListFilter) (int, error) { +func (s *Store) FlaggedPiecesCount(ctx context.Context) (int, error) { ctx, span := tracing.Tracer.Start(ctx, "store.flagged_pieces_count") defer span.End() - var args []interface{} var count int qry := `SELECT COUNT(*) FROM PieceFlagged` - if filter != nil { - qry += ` WHERE HasUnsealedCopy = $1` - args = append(args, filter.HasUnsealedCopy) - } - - err := s.db.QueryRow(ctx, qry, args...).Scan(&count) + err := s.db.QueryRow(ctx, qry).Scan(&count) if err != nil { return 0, fmt.Errorf("getting flagged pieces count: %w", err) } diff --git a/extern/boostd-data/yugabyte/service.go b/extern/boostd-data/yugabyte/service.go index e52f9bc6e..b529cc7e6 100644 --- a/extern/boostd-data/yugabyte/service.go +++ b/extern/boostd-data/yugabyte/service.go @@ -4,9 +4,6 @@ import ( "context" _ "embed" "fmt" - "sync" - "time" - "github.com/filecoin-project/boostd-data/model" "github.com/filecoin-project/boostd-data/shared/tracing" "github.com/filecoin-project/boostd-data/svc/types" @@ -18,6 +15,8 @@ import ( "github.com/yugabyte/gocql" "github.com/yugabyte/pgx/v4/pgxpool" "golang.org/x/sync/errgroup" + "sync" + "time" ) var log = logging.Logger("boostd-data-yb") @@ -231,64 +230,37 @@ func (s *Store) PiecesContainingMultihash(ctx context.Context, m mh.Multihash) ( return pcids, nil } -func (s *Store) GetIndex(ctx context.Context, pieceCid cid.Cid) (<-chan types.IndexRecord, error) { +func (s *Store) GetIndex(ctx context.Context, pieceCid cid.Cid) ([]model.Record, error) { ctx, span := tracing.Tracer.Start(ctx, "store.get_index") defer span.End() qry := `SELECT PayloadMultihash, BlockOffset, BlockSize FROM idx.PieceBlockOffsetSize WHERE PieceCid = ?` iter := s.session.Query(qry, pieceCid.Bytes()).WithContext(ctx).Iter() - scannedRecordCh := make(chan struct{}, 1) - records := make(chan types.IndexRecord) - go func() { - defer close(scannedRecordCh) - defer close(records) - - var payloadMHBz []byte - var offset, size uint64 - for iter.Scan(&payloadMHBz, &offset, &size) { - // The scan was successful, which means there is at least one - // record - select { - case scannedRecordCh <- struct{}{}: - default: - } - - // Parse the multihash bytes - _, pmh, err := multihash.MHFromBytes(payloadMHBz) - if err != nil { - records <- types.IndexRecord{Error: err} - return - } - - records <- types.IndexRecord{ - Record: model.Record{ - Cid: cid.NewCidV1(cid.Raw, pmh), - OffsetSize: model.OffsetSize{ - Offset: offset, - Size: size, - }, - }, - } - } - if err := iter.Close(); err != nil { - err = fmt.Errorf("getting piece index for piece %s: %w", pieceCid, err) - records <- types.IndexRecord{Error: err} + var records []model.Record + var payloadMHBz []byte + var offset, size uint64 + for iter.Scan(&payloadMHBz, &offset, &size) { + _, pmh, err := multihash.MHFromBytes(payloadMHBz) + if err != nil { + return nil, fmt.Errorf("scanning mulithash: %w", err) } - }() - // Check if there were any records for this piece cid - var pieceHasRecords bool - select { - case <-ctx.Done(): - return nil, ctx.Err() - case _, pieceHasRecords = <-scannedRecordCh: + records = append(records, model.Record{ + Cid: cid.NewCidV1(cid.Raw, pmh), + OffsetSize: model.OffsetSize{ + Offset: offset, + Size: size, + }, + }) + } + if err := iter.Close(); err != nil { + return nil, fmt.Errorf("getting piece index for piece %s: %w", pieceCid, err) } - if !pieceHasRecords { - // For correctness, we should always return a not found error if there - // is no piece with the piece cid. Call getPieceMetadata which returns - // not found if it can't find the piece. + // For correctness, we should always return a not found error if there is + // no piece with the piece cid + if len(records) == 0 { _, err := s.getPieceMetadata(ctx, pieceCid) if err != nil { return nil, err @@ -298,116 +270,50 @@ func (s *Store) GetIndex(ctx context.Context, pieceCid cid.Cid) (<-chan types.In return records, nil } -func (s *Store) AddIndex(ctx context.Context, pieceCid cid.Cid, recs []model.Record, isCompleteIndex bool) <-chan types.AddIndexProgress { +func (s *Store) AddIndex(ctx context.Context, pieceCid cid.Cid, recs []model.Record, isCompleteIndex bool) error { ctx, span := tracing.Tracer.Start(ctx, "store.add_index") defer span.End() - // Set up the progress channel - progress := make(chan types.AddIndexProgress, 2) if len(recs) == 0 { - // If there are no records, set progress to 100% and close the channel - progress <- types.AddIndexProgress{Progress: 1} - close(progress) - return progress + return nil } - // Start by sending a progress update of zero - progress <- types.AddIndexProgress{Progress: 0} - lastUpdateTime := time.Now() - - var lastUpdateValue *float64 - updateProgress := func(prg float64) { - // Don't send updates more than once every few seconds - if time.Since(lastUpdateTime) < 5*time.Second { - lastUpdateValue = &prg - return - } - - // If the channel is full, don't send this progress update, just - // wait for the next one. - select { - case progress <- types.AddIndexProgress{Progress: prg}: - lastUpdateTime = time.Now() - lastUpdateValue = nil - default: - lastUpdateValue = &prg - } + // Add a mapping from multihash -> piece cid so that clients can look up + // which pieces contain a multihash + err := s.addMultihashesToPieces(ctx, pieceCid, recs) + if err != nil { + return err } - completeProgress := func(err error) { - var lastProg *types.AddIndexProgress - if err != nil { - // If there was an error, send it as the last progress update - lastProg = &types.AddIndexProgress{Err: err.Error()} - } else if lastUpdateValue != nil { - // If there is an outstanding update that hasn't been sent out - // yet, make sure it gets sent - lastProg = &types.AddIndexProgress{Progress: *lastUpdateValue} - } - - if lastProg != nil { - select { - case progress <- *lastProg: - case <-time.After(5 * time.Second): - } - } - - // Close the channel - close(progress) + // Add a mapping from piece cid -> offset / size of each block so that + // clients can get the block info for all blocks in a piece + err = s.addPieceInfos(ctx, pieceCid, recs) + if err != nil { + return err } - go func() { - // Add a mapping from multihash -> piece cid so that clients can look up - // which pieces contain a multihash - err := s.addMultihashesToPieces(ctx, pieceCid, recs, func(addProgress float64) { - // The first 45% of progress is for adding multihash -> pieces index - updateProgress(0.45 * addProgress) - }) - if err != nil { - completeProgress(err) - return - } - - // Add a mapping from piece cid -> offset / size of each block so that - // clients can get the block info for all blocks in a piece - err = s.addPieceInfos(ctx, pieceCid, recs, func(addProgress float64) { - // From 45% - 90% of progress is for adding piece infos - updateProgress(0.45 + 0.45*addProgress) - }) - if err != nil { - completeProgress(err) - return - } - - // Ensure the piece metadata exists - err = s.createPieceMetadata(ctx, pieceCid) - if err != nil { - completeProgress(err) - return - } - updateProgress(0.95) + // Ensure the piece metadata exists + err = s.createPieceMetadata(ctx, pieceCid) + if err != nil { + return err + } - // Mark indexing as complete for the piece - qry := `UPDATE idx.PieceMetadata ` + - `SET IndexedAt = ?, CompleteIndex = ? ` + - `WHERE PieceCid = ?` - err = s.session.Query(qry, time.Now(), isCompleteIndex, pieceCid.String()).WithContext(ctx).Exec() - if err != nil { - completeProgress(err) - return - } - updateProgress(1) - completeProgress(nil) - }() + // Mark indexing as complete for the piece + qry := `UPDATE idx.PieceMetadata ` + + `SET IndexedAt = ?, CompleteIndex = ? ` + + `WHERE PieceCid = ?` + err = s.session.Query(qry, time.Now(), isCompleteIndex, pieceCid.String()).WithContext(ctx).Exec() + if err != nil { + return fmt.Errorf("marking indexing as complete for piece %s", pieceCid) + } - return progress + return nil } -func (s *Store) addMultihashesToPieces(ctx context.Context, pieceCid cid.Cid, recs []model.Record, progress func(addProgress float64)) error { +func (s *Store) addMultihashesToPieces(ctx context.Context, pieceCid cid.Cid, recs []model.Record) error { ctx, span := tracing.Tracer.Start(ctx, "store.add_index.payloadpiece") defer span.End() - var count float64 return s.execParallel(ctx, recs, s.settings.PayloadPiecesParallelism, func(rec model.Record) error { multihashBytes := rec.Cid.Hash() q := `INSERT INTO idx.PayloadToPieces (PayloadMultihash, PieceCid) VALUES (?, ?)` @@ -415,14 +321,11 @@ func (s *Store) addMultihashesToPieces(ctx context.Context, pieceCid cid.Cid, re if err != nil { return fmt.Errorf("inserting into PayloadToPieces: %w", err) } - - count++ - progress(count / float64(len(recs))) return nil }) } -func (s *Store) addPieceInfos(ctx context.Context, pieceCid cid.Cid, recs []model.Record, progress func(addProgress float64)) error { +func (s *Store) addPieceInfos(ctx context.Context, pieceCid cid.Cid, recs []model.Record) error { ctx, span := tracing.Tracer.Start(ctx, "store.add_index.pieceinfo") defer span.End() @@ -438,7 +341,7 @@ func (s *Store) addPieceInfos(ctx context.Context, pieceCid cid.Cid, recs []mode // The Cassandra driver has a 50k limit on batch statements. Keeping // batch size small makes sure we're under the limit. - const batchSize = 5000 + const batchSize = 49000 var batch *gocql.Batch for allIdx, entry := range batchEntries { if batch == nil { @@ -453,8 +356,7 @@ func (s *Store) addPieceInfos(ctx context.Context, pieceCid cid.Cid, recs []mode return fmt.Errorf("executing offset / size batch insert for piece %s: %w", pieceCid, err) } batch = nil - - progress((float64(allIdx+1) / float64(len(batchEntries)))) + continue } } @@ -499,21 +401,6 @@ func (s *Store) IndexedAt(ctx context.Context, pieceCid cid.Cid) (time.Time, err return md.IndexedAt, nil } -func (s *Store) PiecesCount(ctx context.Context) (int, error) { - ctx, span := tracing.Tracer.Start(ctx, "store.pieces_count") - defer span.End() - - var count int - qry := `SELECT COUNT(*) FROM idx.PieceMetadata` - - err := s.session.Query(qry).WithContext(ctx).Scan(&count) - if err != nil { - return -1, fmt.Errorf("getting pieces count: %w", err) - } - - return count, nil -} - func (s *Store) ListPieces(ctx context.Context) ([]cid.Cid, error) { ctx, span := tracing.Tracer.Start(ctx, "store.list_pieces") defer span.End() @@ -589,44 +476,21 @@ func (s *Store) RemoveIndexes(ctx context.Context, pieceCid cid.Cid) error { ctx, span := tracing.Tracer.Start(ctx, "store.remove_indexes") defer span.End() - // Get multihashes for piece recs, err := s.GetIndex(ctx, pieceCid) if err != nil { return fmt.Errorf("removing indexes for piece %s: getting recs: %w", pieceCid, err) } - // Delete from multihash -> piece cids index - var eg errgroup.Group - for i := 0; i < s.settings.PayloadPiecesParallelism; i++ { - eg.Go(func() error { - for ctx.Err() == nil { - select { - case <-ctx.Done(): - return ctx.Err() - case rec, ok := <-recs: - if !ok { - // Finished adding all the queued items, exit the thread - return nil - } - - multihashBytes := rec.Cid.Hash() - q := `DELETE FROM idx.PayloadToPieces WHERE PayloadMultihash = ? AND PieceCid = ?` - err := s.session.Query(q, trimMultihash(multihashBytes), pieceCid.Bytes()).Exec() - if err != nil { - return fmt.Errorf("deleting from PayloadToPieces: %w", err) - } - } - } - - return ctx.Err() - }) - } - err = eg.Wait() - if err != nil { - return err - } + err = s.execParallel(ctx, recs, s.settings.PayloadPiecesParallelism, func(rec model.Record) error { + multihashBytes := rec.Cid.Hash() + q := `DELETE FROM idx.PayloadToPieces WHERE PayloadMultihash = ? AND PieceCid = ?` + err := s.session.Query(q, trimMultihash(multihashBytes), pieceCid.Bytes()).Exec() + if err != nil { + return fmt.Errorf("inserting into PayloadToPieces: %w", err) + } + return nil + }) - // Delete from piece offsets index qry := `DELETE FROM idx.PieceBlockOffsetSize WHERE PieceCid = ?` err = s.session.Query(qry, pieceCid.Bytes()).WithContext(ctx).Exec() if err != nil { diff --git a/go.mod b/go.mod index edbed72f3..a5af01338 100644 --- a/go.mod +++ b/go.mod @@ -371,8 +371,17 @@ require ( require ( github.com/google/pprof v0.0.0-20221203041831-ce31453925ec // indirect + github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect github.com/hashicorp/go-uuid v1.0.2 // indirect github.com/hashicorp/golang-lru/v2 v2.0.2 // indirect + github.com/jackc/chunkreader/v2 v2.0.1 // indirect + github.com/jackc/pgconn v1.11.0 // indirect + github.com/jackc/pgio v1.0.0 // indirect + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgproto3/v2 v2.2.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect + github.com/jackc/pgtype v1.10.0 // indirect + github.com/jackc/puddle v1.2.1 // indirect github.com/onsi/ginkgo/v2 v2.5.1 // indirect github.com/quic-go/qpack v0.4.0 // indirect github.com/quic-go/qtls-go1-19 v0.2.1 // indirect @@ -381,6 +390,9 @@ require ( github.com/quic-go/webtransport-go v0.5.2 // indirect github.com/tidwall/gjson v1.14.0 // indirect github.com/ucarion/urlpath v0.0.0-20200424170820-7ccc79b76bbb // indirect + github.com/yugabyte/gocql v0.0.0-20221110041640-6fc475c5aeb0 // indirect + github.com/yugabyte/pgx/v4 v4.14.5 // indirect github.com/zyedidia/generic v1.2.1 // indirect + gopkg.in/inf.v0 v0.9.1 // indirect nhooyr.io/websocket v1.8.7 // indirect ) diff --git a/go.sum b/go.sum index ed5d2a3f4..4ebff6fe0 100644 --- a/go.sum +++ b/go.sum @@ -204,6 +204,8 @@ github.com/clbanning/x2j v0.0.0-20191024224557-825249438eec/go.mod h1:jMjuTZXRI4 github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58/go.mod h1:EOBUe0h4xcZ5GoxqC5SDxFQ8gwyZPKQoEzownBlhI80= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= +github.com/cockroachdb/apd v1.1.0 h1:3LFP3629v+1aKXU5Q37mxmRxX/pIu1nijXydLShEq5I= +github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ= github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8= github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI= github.com/codegangsta/cli v1.20.0/go.mod h1:/qJNoX69yVSKu5o4jLyXAENLRyk1uhi7zkbQ3slBdOA= @@ -223,6 +225,7 @@ github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3Ee github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/go-systemd v0.0.0-20181012123002-c6f51f82210d/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= +github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/go-systemd/v22 v22.1.0/go.mod h1:xO0FLkIi5MaZafQlIrOotqXZ90ih+1atmu1JpKERPPk= github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/coreos/go-systemd/v22 v22.5.0 h1:RrqgGjYQKalulkV8NGVIfkXQf6YYmOyiJKk8iXXhfZs= @@ -538,6 +541,8 @@ github.com/godbus/dbus/v5 v5.0.3/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5x github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/godbus/dbus/v5 v5.1.0 h1:4KLkAxT3aOY8Li4FRJe/KvhoNFFxo0m6fNuFUO8QJUk= github.com/godbus/dbus/v5 v5.1.0/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= +github.com/gofrs/uuid v4.0.0+incompatible h1:1SD/1F5pU8p29ybwgQSwpQk+mwdRrXCYuPhW6m+TnJw= +github.com/gofrs/uuid v4.0.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= github.com/gogo/googleapis v1.1.0/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s= github.com/gogo/googleapis v1.4.1 h1:1Yx4Myt7BxzvUr5ldGSbwYiZG6t9wGBZ+8/fX3Wvtq0= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= @@ -1400,6 +1405,7 @@ github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaO github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcnceauSikq3lYCQ= github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= +github.com/mattn/go-colorable v0.1.6/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= github.com/mattn/go-colorable v0.1.9/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= @@ -1758,6 +1764,7 @@ github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQD github.com/rwcarlsen/goexif v0.0.0-20190401172101-9e8deecbddbd/go.mod h1:hPqNNc0+uJM6H+SuU8sEs5K5IQeKccPqeSjfgcKGgPk= github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E= +github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= github.com/schollz/progressbar/v3 v3.13.1 h1:o8rySDYiQ59Mwzy2FELeHY5ZARXZTVJC7iHD6PEFUiE= github.com/schollz/progressbar/v3 v3.13.1/go.mod h1:xvrbki8kfT1fzWzBT/UZd9L6GA+jdL7HAgq2RFnO6fQ= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= @@ -1838,6 +1845,7 @@ github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3 github.com/streadway/handy v0.0.0-20190108123426-d5acb3125c2a/go.mod h1:qNTQ5P5JnDBl6z3cMAg/SywNDC5ABu5ApDIw6lUbRmI= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= github.com/stretchr/objx v0.3.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= @@ -2379,6 +2387,7 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo= @@ -2458,6 +2467,8 @@ golang.org/x/tools v0.1.9/go.mod h1:nABZi5QlRsZVlzPpHl034qft6wpY4eDcsTt5AaioBiU= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/tools v0.3.0 h1:SrNbZl6ECOS1qFzgTdQfWXZM9XBkiA6tkFrH9YSTPHM= golang.org/x/tools v0.3.0/go.mod h1:/rWhSS2+zyEVwoJf8YAX6L2f0ntZ7Kn/mGgAWcipA5k= +golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/gql/resolver_piece.go b/gql/resolver_piece.go index fa5dc5fb2..eebbf18ea 100644 --- a/gql/resolver_piece.go +++ b/gql/resolver_piece.go @@ -367,9 +367,6 @@ func (r *resolver) getIndexStatus(ctx context.Context, pieceCid cid.Cid, md pdty idxerr = mdErr.Error() case md.Indexing: idxst = IndexStatusIndexing - case md.Error != "": - idxst = IndexStatusFailed - idxerr = md.Error case md.IndexedAt.IsZero(): idxst = IndexStatusRegistered default: diff --git a/node/config/def.go b/node/config/def.go index 053de2796..2e01b19ab 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -80,6 +80,9 @@ func DefaultBoost() *Boost { }, LocalIndexDirectory: LocalIndexDirectoryConfig{ + Yugabyte: LocalIndexDirectoryYugabyteConfig{ + Enabled: false, + }, Couchbase: LocalIndexDirectoryCouchbaseConfig{ ConnectString: "", Username: "", diff --git a/node/config/doc_gen.go b/node/config/doc_gen.go index 52b0fc0d0..b4dd0e653 100644 --- a/node/config/doc_gen.go +++ b/node/config/doc_gen.go @@ -456,6 +456,12 @@ them disabled. These will be completely deprecated soon.`, }, }, "LocalIndexDirectoryConfig": []DocField{ + { + Name: "Yugabyte", + Type: "LocalIndexDirectoryYugabyteConfig", + + Comment: ``, + }, { Name: "Couchbase", Type: "LocalIndexDirectoryCouchbaseConfig", @@ -534,6 +540,26 @@ If empty, a leveldb database is used instead.`, Comment: ``, }, }, + "LocalIndexDirectoryYugabyteConfig": []DocField{ + { + Name: "Enabled", + Type: "bool", + + Comment: ``, + }, + { + Name: "ConnectString", + Type: "string", + + Comment: `The yugabyte postgres connect string eg "postgresql://postgres:postgres@localhost"`, + }, + { + Name: "Hosts", + Type: "[]string", + + Comment: `The yugabyte cassandra hosts eg ["127.0.0.1"]`, + }, + }, "LotusDealmakingConfig": []DocField{ { Name: "PieceCidBlocklist", diff --git a/node/config/types.go b/node/config/types.go index 845d41793..f3948032f 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -418,7 +418,16 @@ type LocalIndexDirectoryCouchbaseConfig struct { PieceOffsetsBucket LocalIndexDirectoryCouchbaseBucketConfig } +type LocalIndexDirectoryYugabyteConfig struct { + Enabled bool + // The yugabyte postgres connect string eg "postgresql://postgres:postgres@localhost" + ConnectString string + // The yugabyte cassandra hosts eg ["127.0.0.1"] + Hosts []string +} + type LocalIndexDirectoryConfig struct { + Yugabyte LocalIndexDirectoryYugabyteConfig Couchbase LocalIndexDirectoryCouchbaseConfig // The maximum number of add index operations allowed to execute in parallel. // The add index operation is executed when a new deal is created - it fetches diff --git a/node/impl/boost.go b/node/impl/boost.go index 5b2886f31..fb2d24d57 100644 --- a/node/impl/boost.go +++ b/node/impl/boost.go @@ -543,10 +543,6 @@ func (sm *BoostAPI) PdBuildIndexForPieceCid(ctx context.Context, piececid cid.Ci return sm.Pd.BuildIndexForPiece(ctx, piececid) } -func (sm *BoostAPI) PdMarkIndexErrored(ctx context.Context, piececid cid.Cid, err string) error { - return sm.Pd.MarkIndexErrored(ctx, piececid, err) -} - func (sm *BoostAPI) OnlineBackup(ctx context.Context, dstDir string) error { return sm.Bkp.Backup(ctx, dstDir) } diff --git a/node/modules/piecedirectory.go b/node/modules/piecedirectory.go index 994542b12..e1a73a760 100644 --- a/node/modules/piecedirectory.go +++ b/node/modules/piecedirectory.go @@ -16,6 +16,7 @@ import ( "github.com/filecoin-project/boostd-data/couchbase" "github.com/filecoin-project/boostd-data/model" "github.com/filecoin-project/boostd-data/svc" + "github.com/filecoin-project/boostd-data/yugabyte" "github.com/filecoin-project/dagstore" "github.com/filecoin-project/dagstore/shard" "github.com/filecoin-project/go-address" @@ -52,9 +53,20 @@ func NewPieceDirectoryStore(cfg *config.Boost) func(lc fx.Lifecycle, r lotus_rep } svcCtx, cancel = context.WithCancel(ctx) - var bdsvc *svc.Service - if cfg.LocalIndexDirectory.Couchbase.ConnectString != "" { + switch { + case cfg.LocalIndexDirectory.Yugabyte.Enabled: + log.Infow("local index directory: connecting to yugabyte server", + "connect-string", cfg.LocalIndexDirectory.Yugabyte.ConnectString, + "hosts", cfg.LocalIndexDirectory.Yugabyte.Hosts) + + // Set up a local index directory service that connects to the yugabyte db + bdsvc = svc.NewYugabyte(yugabyte.DBSettings{ + Hosts: cfg.LocalIndexDirectory.Yugabyte.Hosts, + ConnectString: cfg.LocalIndexDirectory.Yugabyte.ConnectString, + }) + + case cfg.LocalIndexDirectory.Couchbase.ConnectString != "": log.Infow("local index directory: connecting to couchbase server", "connect-string", cfg.LocalIndexDirectory.Couchbase.ConnectString) @@ -76,7 +88,8 @@ func NewPieceDirectoryStore(cfg *config.Boost) func(lc fx.Lifecycle, r lotus_rep RAMQuotaMB: cfg.LocalIndexDirectory.Couchbase.PieceOffsetsBucket.RAMQuotaMB, }, }) - } else { + + default: log.Infow("local index directory: connecting to leveldb instance") // Setup a local index directory service that connects to the leveldb diff --git a/piecedirectory/doctor.go b/piecedirectory/doctor.go index 805b5359f..ec6cfe77e 100644 --- a/piecedirectory/doctor.go +++ b/piecedirectory/doctor.go @@ -86,16 +86,6 @@ func (d *Doctor) checkPiece(ctx context.Context, pieceCid cid.Cid) error { return fmt.Errorf("failed to get piece %s from local index directory: %w", pieceCid, err) } - // Check if the piece is in an error state - if md.Error != "" { - err = d.store.FlagPiece(ctx, pieceCid) - if err != nil { - return fmt.Errorf("failed to flag piece in error state %s: %w", pieceCid, err) - } - doclog.Debugw("piece is in error state", "err", md.Error) - return nil - } - // Check if piece has been indexed isIndexed, err := d.store.IsIndexed(ctx, pieceCid) if err != nil { diff --git a/piecedirectory/doctor_test.go b/piecedirectory/doctor_test.go index 8e6ec2110..eb8338d0f 100644 --- a/piecedirectory/doctor_test.go +++ b/piecedirectory/doctor_test.go @@ -1,3 +1,6 @@ +//go:build test_lid +// +build test_lid + package piecedirectory import ( @@ -7,11 +10,13 @@ import ( "testing" "time" + "github.com/filecoin-project/boost/testutil" "github.com/filecoin-project/boostd-data/client" "github.com/filecoin-project/boostd-data/couchbase" "github.com/filecoin-project/boostd-data/ldb" "github.com/filecoin-project/boostd-data/model" "github.com/filecoin-project/boostd-data/svc" + "github.com/filecoin-project/boostd-data/yugabyte" "github.com/filecoin-project/go-state-types/abi" "github.com/google/uuid" "github.com/ipfs/go-cid" @@ -28,9 +33,6 @@ func TestPieceDoctor(t *testing.T) { prev := ldb.MinPieceCheckPeriod ldb.MinPieceCheckPeriod = 1 * time.Second - prevp := ldb.PiecesToTrackerBatchSize - ldb.PiecesToTrackerBatchSize = 4 - bdsvc, err := svc.NewLevelDB("") require.NoError(t, err) @@ -44,7 +46,11 @@ func TestPieceDoctor(t *testing.T) { defer cl.Close(ctx) t.Run("next pieces pagination", func(t *testing.T) { - testNextPiecesPagination(ctx, t, cl, ldb.MinPieceCheckPeriod) + prevp := ldb.PiecesToTrackerBatchSize + testNextPiecesPagination(ctx, t, cl, func(pageSize int) { + ldb.PiecesToTrackerBatchSize = pageSize + }) + ldb.PiecesToTrackerBatchSize = prevp }) t.Run("check pieces", func(t *testing.T) { @@ -52,8 +58,8 @@ func TestPieceDoctor(t *testing.T) { }) ldb.MinPieceCheckPeriod = prev - ldb.PiecesToTrackerBatchSize = prevp }) + t.Run("couchbase", func(t *testing.T) { // TODO: Unskip this test once the couchbase instance can be created // from a docker container in CI @@ -84,6 +90,47 @@ func TestPieceDoctor(t *testing.T) { couchbase.MinPieceCheckPeriod = prev }) + + t.Run("yugabyte", func(t *testing.T) { + prev := yugabyte.MinPieceCheckPeriod + yugabyte.MinPieceCheckPeriod = 1 * time.Second + + svc.SetupYugabyte(t) + + bdsvc := svc.NewYugabyte(svc.TestYugabyteSettings) + + addr := "localhost:8044" + err := bdsvc.Start(ctx, addr) + require.NoError(t, err) + + cl := client.NewStore() + err = cl.Dial(ctx, fmt.Sprintf("http://%s", addr)) + require.NoError(t, err) + defer cl.Close(ctx) + + ybstore := bdsvc.Impl.(*yugabyte.Store) + + t.Run("next pieces", func(t *testing.T) { + svc.RecreateTables(ctx, t, ybstore) + testNextPieces(ctx, t, cl, yugabyte.MinPieceCheckPeriod) + }) + + t.Run("next pieces pagination", func(t *testing.T) { + svc.RecreateTables(ctx, t, ybstore) + prevp := yugabyte.TrackerCheckBatchSize + testNextPiecesPagination(ctx, t, cl, func(pageSize int) { + yugabyte.TrackerCheckBatchSize = pageSize + }) + yugabyte.TrackerCheckBatchSize = prevp + }) + + t.Run("check pieces", func(t *testing.T) { + svc.RecreateTables(ctx, t, ybstore) + testCheckPieces(ctx, t, cl) + }) + + yugabyte.MinPieceCheckPeriod = prev + }) } // Verify that after a new piece is added @@ -92,7 +139,6 @@ func TestPieceDoctor(t *testing.T) { func testNextPieces(ctx context.Context, t *testing.T, cl *client.Store, pieceCheckPeriod time.Duration) { // Add a new piece pieceCid := blocks.NewBlock([]byte(fmt.Sprintf("%d", time.Now().UnixMilli()))).Cid() - fmt.Println(pieceCid) di := model.DealInfo{ DealUuid: uuid.New().String(), ChainDealID: 1, @@ -127,14 +173,14 @@ func testNextPieces(ctx context.Context, t *testing.T, cl *client.Store, pieceCh require.Contains(t, pcids, pieceCid) } -func testNextPiecesPagination(ctx context.Context, t *testing.T, cl *client.Store, pieceCheckPeriod time.Duration) { - // Add 8 pieces - allPcids := make(map[cid.Cid]struct{}) +func testNextPiecesPagination(ctx context.Context, t *testing.T, cl *client.Store, setPageSize func(int)) { + setPageSize(4) + + // Add 9 pieces seen := make(map[cid.Cid]int) for i := 1; i <= 9; i++ { - pieceCid := blocks.NewBlock([]byte(fmt.Sprintf("%d%d", time.Now().UnixMilli(), i))).Cid() - fmt.Println(pieceCid) + pieceCid := testutil.GenerateCid() di := model.DealInfo{ DealUuid: uuid.New().String(), ChainDealID: abi.DealID(i), @@ -144,8 +190,6 @@ func testNextPiecesPagination(ctx context.Context, t *testing.T, cl *client.Stor } err := cl.AddDealForPiece(ctx, pieceCid, di) require.NoError(t, err) - - allPcids[pieceCid] = struct{}{} } // expect to get 4 pieces @@ -191,19 +235,16 @@ func testNextPiecesPagination(ctx context.Context, t *testing.T, cl *client.Stor // Add 1 more piece for i := 1; i <= 1; i++ { - pieceCid := blocks.NewBlock([]byte(fmt.Sprintf("%d%d", time.Now().UnixMilli(), i))).Cid() - fmt.Println(pieceCid) + pieceCid := testutil.GenerateCid() di := model.DealInfo{ DealUuid: uuid.New().String(), - ChainDealID: abi.DealID(i), - SectorID: abi.SectorNumber(i), + ChainDealID: abi.DealID(100), + SectorID: abi.SectorNumber(100), PieceOffset: 0, PieceLength: 2048, } err := cl.AddDealForPiece(ctx, pieceCid, di) require.NoError(t, err) - - allPcids[pieceCid] = struct{}{} } // wait to reset the interval and start from scratch diff --git a/piecedirectory/piecedirectory.go b/piecedirectory/piecedirectory.go index 5b3aecef6..1e41cab14 100644 --- a/piecedirectory/piecedirectory.go +++ b/piecedirectory/piecedirectory.go @@ -3,7 +3,6 @@ package piecedirectory import ( "bufio" "context" - "encoding/binary" "errors" "fmt" "io" @@ -111,84 +110,6 @@ func (ps *PieceDirectory) GetOffsetSize(ctx context.Context, pieceCid cid.Cid, h return ps.store.GetOffsetSize(ctx, pieceCid, hash) } -func (ps *PieceDirectory) GetCarSize(ctx context.Context, pieceCid cid.Cid) (uint64, error) { - // Get the deals for the piece - dls, err := ps.GetPieceDeals(ctx, pieceCid) - if err != nil { - return 0, fmt.Errorf("getting piece deals for piece %s: %w", pieceCid, err) - } - - if len(dls) == 0 { - return 0, fmt.Errorf("no deals for piece %s in index: piece not found", pieceCid) - } - - // The size of the CAR should be the same for any deal, so just return the - // first non-zero CAR size - for _, dl := range dls { - if dl.CarLength > 0 { - return dl.CarLength, nil - } - } - - // There are no deals with a non-zero CAR size. - // The CAR size is zero if it's been imported from the dagstore (the - // dagstore doesn't store CAR size information). So instead work out the - // size of the CAR by getting the offset of the last section in the CAR - // file, then reading the section information. - - // Get the offset of the last section in the CAR file from the index. - var lastSectionOffset uint64 - idx, err := ps.GetIterableIndex(ctx, pieceCid) - if err != nil { - return 0, fmt.Errorf("getting index for piece %s: %w", pieceCid, err) - } - err = idx.ForEach(func(_ mh.Multihash, offset uint64) error { - if offset > lastSectionOffset { - lastSectionOffset = offset - } - return nil - }) - if err != nil { - return 0, fmt.Errorf("iterating index for piece %s: %w", pieceCid, err) - } - - // Get a reader over the piece - pieceReader, err := ps.GetPieceReader(ctx, pieceCid) - if err != nil { - return 0, fmt.Errorf("getting piece reader for piece %s: %w", pieceCid, err) - } - - // Seek to the last section - _, err = pieceReader.Seek(int64(lastSectionOffset), io.SeekStart) - if err != nil { - return 0, fmt.Errorf("seeking to offset %d in piece data: %w", lastSectionOffset, err) - } - - // A section consists of - // - - // Get - cr := &countReader{r: bufio.NewReader(pieceReader)} - dataLength, err := binary.ReadUvarint(cr) - if err != nil { - return 0, fmt.Errorf("reading CAR section length: %w", err) - } - - // The number of bytes in the uvarint that records - dataLengthUvarSize := cr.count - - // Get the size of the (unpadded) CAR file - unpaddedCarSize := lastSectionOffset + dataLengthUvarSize + dataLength - - // Write the CAR size back to the store so that it's cached for next time - err = ps.store.SetCarSize(ctx, pieceCid, unpaddedCarSize) - if err != nil { - log.Errorw("writing CAR size to local index directory store", "pieceCid", pieceCid, "err", err) - } - - return unpaddedCarSize, nil -} - func (ps *PieceDirectory) AddDealForPiece(ctx context.Context, pieceCid cid.Cid, dealInfo model.DealInfo) error { ctx, span := tracing.Tracer.Start(ctx, "pm.add_deal_for_piece") defer span.End() @@ -348,10 +269,6 @@ func (ps *PieceDirectory) RemoveDealForPiece(ctx context.Context, pieceCid cid.C return nil } -func (ps *PieceDirectory) MarkIndexErrored(ctx context.Context, pieceCid cid.Cid, err string) error { - return ps.store.MarkIndexErrored(ctx, pieceCid, err) -} - //func (ps *piecedirectory) deleteIndexForPiece(pieceCid cid.Cid) interface{} { // TODO: Maybe mark for GC instead of deleting immediately @@ -586,20 +503,6 @@ func (ps *PieceDirectory) GetBlockstore(ctx context.Context, pieceCid cid.Cid) ( return bs, nil } -// countReader just counts the number of bytes read -type countReader struct { - r *bufio.Reader - count uint64 -} - -func (c *countReader) ReadByte() (byte, error) { - b, err := c.r.ReadByte() - if err == nil { - c.count++ - } - return b, err -} - type SectorAccessorAsPieceReader struct { dagstore.SectorAccessor } diff --git a/piecedirectory/piecedirectory_test.go b/piecedirectory/piecedirectory_test.go index ea4017d7a..567f4c8bc 100644 --- a/piecedirectory/piecedirectory_test.go +++ b/piecedirectory/piecedirectory_test.go @@ -1,3 +1,6 @@ +//go:build test_lid +// +build test_lid + package piecedirectory import ( @@ -22,6 +25,24 @@ import ( "github.com/stretchr/testify/require" ) +var testCouchSettings = couchbase.DBSettings{ + ConnectString: "couchbase://localhost", + Auth: couchbase.DBSettingsAuth{ + Username: "Administrator", + Password: "boostdemo", + }, + PieceMetadataBucket: couchbase.DBSettingsBucket{ + RAMQuotaMB: 128, + }, + MultihashToPiecesBucket: couchbase.DBSettingsBucket{ + RAMQuotaMB: 128, + }, + PieceOffsetsBucket: couchbase.DBSettingsBucket{ + RAMQuotaMB: 128, + }, + TestMode: true, +} + func TestPieceDirectory(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 300*time.Second) defer cancel() @@ -29,20 +50,31 @@ func TestPieceDirectory(t *testing.T) { t.Run("leveldb", func(t *testing.T) { bdsvc, err := svc.NewLevelDB("") require.NoError(t, err) - testPieceDirectory(ctx, t, bdsvc) + addr := "localhost:8042" + + testPieceDirectory(ctx, t, bdsvc, addr) }) + t.Run("couchbase", func(t *testing.T) { // TODO: Unskip this test once the couchbase instance can be created // from a docker container in CI as part of the test t.Skip() svc.SetupCouchbase(t, testCouchSettings) bdsvc := svc.NewCouchbase(testCouchSettings) - testPieceDirectory(ctx, t, bdsvc) + addr := "localhost:8043" + testPieceDirectory(ctx, t, bdsvc, addr) + }) + + t.Run("yugabyte", func(t *testing.T) { + svc.SetupYugabyte(t) + + bdsvc := svc.NewYugabyte(svc.TestYugabyteSettings) + addr := "localhost:8044" + testPieceDirectory(ctx, t, bdsvc, addr) }) } -func testPieceDirectory(ctx context.Context, t *testing.T, bdsvc *svc.Service) { - addr := "localhost:8044" +func testPieceDirectory(ctx context.Context, t *testing.T, bdsvc *svc.Service, addr string) { err := bdsvc.Start(ctx, addr) require.NoError(t, err) @@ -63,10 +95,6 @@ func testPieceDirectory(ctx context.Context, t *testing.T, bdsvc *svc.Service) { testImportedIndex(ctx, t, cl) }) - t.Run("car file size", func(t *testing.T) { - testCarFileSize(ctx, t, cl) - }) - t.Run("flagging pieces", func(t *testing.T) { testFlaggingPieces(ctx, t, cl) }) @@ -252,55 +280,6 @@ func testImportedIndex(ctx context.Context, t *testing.T, cl *client.Store) { require.Equal(t, len(blk.RawData()), sz) } -// Verify that if the deal info has been imported from the DAG store, meaning -// it does not have CAR size information, GetCarSize will correctly calculate -// the CAR size from the index + piece data -func testCarFileSize(ctx context.Context, t *testing.T, cl *client.Store) { - // Create a random CAR file - carFilePath := CreateCarFile(t) - carFile, err := os.Open(carFilePath) - require.NoError(t, err) - defer carFile.Close() - - carReader, err := car.OpenReader(carFilePath) - require.NoError(t, err) - defer carReader.Close() - carv1Reader, err := carReader.DataReader() - require.NoError(t, err) - - // Read the CAR bytes - carBytes, err := io.ReadAll(carv1Reader) - require.NoError(t, err) - - // Any calls to get a reader over data should return a reader over the random CAR file - pr := CreateMockPieceReader(t, carv1Reader) - - recs := GetRecords(t, carv1Reader) - commpCalc := CalculateCommp(t, carv1Reader) - err = cl.AddIndex(ctx, commpCalc.PieceCID, recs, false) - require.NoError(t, err) - - // Add deal info for the piece without a CAR file - di := model.DealInfo{ - DealUuid: uuid.New().String(), - ChainDealID: 1, - SectorID: 1, - PieceOffset: 0, - PieceLength: commpCalc.PieceSize, - } - err = cl.AddDealForPiece(ctx, commpCalc.PieceCID, di) - require.NoError(t, err) - - // Verify that getting the size of the CAR file works correctly: - // There is no CAR size information in the deal info, so the piece - // directory should work it out from the index and piece data. - pm := NewPieceDirectory(cl, pr, 1) - pm.Start(ctx) - size, err := pm.GetCarSize(ctx, commpCalc.PieceCID) - require.NoError(t, err) - require.Equal(t, len(carBytes), int(size)) -} - func testFlaggingPieces(ctx context.Context, t *testing.T, cl *client.Store) { // Create a random CAR file carFilePath := CreateCarFile(t) diff --git a/piecedirectory/test_util.go b/piecedirectory/test_util.go index f3aac7840..cc4bef77c 100644 --- a/piecedirectory/test_util.go +++ b/piecedirectory/test_util.go @@ -9,7 +9,6 @@ import ( "github.com/filecoin-project/boost/piecedirectory/types" mock_piecedirectory "github.com/filecoin-project/boost/piecedirectory/types/mocks" "github.com/filecoin-project/boost/testutil" - "github.com/filecoin-project/boostd-data/couchbase" "github.com/filecoin-project/boostd-data/model" "github.com/filecoin-project/go-commp-utils/writer" "github.com/filecoin-project/go-state-types/abi" @@ -18,24 +17,6 @@ import ( "github.com/stretchr/testify/require" ) -var testCouchSettings = couchbase.DBSettings{ - ConnectString: "couchbase://localhost", - Auth: couchbase.DBSettingsAuth{ - Username: "Administrator", - Password: "boostdemo", - }, - PieceMetadataBucket: couchbase.DBSettingsBucket{ - RAMQuotaMB: 128, - }, - MultihashToPiecesBucket: couchbase.DBSettingsBucket{ - RAMQuotaMB: 128, - }, - PieceOffsetsBucket: couchbase.DBSettingsBucket{ - RAMQuotaMB: 128, - }, - TestMode: true, -} - // Get the index records from the CAR file func GetRecords(t *testing.T, reader car.SectionReader) []model.Record { _, err := reader.Seek(0, io.SeekStart) diff --git a/piecedirectory/types/mocks/piecedirectory.go b/piecedirectory/types/mocks/piecedirectory.go index 70b61ff8e..d44e2031d 100644 --- a/piecedirectory/types/mocks/piecedirectory.go +++ b/piecedirectory/types/mocks/piecedirectory.go @@ -338,20 +338,6 @@ func (mr *MockStoreMockRecorder) ListPieces(arg0 interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListPieces", reflect.TypeOf((*MockStore)(nil).ListPieces), arg0) } -// MarkIndexErrored mocks base method. -func (m *MockStore) MarkIndexErrored(arg0 context.Context, arg1 cid.Cid, arg2 string) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "MarkIndexErrored", arg0, arg1, arg2) - ret0, _ := ret[0].(error) - return ret0 -} - -// MarkIndexErrored indicates an expected call of MarkIndexErrored. -func (mr *MockStoreMockRecorder) MarkIndexErrored(arg0, arg1, arg2 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MarkIndexErrored", reflect.TypeOf((*MockStore)(nil).MarkIndexErrored), arg0, arg1, arg2) -} - // NextPiecesToCheck mocks base method. func (m *MockStore) NextPiecesToCheck(arg0 context.Context) ([]cid.Cid, error) { m.ctrl.T.Helper() @@ -424,20 +410,6 @@ func (mr *MockStoreMockRecorder) RemovePieceMetadata(arg0, arg1 interface{}) *go return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemovePieceMetadata", reflect.TypeOf((*MockStore)(nil).RemovePieceMetadata), arg0, arg1) } -// SetCarSize mocks base method. -func (m *MockStore) SetCarSize(arg0 context.Context, arg1 cid.Cid, arg2 uint64) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "SetCarSize", arg0, arg1, arg2) - ret0, _ := ret[0].(error) - return ret0 -} - -// SetCarSize indicates an expected call of SetCarSize. -func (mr *MockStoreMockRecorder) SetCarSize(arg0, arg1, arg2 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetCarSize", reflect.TypeOf((*MockStore)(nil).SetCarSize), arg0, arg1, arg2) -} - // UnflagPiece mocks base method. func (m *MockStore) UnflagPiece(arg0 context.Context, arg1 cid.Cid) error { m.ctrl.T.Helper() diff --git a/piecedirectory/types/types.go b/piecedirectory/types/types.go index 8221dd5d3..8ad5dae8a 100644 --- a/piecedirectory/types/types.go +++ b/piecedirectory/types/types.go @@ -39,9 +39,7 @@ type Store interface { GetPieceMetadata(ctx context.Context, pieceCid cid.Cid) (model.Metadata, error) ListPieces(ctx context.Context) ([]cid.Cid, error) GetPieceDeals(ctx context.Context, pieceCid cid.Cid) ([]model.DealInfo, error) - SetCarSize(ctx context.Context, pieceCid cid.Cid, size uint64) error PiecesContainingMultihash(ctx context.Context, m multihash.Multihash) ([]cid.Cid, error) - MarkIndexErrored(ctx context.Context, pieceCid cid.Cid, err string) error RemoveDealForPiece(context.Context, cid.Cid, string) error RemovePieceMetadata(context.Context, cid.Cid) error RemoveIndexes(context.Context, cid.Cid) error