Skip to content

Commit

Permalink
remove goprocess from mock net and peernet
Browse files Browse the repository at this point in the history
  • Loading branch information
marten-seemann committed Dec 14, 2021
1 parent 980033e commit 2b620cf
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 64 deletions.
3 changes: 2 additions & 1 deletion p2p/net/mock/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
)

type Mocknet interface {

// GenPeer generates a peer and its network.Network in the Mocknet
GenPeer() (host.Host, error)

Expand Down Expand Up @@ -63,6 +62,8 @@ type Mocknet interface {
DisconnectNets(network.Network, network.Network) error
LinkAll() error
ConnectAllButSelf() error

io.Closer
}

// LinkOptions are used to change aspects of the links.
Expand Down
18 changes: 7 additions & 11 deletions p2p/net/mock/mock.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
package mocknet

import (
"context"

logging "github.com/ipfs/go-log/v2"
)

var log = logging.Logger("mocknet")

// WithNPeers constructs a Mocknet with N peers.
func WithNPeers(ctx context.Context, n int) (Mocknet, error) {
m := New(ctx)
func WithNPeers(n int) (Mocknet, error) {
m := New()
for i := 0; i < n; i++ {
if _, err := m.GenPeer(); err != nil {
return nil, err
Expand All @@ -22,30 +20,28 @@ func WithNPeers(ctx context.Context, n int) (Mocknet, error) {
// FullMeshLinked constructs a Mocknet with full mesh of Links.
// This means that all the peers **can** connect to each other
// (not that they already are connected. you can use m.ConnectAll())
func FullMeshLinked(ctx context.Context, n int) (Mocknet, error) {
m, err := WithNPeers(ctx, n)
func FullMeshLinked(n int) (Mocknet, error) {
m, err := WithNPeers(n)
if err != nil {
return nil, err
}

if err := m.LinkAll(); err != nil {
return nil, err
}

return m, nil
}

// FullMeshConnected constructs a Mocknet with full mesh of Connections.
// This means that all the peers have dialed and are ready to talk to
// each other.
func FullMeshConnected(ctx context.Context, n int) (Mocknet, error) {
m, err := FullMeshLinked(ctx, n)
func FullMeshConnected(n int) (Mocknet, error) {
m, err := FullMeshLinked(n)
if err != nil {
return nil, err
}

err = m.ConnectAllButSelf()
if err != nil {
if err := m.ConnectAllButSelf(); err != nil {
return nil, err
}
return m, nil
Expand Down
39 changes: 20 additions & 19 deletions p2p/net/mock/mock_net.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@ import (

bhost "github.com/libp2p/go-libp2p/p2p/host/basic"

"github.com/jbenet/goprocess"
goprocessctx "github.com/jbenet/goprocess/context"

p2putil "github.com/libp2p/go-libp2p-netutil"
"github.com/libp2p/go-libp2p-peerstore/pstoremem"
ma "github.com/multiformats/go-multiaddr"
Expand All @@ -30,7 +27,7 @@ var blackholeIP6 = net.ParseIP("100::")
// mocknet implements mocknet.Mocknet
type mocknet struct {
nets map[peer.ID]*peernet
hosts map[peer.ID]*bhost.BasicHost
hosts map[peer.ID]host.Host

// links make it possible to connect two peers.
// think of links as the physical medium.
Expand All @@ -40,22 +37,30 @@ type mocknet struct {

linkDefaults LinkOptions

proc goprocess.Process // for Context closing
ctx context.Context
ctxCancel context.CancelFunc
ctx context.Context
sync.Mutex
}

func New(ctx context.Context) Mocknet {
proc := goprocessctx.WithContext(ctx)
ctx = goprocessctx.WithProcessClosing(ctx, proc)

return &mocknet{
func New() Mocknet {
mn := &mocknet{
nets: map[peer.ID]*peernet{},
hosts: map[peer.ID]*bhost.BasicHost{},
hosts: map[peer.ID]host.Host{},
links: map[peer.ID]map[peer.ID]map[*link]struct{}{},
proc: proc,
ctx: ctx,
}
mn.ctx, mn.ctxCancel = context.WithCancel(context.Background())
return mn
}

func (mn *mocknet) Close() error {
mn.ctxCancel()
for _, h := range mn.hosts {
h.Close()
}
for _, n := range mn.nets {
n.Close()
}
return nil
}

func (mn *mocknet) GenPeer() (host.Host, error) {
Expand Down Expand Up @@ -104,7 +109,7 @@ func (mn *mocknet) AddPeer(k ic.PrivKey, a ma.Multiaddr) (host.Host, error) {
}

func (mn *mocknet) AddPeerWithPeerstore(p peer.ID, ps peerstore.Peerstore) (host.Host, error) {
n, err := newPeernet(mn.ctx, mn, p, ps)
n, err := newPeernet(mn, p, ps)
if err != nil {
return nil, err
}
Expand All @@ -119,10 +124,6 @@ func (mn *mocknet) AddPeerWithPeerstore(p peer.ID, ps peerstore.Peerstore) (host
return nil, err
}

// Ensure we close the hoset when we close the mock network.
// Otherwise, tests leak memory.
mn.proc.AddChild(goprocess.WithTeardown(h.Close))

mn.Lock()
mn.nets[n.peer] = n
mn.hosts[n.peer] = h
Expand Down
8 changes: 3 additions & 5 deletions p2p/net/mock/mock_notif_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,13 @@ import (

func TestNotifications(t *testing.T) {
const swarmSize = 5
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
const timeout = 10 * time.Second

mn, err := FullMeshLinked(ctx, swarmSize)
mn, err := FullMeshLinked(swarmSize)
if err != nil {
t.Fatal(err)
}

timeout := 10 * time.Second
defer mn.Close()

// signup notifs
nets := mn.Nets()
Expand Down
20 changes: 2 additions & 18 deletions p2p/net/mock/mock_peernet.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"

"github.com/jbenet/goprocess"
goprocessctx "github.com/jbenet/goprocess/context"

ma "github.com/multiformats/go-multiaddr"
)

Expand All @@ -36,12 +33,11 @@ type peernet struct {
notifmu sync.Mutex
notifs map[network.Notifiee]struct{}

proc goprocess.Process
sync.RWMutex
}

// newPeernet constructs a new peernet
func newPeernet(ctx context.Context, m *mocknet, p peer.ID, ps peerstore.Peerstore) (*peernet, error) {
func newPeernet(m *mocknet, p peer.ID, ps peerstore.Peerstore) (*peernet, error) {
n := &peernet{
mocknet: m,
peer: p,
Expand All @@ -53,12 +49,10 @@ func newPeernet(ctx context.Context, m *mocknet, p peer.ID, ps peerstore.Peersto
notifs: make(map[network.Notifiee]struct{}),
}

n.proc = goprocessctx.WithContextAndTeardown(ctx, n.teardown)
return n, nil
}

func (pn *peernet) teardown() error {

func (pn *peernet) Close() error {
// close the connections
for _, c := range pn.allConns() {
c.Close()
Expand All @@ -79,11 +73,6 @@ func (pn *peernet) allConns() []*conn {
return cs
}

// Close calls the ContextCloser func
func (pn *peernet) Close() error {
return pn.proc.Close()
}

func (pn *peernet) Peerstore() peerstore.Peerstore {
return pn.ps
}
Expand Down Expand Up @@ -226,11 +215,6 @@ func (pn *peernet) removeConn(c *conn) {
delete(cs, c)
}

// Process returns the network's Process
func (pn *peernet) Process() goprocess.Process {
return pn.proc
}

// LocalPeer the network's LocalPeer
func (pn *peernet) LocalPeer() peer.ID {
return pn.peer
Expand Down
24 changes: 14 additions & 10 deletions p2p/net/mock/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ func TestNetworkSetup(t *testing.T) {
id1 := tnet.RandIdentityOrFatal(t)
id2 := tnet.RandIdentityOrFatal(t)
id3 := tnet.RandIdentityOrFatal(t)
mn := New(ctx)
// peers := []peer.ID{p1, p2, p3}
mn := New()
defer mn.Close()

// add peers to mock net

Expand Down Expand Up @@ -267,10 +267,11 @@ func TestNetworkSetup(t *testing.T) {
func TestStreams(t *testing.T) {
ctx := context.Background()

mn, err := FullMeshConnected(context.Background(), 3)
mn, err := FullMeshConnected(3)
if err != nil {
t.Fatal(err)
}
defer mn.Close()

handler := func(s network.Stream) {
b := make([]byte, 4)
Expand Down Expand Up @@ -362,10 +363,11 @@ func TestStreamsStress(t *testing.T) {
nnodes = 30
}

mn, err := FullMeshConnected(context.Background(), nnodes)
mn, err := FullMeshConnected(nnodes)
if err != nil {
t.Fatal(err)
}
defer mn.Close()

errs := make(chan error)

Expand Down Expand Up @@ -411,7 +413,8 @@ func TestStreamsStress(t *testing.T) {
}

func TestAdding(t *testing.T) {
mn := New(context.Background())
mn := New()
defer mn.Close()

var peers []peer.ID
for i := 0; i < 3; i++ {
Expand Down Expand Up @@ -534,10 +537,11 @@ func within(t1 time.Duration, t2 time.Duration, tolerance time.Duration) bool {
}

func TestLimitedStreams(t *testing.T) {
mn, err := FullMeshConnected(context.Background(), 2)
mn, err := FullMeshConnected(2)
if err != nil {
t.Fatal(err)
}
defer mn.Close()

var wg sync.WaitGroup
messages := 4
Expand Down Expand Up @@ -598,22 +602,22 @@ func TestFuzzManyPeers(t *testing.T) {
peerCount = 100
}
for i := 0; i < peerCount; i++ {
ctx, cancel := context.WithCancel(context.Background())
_, err := FullMeshConnected(ctx, 2)
cancel()
mn, err := FullMeshConnected(2)
if err != nil {
t.Fatal(err)
}
mn.Close()
}
}

func TestStreamsWithLatency(t *testing.T) {
latency := time.Millisecond * 500

mn, err := WithNPeers(context.Background(), 2)
mn, err := WithNPeers(2)
if err != nil {
t.Fatal(err)
}
defer mn.Close()

// configure the Mocknet with some latency and link/connect its peers
mn.SetLinkDefaults(LinkOptions{Latency: latency})
Expand Down

0 comments on commit 2b620cf

Please sign in to comment.