Skip to content

Commit

Permalink
swarm/storage/netstore: add fetcher cancellation on shutdown (#19049)
Browse files Browse the repository at this point in the history
swarm/network/stream: remove netstore internal wg
swarm/network/stream: run individual tests with t.Run
  • Loading branch information
acud authored and zelig committed Feb 14, 2019
1 parent e9f70c9 commit 3ee09ba
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 120 deletions.
229 changes: 116 additions & 113 deletions swarm/network/stream/delivery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,133 +453,136 @@ func TestDeliveryFromNodes(t *testing.T) {
}

func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool) {
sim := simulation.New(map[string]simulation.ServiceFunc{
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket)
if err != nil {
return nil, nil, err
}

r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
SkipCheck: skipCheck,
Syncing: SyncingDisabled,
Retrieval: RetrievalEnabled,
}, nil)
bucket.Store(bucketKeyRegistry, r)
t.Helper()
t.Run(fmt.Sprintf("testDeliveryFromNodes_%d_%d_skipCheck_%t", nodes, chunkCount, skipCheck), func(t *testing.T) {
sim := simulation.New(map[string]simulation.ServiceFunc{
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket)
if err != nil {
return nil, nil, err
}

cleanup = func() {
r.Close()
clean()
}
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
SkipCheck: skipCheck,
Syncing: SyncingDisabled,
Retrieval: RetrievalEnabled,
}, nil)
bucket.Store(bucketKeyRegistry, r)

return r, cleanup, nil
},
})
defer sim.Close()
cleanup = func() {
r.Close()
clean()
}

log.Info("Adding nodes to simulation")
_, err := sim.AddNodesAndConnectChain(nodes)
if err != nil {
t.Fatal(err)
}
return r, cleanup, nil
},
})
defer sim.Close()

log.Info("Starting simulation")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) {
nodeIDs := sim.UpNodeIDs()
//determine the pivot node to be the first node of the simulation
pivot := nodeIDs[0]

//distribute chunks of a random file into Stores of nodes 1 to nodes
//we will do this by creating a file store with an underlying round-robin store:
//the file store will create a hash for the uploaded file, but every chunk will be
//distributed to different nodes via round-robin scheduling
log.Debug("Writing file to round-robin file store")
//to do this, we create an array for chunkstores (length minus one, the pivot node)
stores := make([]storage.ChunkStore, len(nodeIDs)-1)
//we then need to get all stores from the sim....
lStores := sim.NodesItems(bucketKeyStore)
i := 0
//...iterate the buckets...
for id, bucketVal := range lStores {
//...and remove the one which is the pivot node
if id == pivot {
continue
}
//the other ones are added to the array...
stores[i] = bucketVal.(storage.ChunkStore)
i++
}
//...which then gets passed to the round-robin file store
roundRobinFileStore := storage.NewFileStore(newRoundRobinStore(stores...), storage.NewFileStoreParams())
//now we can actually upload a (random) file to the round-robin store
size := chunkCount * chunkSize
log.Debug("Storing data to file store")
fileHash, wait, err := roundRobinFileStore.Store(ctx, testutil.RandomReader(1, size), int64(size), false)
// wait until all chunks stored
log.Info("Adding nodes to simulation")
_, err := sim.AddNodesAndConnectChain(nodes)
if err != nil {
return err
}
err = wait(ctx)
if err != nil {
return err
t.Fatal(err)
}

log.Debug("Waiting for kademlia")
// TODO this does not seem to be correct usage of the function, as the simulation may have no kademlias
if _, err := sim.WaitTillHealthy(ctx); err != nil {
return err
}
log.Info("Starting simulation")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) {
nodeIDs := sim.UpNodeIDs()
//determine the pivot node to be the first node of the simulation
pivot := nodeIDs[0]

//distribute chunks of a random file into Stores of nodes 1 to nodes
//we will do this by creating a file store with an underlying round-robin store:
//the file store will create a hash for the uploaded file, but every chunk will be
//distributed to different nodes via round-robin scheduling
log.Debug("Writing file to round-robin file store")
//to do this, we create an array for chunkstores (length minus one, the pivot node)
stores := make([]storage.ChunkStore, len(nodeIDs)-1)
//we then need to get all stores from the sim....
lStores := sim.NodesItems(bucketKeyStore)
i := 0
//...iterate the buckets...
for id, bucketVal := range lStores {
//...and remove the one which is the pivot node
if id == pivot {
continue
}
//the other ones are added to the array...
stores[i] = bucketVal.(storage.ChunkStore)
i++
}
//...which then gets passed to the round-robin file store
roundRobinFileStore := storage.NewFileStore(newRoundRobinStore(stores...), storage.NewFileStoreParams())
//now we can actually upload a (random) file to the round-robin store
size := chunkCount * chunkSize
log.Debug("Storing data to file store")
fileHash, wait, err := roundRobinFileStore.Store(ctx, testutil.RandomReader(1, size), int64(size), false)
// wait until all chunks stored
if err != nil {
return err
}
err = wait(ctx)
if err != nil {
return err
}

//get the pivot node's filestore
item, ok := sim.NodeItem(pivot, bucketKeyFileStore)
if !ok {
return fmt.Errorf("No filestore")
}
pivotFileStore := item.(*storage.FileStore)
log.Debug("Starting retrieval routine")
retErrC := make(chan error)
go func() {
// start the retrieval on the pivot node - this will spawn retrieve requests for missing chunks
// we must wait for the peer connections to have started before requesting
n, err := readAll(pivotFileStore, fileHash)
log.Info(fmt.Sprintf("retrieved %v", fileHash), "read", n, "err", err)
retErrC <- err
}()
log.Debug("Waiting for kademlia")
// TODO this does not seem to be correct usage of the function, as the simulation may have no kademlias
if _, err := sim.WaitTillHealthy(ctx); err != nil {
return err
}

disconnected := watchDisconnections(ctx, sim)
defer func() {
if err != nil && disconnected.bool() {
err = errors.New("disconnect events received")
//get the pivot node's filestore
item, ok := sim.NodeItem(pivot, bucketKeyFileStore)
if !ok {
return fmt.Errorf("No filestore")
}
}()
pivotFileStore := item.(*storage.FileStore)
log.Debug("Starting retrieval routine")
retErrC := make(chan error)
go func() {
// start the retrieval on the pivot node - this will spawn retrieve requests for missing chunks
// we must wait for the peer connections to have started before requesting
n, err := readAll(pivotFileStore, fileHash)
log.Info(fmt.Sprintf("retrieved %v", fileHash), "read", n, "err", err)
retErrC <- err
}()

disconnected := watchDisconnections(ctx, sim)
defer func() {
if err != nil && disconnected.bool() {
err = errors.New("disconnect events received")
}
}()

//finally check that the pivot node gets all chunks via the root hash
log.Debug("Check retrieval")
success := true
var total int64
total, err = readAll(pivotFileStore, fileHash)
if err != nil {
return err
}
log.Info(fmt.Sprintf("check if %08x is available locally: number of bytes read %v/%v (error: %v)", fileHash, total, size, err))
if err != nil || total != int64(size) {
success = false
}
//finally check that the pivot node gets all chunks via the root hash
log.Debug("Check retrieval")
success := true
var total int64
total, err = readAll(pivotFileStore, fileHash)
if err != nil {
return err
}
log.Info(fmt.Sprintf("check if %08x is available locally: number of bytes read %v/%v (error: %v)", fileHash, total, size, err))
if err != nil || total != int64(size) {
success = false
}

if !success {
return fmt.Errorf("Test failed, chunks not available on all nodes")
}
if err := <-retErrC; err != nil {
return fmt.Errorf("requesting chunks: %v", err)
if !success {
return fmt.Errorf("Test failed, chunks not available on all nodes")
}
if err := <-retErrC; err != nil {
return fmt.Errorf("requesting chunks: %v", err)
}
log.Debug("Test terminated successfully")
return nil
})
if result.Error != nil {
t.Fatal(result.Error)
}
log.Debug("Test terminated successfully")
return nil
})
if result.Error != nil {
t.Fatal(result.Error)
}
}

func BenchmarkDeliveryFromNodesWithoutCheck(b *testing.B) {
Expand Down
15 changes: 9 additions & 6 deletions swarm/network/stream/snapshot_retrieval_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestRetrieval(t *testing.T) {
//if nodes/chunks have been provided via commandline,
//run the tests with these values
if *nodes != 0 && *chunks != 0 {
err := runRetrievalTest(*chunks, *nodes)
err := runRetrievalTest(t, *chunks, *nodes)
if err != nil {
t.Fatal(err)
}
Expand All @@ -93,10 +93,12 @@ func TestRetrieval(t *testing.T) {
}
for _, n := range nodeCnt {
for _, c := range chnkCnt {
err := runRetrievalTest(c, n)
if err != nil {
t.Fatal(err)
}
t.Run(fmt.Sprintf("TestRetrieval_%d_%d", n, c), func(t *testing.T) {
err := runRetrievalTest(t, c, n)
if err != nil {
t.Fatal(err)
}
})
}
}
}
Expand Down Expand Up @@ -225,7 +227,8 @@ simulation's `action` function.
The snapshot should have 'streamer' in its service list.
*/
func runRetrievalTest(chunkCount int, nodeCount int) error {
func runRetrievalTest(t *testing.T, chunkCount int, nodeCount int) error {
t.Helper()
sim := simulation.New(retrievalSimServiceMap)
defer sim.Close()

Expand Down
20 changes: 19 additions & 1 deletion swarm/storage/netstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,25 @@ func (n *NetStore) FetchFunc(ctx context.Context, ref Address) func(context.Cont
func (n *NetStore) Close() {
close(n.closeC)
n.store.Close()
// TODO: loop through fetchers to cancel them

wg := sync.WaitGroup{}
for _, key := range n.fetchers.Keys() {
if f, ok := n.fetchers.Get(key); ok {
if fetch, ok := f.(*fetcher); ok {
wg.Add(1)
go func(fetch *fetcher) {
defer wg.Done()
fetch.cancel()

select {
case <-fetch.deliveredC:
case <-fetch.cancelledC:
}
}(fetch)
}
}
}
wg.Wait()
}

// get attempts at retrieving the chunk from LocalStore
Expand Down

0 comments on commit 3ee09ba

Please sign in to comment.