From 655b2630bc5c2d4f0c087092810ac966848134dc Mon Sep 17 00:00:00 2001 From: Sabyasachi Patra Date: Wed, 6 Oct 2021 12:15:07 +0530 Subject: [PATCH] Autodisconnect.0 (#19) * auto disconnect working --- cli/cmd/add.go | 9 +- cli/cmd/completion.go | 28 +++++ cli/cmd/get.go | 2 +- cli/datahop.go | 2 +- go.mod | 5 +- go.sum | 14 +-- internal/matrix/matrix.go | 4 +- internal/replication/replication.go | 130 ++++++++++++++++++----- internal/replication/replication_test.go | 67 ++++++++++++ ipfs_test.go | 4 +- mobile/datahop.go | 112 +++++++++++++++++-- mobile/datahop_test.go | 6 +- mobile/discovery.go | 27 +++++ 13 files changed, 354 insertions(+), 56 deletions(-) create mode 100644 cli/cmd/completion.go diff --git a/cli/cmd/add.go b/cli/cmd/add.go index 3645a54..5829b03 100644 --- a/cli/cmd/add.go +++ b/cli/cmd/add.go @@ -87,6 +87,10 @@ Example: return err } kind, _ := filetype.Match(head) + tag, _ := cmd.Flags().GetString("tag") + if tag == "" { + tag = filepath.Base(f.Name()) + } meta := &replication.Metatag{ Size: fileinfo.Size(), Type: kind.MIME.Value, @@ -94,10 +98,7 @@ Example: Hash: n.Cid(), Timestamp: time.Now().Unix(), Owner: comm.LitePeer.Host.ID(), - } - tag, _ := cmd.Flags().GetString("tag") - if tag == "" { - tag = filepath.Base(f.Name()) + Tag: tag, } err = comm.LitePeer.Manager.Tag(tag, meta) if err != nil { diff --git a/cli/cmd/completion.go b/cli/cmd/completion.go new file mode 100644 index 0000000..6840d83 --- /dev/null +++ b/cli/cmd/completion.go @@ -0,0 +1,28 @@ +package cmd + +import ( + "os" + + "github.com/datahop/ipfs-lite/cli/common" + "github.com/spf13/cobra" +) + +// InitCompletionCmd creates the stop command +func InitCompletionCmd(comm *common.Common) *cobra.Command { + return &cobra.Command{ + Use: "completion [bash|zsh|fish|powershell]", + Short: "Generate completion script", + Long: "To load completions", + DisableFlagsInUseLine: true, + ValidArgs: []string{"bash", "zsh", "fish", "powershell"}, + Args: cobra.ExactValidArgs(1), + Run: func(cmd *cobra.Command, args []string) { + switch args[0] { + case "bash": + cmd.Root().GenBashCompletion(os.Stdout) + case "zsh": + cmd.Root().GenZshCompletion(os.Stdout) + } + }, + } +} diff --git a/cli/cmd/get.go b/cli/cmd/get.go index a889919..7c0b364 100644 --- a/cli/cmd/get.go +++ b/cli/cmd/get.go @@ -24,7 +24,7 @@ datahop network by a simple tag Example: - // To save at default "Download" location + // To save at default "download" location $ datahop get /test.txt diff --git a/cli/datahop.go b/cli/datahop.go index fef8fb3..987ead9 100644 --- a/cli/datahop.go +++ b/cli/datahop.go @@ -77,12 +77,12 @@ func main() { cmd.InitMatrixCmd(comm), cmd.InitializeDocCommand(comm), cmd.InitGetCmd(comm), + cmd.InitCompletionCmd(comm), ) for _, i := range allCommands { rootCmd.AddCommand(i) } - // check help flag for _, v := range os.Args { if v == "-h" || v == "--help" { diff --git a/go.mod b/go.mod index 14d589a..38d53a2 100644 --- a/go.mod +++ b/go.mod @@ -37,8 +37,11 @@ require ( github.com/mitchellh/go-homedir v1.1.0 github.com/multiformats/go-multiaddr v0.3.2 github.com/multiformats/go-multihash v0.0.15 + github.com/plexsysio/taskmanager v0.0.0-00010101000000-000000000000 github.com/spf13/cobra v0.0.5 github.com/spf13/pflag v1.0.3 - golang.org/x/mobile v0.0.0-20210902104108-5d9a33257ab5 // indirect + golang.org/x/tools v0.1.2 // indirect google.golang.org/protobuf v1.26.0 ) + +replace github.com/plexsysio/taskmanager => github.com/asabya/taskmanager v0.1.0 diff --git a/go.sum b/go.sum index 2a60a3f..4b284e0 100644 --- a/go.sum +++ b/go.sum @@ -39,6 +39,8 @@ github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYU github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= github.com/asabya/go-ipc-uds v0.1.1 h1:xpCn1761D9RXx0nbp5y/REYb0IFjoTe2cRn12YyhfLM= github.com/asabya/go-ipc-uds v0.1.1/go.mod h1:6fnoNAImh5CdGv6q9katuePtyNIhIfuNscswQJ13Wl4= +github.com/asabya/taskmanager v0.1.0 h1:vc1MxWLaKIqFS8d055QceIXEonfXvw+mi9mkgcGzOMg= +github.com/asabya/taskmanager v0.1.0/go.mod h1:jJPMZyf78X72eyZNrJS5Cz75OFBhQP2+81/FkAEvyOM= github.com/benbjohnson/clock v1.0.2/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM= github.com/benbjohnson/clock v1.0.3 h1:vkLuvpK4fmtSCuo60+yC63p7y0BmQ8gm5ZXGuBCJyXg= github.com/benbjohnson/clock v1.0.3/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM= @@ -880,8 +882,9 @@ go.opencensus.io v0.23.0 h1:gqCw0LfLxScz8irSi8exQc7fyQ0fKQU/qnC/X8+V/1M= go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= -go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= +go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/goleak v1.0.0 h1:qsup4IcBdlmsnGfqyLl4Ntn3C2XCCuKAE7DwHpScyUo= go.uber.org/goleak v1.0.0/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= @@ -927,7 +930,6 @@ golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= -golang.org/x/exp v0.0.0-20190731235908-ec7cb31e5a56/go.mod h1:JhuoJpWY28nO4Vef9tZUw9qufEGTyX1+7lmHxV5q5G4= golang.org/x/exp v0.0.0-20190829153037-c13cbed26979/go.mod h1:86+5VVa7VpoJ4kLfm080zCjGlMRFzhUhsZKEZO7MGek= golang.org/x/exp v0.0.0-20191030013958-a1ab85dbe136/go.mod h1:JXzH8nQsPlswgeRAPE3MuO9GYsAcnJvJ4vnMwN/5qkY= golang.org/x/exp v0.0.0-20191129062945-2f5052295587/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= @@ -948,14 +950,12 @@ golang.org/x/lint v0.0.0-20200130185559-910be7a94367/go.mod h1:3xt1FjdF8hUf6vQPI golang.org/x/lint v0.0.0-20200302205851-738671d3881b h1:Wh+f8QHJXR411sJR8/vRBTZ7YapZaRvUcLFFJhusH0k= golang.org/x/lint v0.0.0-20200302205851-738671d3881b/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU15maQ/Ox0txvL9dWGYEHz965HBQE= +golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028 h1:4+4C/Iv2U4fMZBiMCc98MG1In4gJY5YRhtpDNeDeHWs= golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o= -golang.org/x/mobile v0.0.0-20210902104108-5d9a33257ab5 h1:peBP2oZO/xVnGMaWMCyFEI0WENsGj71wx5K12mRELHQ= -golang.org/x/mobile v0.0.0-20210902104108-5d9a33257ab5/go.mod h1:c4YKU3ZylDmvbw+H/PSvm42vhdWbuxCzbonauEAP9B8= golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc= golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY= golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= -golang.org/x/mod v0.3.0 h1:RM4zey1++hCTbCVQfnWeKs9/IEsaBLA8vTkd0WVtmH4= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2 h1:Gz96sIWK3OalVv/I/qNygP42zyoKp3xptRVCWRFEBvo= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= @@ -1049,12 +1049,10 @@ golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200602225109-6fdc65e7d980/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210303074136-134d130e1a04/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210309074719-68d13333faf2/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210426080607-c94f62235c83 h1:kHSDPqCtsHZOg0nVylfTo20DDhE9gG4Y0jn7hKQ0QAM= golang.org/x/sys v0.0.0-20210426080607-c94f62235c83/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007 h1:gG67DSER+11cZvqIMb8S8bt0vZtiN6xWYARwirrOSfE= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -1106,8 +1104,6 @@ golang.org/x/tools v0.0.0-20200207183749-b753a1ba74fa/go.mod h1:TB2adYChydJhpapK golang.org/x/tools v0.0.0-20200212150539-ea181f53ac56/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= -golang.org/x/tools v0.1.0 h1:po9/4sTYwZU9lPhi1tOrb4hCv3qrhiQ77LZfGa2OjwY= -golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= golang.org/x/tools v0.1.2 h1:kRBLX7v7Af8W7Gdbbc908OJcdgtK8bOz9Uaj8/F1ACA= golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/internal/matrix/matrix.go b/internal/matrix/matrix.go index 0e61c36..a48b6dc 100644 --- a/internal/matrix/matrix.go +++ b/internal/matrix/matrix.go @@ -19,6 +19,7 @@ var ( // ContentMatrix keeps record for each hash type ContentMatrix struct { + Tag string Size int64 AvgSpeed float32 DownloadStartedAt int64 @@ -308,7 +309,7 @@ func (mKeeper *MatrixKeeper) NodeDisconnected(address string) { } // ContentDownloadStarted updates content download start time of a hash -func (mKeeper *MatrixKeeper) ContentDownloadStarted(hash string, size int64) { +func (mKeeper *MatrixKeeper) ContentDownloadStarted(tag, hash string, size int64) { mKeeper.mtx.Lock() defer mKeeper.mtx.Unlock() @@ -318,6 +319,7 @@ func (mKeeper *MatrixKeeper) ContentDownloadStarted(hash string, size int64) { } } contentMatrix := mKeeper.ContentMatrix[hash] + contentMatrix.Tag = tag contentMatrix.DownloadStartedAt = time.Now().Unix() contentMatrix.Size = size log.Debug("ContentDownloadStarted : ", contentMatrix) diff --git a/internal/replication/replication.go b/internal/replication/replication.go index b71efac..f977c41 100644 --- a/internal/replication/replication.go +++ b/internal/replication/replication.go @@ -3,6 +3,7 @@ package replication import ( "context" "encoding/json" + "sync" "time" "github.com/datahop/ipfs-lite/internal/repo" @@ -15,6 +16,7 @@ import ( "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" pubsub "github.com/libp2p/go-libp2p-pubsub" + "github.com/plexsysio/taskmanager" ) var ( @@ -23,6 +25,7 @@ var ( // Metatag keeps meta information of a content in the crdt store type Metatag struct { + Tag string Size int64 Type string Name string @@ -33,12 +36,15 @@ type Metatag struct { // Manager handles replication type Manager struct { - ctx context.Context - cancel context.CancelFunc - crdt *crdt.Datastore - contentChan chan cid.Cid - syncer Syncer - repo repo.Repo + ctx context.Context + cancel context.CancelFunc + crdt *crdt.Datastore + syncer Syncer + repo repo.Repo + syncMtx sync.Mutex + + dlManager *taskmanager.TaskManager + download chan Metatag } // Syncer gets the file and finds file provider from the network @@ -69,29 +75,22 @@ func New( if err != nil { return nil, err } - contentChan := make(chan cid.Cid) + contentChan := make(chan Metatag) crdtOpts := crdt.DefaultOptions() crdtOpts.Logger = log crdtOpts.RebroadcastInterval = broadcastInterval crdtOpts.PutHook = func(k datastore.Key, v []byte) { - log.Debugf("Added: [%s] -> %s\n", k, string(v)) + log.Debugf("CRDT Replication :: Added Key: [%s] -> Value: %s\n", k, string(v)) m := &Metatag{} err := json.Unmarshal(v, m) if err != nil { log.Error(err.Error()) return } - contentChan <- m.Hash - state := r.State().Add([]byte(k.Name())) - log.Debugf("New State: %d\n", state) - err = r.SetState() - if err != nil { - log.Errorf("SetState failed %s\n", err.Error()) - } - r.Matrix().ContentDownloadStarted(m.Hash.String(), m.Size) + contentChan <- *m } crdtOpts.DeleteHook = func(k datastore.Key) { - log.Debugf("Removed: [%s]\n", k) + log.Debugf("CRDT Replication :: Removed: [%s]\n", k) state := r.State().Add([]byte("removed " + k.Name())) log.Debugf("New State: %d\n", state) err = r.SetState() @@ -105,17 +104,19 @@ func New( } return &Manager{ - ctx: ctx, - crdt: crdtStore, - contentChan: contentChan, - syncer: syncer, - cancel: cancel, - repo: r, + ctx: ctx, + crdt: crdtStore, + download: contentChan, + syncer: syncer, + cancel: cancel, + repo: r, + dlManager: taskmanager.New(1, 100, time.Second*15), }, nil } // Close the crdt store func (m *Manager) Close() error { + m.dlManager.Stop(m.ctx) m.cancel() return m.crdt.Close() } @@ -199,6 +200,29 @@ func (m *Manager) GetAllCids() ([]cid.Cid, error) { return cids, nil } +// DownloadManagerStatus returns taskmanager status for handling downloads +func (m *Manager) DownloadManagerStatus() (res map[string]taskmanager.TaskStatus) { + return m.dlManager.TaskStatus() +} + +// StartUnfinishedDownload starts unfinished downloads once peer comes back +func (m *Manager) StartUnfinishedDownload(pid peer.ID) { + cm := m.repo.Matrix().ContentMatrix + for _, v := range cm { + if v.DownloadFinishedAt == 0 { + for _, provider := range v.ProvidedBy { + if provider == pid { + meta, err := m.FindTag(v.Tag) + if err != nil { + continue + } + m.download <- *meta + } + } + } + } +} + // Put stores the object `value` named by `key`. func (m *Manager) Put(key datastore.Key, v []byte) error { return m.crdt.Put(key, v) @@ -230,21 +254,73 @@ func (m *Manager) StartContentWatcher() { select { case <-m.ctx.Done(): return - case id := <-m.contentChan: + case meta := <-m.download: + id := meta.Hash log.Debugf("got %s\n", id.String()) go func() { providers := m.syncer.FindProviders(m.ctx, id) for _, provider := range providers { m.repo.Matrix().ContentAddProvider(id.String(), provider) } - _, err := m.syncer.GetFile(m.ctx, id) + //_, err := m.syncer.GetFile(m.ctx, id) + //if err != nil { + // log.Errorf("replication sync failed for %s, Err : %s", id.String(), err.Error()) + // return + //} + ctx, cancel := context.WithCancel(m.ctx) + t := newDownloaderTask(ctx, cancel, meta, m.syncer) + done, err := m.dlManager.Go(t) if err != nil { - log.Errorf("replication sync failed for %s, Err : %s", id.String(), err.Error()) + log.Errorf("content watcher: unable to start downloader task for %s : %s", meta.Name, err.Error()) return } - m.repo.Matrix().ContentDownloadFinished(id.String()) + m.repo.Matrix().ContentDownloadStarted(meta.Tag, id.String(), meta.Size) + select { + case <-ctx.Done(): + return + case <-done: + m.repo.Matrix().ContentDownloadFinished(id.String()) + m.syncMtx.Lock() + state := m.repo.State().Add([]byte(meta.Tag)) + log.Debugf("New State: %d\n", state) + err = m.repo.SetState() + if err != nil { + log.Errorf("SetState failed %s\n", err.Error()) + } + m.syncMtx.Unlock() + } }() } } }() } + +type DownloaderTask struct { + ctx context.Context + cancel context.CancelFunc + metatag Metatag + syncer Syncer +} + +func newDownloaderTask(ctx context.Context, cancel context.CancelFunc, metatag Metatag, syncer Syncer) *DownloaderTask { + return &DownloaderTask{ + ctx: ctx, + cancel: cancel, + metatag: metatag, + syncer: syncer, + } +} + +func (d *DownloaderTask) Execute(ctx context.Context) error { + <-time.After(time.Second) + _, err := d.syncer.GetFile(d.ctx, d.metatag.Hash) + if err != nil { + log.Errorf("replication sync failed for %s, Err : %s", d.metatag.Hash.String(), err.Error()) + return err + } + return nil +} + +func (d *DownloaderTask) Name() string { + return d.metatag.Name +} diff --git a/internal/replication/replication_test.go b/internal/replication/replication_test.go index 21e4ed0..0e194fa 100644 --- a/internal/replication/replication_test.go +++ b/internal/replication/replication_test.go @@ -2,6 +2,7 @@ package replication import ( "context" + "fmt" "os" "path/filepath" "testing" @@ -111,6 +112,19 @@ func (m mockRepo) SetState() error { return nil } +type mockDownload struct { + mockName string +} + +func (m *mockDownload) Execute(ctx context.Context) error { + <-time.After(time.Second * 2) + return nil +} + +func (m *mockDownload) Name() string { + return m.mockName +} + func TestNewManager(t *testing.T) { <-time.After(time.Second) ctx, cancel := context.WithCancel(context.Background()) @@ -176,6 +190,59 @@ func TestNewManager(t *testing.T) { } } +func TestDownloadManager(t *testing.T) { + <-time.After(time.Second) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + root := filepath.Join("../../test", "root1") + d, err := leveldb.NewDatastore(root, &leveldb.Options{}) + if err != nil { + t.Fatal(err) + } + defer d.Close() + r := &mockRepo{ + path: root, + state: bloom.New(uint(2000), 5), + ds: syncds.MutexWrap(d), + } + defer r.Close() + defer removeRepo(root, t) + priv, _, err := crypto.GenerateKeyPair(crypto.RSA, 2048) + if err != nil { + t.Fatal(err) + } + opts := []libp2p.Option{ + libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/4832"), + libp2p.Identity(priv), + libp2p.DisableRelay(), + } + h, err := libp2p.New(ctx, opts...) + if err != nil { + t.Fatal(err) + } + defer h.Close() + ds := &mockDAGSyncer{} + sy := &mockSyncer{} + childCtx, childCancel := context.WithCancel(ctx) + m, err := New(childCtx, childCancel, r, h, ds, r.Datastore(), "/prefix", "topic", time.Second, sy) + if err != nil { + t.Fatal(err) + } + defer m.Close() + taskCount := 10 + for i := 0; i < taskCount; i++ { + dt := &mockDownload{mockName: fmt.Sprintf("some content %d", i)} + m.dlManager.Go(dt) + } + if len(m.DownloadManagerStatus()) != taskCount { + t.Fatalf("taskCount should be %d got %d", taskCount, len(m.DownloadManagerStatus())) + } + <-time.After(time.Second * 3) + if len(m.DownloadManagerStatus()) != 0 { + t.Fatalf("taskCount should be 0 got %d", len(m.DownloadManagerStatus())) + } +} + func TestGetAllCids(t *testing.T) { <-time.After(time.Second) ctx, cancel := context.WithCancel(context.Background()) diff --git a/ipfs_test.go b/ipfs_test.go index a9381cc..83b5784 100644 --- a/ipfs_test.go +++ b/ipfs_test.go @@ -384,6 +384,7 @@ func TestState(t *testing.T) { Hash: n.Cid(), Timestamp: time.Now().Unix(), Owner: p1.Host.ID(), + Tag: fmt.Sprintf("tag%d", i), } err = p1.Manager.Tag(fmt.Sprintf("tag%d", i), meta) if err != nil { @@ -437,10 +438,11 @@ func TestStateDualPeer(t *testing.T) { meta := &replication.Metatag{ Size: int64(len(content)), Type: filetype.Unknown.Extension, - Name: "tag", + Name: fmt.Sprintf("tag%d", i), Hash: n.Cid(), Timestamp: time.Now().Unix(), Owner: p1.Host.ID(), + Tag: fmt.Sprintf("tag%d", i), } err = p1.Manager.Tag(fmt.Sprintf("tag%d", i), meta) if err != nil { diff --git a/mobile/datahop.go b/mobile/datahop.go index 58c0eaf..ed65019 100644 --- a/mobile/datahop.go +++ b/mobile/datahop.go @@ -12,11 +12,10 @@ import ( "sync" "time" + ipfslite "github.com/datahop/ipfs-lite" "github.com/datahop/ipfs-lite/internal/config" "github.com/datahop/ipfs-lite/internal/replication" "github.com/datahop/ipfs-lite/internal/repo" - - ipfslite "github.com/datahop/ipfs-lite" types "github.com/datahop/ipfs-lite/pb" "github.com/datahop/ipfs-lite/version" "github.com/golang/protobuf/proto" @@ -25,6 +24,8 @@ import ( logger "github.com/ipfs/go-log/v2" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/protocol" + pubsub "github.com/libp2p/go-libp2p-pubsub" ma "github.com/multiformats/go-multiaddr" ) @@ -57,6 +58,8 @@ func (n *notifier) ListenClose(network.Network, ma.Multiaddr) {} func (n *notifier) Connected(net network.Network, c network.Conn) { // NodeMatrix management hop.peer.Repo.Matrix().NodeConnected(c.RemotePeer().String()) + hop.peer.Manager.StartUnfinishedDownload(c.RemotePeer()) + if hop.hook != nil { hop.hook.PeerConnected(c.RemotePeer().String()) } @@ -110,6 +113,7 @@ type datahop struct { func init() { logger.SetLogLevel("ipfslite", "Debug") logger.SetLogLevel("datahop", "Debug") + logger.SetLogLevel("replication", "Debug") logger.SetLogLevel("matrix", "Debug") } @@ -222,6 +226,11 @@ func Start(shouldBootstrap bool) error { return nil } +const ( + FloodSubID = protocol.ID("/hopfloodsub/1.0.0") + TopicCRDT = "CRDTStateLine" +) + // StartDiscovery starts BLE discovery func StartDiscovery(advertising bool, scanning bool, autoDisconnect bool) error { if hop != nil { @@ -243,6 +252,14 @@ func StartDiscovery(advertising bool, scanning bool, autoDisconnect bool) error } } }() + log.Debug("autoDisconnect : ", autoDisconnect) + if autoDisconnect { + err := startCRDTStateWatcher() + if err != nil { + log.Error("StartDiscovery: autoDisconnect: startCRDTStateWatcher failed ", err.Error()) + return err + } + } if advertising && scanning { hop.discService.Start() log.Debug("Stated discovery") @@ -255,15 +272,89 @@ func StartDiscovery(advertising bool, scanning bool, autoDisconnect bool) error hop.discService.StartOnlyScanning() log.Debug("Started discovery only scanning") return nil - } else { - return errors.New("no advertising and no scanning enabled") } - } else { - return errors.New("discovery service is not initialised") + return errors.New("no advertising and no scanning enabled") } - } else { - return errors.New("Datahop service is not initialised") + return errors.New("discovery service is not initialised") + } + return errors.New("datahop service is not initialised") +} + +func startCRDTStateWatcher() error { + log.Debug("startCRDTStateWatcher") + var pubsubOptions []pubsub.Option + ps, err := pubsub.NewFloodsubWithProtocols(context.Background(), hop.peer.Host, + []protocol.ID{FloodSubID}, pubsubOptions...) + if err != nil { + return err + } + topic, err := ps.Join(TopicCRDT) + if err != nil { + return err } + ctx, cancle := context.WithCancel(hop.ctx) + hop.discService.stateInformer = &stateInformer{ + ctx: ctx, + cancel: cancle, + Topic: topic, + } + sub, err := hop.discService.stateInformer.Subscribe() + if err != nil { + return err + } + go func() { + for { + state, err := FilterFromState() + if err != nil { + return + } + msg := &Message{ + Id: ID(), + CRDTState: state, + } + msgBytes, err := msg.Marshal() + if err != nil { + return + } + select { + case <-hop.discService.stateInformer.ctx.Done(): + log.Debug("Stop stateInformer Publisher") + return + case <-time.After(time.Second * 10): + err := hop.discService.stateInformer.Publish(hop.ctx, msgBytes) + if err != nil { + log.Error("startCRDTStateWatcher : message publish error ", err.Error()) + continue + } + log.Debugf("startCRDTStateWatcher : sent message %+v\n", msg) + } + } + }() + go func() { + for { + got, err := sub.Next(hop.ctx) + if err != nil { + return + } + msg := Message{} + err = msg.Unmarshal(got.GetData()) + if err != nil { + return + } + log.Debugf("startCRDTStateWatcher : got message %+v\n", msg) + if msg.Id != ID() { + state, err := FilterFromState() + if err != nil { + continue + } + if msg.CRDTState == state && !hop.discService.isHost { + hop.wifiCon.Disconnect() + hop.discService.connected = false + } + } + } + }() + return nil } // StopDiscovery stops BLE discovery @@ -459,6 +550,7 @@ func Add(tag string, content []byte) error { Hash: n.Cid(), Timestamp: time.Now().Unix(), Owner: hop.peer.Host.ID(), + Tag: tag, } err = hop.peer.Manager.Tag(tag, meta) if err != nil { @@ -558,6 +650,10 @@ func Matrix() (string, error) { return "", errors.New("datahop ipfs-lite node is not running") } +func DownloadsInProgress() int { + return len(hop.peer.Manager.DownloadManagerStatus()) +} + // GetDiscoveryNotifier returns discovery notifier func GetDiscoveryNotifier() DiscoveryNotifier { return hop.discService diff --git a/mobile/datahop_test.go b/mobile/datahop_test.go index 2f2faa2..8d4b9d8 100644 --- a/mobile/datahop_test.go +++ b/mobile/datahop_test.go @@ -198,7 +198,7 @@ func TestStartStopDiscovery(t *testing.T) { removeRepo(root, t) Close() }() - err = StartDiscovery(true, true, true) + err = StartDiscovery(true, true, false) if err != nil { t.Fatal(err) } @@ -419,6 +419,7 @@ func TestReplicationOut(t *testing.T) { t.Fatal(err) } } + <-time.After(time.Second * 5) bf2, err := State() if err != nil { t.Fatal(err) @@ -426,7 +427,6 @@ func TestReplicationOut(t *testing.T) { if bytes.Compare(bf1, bf2) == 0 { t.Fatal("bloom filter is same after addition") } - <-time.After(time.Second * 5) bf3, err := p.Repo.State().MarshalJSON() if err != nil { t.Fatal(err) @@ -564,6 +564,7 @@ func TestReplicationIn(t *testing.T) { } p.Manager.Tag(fmt.Sprintf("tag%d", i), meta) } + <-time.After(time.Second * 5) bf2, err := p.Repo.State().MarshalJSON() if err != nil { t.Fatal(err) @@ -571,7 +572,6 @@ func TestReplicationIn(t *testing.T) { if bytes.Compare(bf1, bf2) == 0 { t.Fatal("bloom filter are same") } - <-time.After(time.Second * 5) bf3, err := State() if err != nil { t.Fatal(err) diff --git a/mobile/discovery.go b/mobile/discovery.go index 4b4ffed..848ba58 100644 --- a/mobile/discovery.go +++ b/mobile/discovery.go @@ -1,11 +1,14 @@ package datahop import ( + "context" + "encoding/json" "io" "sync" "time" "github.com/libp2p/go-libp2p-core/peer" + pubsub "github.com/libp2p/go-libp2p-pubsub" ) const DiscoveryServiceTag = "_datahop-discovery._ble" @@ -28,6 +31,25 @@ type Notifee interface { HandlePeerFound(string) } +type stateInformer struct { + ctx context.Context + cancel context.CancelFunc + *pubsub.Topic +} + +type Message struct { + Id string + CRDTState string +} + +func (m *Message) Marshal() ([]byte, error) { + return json.Marshal(m) +} + +func (m *Message) Unmarshal(val []byte) error { + return json.Unmarshal(val, m) +} + type discoveryService struct { discovery DiscoveryDriver advertiser AdvertisingDriver @@ -43,6 +65,9 @@ type discoveryService struct { connected bool //wifi connection status connected/disconnected numConnected int service ServiceType + + isHost bool + stateInformer *stateInformer // handleConnectionRequest will take care of the incoming connection request. // but it is not safe to use this approach, as in case of multiple back to // back connection requests we might loose some connection request as @@ -124,6 +149,7 @@ func (b *discoveryService) Close() error { b.advertiser.Stop() hop.wifiCon.Disconnect() hop.wifiHS.Stop() + b.isHost = false return nil } func (b *discoveryService) RegisterNotifee(n Notifee) { @@ -181,6 +207,7 @@ func (b *discoveryService) AdvertiserPeerDifferentStatus(topic string, value []b hop.peer.Repo.Matrix().BLEDiscovered(id) b.discovery.Stop() b.wifiHS.Start() + b.isHost = true } func (b *discoveryService) OnConnectionSuccess(started int64, completed int64, rssi int, speed int, freq int) {