Skip to content

Commit

Permalink
Feat/refactor transport protocol update part 2 (#338)
Browse files Browse the repository at this point in the history
* feat(network): transport versioning and detection

Support multiple transports on the libp2p protocol, via different protocol naming, and using libp2p
to do protocol negotiation

* Update transport/helpers/network/libp2p_impl.go

Co-authored-by: Rod Vagg <rod@vagg.org>

* Update transport/helpers/network/libp2p_impl.go

Co-authored-by: Rod Vagg <rod@vagg.org>

* fix(network): add versions check for legacy transport

Co-authored-by: Rod Vagg <rod@vagg.org>
  • Loading branch information
hannahhoward and rvagg committed Jun 25, 2022
1 parent b374b08 commit c5f91ae
Show file tree
Hide file tree
Showing 18 changed files with 305 additions and 147 deletions.
1 change: 0 additions & 1 deletion impl/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,3 @@ func (m *manager) cancelMessage(chid datatransfer.ChannelID) datatransfer.Messag
}
return message.CancelResponse(chid.ID)
}

4 changes: 2 additions & 2 deletions itest/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1779,7 +1779,7 @@ func TestRespondingToPushGraphsyncRequests(t *testing.T) {
r := &receiver{
messageReceived: make(chan receivedMessage),
}
dtnet2.SetDelegate("graphsync", r)
dtnet2.SetDelegate(datatransfer.LegacyTransportID, []datatransfer.Version{datatransfer.LegacyTransportVersion}, r)

gsr := &fakeGraphSyncReceiver{
receivedMessages: make(chan receivedGraphSyncMessage),
Expand Down Expand Up @@ -1857,7 +1857,7 @@ func TestResponseHookWhenExtensionNotFound(t *testing.T) {
r := &receiver{
messageReceived: make(chan receivedMessage),
}
dtnet2.SetDelegate("graphsync", r)
dtnet2.SetDelegate(datatransfer.LegacyTransportID, []datatransfer.Version{datatransfer.LegacyTransportVersion}, r)

gsr := &fakeGraphSyncReceiver{
receivedMessages: make(chan receivedGraphSyncMessage),
Expand Down
30 changes: 19 additions & 11 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,41 +11,41 @@ import (
"github.com/ipld/go-ipld-prime/datamodel"
)

type MessageVersion struct {
type Version struct {
Major uint64
Minor uint64
Patch uint64
}

func (mv MessageVersion) String() string {
func (mv Version) String() string {
return fmt.Sprintf("%d.%d.%d", mv.Major, mv.Minor, mv.Patch)
}

// MessageVersionFromString parses a string into a message version
func MessageVersionFromString(versionString string) (MessageVersion, error) {
func MessageVersionFromString(versionString string) (Version, error) {
versions := strings.Split(versionString, ".")
if len(versions) != 3 {
return MessageVersion{}, errors.New("not a version string")
return Version{}, errors.New("not a version string")
}
major, err := strconv.ParseUint(versions[0], 10, 0)
if err != nil {
return MessageVersion{}, errors.New("unable to parse major version")
return Version{}, errors.New("unable to parse major version")
}
minor, err := strconv.ParseUint(versions[1], 10, 0)
if err != nil {
return MessageVersion{}, errors.New("unable to parse major version")
return Version{}, errors.New("unable to parse major version")
}
patch, err := strconv.ParseUint(versions[2], 10, 0)
if err != nil {
return MessageVersion{}, errors.New("unable to parse major version")
return Version{}, errors.New("unable to parse major version")
}
return MessageVersion{Major: major, Minor: minor, Patch: patch}, nil
return Version{Major: major, Minor: minor, Patch: patch}, nil
}

var (
// DataTransfer1_2 is the identifier for the current
// supported version of data-transfer
DataTransfer1_2 MessageVersion = MessageVersion{1, 2, 0}
DataTransfer1_2 Version = Version{1, 2, 0}
)

// Message is a message for the data transfer protocol
Expand All @@ -60,8 +60,16 @@ type Message interface {
TransferID() TransferID
ToNet(w io.Writer) error
ToIPLD() datamodel.Node
MessageForVersion(targetProtocol MessageVersion) (newMsg Message, err error)
WrappedForTransport(transportID TransportID) Message
MessageForVersion(targetProtocol Version) (newMsg Message, err error)
Version() Version
WrappedForTransport(transportID TransportID, transportVersion Version) TransportedMessage
}

// TransportedMessage is a message that can also report how it was transported
type TransportedMessage interface {
Message
TransportID() TransportID
TransportVersion() Version
}

// Request is a response message for the data transfer protocol
Expand Down
27 changes: 23 additions & 4 deletions message/message1_1prime/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,15 +212,34 @@ func fromMessage(tresp *TransferMessage1_1) (datatransfer.Message, error) {
return tresp.Response, nil
}

func fromWrappedMessage(wtresp *WrappedTransferMessage1_1) (datatransfer.TransportedMessage, error) {
tresp := wtresp.Message
if (tresp.IsRequest && tresp.Request == nil) || (!tresp.IsRequest && tresp.Response == nil) {
return nil, xerrors.Errorf("invalid/malformed message")
}

if tresp.IsRequest {
return &WrappedTransferRequest1_1{
tresp.Request,
wtresp.TransportVersion,
wtresp.TransportID,
}, nil
}
return &WrappedTransferResponse1_1{
tresp.Response,
wtresp.TransportID,
wtresp.TransportVersion,
}, nil
}

// FromNetWrraped can read a network stream to deserialize a message + transport ID
func FromNetWrapped(r io.Reader) (datatransfer.TransportID, datatransfer.Message, error) {
func FromNetWrapped(r io.Reader) (datatransfer.TransportedMessage, error) {
tm, err := bindnodeRegistry.TypeFromReader(r, &WrappedTransferMessage1_1{}, dagcbor.Decode)
if err != nil {
return "", nil, err
return nil, err
}
wtresp := tm.(*WrappedTransferMessage1_1)
msg, err := fromMessage(&wtresp.Message)
return datatransfer.TransportID(wtresp.TransportID), msg, err
return fromWrappedMessage(wtresp)
}

// FromNet can read a network stream to deserialize a GraphSyncMessage
Expand Down
22 changes: 13 additions & 9 deletions message/message1_1prime/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,7 @@ func TestToNetFromNetEquivalency(t *testing.T) {
})
t.Run("round-trip with wrapping", func(t *testing.T) {
transportID := datatransfer.TransportID("applesauce")
transportVersion := datatransfer.Version{Major: 1, Minor: 5, Patch: 0}
baseCid := testutil.GenerateCids(1)[0]
selector := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any).Matcher().Node()
isPull := false
Expand All @@ -519,15 +520,16 @@ func TestToNetFromNetEquivalency(t *testing.T) {
voucherResult := testutil.NewTestTypedVoucher()
request, err := message1_1.NewRequest(id, false, isPull, &voucher, baseCid, selector)
require.NoError(t, err)
wrequest := request.WrappedForTransport(transportID)
wrequest := request.WrappedForTransport(transportID, transportVersion)
buf := new(bytes.Buffer)
err = wrequest.ToNet(buf)
require.NoError(t, err)
require.Greater(t, buf.Len(), 0)
receivedTransportID, deserialized, err := message1_1.FromNetWrapped(buf)
deserialized, err := message1_1.FromNetWrapped(buf)
require.NoError(t, err)

require.Equal(t, transportID, receivedTransportID)
require.Equal(t, transportID, deserialized.TransportID())
require.Equal(t, transportVersion, deserialized.TransportVersion())
deserializedRequest, ok := deserialized.(datatransfer.Request)
require.True(t, ok)

Expand All @@ -541,12 +543,13 @@ func TestToNetFromNetEquivalency(t *testing.T) {

response, err := message1_1.NewResponse(id, accepted, false, &voucherResult)
require.NoError(t, err)
wresponse := response.WrappedForTransport(transportID)
wresponse := response.WrappedForTransport(transportID, transportVersion)
err = wresponse.ToNet(buf)
require.NoError(t, err)
receivedTransportID, deserialized, err = message1_1.FromNetWrapped(buf)
deserialized, err = message1_1.FromNetWrapped(buf)
require.NoError(t, err)
require.Equal(t, transportID, receivedTransportID)
require.Equal(t, transportID, deserialized.TransportID())
require.Equal(t, transportVersion, deserialized.TransportVersion())

deserializedResponse, ok := deserialized.(datatransfer.Response)
require.True(t, ok)
Expand All @@ -559,12 +562,13 @@ func TestToNetFromNetEquivalency(t *testing.T) {
testutil.AssertEqualTestVoucherResult(t, response, deserializedResponse)

request = message1_1.CancelRequest(id)
wrequest = request.WrappedForTransport(transportID)
wrequest = request.WrappedForTransport(transportID, transportVersion)
err = wrequest.ToNet(buf)
require.NoError(t, err)
receivedTransportID, deserialized, err = message1_1.FromNetWrapped(buf)
deserialized, err = message1_1.FromNetWrapped(buf)
require.NoError(t, err)
require.Equal(t, transportID, receivedTransportID)
require.Equal(t, transportID, deserialized.TransportID())
require.Equal(t, transportVersion, deserialized.TransportVersion())

deserializedRequest, ok = deserialized.(datatransfer.Request)
require.True(t, ok)
Expand Down
7 changes: 7 additions & 0 deletions message/message1_1prime/schema.ipldsch
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,14 @@ type TransferMessage1_1 struct {
Response nullable TransferResponse
}

type Version struct {
Major Int
Minor Int
Patch Int
} representation tuple

type WrappedTransferMessage1_1 struct {
TransportID TransportID (rename "ID")
TransportVersion Version (rename "TV")
Message TransferMessage1_1 (rename "Msg")
}
5 changes: 3 additions & 2 deletions message/message1_1prime/transfer_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@ func init() {
}

type WrappedTransferMessage1_1 struct {
TransportID string
Message TransferMessage1_1
TransportID string
TransportVersion datatransfer.Version
Message TransferMessage1_1
}

func (wtm *WrappedTransferMessage1_1) BindnodeSchema() string {
Expand Down
32 changes: 25 additions & 7 deletions message/message1_1prime/transfer_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type TransferRequest1_1 struct {
RestartChannel datatransfer.ChannelID
}

func (trq *TransferRequest1_1) MessageForVersion(version datatransfer.MessageVersion) (datatransfer.Message, error) {
func (trq *TransferRequest1_1) MessageForVersion(version datatransfer.Version) (datatransfer.Message, error) {
switch version {
case datatransfer.DataTransfer1_2:
return trq, nil
Expand All @@ -38,8 +38,16 @@ func (trq *TransferRequest1_1) MessageForVersion(version datatransfer.MessageVer
}
}

func (trq *TransferRequest1_1) WrappedForTransport(transportID datatransfer.TransportID) datatransfer.Message {
return &WrappedTransferRequest1_1{trq, string(transportID)}
func (trq *TransferRequest1_1) Version() datatransfer.Version {
return datatransfer.DataTransfer1_2
}

func (trq *TransferRequest1_1) WrappedForTransport(transportID datatransfer.TransportID, transportVersion datatransfer.Version) datatransfer.TransportedMessage {
return &WrappedTransferRequest1_1{
TransferRequest1_1: trq,
transportID: string(transportID),
transportVersion: transportVersion,
}
}

// IsRequest always returns true in this case because this is a transfer request
Expand Down Expand Up @@ -164,15 +172,25 @@ func (trq *TransferRequest1_1) ToNet(w io.Writer) error {
// transport id
type WrappedTransferRequest1_1 struct {
*TransferRequest1_1
TransportID string
transportVersion datatransfer.Version
transportID string
}

func (trq *WrappedTransferRequest1_1) TransportID() datatransfer.TransportID {
return datatransfer.TransportID(trq.transportID)
}

func (trq *WrappedTransferRequest1_1) TransportVersion() datatransfer.Version {
return trq.transportVersion
}

func (trsp *WrappedTransferRequest1_1) toIPLD() schema.TypedNode {
func (trq *WrappedTransferRequest1_1) toIPLD() schema.TypedNode {
msg := WrappedTransferMessage1_1{
TransportID: trsp.TransportID,
TransportID: trq.transportID,
TransportVersion: trq.transportVersion,
Message: TransferMessage1_1{
IsRequest: true,
Request: trsp.TransferRequest1_1,
Request: trq.TransferRequest1_1,
Response: nil,
},
}
Expand Down
10 changes: 4 additions & 6 deletions message/message1_1prime/transfer_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,12 @@ func TestRequestMessageForVersion(t *testing.T) {
require.Equal(t, selector, n)
require.Equal(t, testutil.TestVoucherType, req.VoucherType())

wrappedOut12 := out12.WrappedForTransport(datatransfer.LegacyTransportID)
require.Equal(t, &message1_1.WrappedTransferRequest1_1{
TransferRequest1_1: request.(*message1_1.TransferRequest1_1),
TransportID: string(datatransfer.LegacyTransportID),
}, wrappedOut12)
wrappedOut12 := out12.WrappedForTransport(datatransfer.LegacyTransportID, datatransfer.LegacyTransportVersion)
require.Equal(t, datatransfer.LegacyTransportID, wrappedOut12.TransportID())
require.Equal(t, datatransfer.LegacyTransportVersion, wrappedOut12.TransportVersion())

// random protocol should fail
_, err = request.MessageForVersion(datatransfer.MessageVersion{
_, err = request.MessageForVersion(datatransfer.Version{
Major: rand.Uint64(),
Minor: rand.Uint64(),
Patch: rand.Uint64(),
Expand Down
27 changes: 22 additions & 5 deletions message/message1_1prime/transfer_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (trsp *TransferResponse1_1) EmptyVoucherResult() bool {
return trsp.VoucherTypeIdentifier == datatransfer.EmptyTypeIdentifier
}

func (trsp *TransferResponse1_1) MessageForVersion(version datatransfer.MessageVersion) (datatransfer.Message, error) {
func (trsp *TransferResponse1_1) MessageForVersion(version datatransfer.Version) (datatransfer.Message, error) {
switch version {
case datatransfer.DataTransfer1_2:
return trsp, nil
Expand All @@ -96,8 +96,16 @@ func (trsp *TransferResponse1_1) MessageForVersion(version datatransfer.MessageV
}
}

func (trsp *TransferResponse1_1) WrappedForTransport(transportID datatransfer.TransportID) datatransfer.Message {
return &WrappedTransferResponse1_1{trsp, string(transportID)}
func (trsp *TransferResponse1_1) Version() datatransfer.Version {
return datatransfer.DataTransfer1_2
}

func (trsp *TransferResponse1_1) WrappedForTransport(transportID datatransfer.TransportID, transportVersion datatransfer.Version) datatransfer.TransportedMessage {
return &WrappedTransferResponse1_1{
TransferResponse1_1: trsp,
transportID: string(transportID),
transportVersion: transportVersion,
}
}
func (trsp *TransferResponse1_1) toIPLD() schema.TypedNode {
msg := TransferMessage1_1{
Expand All @@ -121,12 +129,21 @@ func (trsp *TransferResponse1_1) ToNet(w io.Writer) error {
// transport id
type WrappedTransferResponse1_1 struct {
*TransferResponse1_1
TransportID string
transportID string
transportVersion datatransfer.Version
}

func (trsp *WrappedTransferResponse1_1) TransportID() datatransfer.TransportID {
return datatransfer.TransportID(trsp.transportID)
}
func (trsp *WrappedTransferResponse1_1) TransportVersion() datatransfer.Version {
return trsp.transportVersion
}

func (trsp *WrappedTransferResponse1_1) toIPLD() schema.TypedNode {
msg := WrappedTransferMessage1_1{
TransportID: trsp.TransportID,
TransportID: trsp.transportID,
TransportVersion: trsp.transportVersion,
Message: TransferMessage1_1{
IsRequest: false,
Request: nil,
Expand Down
10 changes: 4 additions & 6 deletions message/message1_1prime/transfer_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,12 @@ func TestResponseMessageForVersion(t *testing.T) {
require.Equal(t, testutil.TestVoucherType, resp.VoucherResultType())
require.True(t, resp.IsValidationResult())

wrappedOut := out.WrappedForTransport(datatransfer.LegacyTransportID)
require.Equal(t, &message1_1.WrappedTransferResponse1_1{
TransferResponse1_1: response.(*message1_1.TransferResponse1_1),
TransportID: string(datatransfer.LegacyTransportID),
}, wrappedOut)
wrappedOut := out.WrappedForTransport(datatransfer.LegacyTransportID, datatransfer.LegacyTransportVersion)
require.Equal(t, datatransfer.LegacyTransportID, wrappedOut.TransportID())
require.Equal(t, datatransfer.LegacyTransportVersion, wrappedOut.TransportVersion())

// random protocol should fail
_, err = response.MessageForVersion(datatransfer.MessageVersion{
_, err = response.MessageForVersion(datatransfer.Version{
Major: rand.Uint64(),
Minor: rand.Uint64(),
Patch: rand.Uint64(),
Expand Down
5 changes: 5 additions & 0 deletions testutil/faketransport.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ func (ft *FakeTransport) ID() datatransfer.TransportID {
return "fake"
}

// Versions indicates what versions of this transport are supported
func (ft *FakeTransport) Versions() []datatransfer.Version {
return []datatransfer.Version{{Major: 1, Minor: 1, Patch: 0}}
}

// Capabilities tells datatransfer what kinds of capabilities this transport supports
func (ft *FakeTransport) Capabilities() datatransfer.TransportCapabilities {
return datatransfer.TransportCapabilities{
Expand Down
Loading

0 comments on commit c5f91ae

Please sign in to comment.