From 4dc73ba48b4dd97159e724883b3d96480474b0e9 Mon Sep 17 00:00:00 2001 From: gammazero Date: Sat, 8 Jul 2023 10:39:26 -0700 Subject: [PATCH 1/2] Remove index counter functionality This counter is very expensive to maintain and ends up offering little value. - Remove index logic - Cleanup datastore to remove index count records --- command/daemon.go | 20 +- command/update_datastore.go | 96 ++++++- config/indexer.go | 7 - e2e_test.go | 8 - internal/counter/index_counts.go | 348 -------------------------- internal/counter/index_counts_test.go | 153 ----------- internal/ingest/ingest.go | 35 +-- internal/ingest/ingest_test.go | 49 +--- internal/ingest/linksystem.go | 22 -- internal/ingest/option.go | 34 --- internal/metrics/server.go | 6 - internal/registry/apiconv.go | 9 +- internal/registry/apiconv_test.go | 7 +- server/admin/server_test.go | 5 +- server/find/handler/handler.go | 38 +-- server/find/handler_test.go | 4 +- server/find/options.go | 11 - server/find/protocol_test.go | 22 +- server/find/server.go | 2 +- server/reframe/reframe.go | 2 +- 20 files changed, 133 insertions(+), 745 deletions(-) delete mode 100644 internal/counter/index_counts.go delete mode 100644 internal/counter/index_counts_test.go delete mode 100644 internal/ingest/option.go diff --git a/command/daemon.go b/command/daemon.go index 635c6db04..477d6804c 100644 --- a/command/daemon.go +++ b/command/daemon.go @@ -26,7 +26,6 @@ import ( "github.com/ipni/go-libipni/mautil" "github.com/ipni/storetheindex/config" "github.com/ipni/storetheindex/fsutil" - "github.com/ipni/storetheindex/internal/counter" "github.com/ipni/storetheindex/internal/ingest" "github.com/ipni/storetheindex/internal/registry" httpadmin "github.com/ipni/storetheindex/server/admin" @@ -39,12 +38,6 @@ import ( "github.com/urfave/cli/v2" ) -const ( - dsInfoPrefix = "/dsInfo/" - dsVersionKey = "version" - dsVersion = "001" -) - // Recognized valuestore type names. const ( vstoreDHStore = "dhstore" @@ -170,12 +163,6 @@ func daemonAction(cctx *cli.Context) error { // Create indexer core indexerCore := engine.New(valueStore, engine.WithCache(resultCache)) - var indexCounts *counter.IndexCounts - if cfg.Indexer.IndexCountEnabled { - indexCounts = counter.NewIndexCounts(dstore) - indexCounts.SetTotalAddend(cfg.Indexer.IndexCountTotalAddend) - } - // Create registry reg, err := registry.New(cctx.Context, cfg.Discovery, dstore, registry.WithFreezer(freezeDirs, cfg.Indexer.FreezeAtPercent)) @@ -200,7 +187,6 @@ func daemonAction(cctx *cli.Context) error { httpfind.WithWriteTimeout(time.Duration(cfg.Finder.ApiWriteTimeout)), httpfind.WithMaxConnections(cfg.Finder.MaxConnections), httpfind.WithHomepage(cfg.Finder.Webpage), - httpfind.WithIndexCounts(indexCounts), httpfind.WithVersion(cctx.App.Version), ) if err != nil { @@ -251,7 +237,7 @@ func daemonAction(cctx *cli.Context) error { } // Initialize ingester. - ingester, err = ingest.NewIngester(cfg.Ingest, p2pHost, indexerCore, reg, dstore, dsTmp, ingest.WithIndexCounts(indexCounts)) + ingester, err = ingest.NewIngester(cfg.Ingest, p2pHost, indexerCore, reg, dstore, dsTmp) if err != nil { return err } @@ -426,10 +412,6 @@ func daemonAction(cctx *cli.Context) error { } } - if indexCounts != nil { - indexCounts.SetTotalAddend(cfg.Indexer.IndexCountTotalAddend) - } - if errChan != nil { errChan <- nil } diff --git a/command/update_datastore.go b/command/update_datastore.go index 9059b64c9..dcd95e3f7 100644 --- a/command/update_datastore.go +++ b/command/update_datastore.go @@ -11,8 +11,14 @@ import ( "github.com/ipfs/go-datastore/query" ) -// updateBatchSize is the number of records to update at a time. -const updateBatchSize = 500000 +const ( + dsInfoPrefix = "/dsInfo/" + dsVersionKey = "version" + dsVersion = "002" + + // updateBatchSize is the number of records to update at a time. + updateBatchSize = 500000 +) func updateDatastore(ctx context.Context, ds datastore.Batching) error { dsVerKey := datastore.NewKey(dsInfoPrefix + dsVersionKey) @@ -20,7 +26,7 @@ func updateDatastore(ctx context.Context, ds datastore.Batching) error { if err != nil && !errors.Is(err, datastore.ErrNotFound) { return fmt.Errorf("cannot check datastore: %w", err) } - var curVer string + curVer := "000" if len(curVerData) != 0 { curVer = string(curVerData) } @@ -31,12 +37,29 @@ func updateDatastore(ctx context.Context, ds datastore.Batching) error { return fmt.Errorf("unknown datastore verssion: %s", curVer) } - log.Infof("Updating datastore to version %s", dsVersion) - if err = rmDtFsmRecords(ctx, ds); err != nil { - return err + var count int + + log.Infof("Updating datastore from version %s to %s", curVer, dsVersion) + if curVer < "001" { + count, err = deletePrefix(ctx, ds, "/data-transfer-v2") + if err != nil { + return err + } + if count != 0 { + log.Infow("Datastore update removed data-transfer fsm records", "count", count) + } + if err = rmOldTempRecords(ctx, ds); err != nil { + return err + } } - if err = rmOldTempRecords(ctx, ds); err != nil { - return err + if curVer < "002" { + count, err = deletePrefix(ctx, ds, "/indexCounts") + if err != nil { + return err + } + if count != 0 { + log.Infow("Datastore update removed index count records", "count", count) + } } if err = ds.Put(ctx, dsVerKey, []byte(dsVersion)); err != nil { @@ -179,8 +202,59 @@ func rmDtFsmRecords(ctx context.Context, ds datastore.Batching) error { return err } - if dtKeyCount != 0 { - log.Infow("Datastore update removed data-transfer fsm records", "count", dtKeyCount) - } return nil } + +func deletePrefix(ctx context.Context, ds datastore.Batching, prefix string) (int, error) { + q := query.Query{ + KeysOnly: true, + Prefix: prefix, + } + results, err := ds.Query(ctx, q) + if err != nil { + return 0, fmt.Errorf("cannot query datastore: %w", err) + } + defer results.Close() + + batch, err := ds.Batch(ctx) + if err != nil { + return 0, fmt.Errorf("cannot create datastore batch: %w", err) + } + + var keyCount, writeCount int + for result := range results.Next() { + if ctx.Err() != nil { + return 0, ctx.Err() + } + if writeCount >= updateBatchSize { + writeCount = 0 + if err = batch.Commit(ctx); err != nil { + return 0, fmt.Errorf("cannot commit datastore: %w", err) + } + log.Infow("Datastore update removed records", "count", keyCount) + } + if result.Error != nil { + return 0, fmt.Errorf("cannot read query result from datastore: %w", result.Error) + } + ent := result.Entry + if len(ent.Key) == 0 { + log.Warnf("result entry has empty key") + continue + } + + if err = batch.Delete(ctx, datastore.NewKey(ent.Key)); err != nil { + return 0, fmt.Errorf("cannot delete key from datastore: %w", err) + } + writeCount++ + keyCount++ + } + + if err = batch.Commit(ctx); err != nil { + return 0, fmt.Errorf("cannot commit datastore: %w", err) + } + if err = ds.Sync(context.Background(), datastore.NewKey(q.Prefix)); err != nil { + return 0, err + } + + return keyCount, nil +} diff --git a/config/indexer.go b/config/indexer.go index 8e35e1c4a..6050acb8e 100644 --- a/config/indexer.go +++ b/config/indexer.go @@ -31,13 +31,6 @@ type Indexer struct { // ValueStoreDir is on, at which to trigger the indexer to enter frozen // mode. A zero value uses the default. A negative value disables freezing. FreezeAtPercent float64 - // IndexCountEnabled sets whether ingest process should count the number of index records per provider and - // finder should show the number of index counts per provider. - IndexCountEnabled bool - // IndexCountTotalAddend is a value that is added to the index count total, - // to account for uncounted indexes that have existed in the value store - // before provider index counts were tracked. This value is reloadable. - IndexCountTotalAddend uint64 // ShutdownTimeout is the duration that a graceful shutdown has to complete // before the daemon process is terminated. If unset or zero, configures no // shutdown timeout. This value is reloadable. diff --git a/e2e_test.go b/e2e_test.go index 7ae87d5d6..6ffe3421a 100644 --- a/e2e_test.go +++ b/e2e_test.go @@ -326,10 +326,6 @@ func TestEndToEndWithReferenceProvider(t *testing.T) { require.Equal(t, 1, carCount) require.Equal(t, 1, headCount) - outProvider := e.run(ipni, "provider", "-pid", providerID, "--indexer", "localhost:3000") - // Check that IndexCount with correct value appears in providers output. - require.Contains(t, string(outProvider), "IndexCount: 1043") - root2 := filepath.Join(e.dir, ".storetheindex2") e.env = append(e.env, fmt.Sprintf("%s=%s", config.EnvDir, root2)) e.run(indexer, "init", "--store", "dhstore", "--pubsub-topic", "/indexer/ingest/mainnet", "--no-bootstrap", "--dhstore", "http://127.0.0.1:40080", @@ -421,10 +417,6 @@ func TestEndToEndWithReferenceProvider(t *testing.T) { return true }, 10*time.Second, time.Second) - outProvider = e.run(ipni, "provider", "-pid", providerID, "--indexer", "localhost:3000") - // Check that IndexCount is back to zero after removing car. - require.Contains(t, string(outProvider), "IndexCount: 0") - // Check that status is not frozen. outStatus := e.run(indexer, "admin", "status", "--indexer", "localhost:3202") require.Contains(t, string(outStatus), "Frozen: false", "expected indexer to be frozen") diff --git a/internal/counter/index_counts.go b/internal/counter/index_counts.go deleted file mode 100644 index b784b4da0..000000000 --- a/internal/counter/index_counts.go +++ /dev/null @@ -1,348 +0,0 @@ -package counter - -import ( - "context" - "encoding/base64" - "errors" - "fmt" - "strings" - "sync" - - "github.com/ipfs/go-datastore" - "github.com/ipfs/go-datastore/query" - logging "github.com/ipfs/go-log/v2" - "github.com/libp2p/go-libp2p/core/peer" - "github.com/multiformats/go-varint" -) - -var log = logging.Logger("indexer/counters") - -const ( - // indexCountPrefix identifies all provider index counts. - indexCountPrefix = "/indexCounts/" -) - -// IndexCounts persists multihash counts associated with provider ContextIDs. -// It is not safe to allow concurrent calls to IndexCounts methods for the same -// provider. Concurrent calls for different providers are safe. -type IndexCounts struct { - ds datastore.Datastore - // mutex protexts counts and total. - mutex sync.Mutex - // counts is in-mem total index counts for each provider. - counts map[peer.ID]uint64 - // total is in-mem total index count for all providers. - total uint64 - // totalAddend is a value that gets added to the total. - totalAddend uint64 -} - -// NewIndexCounts creates a new IndexCounts given a Datastore. -func NewIndexCounts(ds datastore.Datastore) *IndexCounts { - return &IndexCounts{ - ds: ds, - counts: make(map[peer.ID]uint64), - } -} - -// SetTotalAddent sets a value that is added to the index count returned by -// Total. Its purpose is to account for uncounted indexes that have existed -// since before provider index counts were tracked. This only affects Total, -// and does not affect any individual provider values. -func (c *IndexCounts) SetTotalAddend(totalAddend uint64) { - c.mutex.Lock() - c.totalAddend = totalAddend - c.mutex.Unlock() -} - -// AddCount adds the index count to the existing count for the -// provider's context ID, and updates in-memory totals. -func (c *IndexCounts) AddCount(providerID peer.ID, contextID []byte, count uint64) { - if count == 0 { - return - } - - // Update in-mem values if they are present. - c.mutex.Lock() - prevCtxTotal, ok := c.counts[providerID] - if ok { - c.counts[providerID] = prevCtxTotal + count - } - if c.total != 0 { - c.total += count - } - c.mutex.Unlock() - - key := makeIndexCountKey(providerID, contextID) - - // Get any previous count for this contextID and add to it. - prevCtxCount, err := c.loadContextCount(context.Background(), key) - if err != nil { - return - } - err = c.ds.Put(context.Background(), key, varint.ToUvarint(prevCtxCount+count)) - if err != nil { - log.Errorw("Cannot update index count", "err", err) - } -} - -// AddMissingCount stores the count only if there is no existing count for the -// provider's context ID, and updates in-memory totals. -func (c *IndexCounts) AddMissingCount(providerID peer.ID, contextID []byte, count uint64) { - if count == 0 { - return - } - - key := makeIndexCountKey(providerID, contextID) - - has, err := c.ds.Has(context.Background(), key) - if err != nil { - log.Errorw("cannot load index count", "err", err) - return - } - if has { - return - } - err = c.ds.Put(context.Background(), key, varint.ToUvarint(count)) - if err != nil { - log.Errorw("Cannot store index count", "err", err) - } - - // Update in-mem values if they are present. - c.mutex.Lock() - prevCtxTotal, ok := c.counts[providerID] - if ok { - c.counts[providerID] = prevCtxTotal + count - } - if c.total != 0 { - c.total += count - } - c.mutex.Unlock() -} - -// RemoveCtx removes the index count for a provider's contextID. -func (c *IndexCounts) RemoveCtx(providerID peer.ID, contextID []byte) (uint64, error) { - key := makeIndexCountKey(providerID, contextID) - - count, err := c.loadContextCount(context.Background(), key) - if derr := c.ds.Delete(context.Background(), key); derr != nil { - log.Errorw("Cannot delete index count", "err", derr) - } - if err != nil { - return 0, err - } - - // Update in-mem values if they are present. - c.mutex.Lock() - ptotal, ok := c.counts[providerID] - if ok { - if count < ptotal { - c.counts[providerID] = ptotal - count - } else { - if count > ptotal { - log.Error("Index count in data store is greater than in memory") - count = ptotal - } - delete(c.counts, providerID) - } - } - if c.total != 0 { - c.total -= count - } - c.mutex.Unlock() - - return count, nil -} - -// Provider reads all index counts for a provider. -func (c *IndexCounts) Provider(providerID peer.ID) (uint64, error) { - // Return in-mem value if available. - c.mutex.Lock() - count, ok := c.counts[providerID] - c.mutex.Unlock() - if ok { - return count, nil - } - - total, err := c.loadProvider(context.Background(), providerID) - if err != nil { - return 0, err - } - - // Track value in memory. - c.mutex.Lock() - c.counts[providerID] = total - c.mutex.Unlock() - - return total, nil -} - -// Total returns the total of all index counts for all providers. -func (c *IndexCounts) Total() (uint64, error) { - // Return in-mem value if available. - c.mutex.Lock() - count := c.total - total := count + c.totalAddend - c.mutex.Unlock() - if count != 0 { - return total, nil - } - - q := query.Query{ - Prefix: indexCountPrefix, - } - results, err := c.ds.Query(context.Background(), q) - if err != nil { - return 0, fmt.Errorf("cannot query index counts: %v", err) - } - defer results.Close() - - for r := range results.Next() { - if r.Error != nil { - return 0, fmt.Errorf("cannot read index: %v", r.Error) - } - count, _, err = varint.FromUvarint(r.Entry.Value) - if err != nil { - log.Errorw("Cannot decode index count", "err", err) - continue - } - - total += count - } - - // Track value in memory. - c.mutex.Lock() - c.total = total - total += c.totalAddend - c.mutex.Unlock() - - return total, nil -} - -// RemoveProvider removes all index counts for a provider. -func (c *IndexCounts) RemoveProvider(providerID peer.ID) uint64 { - var count uint64 - var ok bool - - c.mutex.Lock() - if len(c.counts) != 0 { - if c.total != 0 { - count, ok = c.counts[providerID] - if ok { - c.total -= count - delete(c.counts, providerID) - } - } else { - ok = true - delete(c.counts, providerID) - } - } - c.mutex.Unlock() - - ctx := context.Background() - if !ok { - var err error - count, err = c.loadProvider(ctx, providerID) - if err != nil { - c.mutex.Lock() - c.total = 0 - c.mutex.Unlock() - log.Errorw("Cannot load provider count", "err", err) - } - - if count != 0 { - c.mutex.Lock() - if c.total != 0 { - c.total -= count - } - c.mutex.Unlock() - } - } - - n, err := c.deletePrefix(ctx, indexCountPrefix+providerID.String()) - if err != nil { - log.Errorw("Cannot delete provider contextID counts", "err", err) - } else { - log.Debugf("Removed %d contextID counts for provider", n) - } - - return count -} - -func (c *IndexCounts) loadContextCount(ctx context.Context, key datastore.Key) (uint64, error) { - data, err := c.ds.Get(context.Background(), key) - if err != nil { - if errors.Is(err, datastore.ErrNotFound) { - return 0, nil - } - return 0, fmt.Errorf("cannot load index count: %w", err) - } - count, _, err := varint.FromUvarint(data) - if err != nil { - return 0, fmt.Errorf("cannot decode index count: %w", err) - } - - return count, nil -} - -func (c *IndexCounts) loadProvider(ctx context.Context, providerID peer.ID) (uint64, error) { - q := query.Query{ - Prefix: indexCountPrefix + providerID.String(), - } - results, err := c.ds.Query(ctx, q) - if err != nil { - return 0, fmt.Errorf("cannot query all index counts: %v", err) - } - defer results.Close() - - var total uint64 - for r := range results.Next() { - if r.Error != nil { - return 0, fmt.Errorf("cannot read index: %v", r.Error) - } - count, _, err := varint.FromUvarint(r.Entry.Value) - if err != nil { - log.Errorw("Cannot decode index count", "err", err) - continue - } - - total += count - } - - return total, nil -} - -func makeIndexCountKey(provider peer.ID, contextID []byte) datastore.Key { - var keyBuf strings.Builder - keyBuf.WriteString(indexCountPrefix) - keyBuf.WriteString(provider.String()) - keyBuf.WriteString("/") - keyBuf.WriteString(base64.StdEncoding.EncodeToString(contextID)) - return datastore.NewKey(keyBuf.String()) -} - -func (c *IndexCounts) deletePrefix(ctx context.Context, prefix string) (int, error) { - q := query.Query{ - Prefix: prefix, - KeysOnly: true, - } - results, err := c.ds.Query(ctx, q) - if err != nil { - return 0, err - } - - var delKeys []string - for r := range results.Next() { - delKeys = append(delKeys, r.Entry.Key) - } - results.Close() - - for _, key := range delKeys { - err = c.ds.Delete(ctx, datastore.NewKey(key)) - if err != nil { - return 0, err - } - } - - return len(delKeys), nil -} diff --git a/internal/counter/index_counts_test.go b/internal/counter/index_counts_test.go deleted file mode 100644 index bda163ada..000000000 --- a/internal/counter/index_counts_test.go +++ /dev/null @@ -1,153 +0,0 @@ -package counter_test - -import ( - "testing" - - "github.com/ipfs/go-datastore" - "github.com/ipni/storetheindex/internal/counter" - crypto "github.com/libp2p/go-libp2p/core/crypto" - "github.com/libp2p/go-libp2p/core/peer" - "github.com/libp2p/go-libp2p/core/test" - "github.com/stretchr/testify/require" -) - -func TestIndexCounts(t *testing.T) { - providerPriv, _, err := test.RandTestKeyPair(crypto.Ed25519, 256) - require.NoError(t, err) - providerID1, err := peer.IDFromPrivateKey(providerPriv) - require.NoError(t, err) - providerPriv, _, err = test.RandTestKeyPair(crypto.Ed25519, 256) - require.NoError(t, err) - providerID2, err := peer.IDFromPrivateKey(providerPriv) - require.NoError(t, err) - - c := counter.NewIndexCounts(datastore.NewMapDatastore()) - - ctxid1 := []byte("ctxid1") - ctxid2 := []byte("ctxid2") - ctxid3 := []byte("ctxid3") - ctxid4 := []byte("ctxid4") - - c.AddCount(providerID1, ctxid1, 5) - c.AddCount(providerID1, ctxid2, 2) - c.AddCount(providerID1, ctxid3, 3) - total, err := c.Provider(providerID1) - require.NoError(t, err) - require.Equal(t, 10, int(total)) - - total, err = c.Total() - require.NoError(t, err) - require.Equal(t, 10, int(total)) - - c.AddCount(providerID2, ctxid4, 7) - total, err = c.Provider(providerID2) - require.NoError(t, err) - require.Equal(t, 7, int(total)) - - total, err = c.Total() - require.NoError(t, err) - require.Equal(t, 17, int(total)) - - count, err := c.RemoveCtx(providerID1, ctxid2) - require.NoError(t, err) - require.Equal(t, 2, int(count)) - - total, err = c.Provider(providerID1) - require.NoError(t, err) - require.Equal(t, 8, int(total)) - - total, err = c.Provider(providerID2) - require.NoError(t, err) - require.Equal(t, 7, int(total)) - - total, err = c.Total() - require.NoError(t, err) - require.Equal(t, 15, int(total)) - - // Remove context that provider does not have should change nothing. - count, err = c.RemoveCtx(providerID2, ctxid1) - require.NoError(t, err) - require.Zero(t, count) - - total, err = c.Total() - require.NoError(t, err) - require.Equal(t, 15, int(total)) - - // Remove first provider. - count = c.RemoveProvider(providerID1) - require.Equal(t, 8, int(count)) - count = c.RemoveProvider(providerID1) - require.Zero(t, int(count)) - - total, err = c.Total() - require.NoError(t, err) - require.Equal(t, 7, int(total)) - - // Remove the last provider. - count, err = c.RemoveCtx(providerID2, ctxid4) - require.NoError(t, err) - require.Equal(t, 7, int(count)) - - total, err = c.Total() - require.NoError(t, err) - require.Zero(t, total) - - // Remove provider with total not yet in mem. - c = counter.NewIndexCounts(datastore.NewMapDatastore()) - c.AddCount(providerID1, ctxid1, 5) - c.AddCount(providerID2, ctxid4, 7) - count = c.RemoveProvider(providerID1) - require.Equal(t, 5, int(count)) - - total, err = c.Total() - require.NoError(t, err) - require.Equal(t, 7, int(total)) -} - -func TestAddCount(t *testing.T) { - providerPriv, _, err := test.RandTestKeyPair(crypto.Ed25519, 256) - require.NoError(t, err) - providerID, err := peer.IDFromPrivateKey(providerPriv) - require.NoError(t, err) - - c := counter.NewIndexCounts(datastore.NewMapDatastore()) - - ctxid1 := []byte("ctxid1") - ctxid2 := []byte("ctxid2") - - c.AddCount(providerID, ctxid1, 5) - c.AddCount(providerID, ctxid2, 2) - total, err := c.Total() - require.NoError(t, err) - require.Equal(t, 7, int(total)) - - // Show that AddCount adds to existing value. - c.AddCount(providerID, ctxid2, 2) - total, err = c.Provider(providerID) - require.NoError(t, err) - require.Equal(t, 9, int(total)) - - c.AddCount(providerID, ctxid1, 3) - total, err = c.Provider(providerID) - require.NoError(t, err) - require.Equal(t, 12, int(total)) - - c.AddMissingCount(providerID, ctxid1, 5) - total, err = c.Provider(providerID) - require.NoError(t, err) - require.Equal(t, 12, int(total)) - - total, err = c.Total() - require.NoError(t, err) - require.Equal(t, 12, int(total)) - - ctxid3 := []byte("ctxid3") - c.AddMissingCount(providerID, ctxid3, 5) - total, err = c.Provider(providerID) - require.NoError(t, err) - require.Equal(t, 17, int(total)) - - total, err = c.Total() - require.NoError(t, err) - require.Equal(t, 17, int(total)) -} diff --git a/internal/ingest/ingest.go b/internal/ingest/ingest.go index c09a75692..01c74fa50 100644 --- a/internal/ingest/ingest.go +++ b/internal/ingest/ingest.go @@ -25,7 +25,6 @@ import ( coremetrics "github.com/ipni/go-indexer-core/metrics" "github.com/ipni/go-libipni/dagsync" "github.com/ipni/storetheindex/config" - "github.com/ipni/storetheindex/internal/counter" "github.com/ipni/storetheindex/internal/metrics" "github.com/ipni/storetheindex/internal/registry" "github.com/libp2p/go-libp2p/core/host" @@ -171,18 +170,12 @@ type Ingester struct { mhsFromMirror atomic.Uint64 // Metrics - backlogs map[peer.ID]int32 - indexCounts *counter.IndexCounts + backlogs map[peer.ID]int32 } // NewIngester creates a new Ingester that uses a dagsync Subscriber to handle // communication with providers. -func NewIngester(cfg config.Ingest, h host.Host, idxr indexer.Interface, reg *registry.Registry, ds, dsTmp datastore.Batching, options ...Option) (*Ingester, error) { - opts, err := getOpts(options) - if err != nil { - return nil, err - } - +func NewIngester(cfg config.Ingest, h host.Host, idxr indexer.Interface, reg *registry.Registry, ds, dsTmp datastore.Batching) (*Ingester, error) { if cfg.IngestWorkerCount == 0 { return nil, errors.New("ingester worker count must be > 0") } @@ -210,12 +203,12 @@ func NewIngester(cfg config.Ingest, h host.Host, idxr indexer.Interface, reg *re minKeyLen: cfg.MinimumKeyLength, - indexCounts: opts.idxCounts, - backlogs: make(map[peer.ID]int32), + backlogs: make(map[peer.ID]int32), } ing.workersCtx, ing.cancelWorkers = context.WithCancel(context.Background()) + var err error ing.mirror, err = newMirror(cfg.AdvertisementMirror, ing.dsTmp) if err != nil { return nil, err @@ -745,20 +738,9 @@ func (ing *Ingester) metricsUpdater() { usage = usageStats.Percent } - if ing.indexCounts != nil { - indexCount, err := ing.indexCounts.Total() - if err != nil { - log.Errorw("Error getting index counts", "err", err) - } - stats.Record(context.Background(), - coremetrics.StoreSize.M(size), - metrics.IndexCount.M(int64(indexCount)), - metrics.PercentUsage.M(usage)) - } else { - stats.Record(context.Background(), - coremetrics.StoreSize.M(size), - metrics.PercentUsage.M(usage)) - } + stats.Record(context.Background(), + coremetrics.StoreSize.M(size), + metrics.PercentUsage.M(usage)) if ing.mirror.canRead() { mhsFromMirror := ing.MultihashesFromMirror() @@ -822,9 +804,6 @@ func (ing *Ingester) autoSync() { if err := ing.removePublisher(ctx, provInfo.Publisher); err != nil { log.Errorw("Error removing provider", "err", err, "provider", provInfo.AddrInfo.ID) } - if ing.indexCounts != nil { - ing.indexCounts.RemoveProvider(provInfo.AddrInfo.ID) - } // Do not remove provider info from core, because that requires // scanning the entire core valuestore. Instead, let the finder // delete provider contexts as deleted providers appear in find diff --git a/internal/ingest/ingest_test.go b/internal/ingest/ingest_test.go index dfbc3e64e..87e8b8cc6 100644 --- a/internal/ingest/ingest_test.go +++ b/internal/ingest/ingest_test.go @@ -33,7 +33,6 @@ import ( "github.com/ipni/go-libipni/test" "github.com/ipni/storetheindex/carstore" "github.com/ipni/storetheindex/config" - "github.com/ipni/storetheindex/internal/counter" "github.com/ipni/storetheindex/internal/registry" "github.com/ipni/storetheindex/test/typehelpers" "github.com/libp2p/go-libp2p" @@ -807,7 +806,7 @@ func TestSync(t *testing.T) { srcStore := dssync.MutexWrap(datastore.NewMapDatastore()) h := mkTestHost() pubHost := mkTestHost() - i, core, _, indexCounts := mkIngest(t, h) + i, core, _ := mkIngest(t, h) defer core.Close() defer i.Close() pub, lsys := mkMockPublisher(t, pubHost, h, srcStore) @@ -853,32 +852,9 @@ func TestSync(t *testing.T) { _, err = i.Sync(ctx, peerInfo, 1, true) require.NoError(t, err) - // Check that the total number of indexes is correct. - expectCount := uint64(testEntriesChunkCount * testEntriesChunkSize) - count, err := indexCounts.Total() - require.NoError(t, err) - require.Equal(t, int(expectCount), int(count)) - count, err = indexCounts.Provider(providerID) - require.NoError(t, err) - require.Equal(t, expectCount, count) - // Check again to check for correct value from memory. - count, err = indexCounts.Total() - require.NoError(t, err) - require.Equal(t, expectCount, count) - count, err = indexCounts.Provider(providerID) - require.NoError(t, err) - require.Equal(t, expectCount, count) - publishRemovalAd(t, pub, lsys, false, providerID, privKey) _, err = i.Sync(ctx, peerInfo, 0, false) require.NoError(t, err) - - count, err = indexCounts.Provider(providerID) - require.NoError(t, err) - require.Zero(t, count) - count, err = indexCounts.Provider(providerID) - require.NoError(t, err) - require.Zero(t, count) } func testSyncWithExtendedProviders(t *testing.T, @@ -892,7 +868,7 @@ func testSyncWithExtendedProviders(t *testing.T, srcStore := dssync.MutexWrap(datastore.NewMapDatastore()) h := mkTestHost() pubHost := mkTestHost() - ingester, core, reg, _ := mkIngest(t, h) + ingester, core, reg := mkIngest(t, h) defer core.Close() defer ingester.Close() pub, lsys := mkMockPublisher(t, pubHost, h, srcStore) @@ -1098,7 +1074,7 @@ func TestSyncTooLargeMetadata(t *testing.T) { srcStore := dssync.MutexWrap(datastore.NewMapDatastore()) h := mkTestHost() pubHost := mkTestHost() - i, core, _, _ := mkIngest(t, h) + i, core, _ := mkIngest(t, h) defer core.Close() defer i.Close() pub, lsys := mkMockPublisher(t, pubHost, h, srcStore) @@ -1138,7 +1114,7 @@ func TestSyncSkipNoMetadata(t *testing.T) { srcStore := dssync.MutexWrap(datastore.NewMapDatastore()) h := mkTestHost() pubHost := mkTestHost() - i, core, reg, _ := mkIngest(t, h) + i, core, reg := mkIngest(t, h) defer core.Close() defer i.Close() pub, lsys := mkMockPublisher(t, pubHost, h, srcStore) @@ -1260,7 +1236,7 @@ func TestRecursionDepthLimitsEntriesSync(t *testing.T) { srcStore := dssync.MutexWrap(datastore.NewMapDatastore()) h := mkTestHost() pubHost := mkTestHost() - ing, core, _, _ := mkIngest(t, h) + ing, core, _ := mkIngest(t, h) defer core.Close() defer ing.Close() pub, lsys := mkMockPublisher(t, pubHost, h, srcStore) @@ -1382,7 +1358,7 @@ func TestMultiplePublishers(t *testing.T) { pubHost2 := mkTestHost() pubHost2Priv := pubHost2.Peerstore().PrivKey(pubHost2.ID()) - i, core, _, _ := mkIngest(t, h) + i, core, _ := mkIngest(t, h) defer core.Close() defer i.Close() pub1, lsys1 := mkMockPublisher(t, pubHost1, h, srcStore1) @@ -1799,19 +1775,18 @@ func mkMockPublisher(t *testing.T, pubHost, testHost host.Host, store datastore. return pub, lsys } -func mkIngest(t *testing.T, h host.Host) (*Ingester, *engine.Engine, *registry.Registry, *counter.IndexCounts) { +func mkIngest(t *testing.T, h host.Host) (*Ingester, *engine.Engine, *registry.Registry) { return mkIngestWithConfig(t, h, defaultTestIngestConfig) } -func mkIngestWithConfig(t *testing.T, h host.Host, cfg config.Ingest) (*Ingester, *engine.Engine, *registry.Registry, *counter.IndexCounts) { +func mkIngestWithConfig(t *testing.T, h host.Host, cfg config.Ingest) (*Ingester, *engine.Engine, *registry.Registry) { store := dssync.MutexWrap(datastore.NewMapDatastore()) tmpStore := dssync.MutexWrap(datastore.NewMapDatastore()) reg := mkRegistry(t) core := mkIndexer(t, true) - indexCounts := counter.NewIndexCounts(store) - ing, err := NewIngester(cfg, h, core, reg, store, tmpStore, WithIndexCounts(indexCounts)) + ing, err := NewIngester(cfg, h, core, reg, store, tmpStore) require.NoError(t, err) - return ing, core, reg, indexCounts + return ing, core, reg } func connectHosts(t *testing.T, srcHost, dstHost host.Host) { @@ -2093,7 +2068,6 @@ type testEnv struct { publisherPriv crypto.PrivKey ingesterPriv crypto.PrivKey publisherLinkSys ipld.LinkSystem - indexCounts *counter.IndexCounts ingester *Ingester ingesterHost host.Host core indexer.Interface @@ -2162,7 +2136,7 @@ func setupTestEnv(t *testing.T, shouldConnectHosts bool, opts ...func(*testEnvOp require.NoError(t, err) pubHost := mkTestHost(libp2p.Identity(priv)) - i, core, reg, indexCounts := mkIngestWithConfig(t, ingesterHost, *testOpt.ingestConfig) + i, core, reg := mkIngestWithConfig(t, ingesterHost, *testOpt.ingestConfig) var lsys ipld.LinkSystem if testOpt.publisherLinkSysFn != nil { @@ -2191,7 +2165,6 @@ func setupTestEnv(t *testing.T, shouldConnectHosts bool, opts ...func(*testEnvOp core: core, reg: reg, skipIngCleanup: testOpt.skipIngesterCleanup, - indexCounts: indexCounts, } t.Cleanup(func() { diff --git a/internal/ingest/linksystem.go b/internal/ingest/linksystem.go index 6b217a6f3..4ccb43df3 100644 --- a/internal/ingest/linksystem.go +++ b/internal/ingest/linksystem.go @@ -266,14 +266,6 @@ func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid ci if err != nil { return adIngestError{adIngestIndexerErr, fmt.Errorf("failed to remove provider context: %w", err)} } - if ing.indexCounts != nil { - rmCount, err := ing.indexCounts.RemoveCtx(providerID, ad.ContextID) - if err != nil { - log.Errorw("Error removing index count", "err", err) - } else { - log.Debugf("Removal ad reduced index count by %d", rmCount) - } - } return nil } @@ -325,7 +317,6 @@ func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid ci mhCount, err = ing.ingestEntriesFromCar(ctx, ad, providerID, adCid, entriesCid, log) // If entries data successfully read from CAR file. if err == nil { - ing.updateIndexCounts(mhCount, providerID, ad.ContextID, resync) ing.mhsFromMirror.Add(uint64(mhCount)) return nil } @@ -390,22 +381,9 @@ func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid ci if err != nil { return err } - // Update index counts only if no error, since ad usually reindexed if error. - ing.updateIndexCounts(mhCount, providerID, ad.ContextID, resync) return nil } -func (ing *Ingester) updateIndexCounts(mhCount int, providerID peer.ID, contextID []byte, resync bool) { - if ing.indexCounts != nil && mhCount != 0 { - if resync { - // If resyncing, only add missing values so that counts are not duplicated. - ing.indexCounts.AddMissingCount(providerID, contextID, uint64(mhCount)) - } else { - ing.indexCounts.AddCount(providerID, contextID, uint64(mhCount)) - } - } -} - func (ing *Ingester) ingestHamtFromPublisher(ctx context.Context, ad schema.Advertisement, publisherID, providerID peer.ID, entsCid cid.Cid, log *zap.SugaredLogger) (int, error) { // Split HAMP into batches of 4096 entries. const batchSize = 4096 diff --git a/internal/ingest/option.go b/internal/ingest/option.go deleted file mode 100644 index 9924bec18..000000000 --- a/internal/ingest/option.go +++ /dev/null @@ -1,34 +0,0 @@ -package ingest - -import ( - "fmt" - - "github.com/ipni/storetheindex/internal/counter" -) - -// configIngest contains all options for the ingester. -type configIngest struct { - idxCounts *counter.IndexCounts -} - -// Option is a function that sets a value in a config. -type Option func(*configIngest) error - -// getOpts creates a configIngest and applies Options to it. -func getOpts(opts []Option) (configIngest, error) { - var cfg configIngest - for i, opt := range opts { - if err := opt(&cfg); err != nil { - return configIngest{}, fmt.Errorf("option %d error: %s", i, err) - } - } - return cfg, nil -} - -// WithIndexCounts configures counting indexes using an IndexCounts instance. -func WithIndexCounts(ic *counter.IndexCounts) Option { - return func(c *configIngest) error { - c.idxCounts = ic - return nil - } -} diff --git a/internal/metrics/server.go b/internal/metrics/server.go index 4b9b2ae7c..139c8f8c0 100644 --- a/internal/metrics/server.go +++ b/internal/metrics/server.go @@ -34,7 +34,6 @@ var ( AdLoadError = stats.Int64("ingest/adLoadError", "Number of times an ad failed to load", stats.UnitDimensionless) ProviderCount = stats.Int64("provider/count", "Number of known (registered) providers", stats.UnitDimensionless) EntriesSyncLatency = stats.Float64("ingest/entriessynclatency", "How long it took to sync an Ad's entries", stats.UnitMilliseconds) - IndexCount = stats.Int64("provider/indexCount", "Number of indexes stored for all providers", stats.UnitDimensionless) PercentUsage = stats.Float64("ingest/percentusage", "Percent usage of storage available in value store", stats.UnitDimensionless) NonRemoveAdCount = stats.Int64("ingest/nonremoveadcount", "Number of non-removal advertisements", stats.UnitDimensionless) RemoveAdCount = stats.Int64("ingest/removeadcount", "Number of removal advertisements", stats.UnitDimensionless) @@ -93,10 +92,6 @@ var ( Measure: AdLoadError, Aggregation: view.Count(), } - indexCountView = &view.View{ - Measure: IndexCount, - Aggregation: view.LastValue(), - } percentUsageView = &view.View{ Measure: PercentUsage, Aggregation: view.LastValue(), @@ -129,7 +124,6 @@ func Start(views []*view.View) http.Handler { adIngestSkipped, adIngestSuccess, adLoadError, - indexCountView, percentUsageView, nonRemoveAdCountView, removeAdCountView, diff --git a/internal/registry/apiconv.go b/internal/registry/apiconv.go index 109a19bca..1a8f62cc1 100644 --- a/internal/registry/apiconv.go +++ b/internal/registry/apiconv.go @@ -10,16 +10,15 @@ import ( ) // RegToApiProviderInfo converts provider info from registry to api objects. -func RegToApiProviderInfo(pi *ProviderInfo, indexCount uint64) *model.ProviderInfo { +func RegToApiProviderInfo(pi *ProviderInfo) *model.ProviderInfo { if pi == nil { return nil } apiPI := &model.ProviderInfo{ - AddrInfo: pi.AddrInfo, - IndexCount: indexCount, - Lag: pi.Lag, - Inactive: pi.Inactive(), + AddrInfo: pi.AddrInfo, + Lag: pi.Lag, + Inactive: pi.Inactive(), } if pi.LastAdvertisement != cid.Undef { apiPI.LastAdvertisement = pi.LastAdvertisement diff --git a/internal/registry/apiconv_test.go b/internal/registry/apiconv_test.go index a208d61f8..d1aaf3bde 100644 --- a/internal/registry/apiconv_test.go +++ b/internal/registry/apiconv_test.go @@ -47,8 +47,6 @@ func TestRegToApiProviderInfo(t *testing.T) { ep2, _, _ := test.RandomIdentity() ep2Metadata := []byte("ep2-metadata") - indexCount := uint64(1809246) - regPI := ProviderInfo{ AddrInfo: provAddrInfo, LastAdvertisement: lastAdCid, @@ -81,7 +79,7 @@ func TestRegToApiProviderInfo(t *testing.T) { FrozenAtTime: frozenAtTime, } - apiPI := RegToApiProviderInfo(®PI, indexCount) + apiPI := RegToApiProviderInfo(®PI) require.NotNil(t, apiPI) require.Equal(t, regPI.AddrInfo, apiPI.AddrInfo) @@ -90,7 +88,6 @@ func TestRegToApiProviderInfo(t *testing.T) { require.Equal(t, regPI.Publisher, apiPI.Publisher.ID) require.Equal(t, 1, len(apiPI.Publisher.Addrs)) require.Equal(t, regPI.PublisherAddr, apiPI.Publisher.Addrs[0]) - require.Equal(t, indexCount, apiPI.IndexCount) require.Equal(t, regPI.FrozenAt, apiPI.FrozenAt) require.Equal(t, regPI.FrozenAtTime.Format(time.RFC3339), apiPI.FrozenAtTime) @@ -126,6 +123,6 @@ func TestRegToApiProviderInfo(t *testing.T) { require.Equal(t, regPI, *regPI2) - require.Nil(t, RegToApiProviderInfo(nil, 0)) + require.Nil(t, RegToApiProviderInfo(nil)) require.Nil(t, apiToRegProviderInfo(nil)) } diff --git a/server/admin/server_test.go b/server/admin/server_test.go index 97825a903..4c45685f8 100644 --- a/server/admin/server_test.go +++ b/server/admin/server_test.go @@ -17,7 +17,6 @@ import ( "github.com/ipni/go-libipni/find/model" "github.com/ipni/storetheindex/admin/client" "github.com/ipni/storetheindex/config" - "github.com/ipni/storetheindex/internal/counter" "github.com/ipni/storetheindex/internal/ingest" "github.com/ipni/storetheindex/internal/registry" "github.com/ipni/storetheindex/server/admin" @@ -235,7 +234,7 @@ func makeTestenv(t *testing.T) *testenv { idx := initIndex(t, true) reg := initRegistry(t, peerIDStr) ing := initIngest(t, idx, reg) - s := setupServer(t, idx, ing, reg, nil) + s := setupServer(t, idx, ing, reg) c := setupClient(t, s.URL()) // Start server @@ -266,7 +265,7 @@ func (te *testenv) close(t *testing.T) { te.registry.Close() } -func setupServer(t *testing.T, ind indexer.Interface, ing *ingest.Ingester, reg *registry.Registry, idxCts *counter.IndexCounts) *admin.Server { +func setupServer(t *testing.T, ind indexer.Interface, ing *ingest.Ingester, reg *registry.Registry) *admin.Server { reloadErrChan := make(chan chan error) s, err := admin.New("127.0.0.1:0", serverID, ind, ing, reg, reloadErrChan) require.NoError(t, err) diff --git a/server/find/handler/handler.go b/server/find/handler/handler.go index 12cbf131f..0483f66ec 100644 --- a/server/find/handler/handler.go +++ b/server/find/handler/handler.go @@ -11,7 +11,6 @@ import ( "github.com/ipni/go-indexer-core" "github.com/ipni/go-libipni/apierror" "github.com/ipni/go-libipni/find/model" - "github.com/ipni/storetheindex/internal/counter" "github.com/ipni/storetheindex/internal/registry" "github.com/libp2p/go-libp2p/core/peer" "github.com/multiformats/go-multiaddr" @@ -27,18 +26,16 @@ const avg_mh_size = 40 // handler provides request handling functionality for the find server // that is common to all protocols. type Handler struct { - indexer indexer.Interface - registry *registry.Registry - indexCounts *counter.IndexCounts - stats *cachedStats + indexer indexer.Interface + registry *registry.Registry + stats *cachedStats } -func New(indexer indexer.Interface, reg *registry.Registry, indexCounts *counter.IndexCounts) *Handler { +func New(indexer indexer.Interface, reg *registry.Registry) *Handler { return &Handler{ - indexer: indexer, - registry: reg, - indexCounts: indexCounts, - stats: newCachedStats(indexer, time.Hour), + indexer: indexer, + registry: reg, + stats: newCachedStats(indexer, time.Hour), } } @@ -173,15 +170,7 @@ func (h *Handler) ListProviders() ([]byte, error) { responses := make([]model.ProviderInfo, len(infos)) for i, pInfo := range infos { - var indexCount uint64 - if h.indexCounts != nil { - var err error - indexCount, err = h.indexCounts.Provider(pInfo.AddrInfo.ID) - if err != nil { - log.Errorw("Could not get provider index count", "err", err) - } - } - responses[i] = *registry.RegToApiProviderInfo(pInfo, indexCount) + responses[i] = *registry.RegToApiProviderInfo(pInfo) } return json.Marshal(responses) @@ -192,16 +181,7 @@ func (h *Handler) GetProvider(providerID peer.ID) ([]byte, error) { if info == nil || !allowed || info.Inactive() { return nil, nil } - - var indexCount uint64 - if h.indexCounts != nil { - var err error - indexCount, err = h.indexCounts.Provider(providerID) - if err != nil { - log.Errorw("Could not get provider index count", "err", err) - } - } - rsp := registry.RegToApiProviderInfo(info, indexCount) + rsp := registry.RegToApiProviderInfo(info) return json.Marshal(rsp) } diff --git a/server/find/handler_test.go b/server/find/handler_test.go index 01060f10b..2feb9fbb1 100644 --- a/server/find/handler_test.go +++ b/server/find/handler_test.go @@ -274,7 +274,7 @@ func TestServer_StreamingResponse(t *testing.T) { func TestServer_Landing(t *testing.T) { ind := initIndex(t, false) reg := initRegistry(t) - s := setupServer(ind, reg, nil, t) + s := setupServer(ind, reg, t) go func() { err := s.Start() require.ErrorIs(t, err, http.ErrServerClosed) @@ -297,7 +297,7 @@ func TestServer_Landing(t *testing.T) { func setupTestServer(t *testing.T, iv indexer.Value, mhs []multihash.Multihash) *find.Server { ind := initIndex(t, false) reg := initRegistry(t) - s := setupServer(ind, reg, nil, t) + s := setupServer(ind, reg, t) go func() { err := s.Start() require.ErrorIs(t, err, http.ErrServerClosed) diff --git a/server/find/options.go b/server/find/options.go index 0b14c182b..ecbf418c2 100644 --- a/server/find/options.go +++ b/server/find/options.go @@ -3,8 +3,6 @@ package find import ( "fmt" "time" - - "github.com/ipni/storetheindex/internal/counter" ) const ( @@ -17,7 +15,6 @@ const ( // config contains all options for the server. type config struct { homepageURL string - indexCounts *counter.IndexCounts maxConns int readTimeout time.Duration writeTimeout time.Duration @@ -60,14 +57,6 @@ func WithMaxConnections(maxConnections int) Option { } } -// WithIndexCounts supplies a counter.IndexCounts for tracking index counts. -func WithIndexCounts(indexCounts *counter.IndexCounts) Option { - return func(c *config) error { - c.indexCounts = indexCounts - return nil - } -} - // WithReadTimeout configures server read timeout. func WithReadTimeout(t time.Duration) Option { return func(c *config) error { diff --git a/server/find/protocol_test.go b/server/find/protocol_test.go index 391c8408b..a78f59fa0 100644 --- a/server/find/protocol_test.go +++ b/server/find/protocol_test.go @@ -10,7 +10,6 @@ import ( "time" "github.com/ipfs/go-cid" - "github.com/ipfs/go-datastore" reframeclient "github.com/ipfs/go-delegated-routing/client" "github.com/ipfs/go-delegated-routing/gen/proto" indexer "github.com/ipni/go-indexer-core" @@ -22,7 +21,6 @@ import ( "github.com/ipni/go-libipni/find/model" "github.com/ipni/go-libipni/test" "github.com/ipni/storetheindex/config" - "github.com/ipni/storetheindex/internal/counter" "github.com/ipni/storetheindex/internal/registry" httpserver "github.com/ipni/storetheindex/server/find" "github.com/libp2p/go-libp2p/core/peer" @@ -33,8 +31,8 @@ import ( const providerID = "12D3KooWKRyzVWW6ChFjQjK4miCty85Niy48tpPV95XdKu1BcvMA" -func setupServer(ind indexer.Interface, reg *registry.Registry, idxCts *counter.IndexCounts, t *testing.T) *httpserver.Server { - s, err := httpserver.New("127.0.0.1:0", ind, reg, httpserver.WithIndexCounts(idxCts)) +func setupServer(ind indexer.Interface, reg *registry.Registry, t *testing.T) *httpserver.Server { + s, err := httpserver.New("127.0.0.1:0", ind, reg) require.NoError(t, err) return s } @@ -49,7 +47,7 @@ func TestFindIndexData(t *testing.T) { // Initialize everything ind := initIndex(t, true) reg := initRegistry(t) - s := setupServer(ind, reg, nil, t) + s := setupServer(ind, reg, t) c := setupClient(s.URL(), t) // Start server @@ -84,7 +82,7 @@ func TestFindIndexWithExtendedProviders(t *testing.T) { ind := initIndex(t, true) // We don't want to have any restricitons around provider identities as they are generated in rkandom for extended providers reg := initRegistryWithRestrictivePolicy(t, false) - s := setupServer(ind, reg, nil, t) + s := setupServer(ind, reg, t) c := setupClient(s.URL(), t) // Start server @@ -121,7 +119,7 @@ func TestReframeFindIndexData(t *testing.T) { // Initialize everything ind := initIndex(t, true) reg := initRegistry(t) - s := setupServer(ind, reg, nil, t) + s := setupServer(ind, reg, t) c := setupClient(s.URL(), t) // create delegated routing client @@ -158,9 +156,8 @@ func TestProviderInfo(t *testing.T) { // Initialize everything ind := initIndex(t, true) reg := initRegistry(t) - idxCts := counter.NewIndexCounts(datastore.NewMapDatastore()) - s := setupServer(ind, reg, idxCts, t) + s := setupServer(ind, reg, t) findclient := setupClient(s.URL(), t) // Start server @@ -178,8 +175,6 @@ func TestProviderInfo(t *testing.T) { peerID := register(ctx, t, reg) - idxCts.AddCount(peerID, []byte("context-id"), 939) - getProviderTest(t, findclient, peerID) listProvidersTest(t, findclient, peerID) @@ -197,7 +192,7 @@ func TestGetStats(t *testing.T) { reg := initRegistry(t) defer reg.Close() - s := setupServer(ind, reg, nil, t) + s := setupServer(ind, reg, t) findclient := setupClient(s.URL(), t) // Start server @@ -224,7 +219,7 @@ func TestRemoveProvider(t *testing.T) { // Initialize everything ind := initIndex(t, true) reg := initRegistry(t) - s := setupServer(ind, reg, nil, t) + s := setupServer(ind, reg, t) c := setupClient(s.URL(), t) // Start server @@ -454,7 +449,6 @@ func listProvidersTest(t *testing.T, c client.Interface, providerID peer.ID) { func verifyProviderInfo(t *testing.T, provInfo *model.ProviderInfo) { require.NotNil(t, provInfo, "nil provider info") require.Equal(t, providerID, provInfo.AddrInfo.ID.String(), "wrong peer id") - require.Equal(t, uint64(939), provInfo.IndexCount, "expected IndexCount to be 939") require.NotNil(t, provInfo.ExtendedProviders, "expected to have extended providers") require.Equal(t, 1, len(provInfo.ExtendedProviders.Providers)) require.Equal(t, 1, len(provInfo.ExtendedProviders.Contextual)) diff --git a/server/find/server.go b/server/find/server.go index 855b6e2fe..20d760389 100644 --- a/server/find/server.go +++ b/server/find/server.go @@ -89,7 +89,7 @@ func New(listen string, indexer indexer.Interface, registry *registry.Registry, s := &Server{ server: server, listener: l, - handler: handler.New(indexer, registry, opts.indexCounts), + handler: handler.New(indexer, registry), } s.healthMsg = "ready" diff --git a/server/reframe/reframe.go b/server/reframe/reframe.go index 1790fccbf..f132efa92 100644 --- a/server/reframe/reframe.go +++ b/server/reframe/reframe.go @@ -25,7 +25,7 @@ import ( ) func NewReframeHTTPHandler(indexer indexer.Interface, registry *registry.Registry) http.HandlerFunc { - return server.DelegatedRoutingAsyncHandler(NewReframeService(handler.New(indexer, registry, nil))) + return server.DelegatedRoutingAsyncHandler(NewReframeService(handler.New(indexer, registry))) } func NewReframeService(fh *handler.Handler) *ReframeService { From 4e57259a217c832a16fb9a6753b230ff1caecf73 Mon Sep 17 00:00:00 2001 From: gammazero Date: Sat, 8 Jul 2023 11:10:19 -0700 Subject: [PATCH 2/2] rm unused function --- command/update_datastore.go | 54 ------------------------------------- 1 file changed, 54 deletions(-) diff --git a/command/update_datastore.go b/command/update_datastore.go index dcd95e3f7..d9aa069e8 100644 --- a/command/update_datastore.go +++ b/command/update_datastore.go @@ -151,60 +151,6 @@ func rmOldTempRecords(ctx context.Context, ds datastore.Batching) error { return nil } -func rmDtFsmRecords(ctx context.Context, ds datastore.Batching) error { - q := query.Query{ - KeysOnly: true, - Prefix: "/data-transfer-v2", - } - results, err := ds.Query(ctx, q) - if err != nil { - return fmt.Errorf("cannot query datastore: %w", err) - } - defer results.Close() - - batch, err := ds.Batch(ctx) - if err != nil { - return fmt.Errorf("cannot create datastore batch: %w", err) - } - - var dtKeyCount, writeCount int - for result := range results.Next() { - if ctx.Err() != nil { - return ctx.Err() - } - if writeCount >= updateBatchSize { - writeCount = 0 - if err = batch.Commit(ctx); err != nil { - return fmt.Errorf("cannot commit datastore: %w", err) - } - log.Infow("Datastore update removed data-transfer fsm records", "count", dtKeyCount) - } - if result.Error != nil { - return fmt.Errorf("cannot read query result from datastore: %w", result.Error) - } - ent := result.Entry - if len(ent.Key) == 0 { - log.Warnf("result entry has empty key") - continue - } - - if err = batch.Delete(ctx, datastore.NewKey(ent.Key)); err != nil { - return fmt.Errorf("cannot delete dt state key from datastore: %w", err) - } - writeCount++ - dtKeyCount++ - } - - if err = batch.Commit(ctx); err != nil { - return fmt.Errorf("cannot commit datastore: %w", err) - } - if err = ds.Sync(context.Background(), datastore.NewKey(q.Prefix)); err != nil { - return err - } - - return nil -} - func deletePrefix(ctx context.Context, ds datastore.Batching, prefix string) (int, error) { q := query.Query{ KeysOnly: true,