Skip to content

Commit

Permalink
swarm: fix network/stream data races (ethereum#19051)
Browse files Browse the repository at this point in the history
* swarm/network/stream: newStreamerTester cleanup only if err is nil

* swarm/network/stream: raise newStreamerTester waitForPeers timeout

* swarm/network/stream: fix data races in GetPeerSubscriptions

* swarm/storage: prevent data race on LDBStore.batchesC

ethersphere/swarm#1198 (comment)

* swarm/network/stream: fix TestGetSubscriptionsRPC data race

ethersphere/swarm#1198 (comment)

* swarm/network/stream: correctly use Simulation.Run callback

ethersphere/swarm#1198 (comment)

* swarm/network: protect addrCountC in Kademlia.AddrCountC function

ethersphere/swarm#1198 (comment)

* p2p/simulations: fix a deadlock calling getRandomNode with lock

ethersphere/swarm#1198 (comment)

* swarm/network/stream: terminate disconnect goruotines in tests

* swarm/network/stream: reduce memory consumption when testing data races

* swarm/network/stream: add watchDisconnections helper function

* swarm/network/stream: add concurrent counter for tests

* swarm/network/stream: rename race/norace test files and use const

* swarm/network/stream: remove watchSim and its panic

* swarm/network/stream: pass context in watchDisconnections

* swarm/network/stream: add concurrent safe bool for watchDisconnections

* swarm/storage: fix LDBStore.batchesC data race by not closing it

(cherry picked from commit 3fd6db2)
  • Loading branch information
janos authored and dshulyak committed Mar 14, 2019
1 parent b671640 commit f6485fe
Show file tree
Hide file tree
Showing 14 changed files with 274 additions and 197 deletions.
2 changes: 1 addition & 1 deletion p2p/simulations/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ func (net *Network) getRandomNode(ids []enode.ID, excludeIDs []enode.ID) *Node {
if l == 0 {
return nil
}
return net.GetNode(filtered[rand.Intn(l)])
return net.getNode(filtered[rand.Intn(l)])
}

func filterIDs(ids []enode.ID, excludeIDs []enode.ID) []enode.ID {
Expand Down
3 changes: 3 additions & 0 deletions swarm/network/kademlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,9 @@ func (k *Kademlia) sendNeighbourhoodDepthChange() {
// Not receiving from the returned channel will block Register function
// when address count value changes.
func (k *Kademlia) AddrCountC() <-chan int {
k.lock.Lock()
defer k.lock.Unlock()

if k.addrCountC == nil {
k.addrCountC = make(chan int)
}
Expand Down
62 changes: 58 additions & 4 deletions swarm/network/stream/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func newStreamerTester(registryOptions *RegistryOptions) (*p2ptest.ProtocolTeste
// temp datadir
datadir, err := ioutil.TempDir("", "streamer")
if err != nil {
return nil, nil, nil, func() {}, err
return nil, nil, nil, nil, err
}
removeDataDir := func() {
os.RemoveAll(datadir)
Expand All @@ -163,12 +163,14 @@ func newStreamerTester(registryOptions *RegistryOptions) (*p2ptest.ProtocolTeste

localStore, err := storage.NewTestLocalStoreForAddr(params)
if err != nil {
return nil, nil, nil, removeDataDir, err
removeDataDir()
return nil, nil, nil, nil, err
}

netStore, err := storage.NewNetStore(localStore, nil)
if err != nil {
return nil, nil, nil, removeDataDir, err
removeDataDir()
return nil, nil, nil, nil, err
}

delivery := NewDelivery(to, netStore)
Expand All @@ -180,8 +182,9 @@ func newStreamerTester(registryOptions *RegistryOptions) (*p2ptest.ProtocolTeste
}
protocolTester := p2ptest.NewProtocolTester(addr.ID(), 1, streamer.runProtocol)

err = waitForPeers(streamer, 1*time.Second, 1)
err = waitForPeers(streamer, 10*time.Second, 1)
if err != nil {
teardown()
return nil, nil, nil, nil, errors.New("timeout: peer is not created")
}

Expand Down Expand Up @@ -317,3 +320,54 @@ func createTestLocalStorageForID(id enode.ID, addr *network.BzzAddr) (storage.Ch
}
return store, datadir, nil
}

// watchDisconnections receives simulation peer events in a new goroutine and sets atomic value
// disconnected to true in case of a disconnect event.
func watchDisconnections(ctx context.Context, sim *simulation.Simulation) (disconnected *boolean) {
log.Debug("Watching for disconnections")
disconnections := sim.PeerEvents(
ctx,
sim.NodeIDs(),
simulation.NewPeerEventsFilter().Drop(),
)
disconnected = new(boolean)
go func() {
for {
select {
case <-ctx.Done():
return
case d := <-disconnections:
if d.Error != nil {
log.Error("peer drop event error", "node", d.NodeID, "peer", d.PeerID, "err", d.Error)
} else {
log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID)
}
disconnected.set(true)
}
}
}()
return disconnected
}

// boolean is used to concurrently set
// and read a boolean value.
type boolean struct {
v bool
mu sync.RWMutex
}

// set sets the value.
func (b *boolean) set(v bool) {
b.mu.Lock()
defer b.mu.Unlock()

b.v = v
}

// bool reads the value.
func (b *boolean) bool() bool {
b.mu.RLock()
defer b.mu.RUnlock()

return b.v
}
75 changes: 20 additions & 55 deletions swarm/network/stream/delivery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"errors"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"

Expand All @@ -48,10 +47,10 @@ func TestStreamerRetrieveRequest(t *testing.T) {
Syncing: SyncingDisabled,
}
tester, streamer, _, teardown, err := newStreamerTester(regOpts)
defer teardown()
if err != nil {
t.Fatal(err)
}
defer teardown()

node := tester.Nodes[0]

Expand Down Expand Up @@ -100,10 +99,10 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) {
Retrieval: RetrievalEnabled,
Syncing: SyncingDisabled, //do no syncing
})
defer teardown()
if err != nil {
t.Fatal(err)
}
defer teardown()

node := tester.Nodes[0]

Expand Down Expand Up @@ -172,10 +171,10 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) {
Retrieval: RetrievalEnabled,
Syncing: SyncingDisabled,
})
defer teardown()
if err != nil {
t.Fatal(err)
}
defer teardown()

node := tester.Nodes[0]

Expand Down Expand Up @@ -362,10 +361,10 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) {
Retrieval: RetrievalDisabled,
Syncing: SyncingDisabled,
})
defer teardown()
if err != nil {
t.Fatal(err)
}
defer teardown()

streamer.RegisterClientFunc("foo", func(p *Peer, t string, live bool) (Client, error) {
return &testClient{
Expand Down Expand Up @@ -485,7 +484,8 @@ func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool)
}

log.Info("Starting simulation")
ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) {
nodeIDs := sim.UpNodeIDs()
//determine the pivot node to be the first node of the simulation
Expand Down Expand Up @@ -548,27 +548,10 @@ func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool)
retErrC <- err
}()

log.Debug("Watching for disconnections")
disconnections := sim.PeerEvents(
context.Background(),
sim.NodeIDs(),
simulation.NewPeerEventsFilter().Drop(),
)

var disconnected atomic.Value
go func() {
for d := range disconnections {
if d.Error != nil {
log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID)
disconnected.Store(true)
}
}
}()
disconnected := watchDisconnections(ctx, sim)
defer func() {
if err != nil {
if yes, ok := disconnected.Load().(bool); ok && yes {
err = errors.New("disconnect events received")
}
if err != nil && disconnected.bool() {
err = errors.New("disconnect events received")
}
}()

Expand All @@ -589,7 +572,7 @@ func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool)
return fmt.Errorf("Test failed, chunks not available on all nodes")
}
if err := <-retErrC; err != nil {
t.Fatalf("requesting chunks: %v", err)
return fmt.Errorf("requesting chunks: %v", err)
}
log.Debug("Test terminated successfully")
return nil
Expand Down Expand Up @@ -657,48 +640,33 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck b
b.Fatal(err)
}

ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) {
nodeIDs := sim.UpNodeIDs()
node := nodeIDs[len(nodeIDs)-1]

item, ok := sim.NodeItem(node, bucketKeyFileStore)
if !ok {
b.Fatal("No filestore")
return errors.New("No filestore")
}
remoteFileStore := item.(*storage.FileStore)

pivotNode := nodeIDs[0]
item, ok = sim.NodeItem(pivotNode, bucketKeyNetStore)
if !ok {
b.Fatal("No filestore")
return errors.New("No filestore")
}
netStore := item.(*storage.NetStore)

if _, err := sim.WaitTillHealthy(ctx); err != nil {
return err
}

disconnections := sim.PeerEvents(
context.Background(),
sim.NodeIDs(),
simulation.NewPeerEventsFilter().Drop(),
)

var disconnected atomic.Value
go func() {
for d := range disconnections {
if d.Error != nil {
log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID)
disconnected.Store(true)
}
}
}()
disconnected := watchDisconnections(ctx, sim)
defer func() {
if err != nil {
if yes, ok := disconnected.Load().(bool); ok && yes {
err = errors.New("disconnect events received")
}
if err != nil && disconnected.bool() {
err = errors.New("disconnect events received")
}
}()
// benchmark loop
Expand All @@ -713,12 +681,12 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck b
ctx := context.TODO()
hash, wait, err := remoteFileStore.Store(ctx, testutil.RandomReader(i, chunkSize), int64(chunkSize), false)
if err != nil {
b.Fatalf("expected no error. got %v", err)
return fmt.Errorf("store: %v", err)
}
// wait until all chunks stored
err = wait(ctx)
if err != nil {
b.Fatalf("expected no error. got %v", err)
return fmt.Errorf("wait store: %v", err)
}
// collect the hashes
hashes[i] = hash
Expand Down Expand Up @@ -754,10 +722,7 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck b
break Loop
}
}
if err != nil {
b.Fatal(err)
}
return nil
return err
})
if result.Error != nil {
b.Fatal(result.Error)
Expand Down
30 changes: 5 additions & 25 deletions swarm/network/stream/intervals_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"errors"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -118,13 +117,11 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {

_, wait, err := fileStore.Store(ctx, testutil.RandomReader(1, size), int64(size), false)
if err != nil {
log.Error("Store error: %v", "err", err)
t.Fatal(err)
return fmt.Errorf("store: %v", err)
}
err = wait(ctx)
if err != nil {
log.Error("Wait error: %v", "err", err)
t.Fatal(err)
return fmt.Errorf("wait store: %v", err)
}

item, ok = sim.NodeItem(checker, bucketKeyRegistry)
Expand All @@ -136,32 +133,15 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
liveErrC := make(chan error)
historyErrC := make(chan error)

log.Debug("Watching for disconnections")
disconnections := sim.PeerEvents(
context.Background(),
sim.NodeIDs(),
simulation.NewPeerEventsFilter().Drop(),
)

err = registry.Subscribe(storer, NewStream(externalStreamName, "", live), history, Top)
if err != nil {
return err
}

var disconnected atomic.Value
go func() {
for d := range disconnections {
if d.Error != nil {
log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID)
disconnected.Store(true)
}
}
}()
disconnected := watchDisconnections(ctx, sim)
defer func() {
if err != nil {
if yes, ok := disconnected.Load().(bool); ok && yes {
err = errors.New("disconnect events received")
}
if err != nil && disconnected.bool() {
err = errors.New("disconnect events received")
}
}()

Expand Down
Loading

0 comments on commit f6485fe

Please sign in to comment.