Skip to content

Commit

Permalink
feat(network): first version of GraphSyncNetwork
Browse files Browse the repository at this point in the history
setup GraphSyncNetwork interfaces and provide an implementation based on libp2p.host

fix #1
  • Loading branch information
hannahhoward committed Feb 7, 2019
1 parent 2c3e40d commit 54127be
Show file tree
Hide file tree
Showing 6 changed files with 356 additions and 1 deletion.
Binary file modified docs/GraphSync.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
9 changes: 8 additions & 1 deletion docs/GraphSync.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
7 changes: 7 additions & 0 deletions docs/go-graphsync.puml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ package "go-graphsync" {
interface GraphSyncNetwork {
SendMessage(ctx context.Context, receiver peer.Id, m GraphSyncMessage)
SetDelegate(receiver Receiver)
NewMessageSender(context.Context, peer.ID) (MessageSender, error)
}

interface MessageSender {
SendMsg(context.Context, GraphSyncMessage) error
Close() error
Reset() error
}

Receiver <|-- GraphSync : receiver for
Expand Down
48 changes: 48 additions & 0 deletions network/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package network

import (
"context"

gsmsg "github.com/ipfs/go-graphsync/message"

peer "github.com/libp2p/go-libp2p-peer"
protocol "github.com/libp2p/go-libp2p-protocol"
)

var (
// ProtocolGraphsync is the protocol identifier for graphsync messages
ProtocolGraphsync protocol.ID = "/ipfs/graphsync/1.0.0"
)

// GraphSyncNetwork provides network connectivity for GraphSync.
type GraphSyncNetwork interface {

// SendMessage sends a GraphSync message to a peer.
SendMessage(
context.Context,
peer.ID,
gsmsg.GraphSyncMessage) error

// SetDelegate registers the Reciver to handle messages received from the
// network.
SetDelegate(Receiver)

NewMessageSender(context.Context, peer.ID) (MessageSender, error)
}

// MessageSender is an interface to send messages to a peer
type MessageSender interface {
SendMsg(context.Context, gsmsg.GraphSyncMessage) error
Close() error
Reset() error
}

// Receiver is an interface for receiving messages from the GraphSyncNetwork.
type Receiver interface {
ReceiveMessage(
ctx context.Context,
sender peer.ID,
incoming gsmsg.GraphSyncMessage)

ReceiveError(error)
}
161 changes: 161 additions & 0 deletions network/libp2p_impl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package network

import (
"bufio"
"context"
"fmt"
"io"
"time"

gsmsg "github.com/ipfs/go-graphsync/message"

ggio "github.com/gogo/protobuf/io"
logging "github.com/ipfs/go-log"
host "github.com/libp2p/go-libp2p-host"
inet "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer"
)

var log = logging.Logger("graphsync_network")

var sendMessageTimeout = time.Minute * 10

// NewFromLibp2pHost returns a GraphSyncNetwork supported by underlying Libp2p host.
func NewFromLibp2pHost(host host.Host,
decodeSelectorFunc gsmsg.DecodeSelectorFunc,
decodeSelectionResponseFunc gsmsg.DecodeSelectionResponseFunc) GraphSyncNetwork {
graphSyncNetwork := libp2pGraphSyncNetwork{
host: host,
decodeSelectorFunc: decodeSelectorFunc,
decodeSelectionResponseFunc: decodeSelectionResponseFunc,
}
host.SetStreamHandler(ProtocolGraphsync, graphSyncNetwork.handleNewStream)

return &graphSyncNetwork
}

// libp2pGraphSyncNetwork transforms the libp2p host interface, which sends and receives
// NetMessage objects, into the graphsync network interface.
type libp2pGraphSyncNetwork struct {
host host.Host
decodeSelectionResponseFunc gsmsg.DecodeSelectionResponseFunc
decodeSelectorFunc gsmsg.DecodeSelectorFunc
// inbound messages from the network are forwarded to the receiver
receiver Receiver
}

type streamMessageSender struct {
s inet.Stream
}

func (s *streamMessageSender) Close() error {
return inet.FullClose(s.s)
}

func (s *streamMessageSender) Reset() error {
return s.s.Reset()
}

func (s *streamMessageSender) SendMsg(ctx context.Context, msg gsmsg.GraphSyncMessage) error {
return msgToStream(ctx, s.s, msg)
}

func msgToStream(ctx context.Context, s inet.Stream, msg gsmsg.GraphSyncMessage) error {
deadline := time.Now().Add(sendMessageTimeout)
if dl, ok := ctx.Deadline(); ok {
deadline = dl
}
if err := s.SetWriteDeadline(deadline); err != nil {
log.Warningf("error setting deadline: %s", err)
}

w := bufio.NewWriter(s)

switch s.Protocol() {
case ProtocolGraphsync:
if err := msg.ToNet(w); err != nil {
log.Debugf("error: %s", err)
return err
}
default:
return fmt.Errorf("unrecognized protocol on remote: %s", s.Protocol())
}

if err := w.Flush(); err != nil {
log.Debugf("error: %s", err)
return err
}

if err := s.SetWriteDeadline(time.Time{}); err != nil {
log.Warningf("error resetting deadline: %s", err)
}
return nil
}

func (gsnet *libp2pGraphSyncNetwork) NewMessageSender(ctx context.Context, p peer.ID) (MessageSender, error) {
s, err := gsnet.newStreamToPeer(ctx, p)
if err != nil {
return nil, err
}

return &streamMessageSender{s: s}, nil
}

func (gsnet *libp2pGraphSyncNetwork) newStreamToPeer(ctx context.Context, p peer.ID) (inet.Stream, error) {
return gsnet.host.NewStream(ctx, p, ProtocolGraphsync)
}

func (gsnet *libp2pGraphSyncNetwork) SendMessage(
ctx context.Context,
p peer.ID,
outgoing gsmsg.GraphSyncMessage) error {

s, err := gsnet.newStreamToPeer(ctx, p)
if err != nil {
return err
}

if err = msgToStream(ctx, s, outgoing); err != nil {
s.Reset()
return err
}

// TODO(https://github.com/libp2p/go-libp2p-net/issues/28): Avoid this goroutine.
go inet.AwaitEOF(s)
return s.Close()

}

func (gsnet *libp2pGraphSyncNetwork) SetDelegate(r Receiver) {
gsnet.receiver = r
}

// handleNewStream receives a new stream from the network.
func (gsnet *libp2pGraphSyncNetwork) handleNewStream(s inet.Stream) {
defer s.Close()

if gsnet.receiver == nil {
s.Reset()
return
}

reader := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
for {
received, err := gsmsg.FromPBReader(reader,
gsnet.decodeSelectorFunc,
gsnet.decodeSelectionResponseFunc)
if err != nil {
if err != io.EOF {
s.Reset()
go gsnet.receiver.ReceiveError(err)
log.Debugf("graphsync net handleNewStream from %s error: %s", s.Conn().RemotePeer(), err)
}
return
}

p := s.Conn().RemotePeer()
ctx := context.Background()
log.Debugf("graphsync net handleNewStream from %s", s.Conn().RemotePeer())
gsnet.receiver.ReceiveMessage(ctx, p, received)
}
}
132 changes: 132 additions & 0 deletions network/libp2p_impl_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package network

import (
"context"
"math/rand"
"reflect"
"testing"
"time"

gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/testselector"
"github.com/libp2p/go-libp2p-peer"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
)

// Receiver is an interface for receiving messages from the GraphSyncNetwork.
type receiver struct {
messageReceived chan struct{}
lastMessage gsmsg.GraphSyncMessage
lastSender peer.ID
}

func (r *receiver) ReceiveMessage(
ctx context.Context,
sender peer.ID,
incoming gsmsg.GraphSyncMessage) {
r.lastSender = sender
r.lastMessage = incoming
select {
case <-ctx.Done():
case r.messageReceived <- struct{}{}:
}
}

func (r *receiver) ReceiveError(err error) {
}

func TestMessageSendAndReceive(t *testing.T) {
// create network
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
mn := mocknet.New(ctx)

host1, err := mn.GenPeer()
if err != nil {
t.Fatal("error generating host")
}
host2, err := mn.GenPeer()
if err != nil {
t.Fatal("error generating host")
}
err = mn.LinkAll()
if err != nil {
t.Fatal("error linking hosts")
}
_, err = mn.ConnectPeers(host1.ID(), host2.ID())
if err != nil {
t.Fatal("error linking peers")
}
gsnet1 := NewFromLibp2pHost(host1,
testselector.MockDecodeSelectorFunc,
testselector.MockDecodeSelectionResponseFunc)
gsnet2 := NewFromLibp2pHost(host2,
testselector.MockDecodeSelectorFunc,
testselector.MockDecodeSelectionResponseFunc)
r := &receiver{
messageReceived: make(chan struct{}),
}
gsnet1.SetDelegate(r)
gsnet2.SetDelegate(r)

selector := testselector.GenerateSelector()
root := testselector.GenerateRootCid()
selectionResponse := testselector.GenerateSelectionResponse()
id := gsmsg.GraphSyncRequestID(rand.Int31())
priority := gsmsg.GraphSyncPriority(rand.Int31())
status := gsmsg.RequestAcknowledged

sent := gsmsg.New()
sent.AddRequest(id, selector, root, priority)
sent.AddResponse(id, status, selectionResponse)

gsnet1.SendMessage(ctx, host2.ID(), sent)

select {
case <-ctx.Done():
t.Fatal("did not receive message sent")
case <-r.messageReceived:
}

sender := r.lastSender
if sender != host1.ID() {
t.Fatal("received message from wrong node")
}

received := r.lastMessage

sentRequests := sent.Requests()
if len(sentRequests) != 1 {
t.Fatal("Did not add request to sent message")
}
sentRequest := sentRequests[0]
receivedRequests := received.Requests()
if len(receivedRequests) != 1 {
t.Fatal("Did not add request to received message")
}
receivedRequest := receivedRequests[0]
if receivedRequest.ID() != sentRequest.ID() ||
receivedRequest.IsCancel() != sentRequest.IsCancel() ||
receivedRequest.Priority() != sentRequest.Priority() ||
!reflect.DeepEqual(receivedRequest.Root(), sentRequest.Root()) ||
!reflect.DeepEqual(receivedRequest.Selector(), sentRequest.Selector()) {
t.Fatal("Sent message requests did not match received message requests")
}

sentResponses := sent.Responses()
if len(sentResponses) != 1 {
t.Fatal("Did not add response to sent message")
}
sentResponse := sentResponses[0]
receivedResponses := received.Responses()
if len(receivedResponses) != 1 {
t.Fatal("Did not add response to received message")
}
receivedResponse := receivedResponses[0]
if receivedResponse.RequestID() != sentResponse.RequestID() ||
receivedResponse.Status() != sentResponse.Status() ||
!reflect.DeepEqual(receivedResponse.Response(), sentResponse.Response()) {
t.Fatal("Sent message responses did not match received message responses")
}
}

0 comments on commit 54127be

Please sign in to comment.