Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove index counter functionality #2101

Merged
merged 2 commits into from
Jul 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 1 addition & 19 deletions command/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/ipni/go-libipni/mautil"
"github.com/ipni/storetheindex/config"
"github.com/ipni/storetheindex/fsutil"
"github.com/ipni/storetheindex/internal/counter"
"github.com/ipni/storetheindex/internal/ingest"
"github.com/ipni/storetheindex/internal/registry"
httpadmin "github.com/ipni/storetheindex/server/admin"
Expand All @@ -39,12 +38,6 @@ import (
"github.com/urfave/cli/v2"
)

const (
dsInfoPrefix = "/dsInfo/"
dsVersionKey = "version"
dsVersion = "001"
)

// Recognized valuestore type names.
const (
vstoreDHStore = "dhstore"
Expand Down Expand Up @@ -170,12 +163,6 @@ func daemonAction(cctx *cli.Context) error {
// Create indexer core
indexerCore := engine.New(valueStore, engine.WithCache(resultCache))

var indexCounts *counter.IndexCounts
if cfg.Indexer.IndexCountEnabled {
indexCounts = counter.NewIndexCounts(dstore)
indexCounts.SetTotalAddend(cfg.Indexer.IndexCountTotalAddend)
}

// Create registry
reg, err := registry.New(cctx.Context, cfg.Discovery, dstore,
registry.WithFreezer(freezeDirs, cfg.Indexer.FreezeAtPercent))
Expand All @@ -200,7 +187,6 @@ func daemonAction(cctx *cli.Context) error {
httpfind.WithWriteTimeout(time.Duration(cfg.Finder.ApiWriteTimeout)),
httpfind.WithMaxConnections(cfg.Finder.MaxConnections),
httpfind.WithHomepage(cfg.Finder.Webpage),
httpfind.WithIndexCounts(indexCounts),
httpfind.WithVersion(cctx.App.Version),
)
if err != nil {
Expand Down Expand Up @@ -251,7 +237,7 @@ func daemonAction(cctx *cli.Context) error {
}

// Initialize ingester.
ingester, err = ingest.NewIngester(cfg.Ingest, p2pHost, indexerCore, reg, dstore, dsTmp, ingest.WithIndexCounts(indexCounts))
ingester, err = ingest.NewIngester(cfg.Ingest, p2pHost, indexerCore, reg, dstore, dsTmp)
if err != nil {
return err
}
Expand Down Expand Up @@ -426,10 +412,6 @@ func daemonAction(cctx *cli.Context) error {
}
}

if indexCounts != nil {
indexCounts.SetTotalAddend(cfg.Indexer.IndexCountTotalAddend)
}

if errChan != nil {
errChan <- nil
}
Expand Down
70 changes: 45 additions & 25 deletions command/update_datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,22 @@ import (
"github.com/ipfs/go-datastore/query"
)

// updateBatchSize is the number of records to update at a time.
const updateBatchSize = 500000
const (
dsInfoPrefix = "/dsInfo/"
dsVersionKey = "version"
dsVersion = "002"

// updateBatchSize is the number of records to update at a time.
updateBatchSize = 500000
)

func updateDatastore(ctx context.Context, ds datastore.Batching) error {
dsVerKey := datastore.NewKey(dsInfoPrefix + dsVersionKey)
curVerData, err := ds.Get(ctx, dsVerKey)
if err != nil && !errors.Is(err, datastore.ErrNotFound) {
return fmt.Errorf("cannot check datastore: %w", err)
}
var curVer string
curVer := "000"
if len(curVerData) != 0 {
curVer = string(curVerData)
}
Expand All @@ -31,12 +37,29 @@ func updateDatastore(ctx context.Context, ds datastore.Batching) error {
return fmt.Errorf("unknown datastore verssion: %s", curVer)
}

log.Infof("Updating datastore to version %s", dsVersion)
if err = rmDtFsmRecords(ctx, ds); err != nil {
return err
var count int

log.Infof("Updating datastore from version %s to %s", curVer, dsVersion)
if curVer < "001" {
count, err = deletePrefix(ctx, ds, "/data-transfer-v2")
if err != nil {
return err
}
if count != 0 {
log.Infow("Datastore update removed data-transfer fsm records", "count", count)
}
if err = rmOldTempRecords(ctx, ds); err != nil {
return err
}
}
if err = rmOldTempRecords(ctx, ds); err != nil {
return err
if curVer < "002" {
count, err = deletePrefix(ctx, ds, "/indexCounts")
if err != nil {
return err
}
if count != 0 {
log.Infow("Datastore update removed index count records", "count", count)
}
}

if err = ds.Put(ctx, dsVerKey, []byte(dsVersion)); err != nil {
Expand Down Expand Up @@ -128,36 +151,36 @@ func rmOldTempRecords(ctx context.Context, ds datastore.Batching) error {
return nil
}

func rmDtFsmRecords(ctx context.Context, ds datastore.Batching) error {
func deletePrefix(ctx context.Context, ds datastore.Batching, prefix string) (int, error) {
q := query.Query{
KeysOnly: true,
Prefix: "/data-transfer-v2",
Prefix: prefix,
}
results, err := ds.Query(ctx, q)
if err != nil {
return fmt.Errorf("cannot query datastore: %w", err)
return 0, fmt.Errorf("cannot query datastore: %w", err)
}
defer results.Close()

batch, err := ds.Batch(ctx)
if err != nil {
return fmt.Errorf("cannot create datastore batch: %w", err)
return 0, fmt.Errorf("cannot create datastore batch: %w", err)
}

var dtKeyCount, writeCount int
var keyCount, writeCount int
for result := range results.Next() {
if ctx.Err() != nil {
return ctx.Err()
return 0, ctx.Err()
}
if writeCount >= updateBatchSize {
writeCount = 0
if err = batch.Commit(ctx); err != nil {
return fmt.Errorf("cannot commit datastore: %w", err)
return 0, fmt.Errorf("cannot commit datastore: %w", err)
}
log.Infow("Datastore update removed data-transfer fsm records", "count", dtKeyCount)
log.Infow("Datastore update removed records", "count", keyCount)
}
if result.Error != nil {
return fmt.Errorf("cannot read query result from datastore: %w", result.Error)
return 0, fmt.Errorf("cannot read query result from datastore: %w", result.Error)
}
ent := result.Entry
if len(ent.Key) == 0 {
Expand All @@ -166,21 +189,18 @@ func rmDtFsmRecords(ctx context.Context, ds datastore.Batching) error {
}

if err = batch.Delete(ctx, datastore.NewKey(ent.Key)); err != nil {
return fmt.Errorf("cannot delete dt state key from datastore: %w", err)
return 0, fmt.Errorf("cannot delete key from datastore: %w", err)
}
writeCount++
dtKeyCount++
keyCount++
}

if err = batch.Commit(ctx); err != nil {
return fmt.Errorf("cannot commit datastore: %w", err)
return 0, fmt.Errorf("cannot commit datastore: %w", err)
}
if err = ds.Sync(context.Background(), datastore.NewKey(q.Prefix)); err != nil {
return err
return 0, err
}

if dtKeyCount != 0 {
log.Infow("Datastore update removed data-transfer fsm records", "count", dtKeyCount)
}
return nil
return keyCount, nil
}
7 changes: 0 additions & 7 deletions config/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,6 @@ type Indexer struct {
// ValueStoreDir is on, at which to trigger the indexer to enter frozen
// mode. A zero value uses the default. A negative value disables freezing.
FreezeAtPercent float64
// IndexCountEnabled sets whether ingest process should count the number of index records per provider and
// finder should show the number of index counts per provider.
IndexCountEnabled bool
// IndexCountTotalAddend is a value that is added to the index count total,
// to account for uncounted indexes that have existed in the value store
// before provider index counts were tracked. This value is reloadable.
IndexCountTotalAddend uint64
// ShutdownTimeout is the duration that a graceful shutdown has to complete
// before the daemon process is terminated. If unset or zero, configures no
// shutdown timeout. This value is reloadable.
Expand Down
8 changes: 0 additions & 8 deletions e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,10 +326,6 @@ func TestEndToEndWithReferenceProvider(t *testing.T) {
require.Equal(t, 1, carCount)
require.Equal(t, 1, headCount)

outProvider := e.run(ipni, "provider", "-pid", providerID, "--indexer", "localhost:3000")
// Check that IndexCount with correct value appears in providers output.
require.Contains(t, string(outProvider), "IndexCount: 1043")

root2 := filepath.Join(e.dir, ".storetheindex2")
e.env = append(e.env, fmt.Sprintf("%s=%s", config.EnvDir, root2))
e.run(indexer, "init", "--store", "dhstore", "--pubsub-topic", "/indexer/ingest/mainnet", "--no-bootstrap", "--dhstore", "http://127.0.0.1:40080",
Expand Down Expand Up @@ -421,10 +417,6 @@ func TestEndToEndWithReferenceProvider(t *testing.T) {
return true
}, 10*time.Second, time.Second)

outProvider = e.run(ipni, "provider", "-pid", providerID, "--indexer", "localhost:3000")
// Check that IndexCount is back to zero after removing car.
require.Contains(t, string(outProvider), "IndexCount: 0")

// Check that status is not frozen.
outStatus := e.run(indexer, "admin", "status", "--indexer", "localhost:3202")
require.Contains(t, string(outStatus), "Frozen: false", "expected indexer to be frozen")
Expand Down
Loading