Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Config option to remove all temporary data at startup #2395

Merged
merged 5 commits into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 5 additions & 44 deletions command/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ import (
"github.com/cockroachdb/pebble/bloom"
"github.com/ipfs/boxo/bootstrap"
"github.com/ipfs/boxo/peering"
"github.com/ipfs/go-datastore"
leveldb "github.com/ipfs/go-ds-leveldb"
logging "github.com/ipfs/go-log/v2"
"github.com/ipni/go-indexer-core"
"github.com/ipni/go-indexer-core/cache"
Expand Down Expand Up @@ -116,7 +114,7 @@ func daemonAction(cctx *cli.Context) error {
}

// Create datastore
dstore, dsDir, err := createDatastore(cctx.Context, cfg.Datastore.Dir, cfg.Datastore.Type)
dstore, dsDir, err := createDatastore(cctx.Context, cfg.Datastore.Dir, cfg.Datastore.Type, false)
if err != nil {
return err
}
Expand All @@ -128,12 +126,12 @@ func daemonAction(cctx *cli.Context) error {
freezeDirs = append(freezeDirs, dsDir)

// Create datastore for temporary ad data.
dsTmp, dsTmpDir, err := createDatastore(cctx.Context, cfg.Datastore.TmpDir, cfg.Datastore.TmpType)
dsTmp, dsTmpDir, err := createDatastore(cctx.Context, cfg.Datastore.TmpDir, cfg.Datastore.TmpType, cfg.Datastore.RemoveTmpAtStart)
if err != nil {
return err
}
defer dsTmp.Close()
err = cleanupTempData(cctx.Context, dsTmp)
err = cleanupDTTempData(cctx.Context, dsTmp)
if err != nil {
return err
}
Expand Down Expand Up @@ -396,7 +394,7 @@ func daemonAction(cctx *cli.Context) error {
ticker.Reset(time.Duration(cfg.Indexer.ConfigCheckInterval))
}

cfg, err = reloadConfig(cfgPath, ingester, reg, valueStore)
cfg, err = reloadConfig(cfgPath, ingester, reg)
if err != nil {
log.Errorw("Error reloading conifg", "err", err)
if errChan != nil {
Expand Down Expand Up @@ -605,7 +603,7 @@ func loadConfig(filePath string) (*config.Config, error) {
return cfg, nil
}

func reloadConfig(cfgPath string, ingester *ingest.Ingester, reg *registry.Registry, valueStore indexer.Interface) (*config.Config, error) {
func reloadConfig(cfgPath string, ingester *ingest.Ingester, reg *registry.Registry) (*config.Config, error) {
cfg, err := loadConfig(cfgPath)
if err != nil {
return nil, err
Expand Down Expand Up @@ -678,40 +676,3 @@ func reloadPeering(cfg config.Peering, peeringService *peering.PeeringService, p

return peeringService, nil
}

func createDatastore(ctx context.Context, dir, dsType string) (datastore.Batching, string, error) {
if dsType != "levelds" {
return nil, "", fmt.Errorf("only levelds datastore type supported, %q not supported", dsType)
}
dataStorePath, err := config.Path("", dir)
if err != nil {
return nil, "", err
}
if err = fsutil.DirWritable(dataStorePath); err != nil {
return nil, "", err
}
ds, err := leveldb.NewDatastore(dataStorePath, nil)
if err != nil {
return nil, "", err
}
return ds, dataStorePath, nil
}

func cleanupTempData(ctx context.Context, ds datastore.Batching) error {
const dtCleanupTimeout = 10 * time.Minute
const dtPrefix = "/data-transfer-v2"

ctx, cancel := context.WithTimeout(ctx, dtCleanupTimeout)
defer cancel()

count, err := deletePrefix(ctx, ds, dtPrefix)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
log.Info("Not enough time to finish data-transfer state cleanup")
return ds.Sync(context.Background(), datastore.NewKey(dtPrefix))
}
return err
}
log.Infow("Removed old temporary data-transfer fsm records", "count", count)
return nil
}
49 changes: 48 additions & 1 deletion command/update_datastore.go → command/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,16 @@ import (
"context"
"errors"
"fmt"
"os"
"strings"
"time"

"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
leveldb "github.com/ipfs/go-ds-leveldb"
"github.com/ipni/storetheindex/config"
"github.com/ipni/storetheindex/fsutil"
)

const (
Expand All @@ -20,6 +25,48 @@ const (
updateBatchSize = 500000
)

func createDatastore(ctx context.Context, dir, dsType string, rmExisting bool) (datastore.Batching, string, error) {
if dsType != "levelds" {
return nil, "", fmt.Errorf("only levelds datastore type supported, %q not supported", dsType)
}
dataStorePath, err := config.Path("", dir)
if err != nil {
return nil, "", err
}
if rmExisting {
if err = os.RemoveAll(dataStorePath); err != nil {
return nil, "", fmt.Errorf("cannot remove temporary datastore directory: %w", err)
}
}
if err = fsutil.DirWritable(dataStorePath); err != nil {
return nil, "", err
}
ds, err := leveldb.NewDatastore(dataStorePath, nil)
if err != nil {
return nil, "", err
}
return ds, dataStorePath, nil
}

func cleanupDTTempData(ctx context.Context, ds datastore.Batching) error {
const dtCleanupTimeout = 10 * time.Minute
const dtPrefix = "/data-transfer-v2"

ctx, cancel := context.WithTimeout(ctx, dtCleanupTimeout)
defer cancel()

count, err := deletePrefix(ctx, ds, dtPrefix)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
log.Info("Not enough time to finish data-transfer state cleanup")
return ds.Sync(context.Background(), datastore.NewKey(dtPrefix))
}
return err
}
log.Infow("Removed old temporary data-transfer fsm records", "count", count)
return nil
}

func updateDatastore(ctx context.Context, ds datastore.Batching) error {
dsVerKey := datastore.NewKey(dsInfoPrefix + dsVersionKey)
curVerData, err := ds.Get(ctx, dsVerKey)
Expand Down Expand Up @@ -177,7 +224,7 @@ func deletePrefix(ctx context.Context, ds datastore.Batching, prefix string) (in
if err = batch.Commit(ctx); err != nil {
return 0, fmt.Errorf("cannot commit datastore: %w", err)
}
log.Infow("Datastore update removed records", "count", keyCount)
log.Infow("Removed datastore records", "count", keyCount)
}
if result.Error != nil {
return 0, fmt.Errorf("cannot read query result from datastore: %w", result.Error)
Expand Down
94 changes: 94 additions & 0 deletions command/datastore_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package command

import (
"context"
"errors"
"os"
"path/filepath"
"testing"
"time"

"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
"github.com/stretchr/testify/require"
)

func TestCreateDatastore(t *testing.T) {
tmpDir := t.TempDir()
dsDir := filepath.Join(tmpDir, "testDataDir")
_, _, err := createDatastore(context.Background(), dsDir, "unknown", false)
require.Error(t, err)

ds, path, err := createDatastore(context.Background(), dsDir, "levelds", false)
require.NoError(t, err)
require.NotNil(t, ds)
require.Equal(t, dsDir, path)
require.NoError(t, ds.Close())

checkFile := filepath.Join(dsDir, "check.test")
err = os.WriteFile(checkFile, []byte("Hello"), 0666)
require.NoError(t, err)

// Check that ds directory is not removed.
ds, _, err = createDatastore(context.Background(), dsDir, "levelds", false)
require.NoError(t, err)
require.NotNil(t, ds)
require.NoError(t, ds.Close())
require.True(t, fileExists(checkFile))

// Check that ds directory is removed.
ds, _, err = createDatastore(context.Background(), dsDir, "levelds", true)
require.NoError(t, err)
require.NotNil(t, ds)
require.NoError(t, ds.Close())
require.False(t, fileExists(checkFile))
}

func TestDeletePrefix(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()

tmpDir := t.TempDir()
dsDir := filepath.Join(tmpDir, "testDataDir")
ds, _, err := createDatastore(ctx, dsDir, "levelds", false)
require.NoError(t, err)
t.Cleanup(func() {
ds.Close()
})

const prefix = "testKeys"
dsKey1 := datastore.NewKey(prefix + "/foo")
err = ds.Put(ctx, dsKey1, []byte("One"))
require.NoError(t, err)
dsKey2 := datastore.NewKey(prefix + "/bar")
err = ds.Put(ctx, dsKey2, []byte("Two"))
require.NoError(t, err)
err = ds.Sync(ctx, datastore.NewKey(""))
require.NoError(t, err)

q := query.Query{
Prefix: prefix,
}
results, err := ds.Query(ctx, q)
require.NoError(t, err)
ents, err := results.Rest()
results.Close()
require.NoError(t, err)
require.Len(t, ents, 2)

n, err := deletePrefix(ctx, ds, prefix)
require.NoError(t, err)
require.Equal(t, 2, n)

results, err = ds.Query(ctx, q)
require.NoError(t, err)
ents, err = results.Rest()
results.Close()
require.NoError(t, err)
require.Empty(t, ents)
}

func fileExists(filename string) bool {
_, err := os.Lstat(filename)
return !errors.Is(err, os.ErrNotExist)
}
2 changes: 2 additions & 0 deletions config/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ type Datastore struct {
TmpDir string
// TmpType is the type of datastore for temporary persisted data.
TmpType string
// RemoveTmpAtStart causes all temproary data to be removed at startup.
RemoveTmpAtStart bool
}

// NewDatastore returns Datastore with values set to their defaults.
Expand Down