diff --git a/common/common.go b/common/common.go index d24c74d7..65392a27 100644 --- a/common/common.go +++ b/common/common.go @@ -67,7 +67,7 @@ func DefaultNetwork(opts ...NetOption) (NetBoostrapper, error) { } fin.Add(pstore) - hostKey, err := getHostKey(config, litestore) + hostKey, err := getIPFSHostKey(config, litestore) if err != nil { return nil, fin.Cleanup(err) } @@ -174,12 +174,12 @@ func mongoStore(ctx context.Context, uri, db, collection string, fin *util.Final return dstore, nil } -func getHostKey(config NetConfig, store ds.Batching) (crypto.PrivKey, error) { +func getIPFSHostKey(config NetConfig, store ds.Datastore) (crypto.PrivKey, error) { if len(config.MongoUri) != 0 { k := ds.NewKey("key") bytes, err := store.Get(k) if errors.Is(err, ds.ErrNotFound) { - key, bytes, err := newHostKey() + key, bytes, err := newIPFSHostKey() if err != nil { return nil, err } @@ -194,16 +194,16 @@ func getHostKey(config NetConfig, store ds.Batching) (crypto.PrivKey, error) { } else { // If a local datastore is used, the key is written to a file dir := filepath.Join(config.BadgerRepoPath, "ipfslite") - if err := os.MkdirAll(dir, os.ModePerm); err != nil { - return nil, err - } pth := filepath.Join(dir, "key") _, err := os.Stat(pth) if os.IsNotExist(err) { - key, bytes, err := newHostKey() + key, bytes, err := newIPFSHostKey() if err != nil { return nil, err } + if err := os.MkdirAll(dir, os.ModePerm); err != nil { + return nil, err + } if err = ioutil.WriteFile(pth, bytes, 0400); err != nil { return nil, err } @@ -220,7 +220,7 @@ func getHostKey(config NetConfig, store ds.Batching) (crypto.PrivKey, error) { } } -func newHostKey() (crypto.PrivKey, []byte, error) { +func newIPFSHostKey() (crypto.PrivKey, []byte, error) { priv, _, err := crypto.GenerateKeyPair(crypto.Ed25519, 0) if err != nil { return nil, nil, err diff --git a/db/bench_test.go b/db/bench_test.go index 5501edca..0ba9f85f 100644 --- a/db/bench_test.go +++ b/db/bench_test.go @@ -66,7 +66,7 @@ func createBenchDB(b *testing.B, opts ...NewOption) (*DB, func()) { common.WithNetDebug(true), ) checkBenchErr(b, err) - store, err := util.NewBadgerDatastore(dir, false) + store, err := util.NewBadgerDatastore(dir, "eventstore", false) checkBenchErr(b, err) d, err := NewDB(context.Background(), store, n, thread.NewIDV1(thread.Raw, 32), opts...) checkBenchErr(b, err) diff --git a/db/db_test.go b/db/db_test.go index 7248d62e..ed8be29b 100644 --- a/db/db_test.go +++ b/db/db_test.go @@ -37,7 +37,7 @@ func TestE2EWithThreads(t *testing.T) { checkErr(t, err) defer n1.Close() - store, err := util.NewBadgerDatastore(tmpDir1, false) + store, err := util.NewBadgerDatastore(tmpDir1, "eventstore", false) checkErr(t, err) defer store.Close() @@ -92,7 +92,7 @@ func TestE2EWithThreads(t *testing.T) { Schema: util.SchemaFromInstance(&dummy{}, false), } - store2, err := util.NewBadgerDatastore(tmpDir2, false) + store2, err := util.NewBadgerDatastore(tmpDir2, "eventstore", false) checkErr(t, err) defer store2.Close() @@ -139,7 +139,7 @@ func TestMissingCollection(t *testing.T) { checkErr(t, err) defer n.Close() - store, err := util.NewBadgerDatastore(tmpDir, false) + store, err := util.NewBadgerDatastore(tmpDir, "eventstore", false) checkErr(t, err) defer store.Close() @@ -175,7 +175,7 @@ func TestWithNewName(t *testing.T) { ) checkErr(t, err) - store, err := util.NewBadgerDatastore(tmpDir, false) + store, err := util.NewBadgerDatastore(tmpDir, "eventstore", false) checkErr(t, err) defer store.Close() @@ -223,7 +223,7 @@ func TestWithNewEventCodec(t *testing.T) { ) checkErr(t, err) - store, err := util.NewBadgerDatastore(tmpDir, false) + store, err := util.NewBadgerDatastore(tmpDir, "eventstore", false) checkErr(t, err) defer store.Close() diff --git a/db/index.go b/db/index.go index 6016ede3..3c23cab3 100644 --- a/db/index.go +++ b/db/index.go @@ -11,11 +11,10 @@ import ( "fmt" "sort" - dse "github.com/textileio/go-datastore-extensions" - "github.com/alecthomas/jsonschema" ds "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/query" + dse "github.com/textileio/go-datastore-extensions" "github.com/tidwall/gjson" "github.com/tidwall/sjson" ) diff --git a/db/manager_test.go b/db/manager_test.go index ce8bb686..8524874f 100644 --- a/db/manager_test.go +++ b/db/manager_test.go @@ -119,7 +119,7 @@ func TestManager_GetDB(t *testing.T) { common.WithNetDebug(true), ) checkErr(t, err) - store, err := util.NewBadgerDatastore(dir, false) + store, err := util.NewBadgerDatastore(dir, "eventstore", false) checkErr(t, err) man, err := NewManager(store, n, WithNewDebug(true)) checkErr(t, err) @@ -256,7 +256,7 @@ func createTestManager(t *testing.T) (*Manager, func()) { common.WithNetDebug(true), ) checkErr(t, err) - store, err := util.NewBadgerDatastore(dir, false) + store, err := util.NewBadgerDatastore(dir, "eventstore", false) checkErr(t, err) m, err := NewManager(store, n, WithNewDebug(true)) checkErr(t, err) diff --git a/db/testutils_test.go b/db/testutils_test.go index de5c1ff1..b4a0c883 100644 --- a/db/testutils_test.go +++ b/db/testutils_test.go @@ -28,7 +28,7 @@ func createTestDB(t *testing.T, opts ...NewOption) (*DB, func()) { common.WithNetDebug(true), ) checkErr(t, err) - store, err := util.NewBadgerDatastore(dir, false) + store, err := util.NewBadgerDatastore(dir, "eventstore", false) checkErr(t, err) d, err := NewDB(context.Background(), store, n, thread.NewIDV1(thread.Raw, 32), opts...) checkErr(t, err) diff --git a/integrationtests/foldersync/client.go b/integrationtests/foldersync/client.go index 7c1a19de..8320d546 100644 --- a/integrationtests/foldersync/client.go +++ b/integrationtests/foldersync/client.go @@ -79,7 +79,7 @@ func newRootClient(name, folderPath, repoPath string) (*client, error) { return nil, err } - s, err := util.NewBadgerDatastore(repoPath, false) + s, err := util.NewBadgerDatastore(repoPath, "eventstore", false) if err != nil { return nil, err } @@ -106,7 +106,7 @@ func newJoinerClient(name, folderPath, repoPath string, addr ma.Multiaddr, key t return nil, err } - s, err := util.NewBadgerDatastore(repoPath, false) + s, err := util.NewBadgerDatastore(repoPath, "eventstore", false) if err != nil { return nil, err } diff --git a/integrationtests/foldersync/foldersync_test.go b/integrationtests/foldersync/foldersync_test.go index 115fef8a..b3f2f140 100644 --- a/integrationtests/foldersync/foldersync_test.go +++ b/integrationtests/foldersync/foldersync_test.go @@ -43,7 +43,7 @@ func TestSimple(t *testing.T) { network0, err := newNetwork(repoPath0) checkErr(t, err) - store0, err := util.NewBadgerDatastore(repoPath0, false) + store0, err := util.NewBadgerDatastore(repoPath0, "eventstore", false) checkErr(t, err) defer store0.Close() db0, err := db.NewDB(context.Background(), store0, network0, id, db.WithNewCollections(cc)) @@ -63,7 +63,7 @@ func TestSimple(t *testing.T) { network1, err := newNetwork(repoPath1) checkErr(t, err) - store1, err := util.NewBadgerDatastore(repoPath1, false) + store1, err := util.NewBadgerDatastore(repoPath1, "eventstore", false) checkErr(t, err) defer store1.Close() db1, err := db.NewDBFromAddr( @@ -87,7 +87,7 @@ func TestSimple(t *testing.T) { network2, err := newNetwork(repoPath2) checkErr(t, err) - store2, err := util.NewBadgerDatastore(repoPath2, false) + store2, err := util.NewBadgerDatastore(repoPath2, "eventstore", false) checkErr(t, err) defer store2.Close() db2, err := db.NewDBFromAddr( @@ -111,7 +111,7 @@ func TestSimple(t *testing.T) { network3, err := newNetwork(repoPath3) checkErr(t, err) - store3, err := util.NewBadgerDatastore(repoPath3, false) + store3, err := util.NewBadgerDatastore(repoPath3, "eventstore", false) checkErr(t, err) defer store3.Close() db3, err := db.NewDBFromAddr( diff --git a/threadsd/main.go b/threadsd/main.go index 29decad1..d54824b7 100644 --- a/threadsd/main.go +++ b/threadsd/main.go @@ -125,7 +125,7 @@ func main() { if *mongoUri != "" { store, err = mongods.New(ctx, *mongoUri, *mongoDatabase, mongods.WithCollName("eventstore")) } else { - store, err = util.NewBadgerDatastore(*repo, *badgerLowMem) + store, err = util.NewBadgerDatastore(*repo, "eventstore", *badgerLowMem) } if err != nil { log.Fatal(err) diff --git a/util/dscopy/main.go b/util/dscopy/main.go new file mode 100644 index 00000000..e6069bd7 --- /dev/null +++ b/util/dscopy/main.go @@ -0,0 +1,253 @@ +package main + +import ( + "context" + "fmt" + "io/ioutil" + "net/url" + "os" + "path/filepath" + "sync" + "time" + + ds "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/query" + logging "github.com/ipfs/go-log/v2" + "github.com/namsral/flag" + badger "github.com/textileio/go-ds-badger" + mongods "github.com/textileio/go-ds-mongo" +) + +var log = logging.Logger("dscopy") + +type store struct { + name string + files []string +} + +var stores = []store{ + { + name: "eventstore", + }, + { + name: "logstore", + }, + { + name: "ipfslite", + files: []string{"key"}, + }, +} + +func main() { + fs := flag.NewFlagSet(os.Args[0], 0) + + fromBadgerRepos := fs.String("fromBadgerRepos", "", "Source badger repos path") + toBadgerRepos := fs.String("toBadgerRepos", "", "Destination badger repos path") + + fromMongoUri := fs.String("fromMongoUri", "", "Source MongoDB URI") + fromMongoDatabase := fs.String("fromMongoDatabase", "", "Source MongoDB database") + toMongoUri := fs.String("toMongoUri", "", "Destination MongoDB URI") + toMongoDatabase := fs.String("toMongoDatabase", "", "Destination MongoDB database") + + parallel := fs.Int("parallel", 1000, "Number of parallel copy operations") + + verbose := fs.Bool("verbose", false, "More verbose output") + if err := fs.Parse(os.Args[1:]); err != nil { + log.Fatal(err) + } + + logging.SetupLogging(logging.Config{ + Format: logging.ColorizedOutput, + Stderr: true, + Level: logging.LevelError, + }) + if err := logging.SetLogLevel("dscopy", "info"); err != nil { + log.Fatal(err) + } + + start := time.Now() + + for _, s := range stores { + if err := copyDatastore( + s, + *fromBadgerRepos, + *toBadgerRepos, + *fromMongoUri, + *toMongoUri, + *fromMongoDatabase, + *toMongoDatabase, + *parallel, + *verbose, + ); err != nil { + log.Fatal(err) + } + } + + log.Infof("done in %s", time.Since(start)) +} + +func copyDatastore( + s store, + fromBadgerRepos, toBadgerRepos string, + fromMongoUri, toMongoUri string, + fromMongoDatabase, toMongoDatabase string, + parallel int, + verbose bool, +) error { + if len(fromBadgerRepos) != 0 && len(fromMongoUri) != 0 { + return fmt.Errorf("multiple sources specified") + } + if len(fromBadgerRepos) == 0 && len(fromMongoUri) == 0 { + return fmt.Errorf("source not specified") + } + if len(toBadgerRepos) != 0 && len(toMongoUri) != 0 { + return fmt.Errorf("multiple destinations specified") + } + if len(toBadgerRepos) == 0 && len(toMongoUri) == 0 { + return fmt.Errorf("destination not specified") + } + + var from, to ds.Datastore + var err error + if len(fromBadgerRepos) != 0 { + path := filepath.Join(fromBadgerRepos, s.name) + from, err = badger.NewDatastore(path, &badger.DefaultOptions) + if err != nil { + return fmt.Errorf("connecting to badger source: %v", err) + } + log.Infof("connected to badger source: %s", path) + } + if len(toBadgerRepos) != 0 { + path := filepath.Join(toBadgerRepos, s.name) + if err := os.MkdirAll(path, os.ModePerm); err != nil { + return fmt.Errorf("making destination path: %v", err) + } + to, err = badger.NewDatastore(path, &badger.DefaultOptions) + if err != nil { + return fmt.Errorf("connecting to badger destination: %v", err) + } + log.Infof("connected to badger destination: %s", path) + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + if len(fromMongoUri) != 0 { + uri, err := url.Parse(fromMongoUri) + if err != nil { + return fmt.Errorf("parsing source mongo URI: %v", err) + } + if len(fromMongoDatabase) == 0 { + return fmt.Errorf("source mongo database not specified") + } + from, err = mongods.New(ctx, fromMongoUri, fromMongoDatabase, mongods.WithCollName(s.name)) + if err != nil { + return fmt.Errorf("connecting to mongo source: %v", err) + } + log.Infof("connected to mongo source: %s", uri.Redacted()) + } + if len(toMongoUri) != 0 { + uri, err := url.Parse(toMongoUri) + if err != nil { + return fmt.Errorf("parsing destination mongo URI: %v", err) + } + if len(toMongoDatabase) == 0 { + return fmt.Errorf("destination mongo database not specified") + } + to, err = mongods.New(ctx, toMongoUri, toMongoDatabase, mongods.WithCollName(s.name)) + if err != nil { + return fmt.Errorf("connecting to mongo destination: %v", err) + } + log.Infof("connected to mongo destination: %s", uri.Redacted()) + } + + res, err := from.Query(query.Query{}) + if err != nil { + return fmt.Errorf("querying source: %v", err) + } + defer res.Close() + + var lock sync.Mutex + var errors []string + var count int + start := time.Now() + lim := make(chan struct{}, parallel) + for r := range res.Next() { + if r.Error != nil { + return fmt.Errorf("getting next source result: %v", r.Error) + } + lim <- struct{}{} + + r := r + go func() { + defer func() { <-lim }() + + if err := to.Put(ds.NewKey(r.Key), r.Value); err != nil { + lock.Lock() + errors = append(errors, fmt.Sprintf("copying %s: %v", r.Key, err)) + lock.Unlock() + return + } + if verbose { + log.Infof("copied %s", r.Key) + } + lock.Lock() + count++ + if count%parallel == 0 { + log.Infof("copied %d keys", count) + } + lock.Unlock() + }() + } + for i := 0; i < cap(lim); i++ { + lim <- struct{}{} + } + + if len(errors) > 0 { + for _, m := range errors { + log.Error(m) + } + return fmt.Errorf("had %d errors", len(errors)) + } + + log.Infof("copied %d keys in %s", count, time.Since(start)) + + for _, f := range s.files { + var file []byte + if len(fromBadgerRepos) != 0 { + dir := filepath.Join(fromBadgerRepos, s.name) + pth := filepath.Join(dir, f) + _, err := os.Stat(pth) + if os.IsNotExist(err) { + return fmt.Errorf("loading file %s: %v", pth, err) + } else { + file, err = ioutil.ReadFile(pth) + if err != nil { + return fmt.Errorf("reading file %s: %v", pth, err) + } + } + } else { + file, err = from.Get(ds.NewKey(f)) + if err != nil { + return fmt.Errorf("getting file %s: %v", f, err) + } + } + + if len(toBadgerRepos) != 0 { + dir := filepath.Join(toBadgerRepos, s.name) + if err := os.MkdirAll(dir, os.ModePerm); err != nil { + return fmt.Errorf("making dir %s: %v", dir, err) + } + pth := filepath.Join(dir, f) + if err = ioutil.WriteFile(pth, file, 0400); err != nil { + return fmt.Errorf("writing file %s: %v", pth, err) + } + } else { + if err := to.Put(ds.NewKey(f), file); err != nil { + return fmt.Errorf("putting file %s: %v", f, err) + } + } + + log.Infof("copied file %s", f) + } + return nil +} diff --git a/util/util.go b/util/util.go index b28e2709..b90557ec 100644 --- a/util/util.go +++ b/util/util.go @@ -39,8 +39,8 @@ var ( ) // NewBadgerDatastore returns a badger based datastore. -func NewBadgerDatastore(repoPath string, lowMem bool) (kt.TxnDatastoreExtended, error) { - path := filepath.Join(repoPath, "eventstore") +func NewBadgerDatastore(dirPath, name string, lowMem bool) (kt.TxnDatastoreExtended, error) { + path := filepath.Join(dirPath, name) if err := os.MkdirAll(path, os.ModePerm); err != nil { return nil, err }