Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

swarm/network: refactor simulation tests bootstrap #18975

Merged
merged 1 commit into from
Feb 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions swarm/network/stream/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,19 @@ import (
"math/rand"
"os"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
p2ptest "github.com/ethereum/go-ethereum/p2p/testing"
"github.com/ethereum/go-ethereum/swarm/network"
"github.com/ethereum/go-ethereum/swarm/network/simulation"
"github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage"
mockmem "github.com/ethereum/go-ethereum/swarm/storage/mock/mem"
"github.com/ethereum/go-ethereum/swarm/testutil"
colorable "github.com/mattn/go-colorable"
)
Expand Down Expand Up @@ -66,6 +69,80 @@ func init() {
log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(colorable.NewColorableStderr(), log.TerminalFormat(true))))
}

// newNetStoreAndDelivery is a default constructor for BzzAddr, NetStore and Delivery, used in Simulations
func newNetStoreAndDelivery(ctx *adapters.ServiceContext, bucket *sync.Map) (*network.BzzAddr, *storage.NetStore, *Delivery, func(), error) {
addr := network.NewAddr(ctx.Config.Node())

netStore, delivery, cleanup, err := netStoreAndDeliveryWithAddr(ctx, bucket, addr)
if err != nil {
return nil, nil, nil, nil, err
}

netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New

return addr, netStore, delivery, cleanup, nil
}

// newNetStoreAndDeliveryWithBzzAddr is a constructor for NetStore and Delivery, used in Simulations, accepting any BzzAddr
func newNetStoreAndDeliveryWithBzzAddr(ctx *adapters.ServiceContext, bucket *sync.Map, addr *network.BzzAddr) (*storage.NetStore, *Delivery, func(), error) {
netStore, delivery, cleanup, err := netStoreAndDeliveryWithAddr(ctx, bucket, addr)
if err != nil {
return nil, nil, nil, err
}

netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New

return netStore, delivery, cleanup, nil
}

// newNetStoreAndDeliveryWithRequestFunc is a constructor for NetStore and Delivery, used in Simulations, accepting any NetStore.RequestFunc
func newNetStoreAndDeliveryWithRequestFunc(ctx *adapters.ServiceContext, bucket *sync.Map, rf network.RequestFunc) (*network.BzzAddr, *storage.NetStore, *Delivery, func(), error) {
addr := network.NewAddr(ctx.Config.Node())

netStore, delivery, cleanup, err := netStoreAndDeliveryWithAddr(ctx, bucket, addr)
if err != nil {
return nil, nil, nil, nil, err
}

netStore.NewNetFetcherFunc = network.NewFetcherFactory(rf, true).New

return addr, netStore, delivery, cleanup, nil
}

func netStoreAndDeliveryWithAddr(ctx *adapters.ServiceContext, bucket *sync.Map, addr *network.BzzAddr) (*storage.NetStore, *Delivery, func(), error) {
n := ctx.Config.Node()

store, datadir, err := createTestLocalStorageForID(n.ID(), addr)
if *useMockStore {
store, datadir, err = createMockStore(mockmem.NewGlobalStore(), n.ID(), addr)
}
if err != nil {
return nil, nil, nil, err
}
localStore := store.(*storage.LocalStore)
netStore, err := storage.NewNetStore(localStore, nil)
if err != nil {
return nil, nil, nil, err
}

fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())

kad := network.NewKademlia(addr.Over(), network.NewKadParams())
delivery := NewDelivery(kad, netStore)

bucket.Store(bucketKeyStore, store)
bucket.Store(bucketKeyDB, netStore)
bucket.Store(bucketKeyDelivery, delivery)
bucket.Store(bucketKeyFileStore, fileStore)

cleanup := func() {
netStore.Close()
os.RemoveAll(datadir)
}

return netStore, delivery, cleanup, nil
}

func newStreamerTester(registryOptions *RegistryOptions) (*p2ptest.ProtocolTester, *Registry, *storage.LocalStore, func(), error) {
// setup
addr := network.RandomAddr() // tested peers peer address
Expand Down
51 changes: 11 additions & 40 deletions swarm/network/stream/delivery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"context"
"errors"
"fmt"
"os"
"sync"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -457,39 +456,24 @@ func TestDeliveryFromNodes(t *testing.T) {
func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool) {
sim := simulation.New(map[string]simulation.ServiceFunc{
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
node := ctx.Config.Node()
addr := network.NewAddr(node)
store, datadir, err := createTestLocalStorageForID(node.ID(), addr)
if err != nil {
return nil, nil, err
}
bucket.Store(bucketKeyStore, store)
cleanup = func() {
os.RemoveAll(datadir)
store.Close()
}
localStore := store.(*storage.LocalStore)
netStore, err := storage.NewNetStore(localStore, nil)
addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket)
if err != nil {
return nil, nil, err
}

kad := network.NewKademlia(addr.Over(), network.NewKadParams())
delivery := NewDelivery(kad, netStore)
netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New

r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
SkipCheck: skipCheck,
Syncing: SyncingDisabled,
Retrieval: RetrievalEnabled,
}, nil)
bucket.Store(bucketKeyRegistry, r)

fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
bucket.Store(bucketKeyFileStore, fileStore)
cleanup = func() {
r.Close()
clean()
}

return r, cleanup, nil

},
})
defer sim.Close()
Expand Down Expand Up @@ -644,38 +628,25 @@ func BenchmarkDeliveryFromNodesWithCheck(b *testing.B) {
func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck bool) {
sim := simulation.New(map[string]simulation.ServiceFunc{
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
node := ctx.Config.Node()
addr := network.NewAddr(node)
store, datadir, err := createTestLocalStorageForID(node.ID(), addr)
addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket)
if err != nil {
return nil, nil, err
}
bucket.Store(bucketKeyStore, store)
cleanup = func() {
os.RemoveAll(datadir)
store.Close()
}
localStore := store.(*storage.LocalStore)
netStore, err := storage.NewNetStore(localStore, nil)
if err != nil {
return nil, nil, err
}
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
delivery := NewDelivery(kad, netStore)
netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New

r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
SkipCheck: skipCheck,
Syncing: SyncingDisabled,
Retrieval: RetrievalDisabled,
SyncUpdateDelay: 0,
}, nil)
bucket.Store(bucketKeyRegistry, r)

fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
bucket.Store(bucketKeyFileStore, fileStore)
cleanup = func() {
r.Close()
clean()
}

return r, cleanup, nil

},
})
defer sim.Close()
Expand Down
28 changes: 6 additions & 22 deletions swarm/network/stream/intervals_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"encoding/binary"
"errors"
"fmt"
"os"
"sync"
"sync/atomic"
"testing"
Expand All @@ -31,7 +30,6 @@ import (
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
"github.com/ethereum/go-ethereum/swarm/network"
"github.com/ethereum/go-ethereum/swarm/network/simulation"
"github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage"
Expand Down Expand Up @@ -62,26 +60,11 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
externalStreamMaxKeys := uint64(100)

sim := simulation.New(map[string]simulation.ServiceFunc{
"intervalsStreamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
n := ctx.Config.Node()
addr := network.NewAddr(n)
store, datadir, err := createTestLocalStorageForID(n.ID(), addr)
"intervalsStreamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (node.Service, func(), error) {
addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket)
if err != nil {
return nil, nil, err
}
bucket.Store(bucketKeyStore, store)
cleanup = func() {
store.Close()
os.RemoveAll(datadir)
}
localStore := store.(*storage.LocalStore)
netStore, err := storage.NewNetStore(localStore, nil)
if err != nil {
return nil, nil, err
}
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
delivery := NewDelivery(kad, netStore)
netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New

r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
Retrieval: RetrievalDisabled,
Expand All @@ -97,11 +80,12 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
return newTestExternalServer(t, externalStreamSessionAt, externalStreamMaxKeys, nil), nil
})

fileStore := storage.NewFileStore(localStore, storage.NewFileStoreParams())
bucket.Store(bucketKeyFileStore, fileStore)
cleanup := func() {
r.Close()
clean()
}

return r, cleanup, nil

},
})
defer sim.Close()
Expand Down
52 changes: 16 additions & 36 deletions swarm/network/stream/snapshot_retrieval_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package stream
import (
"context"
"fmt"
"os"
"sync"
"testing"
"time"
Expand All @@ -27,7 +26,6 @@ import (
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
"github.com/ethereum/go-ethereum/swarm/log"
"github.com/ethereum/go-ethereum/swarm/network"
"github.com/ethereum/go-ethereum/swarm/network/simulation"
"github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage"
Expand Down Expand Up @@ -105,43 +103,25 @@ func TestRetrieval(t *testing.T) {
}

var retrievalSimServiceMap = map[string]simulation.ServiceFunc{
"streamer": retrievalStreamerFunc,
}

func retrievalStreamerFunc(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
n := ctx.Config.Node()
addr := network.NewAddr(n)
store, datadir, err := createTestLocalStorageForID(n.ID(), addr)
if err != nil {
return nil, nil, err
}
bucket.Store(bucketKeyStore, store)

localStore := store.(*storage.LocalStore)
netStore, err := storage.NewNetStore(localStore, nil)
if err != nil {
return nil, nil, err
}
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
delivery := NewDelivery(kad, netStore)
netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New

r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
Retrieval: RetrievalEnabled,
Syncing: SyncingAutoSubscribe,
SyncUpdateDelay: 3 * time.Second,
}, nil)
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket)
if err != nil {
return nil, nil, err
}

fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
bucket.Store(bucketKeyFileStore, fileStore)
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
Retrieval: RetrievalEnabled,
Syncing: SyncingAutoSubscribe,
SyncUpdateDelay: 3 * time.Second,
}, nil)

cleanup = func() {
os.RemoveAll(datadir)
netStore.Close()
r.Close()
}
cleanup = func() {
r.Close()
clean()
}

return r, cleanup, nil
return r, cleanup, nil
},
}

/*
Expand Down
51 changes: 18 additions & 33 deletions swarm/network/stream/snapshot_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,42 +107,27 @@ func TestSyncingViaGlobalSync(t *testing.T) {
}

var simServiceMap = map[string]simulation.ServiceFunc{
"streamer": streamerFunc,
}
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
addr, netStore, delivery, clean, err := newNetStoreAndDeliveryWithRequestFunc(ctx, bucket, dummyRequestFromPeers)
if err != nil {
return nil, nil, err
}

func streamerFunc(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
n := ctx.Config.Node()
addr := network.NewAddr(n)
store, datadir, err := createTestLocalStorageForID(n.ID(), addr)
if err != nil {
return nil, nil, err
}
bucket.Store(bucketKeyStore, store)
localStore := store.(*storage.LocalStore)
netStore, err := storage.NewNetStore(localStore, nil)
if err != nil {
return nil, nil, err
}
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
delivery := NewDelivery(kad, netStore)
netStore.NewNetFetcherFunc = network.NewFetcherFactory(dummyRequestFromPeers, true).New

r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
Retrieval: RetrievalDisabled,
Syncing: SyncingAutoSubscribe,
SyncUpdateDelay: 3 * time.Second,
}, nil)

bucket.Store(bucketKeyRegistry, r)

cleanup = func() {
os.RemoveAll(datadir)
netStore.Close()
r.Close()
}
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
Retrieval: RetrievalDisabled,
Syncing: SyncingAutoSubscribe,
SyncUpdateDelay: 3 * time.Second,
}, nil)

return r, cleanup, nil
bucket.Store(bucketKeyRegistry, r)

cleanup = func() {
r.Close()
clean()
}

return r, cleanup, nil
},
}

func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) {
Expand Down
Loading