From 3ee09ba03511ad9a49e37c58f0c35b9c9771dd6f Mon Sep 17 00:00:00 2001 From: Elad Date: Thu, 14 Feb 2019 13:51:57 +0700 Subject: [PATCH] swarm/storage/netstore: add fetcher cancellation on shutdown (#19049) swarm/network/stream: remove netstore internal wg swarm/network/stream: run individual tests with t.Run --- swarm/network/stream/delivery_test.go | 229 +++++++++--------- .../network/stream/snapshot_retrieval_test.go | 15 +- swarm/storage/netstore.go | 20 +- 3 files changed, 144 insertions(+), 120 deletions(-) diff --git a/swarm/network/stream/delivery_test.go b/swarm/network/stream/delivery_test.go index e5821df4f934..49e4a423a746 100644 --- a/swarm/network/stream/delivery_test.go +++ b/swarm/network/stream/delivery_test.go @@ -453,133 +453,136 @@ func TestDeliveryFromNodes(t *testing.T) { } func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool) { - sim := simulation.New(map[string]simulation.ServiceFunc{ - "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { - addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket) - if err != nil { - return nil, nil, err - } - - r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ - SkipCheck: skipCheck, - Syncing: SyncingDisabled, - Retrieval: RetrievalEnabled, - }, nil) - bucket.Store(bucketKeyRegistry, r) + t.Helper() + t.Run(fmt.Sprintf("testDeliveryFromNodes_%d_%d_skipCheck_%t", nodes, chunkCount, skipCheck), func(t *testing.T) { + sim := simulation.New(map[string]simulation.ServiceFunc{ + "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { + addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket) + if err != nil { + return nil, nil, err + } - cleanup = func() { - r.Close() - clean() - } + r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ + SkipCheck: skipCheck, + Syncing: SyncingDisabled, + Retrieval: RetrievalEnabled, + }, nil) + bucket.Store(bucketKeyRegistry, r) - return r, cleanup, nil - }, - }) - defer sim.Close() + cleanup = func() { + r.Close() + clean() + } - log.Info("Adding nodes to simulation") - _, err := sim.AddNodesAndConnectChain(nodes) - if err != nil { - t.Fatal(err) - } + return r, cleanup, nil + }, + }) + defer sim.Close() - log.Info("Starting simulation") - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) { - nodeIDs := sim.UpNodeIDs() - //determine the pivot node to be the first node of the simulation - pivot := nodeIDs[0] - - //distribute chunks of a random file into Stores of nodes 1 to nodes - //we will do this by creating a file store with an underlying round-robin store: - //the file store will create a hash for the uploaded file, but every chunk will be - //distributed to different nodes via round-robin scheduling - log.Debug("Writing file to round-robin file store") - //to do this, we create an array for chunkstores (length minus one, the pivot node) - stores := make([]storage.ChunkStore, len(nodeIDs)-1) - //we then need to get all stores from the sim.... - lStores := sim.NodesItems(bucketKeyStore) - i := 0 - //...iterate the buckets... - for id, bucketVal := range lStores { - //...and remove the one which is the pivot node - if id == pivot { - continue - } - //the other ones are added to the array... - stores[i] = bucketVal.(storage.ChunkStore) - i++ - } - //...which then gets passed to the round-robin file store - roundRobinFileStore := storage.NewFileStore(newRoundRobinStore(stores...), storage.NewFileStoreParams()) - //now we can actually upload a (random) file to the round-robin store - size := chunkCount * chunkSize - log.Debug("Storing data to file store") - fileHash, wait, err := roundRobinFileStore.Store(ctx, testutil.RandomReader(1, size), int64(size), false) - // wait until all chunks stored + log.Info("Adding nodes to simulation") + _, err := sim.AddNodesAndConnectChain(nodes) if err != nil { - return err - } - err = wait(ctx) - if err != nil { - return err + t.Fatal(err) } - log.Debug("Waiting for kademlia") - // TODO this does not seem to be correct usage of the function, as the simulation may have no kademlias - if _, err := sim.WaitTillHealthy(ctx); err != nil { - return err - } + log.Info("Starting simulation") + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) { + nodeIDs := sim.UpNodeIDs() + //determine the pivot node to be the first node of the simulation + pivot := nodeIDs[0] + + //distribute chunks of a random file into Stores of nodes 1 to nodes + //we will do this by creating a file store with an underlying round-robin store: + //the file store will create a hash for the uploaded file, but every chunk will be + //distributed to different nodes via round-robin scheduling + log.Debug("Writing file to round-robin file store") + //to do this, we create an array for chunkstores (length minus one, the pivot node) + stores := make([]storage.ChunkStore, len(nodeIDs)-1) + //we then need to get all stores from the sim.... + lStores := sim.NodesItems(bucketKeyStore) + i := 0 + //...iterate the buckets... + for id, bucketVal := range lStores { + //...and remove the one which is the pivot node + if id == pivot { + continue + } + //the other ones are added to the array... + stores[i] = bucketVal.(storage.ChunkStore) + i++ + } + //...which then gets passed to the round-robin file store + roundRobinFileStore := storage.NewFileStore(newRoundRobinStore(stores...), storage.NewFileStoreParams()) + //now we can actually upload a (random) file to the round-robin store + size := chunkCount * chunkSize + log.Debug("Storing data to file store") + fileHash, wait, err := roundRobinFileStore.Store(ctx, testutil.RandomReader(1, size), int64(size), false) + // wait until all chunks stored + if err != nil { + return err + } + err = wait(ctx) + if err != nil { + return err + } - //get the pivot node's filestore - item, ok := sim.NodeItem(pivot, bucketKeyFileStore) - if !ok { - return fmt.Errorf("No filestore") - } - pivotFileStore := item.(*storage.FileStore) - log.Debug("Starting retrieval routine") - retErrC := make(chan error) - go func() { - // start the retrieval on the pivot node - this will spawn retrieve requests for missing chunks - // we must wait for the peer connections to have started before requesting - n, err := readAll(pivotFileStore, fileHash) - log.Info(fmt.Sprintf("retrieved %v", fileHash), "read", n, "err", err) - retErrC <- err - }() + log.Debug("Waiting for kademlia") + // TODO this does not seem to be correct usage of the function, as the simulation may have no kademlias + if _, err := sim.WaitTillHealthy(ctx); err != nil { + return err + } - disconnected := watchDisconnections(ctx, sim) - defer func() { - if err != nil && disconnected.bool() { - err = errors.New("disconnect events received") + //get the pivot node's filestore + item, ok := sim.NodeItem(pivot, bucketKeyFileStore) + if !ok { + return fmt.Errorf("No filestore") } - }() + pivotFileStore := item.(*storage.FileStore) + log.Debug("Starting retrieval routine") + retErrC := make(chan error) + go func() { + // start the retrieval on the pivot node - this will spawn retrieve requests for missing chunks + // we must wait for the peer connections to have started before requesting + n, err := readAll(pivotFileStore, fileHash) + log.Info(fmt.Sprintf("retrieved %v", fileHash), "read", n, "err", err) + retErrC <- err + }() + + disconnected := watchDisconnections(ctx, sim) + defer func() { + if err != nil && disconnected.bool() { + err = errors.New("disconnect events received") + } + }() - //finally check that the pivot node gets all chunks via the root hash - log.Debug("Check retrieval") - success := true - var total int64 - total, err = readAll(pivotFileStore, fileHash) - if err != nil { - return err - } - log.Info(fmt.Sprintf("check if %08x is available locally: number of bytes read %v/%v (error: %v)", fileHash, total, size, err)) - if err != nil || total != int64(size) { - success = false - } + //finally check that the pivot node gets all chunks via the root hash + log.Debug("Check retrieval") + success := true + var total int64 + total, err = readAll(pivotFileStore, fileHash) + if err != nil { + return err + } + log.Info(fmt.Sprintf("check if %08x is available locally: number of bytes read %v/%v (error: %v)", fileHash, total, size, err)) + if err != nil || total != int64(size) { + success = false + } - if !success { - return fmt.Errorf("Test failed, chunks not available on all nodes") - } - if err := <-retErrC; err != nil { - return fmt.Errorf("requesting chunks: %v", err) + if !success { + return fmt.Errorf("Test failed, chunks not available on all nodes") + } + if err := <-retErrC; err != nil { + return fmt.Errorf("requesting chunks: %v", err) + } + log.Debug("Test terminated successfully") + return nil + }) + if result.Error != nil { + t.Fatal(result.Error) } - log.Debug("Test terminated successfully") - return nil }) - if result.Error != nil { - t.Fatal(result.Error) - } } func BenchmarkDeliveryFromNodesWithoutCheck(b *testing.B) { diff --git a/swarm/network/stream/snapshot_retrieval_test.go b/swarm/network/stream/snapshot_retrieval_test.go index 0f20f1be0606..afb023ae295d 100644 --- a/swarm/network/stream/snapshot_retrieval_test.go +++ b/swarm/network/stream/snapshot_retrieval_test.go @@ -74,7 +74,7 @@ func TestRetrieval(t *testing.T) { //if nodes/chunks have been provided via commandline, //run the tests with these values if *nodes != 0 && *chunks != 0 { - err := runRetrievalTest(*chunks, *nodes) + err := runRetrievalTest(t, *chunks, *nodes) if err != nil { t.Fatal(err) } @@ -93,10 +93,12 @@ func TestRetrieval(t *testing.T) { } for _, n := range nodeCnt { for _, c := range chnkCnt { - err := runRetrievalTest(c, n) - if err != nil { - t.Fatal(err) - } + t.Run(fmt.Sprintf("TestRetrieval_%d_%d", n, c), func(t *testing.T) { + err := runRetrievalTest(t, c, n) + if err != nil { + t.Fatal(err) + } + }) } } } @@ -225,7 +227,8 @@ simulation's `action` function. The snapshot should have 'streamer' in its service list. */ -func runRetrievalTest(chunkCount int, nodeCount int) error { +func runRetrievalTest(t *testing.T, chunkCount int, nodeCount int) error { + t.Helper() sim := simulation.New(retrievalSimServiceMap) defer sim.Close() diff --git a/swarm/storage/netstore.go b/swarm/storage/netstore.go index a2595d9fadea..202af2bf58a9 100644 --- a/swarm/storage/netstore.go +++ b/swarm/storage/netstore.go @@ -128,7 +128,25 @@ func (n *NetStore) FetchFunc(ctx context.Context, ref Address) func(context.Cont func (n *NetStore) Close() { close(n.closeC) n.store.Close() - // TODO: loop through fetchers to cancel them + + wg := sync.WaitGroup{} + for _, key := range n.fetchers.Keys() { + if f, ok := n.fetchers.Get(key); ok { + if fetch, ok := f.(*fetcher); ok { + wg.Add(1) + go func(fetch *fetcher) { + defer wg.Done() + fetch.cancel() + + select { + case <-fetch.deliveredC: + case <-fetch.cancelledC: + } + }(fetch) + } + } + } + wg.Wait() } // get attempts at retrieving the chunk from LocalStore