Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use CborGenCompatibleNode #336

Merged
merged 1 commit into from
Jun 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 16 additions & 45 deletions channels/channel_state.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,12 @@
package channels

import (
"bytes"

"github.com/ipfs/go-cid"
"github.com/ipld/go-ipld-prime/codec/dagcbor"
"github.com/ipld/go-ipld-prime/datamodel"
basicnode "github.com/ipld/go-ipld-prime/node/basic"
peer "github.com/libp2p/go-libp2p-core/peer"

datatransfer "github.com/filecoin-project/go-data-transfer/v2"
"github.com/filecoin-project/go-data-transfer/v2/channels/internal"
ipldutils "github.com/filecoin-project/go-data-transfer/v2/ipldutils"
)

// channelState is immutable channel data plus mutable state
Expand Down Expand Up @@ -43,25 +38,16 @@ func (c channelState) BaseCID() cid.Cid { return c.ic.BaseCid }
// Selector returns the IPLD selector for this data transfer (represented as
// an IPLD node)
func (c channelState) Selector() datamodel.Node {
builder := basicnode.Prototype.Any.NewBuilder()
reader := bytes.NewReader(c.ic.Selector.Raw)
err := dagcbor.Decode(builder, reader)
if err != nil {
log.Error(err)
}
return builder.Build()
return c.ic.Selector.Node
}

// Voucher returns the voucher for this data transfer
func (c channelState) Voucher() (datatransfer.TypedVoucher, error) {
func (c channelState) Voucher() datatransfer.TypedVoucher {
if len(c.ic.Vouchers) == 0 {
return datatransfer.TypedVoucher{}, nil
}
node, err := ipldutils.DeferredToNode(c.ic.Vouchers[0].Voucher)
if err != nil {
return datatransfer.TypedVoucher{}, err
return datatransfer.TypedVoucher{}
}
return datatransfer.TypedVoucher{Voucher: node, Type: c.ic.Vouchers[0].Type}, nil
ev := c.ic.Vouchers[0]
return datatransfer.TypedVoucher{Voucher: ev.Voucher.Node, Type: ev.Type}
}

// ReceivedCidsTotal returns the number of (non-unique) cids received so far
Expand Down Expand Up @@ -107,46 +93,31 @@ func (c channelState) Message() string {
return c.ic.Message
}

func (c channelState) Vouchers() ([]datatransfer.TypedVoucher, error) {
func (c channelState) Vouchers() []datatransfer.TypedVoucher {
vouchers := make([]datatransfer.TypedVoucher, 0, len(c.ic.Vouchers))
for _, encoded := range c.ic.Vouchers {
node, err := ipldutils.DeferredToNode(encoded.Voucher)
if err != nil {
return nil, err
}
vouchers = append(vouchers, datatransfer.TypedVoucher{Voucher: node, Type: encoded.Type})
vouchers = append(vouchers, datatransfer.TypedVoucher{Voucher: encoded.Voucher.Node, Type: encoded.Type})
}
return vouchers, nil
return vouchers
}

func (c channelState) LastVoucher() (datatransfer.TypedVoucher, error) {
func (c channelState) LastVoucher() datatransfer.TypedVoucher {
ev := c.ic.Vouchers[len(c.ic.Vouchers)-1]
node, err := ipldutils.DeferredToNode(ev.Voucher)
if err != nil {
return datatransfer.TypedVoucher{}, err
}
return datatransfer.TypedVoucher{Voucher: node, Type: ev.Type}, nil

return datatransfer.TypedVoucher{Voucher: ev.Voucher.Node, Type: ev.Type}
}

func (c channelState) LastVoucherResult() (datatransfer.TypedVoucher, error) {
func (c channelState) LastVoucherResult() datatransfer.TypedVoucher {
evr := c.ic.VoucherResults[len(c.ic.VoucherResults)-1]
node, err := ipldutils.DeferredToNode(evr.VoucherResult)
if err != nil {
return datatransfer.TypedVoucher{}, err
}
return datatransfer.TypedVoucher{Voucher: node, Type: evr.Type}, nil
return datatransfer.TypedVoucher{Voucher: evr.VoucherResult.Node, Type: evr.Type}
}

func (c channelState) VoucherResults() ([]datatransfer.TypedVoucher, error) {
func (c channelState) VoucherResults() []datatransfer.TypedVoucher {
voucherResults := make([]datatransfer.TypedVoucher, 0, len(c.ic.VoucherResults))
for _, encoded := range c.ic.VoucherResults {
node, err := ipldutils.DeferredToNode(encoded.VoucherResult)
if err != nil {
return nil, err
}
voucherResults = append(voucherResults, datatransfer.TypedVoucher{Voucher: node, Type: encoded.Type})
voucherResults = append(voucherResults, datatransfer.TypedVoucher{Voucher: encoded.VoucherResult.Node, Type: encoded.Type})
}
return voucherResults, nil
return voucherResults
}

func (c channelState) SelfPeer() peer.ID {
Expand Down
28 changes: 5 additions & 23 deletions channels/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/ipfs/go-datastore"
"github.com/ipld/go-ipld-prime/datamodel"
peer "github.com/libp2p/go-libp2p-core/peer"
cbg "github.com/whyrusleeping/cbor-gen"
"golang.org/x/xerrors"

versioning "github.com/filecoin-project/go-ds-versioning/pkg"
Expand All @@ -20,7 +19,6 @@ import (
datatransfer "github.com/filecoin-project/go-data-transfer/v2"
"github.com/filecoin-project/go-data-transfer/v2/channels/internal"
"github.com/filecoin-project/go-data-transfer/v2/channels/internal/migrations"
ipldutils "github.com/filecoin-project/go-data-transfer/v2/ipldutils"
)

type Notifier func(datatransfer.Event, datatransfer.ChannelState)
Expand Down Expand Up @@ -119,28 +117,20 @@ func (c *Channels) CreateNew(selfPeer peer.ID, tid datatransfer.TransferID, base
responder = dataSender
}
chid := datatransfer.ChannelID{Initiator: initiator, Responder: responder, ID: tid}
initialVoucher, err := ipldutils.NodeToDeferred(voucher.Voucher)
if err != nil {
return datatransfer.ChannelID{}, err
}
selBytes, err := ipldutils.NodeToBytes(selector)
if err != nil {
return datatransfer.ChannelID{}, err
}
err = c.stateMachines.Begin(chid, &internal.ChannelState{
err := c.stateMachines.Begin(chid, &internal.ChannelState{
SelfPeer: selfPeer,
TransferID: tid,
Initiator: initiator,
Responder: responder,
BaseCid: baseCid,
Selector: &cbg.Deferred{Raw: selBytes},
Selector: internal.CborGenCompatibleNode{Node: selector},
Sender: dataSender,
Recipient: dataReceiver,
Stages: &datatransfer.ChannelStages{},
Vouchers: []internal.EncodedVoucher{
{
Type: voucher.Type,
Voucher: initialVoucher,
Voucher: internal.CborGenCompatibleNode{voucher.Voucher},
},
},
Status: datatransfer.Requested,
Expand Down Expand Up @@ -278,20 +268,12 @@ func (c *Channels) ResumeResponder(chid datatransfer.ChannelID) error {

// NewVoucher records a new voucher for this channel
func (c *Channels) NewVoucher(chid datatransfer.ChannelID, voucher datatransfer.TypedVoucher) error {
voucherBytes, err := ipldutils.NodeToBytes(voucher.Voucher)
if err != nil {
return err
}
return c.send(chid, datatransfer.NewVoucher, voucher.Type, voucherBytes)
return c.send(chid, datatransfer.NewVoucher, voucher)
}

// NewVoucherResult records a new voucher result for this channel
func (c *Channels) NewVoucherResult(chid datatransfer.ChannelID, voucherResult datatransfer.TypedVoucher) error {
voucherResultBytes, err := ipldutils.NodeToBytes(voucherResult.Voucher)
if err != nil {
return err
}
return c.send(chid, datatransfer.NewVoucherResult, voucherResult.Type, voucherResultBytes)
return c.send(chid, datatransfer.NewVoucherResult, voucherResult)
}

// Complete indicates responder has completed sending/receiving data
Expand Down
9 changes: 4 additions & 5 deletions channels/channels_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package channels

import (
logging "github.com/ipfs/go-log/v2"
cbg "github.com/whyrusleeping/cbor-gen"

"github.com/filecoin-project/go-statemachine/fsm"

Expand Down Expand Up @@ -148,15 +147,15 @@ var ChannelEvents = fsm.Events{
}),

fsm.Event(datatransfer.NewVoucher).FromAny().ToNoChange().
Action(func(chst *internal.ChannelState, vtype datatransfer.TypeIdentifier, voucherBytes []byte) error {
chst.Vouchers = append(chst.Vouchers, internal.EncodedVoucher{Type: vtype, Voucher: &cbg.Deferred{Raw: voucherBytes}})
Action(func(chst *internal.ChannelState, voucher datatransfer.TypedVoucher) error {
chst.Vouchers = append(chst.Vouchers, internal.EncodedVoucher{Type: voucher.Type, Voucher: internal.CborGenCompatibleNode{Node: voucher.Voucher}})
chst.AddLog("got new voucher")
return nil
}),
fsm.Event(datatransfer.NewVoucherResult).FromAny().ToNoChange().
Action(func(chst *internal.ChannelState, vtype datatransfer.TypeIdentifier, voucherResultBytes []byte) error {
Action(func(chst *internal.ChannelState, voucherResult datatransfer.TypedVoucher) error {
chst.VoucherResults = append(chst.VoucherResults,
internal.EncodedVoucherResult{Type: vtype, VoucherResult: &cbg.Deferred{Raw: voucherResultBytes}})
internal.EncodedVoucherResult{Type: voucherResult.Type, VoucherResult: internal.CborGenCompatibleNode{Node: voucherResult.Voucher}})
chst.AddLog("got new voucher result")
return nil
}),
Expand Down
30 changes: 10 additions & 20 deletions channels/channels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,7 @@ func TestChannels(t *testing.T) {
require.NotEqual(t, channels.EmptyChannelState, state)
require.Equal(t, cids[0], state.BaseCID())
require.Equal(t, selector, state.Selector())
voucher, err := state.Voucher()
require.NoError(t, err)
voucher := state.Voucher()
require.True(t, fv1.Equals(voucher))
require.Equal(t, peers[0], state.Sender())
require.Equal(t, peers[1], state.Recipient())
Expand Down Expand Up @@ -303,47 +302,38 @@ func TestChannels(t *testing.T) {

state, err := channelList.GetByID(ctx, datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1})
require.NoError(t, err)
vouchers, err := state.Vouchers()
require.NoError(t, err)
vouchers := state.Vouchers()
require.Len(t, vouchers, 1)
require.True(t, fv1.Equals(vouchers[0]))
voucher, err := state.Voucher()
require.NoError(t, err)
voucher := state.Voucher()
require.True(t, fv1.Equals(voucher))
voucher, err = state.LastVoucher()
require.NoError(t, err)
voucher = state.LastVoucher()
require.True(t, fv1.Equals(voucher))

err = channelList.NewVoucher(datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1}, fv3)
require.NoError(t, err)
state = checkEvent(ctx, t, received, datatransfer.NewVoucher)
vouchers, err = state.Vouchers()
require.NoError(t, err)
vouchers = state.Vouchers()
require.Len(t, vouchers, 2)
require.True(t, fv1.Equals(vouchers[0]))
require.True(t, fv3.Equals(vouchers[1]))
voucher, err = state.Voucher()
require.NoError(t, err)
voucher = state.Voucher()
require.True(t, fv1.Equals(voucher))
voucher, err = state.LastVoucher()
require.NoError(t, err)
voucher = state.LastVoucher()
require.True(t, fv3.Equals(voucher))

state, err = channelList.GetByID(ctx, datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1})
require.NoError(t, err)
results, err := state.VoucherResults()
require.NoError(t, err)
results := state.VoucherResults()
require.Equal(t, []datatransfer.TypedVoucher{}, results)

err = channelList.NewVoucherResult(datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1}, fvr1)
require.NoError(t, err)
state = checkEvent(ctx, t, received, datatransfer.NewVoucherResult)
voucherResults, err := state.VoucherResults()
require.NoError(t, err)
voucherResults := state.VoucherResults()
require.Len(t, voucherResults, 1)
require.True(t, fvr1.Equals(voucherResults[0]))
voucherResult, err := state.LastVoucherResult()
require.NoError(t, err)
voucherResult := state.LastVoucherResult()
require.True(t, fvr1.Equals(voucherResult))
})

Expand Down
48 changes: 45 additions & 3 deletions channels/internal/internalchannel.go
Original file line number Diff line number Diff line change
@@ -1,31 +1,73 @@
package internal

import (
"bytes"
"fmt"
"io"

"github.com/ipfs/go-cid"
"github.com/ipld/go-ipld-prime/codec/dagcbor"
"github.com/ipld/go-ipld-prime/datamodel"
"github.com/ipld/go-ipld-prime/node/basicnode"
"github.com/ipld/go-ipld-prime/schema"
peer "github.com/libp2p/go-libp2p-core/peer"
cbg "github.com/whyrusleeping/cbor-gen"

datatransfer "github.com/filecoin-project/go-data-transfer/v2"
)

type CborGenCompatibleNode struct {
Node datamodel.Node
}

func (sn CborGenCompatibleNode) IsNull() bool {
return sn.Node == nil || sn.Node == datamodel.Null
}

// UnmarshalCBOR is for cbor-gen compatibility
func (sn *CborGenCompatibleNode) UnmarshalCBOR(r io.Reader) error {
// use cbg.Deferred.UnmarshalCBOR to figure out how much to pull
def := cbg.Deferred{}
if err := def.UnmarshalCBOR(r); err != nil {
return err
}
// convert it to a Node
na := basicnode.Prototype.Any.NewBuilder()
if err := dagcbor.Decode(na, bytes.NewReader(def.Raw)); err != nil {
return err
}
sn.Node = na.Build()
return nil
}

// MarshalCBOR is for cbor-gen compatibility
func (sn *CborGenCompatibleNode) MarshalCBOR(w io.Writer) error {
node := datamodel.Null
if sn != nil && sn.Node != nil {
node = sn.Node
if tn, ok := node.(schema.TypedNode); ok {
node = tn.Representation()
}
}
return dagcbor.Encode(node, w)
}

//go:generate cbor-gen-for --map-encoding ChannelState EncodedVoucher EncodedVoucherResult

// EncodedVoucher is how the voucher is stored on disk
type EncodedVoucher struct {
// Vouchers identifier for decoding
Type datatransfer.TypeIdentifier
// used to verify this channel
Voucher *cbg.Deferred
Voucher CborGenCompatibleNode
}

// EncodedVoucherResult is how the voucher result is stored on disk
type EncodedVoucherResult struct {
// Vouchers identifier for decoding
Type datatransfer.TypeIdentifier
// used to verify this channel
VoucherResult *cbg.Deferred
VoucherResult CborGenCompatibleNode
}

// ChannelState is the internal representation on disk for the channel fsm
Expand All @@ -41,7 +83,7 @@ type ChannelState struct {
// base CID for the piece being transferred
BaseCid cid.Cid
// portion of Piece to return, specified by an IPLD selector
Selector *cbg.Deferred
Selector CborGenCompatibleNode
// the party that is sending the data (not who initiated the request)
Sender peer.ID
// the party that is receiving the data (not who initiated the request)
Expand Down
Loading