Skip to content
This repository has been archived by the owner on Dec 7, 2019. It is now read-only.

Commit

Permalink
move swarm methods to swarm.go
Browse files Browse the repository at this point in the history
  • Loading branch information
marten-seemann committed Apr 18, 2017
1 parent 36aaf85 commit 5b8e080
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 96 deletions.
95 changes: 0 additions & 95 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"sync"

smux "github.com/jbenet/go-stream-muxer"
iconn "github.com/libp2p/go-libp2p-interface-conn"
)

Expand Down Expand Up @@ -175,97 +174,3 @@ func ConnInConns(c1 *Conn, conns []*Conn) bool {
}
return false
}

// ------------------------------------------------------------------
// All the connection setup logic here, in one place.
// these are mostly *Swarm methods, but i wanted a less-crowded place
// for them.
// ------------------------------------------------------------------

// addConn is the internal version of AddConn
// TODO: there's no need to have an internal version anymore. Move to AddConn
func (s *Swarm) addConn(conn iconn.Conn) (*Conn, error) {
if conn == nil {
return nil, errors.New("nil conn")
}

// first, check if we already have it, to avoid constructing it
// if it is already there
s.connLock.Lock()
for c := range s.conns {
if c.conn == conn {
s.connLock.Unlock()
return c, nil
}
}
s.connLock.Unlock()

c := newConn(conn, s)
s.conns[c] = struct{}{}

s.ConnHandler()(c)

// go listen for incoming streams on this connection
go c.conn.Serve(func(ss smux.Stream) {
stream := s.setupStream(ss, c)
s.StreamHandler()(stream) // call our handler
})

s.notifyAll(func(n Notifiee) {
n.Connected(c)
})
return c, nil
}

// createStream is the internal function that creates a new stream. assumes
// all validation has happened.
func (s *Swarm) createStream(c *Conn) (*Stream, error) {
smuxStream, err := c.conn.OpenStream()
if err != nil {
return nil, err
}

return s.setupStream(smuxStream, c), nil
}

// newStream is the internal function that creates a new stream. assumes
// all validation has happened.
func (s *Swarm) setupStream(smuxStream smux.Stream, c *Conn) *Stream {
stream := newStream(smuxStream, c)

// add it to our streams maps
s.streamLock.Lock()
c.streamLock.Lock()
s.streams[stream] = struct{}{}
c.streams[stream] = struct{}{}
s.streamLock.Unlock()
c.streamLock.Unlock()

s.notifyAll(func(n Notifiee) {
n.OpenedStream(stream)
})
return stream
}

func (s *Swarm) removeStream(stream *Stream) error {
// remove from our maps
s.streamLock.Lock()
stream.conn.streamLock.Lock()
delete(s.streams, stream)
delete(stream.conn.streams, stream)
s.streamLock.Unlock()
stream.conn.streamLock.Unlock()

err := stream.smuxStream.Close()
s.notifyAll(func(n Notifiee) {
n.ClosedStream(stream)
})
return err
}

func (s *Swarm) removeConn(conn *Conn) {
// remove from our maps
s.connLock.Lock()
delete(s.conns, conn)
s.connLock.Unlock()
}
85 changes: 84 additions & 1 deletion swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync"
"time"

smux "github.com/jbenet/go-stream-muxer"
iconn "github.com/libp2p/go-libp2p-interface-conn"
)

Expand Down Expand Up @@ -223,7 +224,89 @@ func (s *Swarm) AddListener(l iconn.Listener) (*Listener, error) {
// Returns the resulting Swarm-associated peerstream.Conn.
// Idempotent: if the Connection has already been added, this is a no-op.
func (s *Swarm) AddConn(conn iconn.Conn) (*Conn, error) {
return s.addConn(conn)
if conn == nil {
return nil, errors.New("nil conn")
}

// first, check if we already have it, to avoid constructing it
// if it is already there
s.connLock.Lock()
for c := range s.conns {
if c.conn == conn {
s.connLock.Unlock()
return c, nil
}
}
s.connLock.Unlock()

c := newConn(conn, s)
s.conns[c] = struct{}{}

s.ConnHandler()(c)

// go listen for incoming streams on this connection
go c.conn.Serve(func(ss smux.Stream) {
stream := s.setupStream(ss, c)
s.StreamHandler()(stream) // call our handler
})

s.notifyAll(func(n Notifiee) {
n.Connected(c)
})
return c, nil
}

// newStream is the internal function that creates a new stream. assumes
// all validation has happened.
func (s *Swarm) setupStream(smuxStream smux.Stream, c *Conn) *Stream {
stream := newStream(smuxStream, c)

// add it to our streams maps
s.streamLock.Lock()
c.streamLock.Lock()
s.streams[stream] = struct{}{}
c.streams[stream] = struct{}{}
s.streamLock.Unlock()
c.streamLock.Unlock()

s.notifyAll(func(n Notifiee) {
n.OpenedStream(stream)
})
return stream
}

// createStream is the internal function that creates a new stream. assumes
// all validation has happened.
func (s *Swarm) createStream(c *Conn) (*Stream, error) {
smuxStream, err := c.conn.OpenStream()
if err != nil {
return nil, err
}

return s.setupStream(smuxStream, c), nil
}

func (s *Swarm) removeStream(stream *Stream) error {
// remove from our maps
s.streamLock.Lock()
stream.conn.streamLock.Lock()
delete(s.streams, stream)
delete(stream.conn.streams, stream)
s.streamLock.Unlock()
stream.conn.streamLock.Unlock()

err := stream.smuxStream.Close()
s.notifyAll(func(n Notifiee) {
n.ClosedStream(stream)
})
return err
}

func (s *Swarm) removeConn(conn *Conn) {
// remove from our maps
s.connLock.Lock()
delete(s.conns, conn)
s.connLock.Unlock()
}

// NewStream opens a new Stream on the best available connection,
Expand Down

0 comments on commit 5b8e080

Please sign in to comment.