Skip to content

Commit

Permalink
refactor(channelstate): use cborgencompatiblenode (#336)
Browse files Browse the repository at this point in the history
use cborgencomptaiblenode to simply channelstate interface
  • Loading branch information
hannahhoward committed Oct 7, 2022
1 parent f54bd1a commit f8d2cd0
Show file tree
Hide file tree
Showing 11 changed files with 111 additions and 142 deletions.
61 changes: 16 additions & 45 deletions channels/channel_state.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,12 @@
package channels

import (
"bytes"

"github.com/ipfs/go-cid"
"github.com/ipld/go-ipld-prime/codec/dagcbor"
"github.com/ipld/go-ipld-prime/datamodel"
basicnode "github.com/ipld/go-ipld-prime/node/basic"
peer "github.com/libp2p/go-libp2p-core/peer"

datatransfer "github.com/filecoin-project/go-data-transfer/v2"
"github.com/filecoin-project/go-data-transfer/v2/channels/internal"
ipldutils "github.com/filecoin-project/go-data-transfer/v2/ipldutils"
)

// channelState is immutable channel data plus mutable state
Expand Down Expand Up @@ -43,25 +38,16 @@ func (c channelState) BaseCID() cid.Cid { return c.ic.BaseCid }
// Selector returns the IPLD selector for this data transfer (represented as
// an IPLD node)
func (c channelState) Selector() datamodel.Node {
builder := basicnode.Prototype.Any.NewBuilder()
reader := bytes.NewReader(c.ic.Selector.Raw)
err := dagcbor.Decode(builder, reader)
if err != nil {
log.Error(err)
}
return builder.Build()
return c.ic.Selector.Node
}

// Voucher returns the voucher for this data transfer
func (c channelState) Voucher() (datatransfer.TypedVoucher, error) {
func (c channelState) Voucher() datatransfer.TypedVoucher {
if len(c.ic.Vouchers) == 0 {
return datatransfer.TypedVoucher{}, nil
}
node, err := ipldutils.DeferredToNode(c.ic.Vouchers[0].Voucher)
if err != nil {
return datatransfer.TypedVoucher{}, err
return datatransfer.TypedVoucher{}
}
return datatransfer.TypedVoucher{Voucher: node, Type: c.ic.Vouchers[0].Type}, nil
ev := c.ic.Vouchers[0]
return datatransfer.TypedVoucher{Voucher: ev.Voucher.Node, Type: ev.Type}
}

// ReceivedCidsTotal returns the number of (non-unique) cids received so far
Expand Down Expand Up @@ -107,46 +93,31 @@ func (c channelState) Message() string {
return c.ic.Message
}

func (c channelState) Vouchers() ([]datatransfer.TypedVoucher, error) {
func (c channelState) Vouchers() []datatransfer.TypedVoucher {
vouchers := make([]datatransfer.TypedVoucher, 0, len(c.ic.Vouchers))
for _, encoded := range c.ic.Vouchers {
node, err := ipldutils.DeferredToNode(encoded.Voucher)
if err != nil {
return nil, err
}
vouchers = append(vouchers, datatransfer.TypedVoucher{Voucher: node, Type: encoded.Type})
vouchers = append(vouchers, datatransfer.TypedVoucher{Voucher: encoded.Voucher.Node, Type: encoded.Type})
}
return vouchers, nil
return vouchers
}

func (c channelState) LastVoucher() (datatransfer.TypedVoucher, error) {
func (c channelState) LastVoucher() datatransfer.TypedVoucher {
ev := c.ic.Vouchers[len(c.ic.Vouchers)-1]
node, err := ipldutils.DeferredToNode(ev.Voucher)
if err != nil {
return datatransfer.TypedVoucher{}, err
}
return datatransfer.TypedVoucher{Voucher: node, Type: ev.Type}, nil

return datatransfer.TypedVoucher{Voucher: ev.Voucher.Node, Type: ev.Type}
}

func (c channelState) LastVoucherResult() (datatransfer.TypedVoucher, error) {
func (c channelState) LastVoucherResult() datatransfer.TypedVoucher {
evr := c.ic.VoucherResults[len(c.ic.VoucherResults)-1]
node, err := ipldutils.DeferredToNode(evr.VoucherResult)
if err != nil {
return datatransfer.TypedVoucher{}, err
}
return datatransfer.TypedVoucher{Voucher: node, Type: evr.Type}, nil
return datatransfer.TypedVoucher{Voucher: evr.VoucherResult.Node, Type: evr.Type}
}

func (c channelState) VoucherResults() ([]datatransfer.TypedVoucher, error) {
func (c channelState) VoucherResults() []datatransfer.TypedVoucher {
voucherResults := make([]datatransfer.TypedVoucher, 0, len(c.ic.VoucherResults))
for _, encoded := range c.ic.VoucherResults {
node, err := ipldutils.DeferredToNode(encoded.VoucherResult)
if err != nil {
return nil, err
}
voucherResults = append(voucherResults, datatransfer.TypedVoucher{Voucher: node, Type: encoded.Type})
voucherResults = append(voucherResults, datatransfer.TypedVoucher{Voucher: encoded.VoucherResult.Node, Type: encoded.Type})
}
return voucherResults, nil
return voucherResults
}

func (c channelState) SelfPeer() peer.ID {
Expand Down
28 changes: 5 additions & 23 deletions channels/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/ipfs/go-datastore"
"github.com/ipld/go-ipld-prime/datamodel"
peer "github.com/libp2p/go-libp2p-core/peer"
cbg "github.com/whyrusleeping/cbor-gen"
"golang.org/x/xerrors"

versioning "github.com/filecoin-project/go-ds-versioning/pkg"
Expand All @@ -20,7 +19,6 @@ import (
datatransfer "github.com/filecoin-project/go-data-transfer/v2"
"github.com/filecoin-project/go-data-transfer/v2/channels/internal"
"github.com/filecoin-project/go-data-transfer/v2/channels/internal/migrations"
ipldutils "github.com/filecoin-project/go-data-transfer/v2/ipldutils"
)

type Notifier func(datatransfer.Event, datatransfer.ChannelState)
Expand Down Expand Up @@ -119,28 +117,20 @@ func (c *Channels) CreateNew(selfPeer peer.ID, tid datatransfer.TransferID, base
responder = dataSender
}
chid := datatransfer.ChannelID{Initiator: initiator, Responder: responder, ID: tid}
initialVoucher, err := ipldutils.NodeToDeferred(voucher.Voucher)
if err != nil {
return datatransfer.ChannelID{}, err
}
selBytes, err := ipldutils.NodeToBytes(selector)
if err != nil {
return datatransfer.ChannelID{}, err
}
err = c.stateMachines.Begin(chid, &internal.ChannelState{
err := c.stateMachines.Begin(chid, &internal.ChannelState{
SelfPeer: selfPeer,
TransferID: tid,
Initiator: initiator,
Responder: responder,
BaseCid: baseCid,
Selector: &cbg.Deferred{Raw: selBytes},
Selector: internal.CborGenCompatibleNode{Node: selector},
Sender: dataSender,
Recipient: dataReceiver,
Stages: &datatransfer.ChannelStages{},
Vouchers: []internal.EncodedVoucher{
{
Type: voucher.Type,
Voucher: initialVoucher,
Voucher: internal.CborGenCompatibleNode{voucher.Voucher},
},
},
Status: datatransfer.Requested,
Expand Down Expand Up @@ -278,20 +268,12 @@ func (c *Channels) ResumeResponder(chid datatransfer.ChannelID) error {

// NewVoucher records a new voucher for this channel
func (c *Channels) NewVoucher(chid datatransfer.ChannelID, voucher datatransfer.TypedVoucher) error {
voucherBytes, err := ipldutils.NodeToBytes(voucher.Voucher)
if err != nil {
return err
}
return c.send(chid, datatransfer.NewVoucher, voucher.Type, voucherBytes)
return c.send(chid, datatransfer.NewVoucher, voucher)
}

// NewVoucherResult records a new voucher result for this channel
func (c *Channels) NewVoucherResult(chid datatransfer.ChannelID, voucherResult datatransfer.TypedVoucher) error {
voucherResultBytes, err := ipldutils.NodeToBytes(voucherResult.Voucher)
if err != nil {
return err
}
return c.send(chid, datatransfer.NewVoucherResult, voucherResult.Type, voucherResultBytes)
return c.send(chid, datatransfer.NewVoucherResult, voucherResult)
}

// Complete indicates responder has completed sending/receiving data
Expand Down
9 changes: 4 additions & 5 deletions channels/channels_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package channels

import (
logging "github.com/ipfs/go-log/v2"
cbg "github.com/whyrusleeping/cbor-gen"

"github.com/filecoin-project/go-statemachine/fsm"

Expand Down Expand Up @@ -148,15 +147,15 @@ var ChannelEvents = fsm.Events{
}),

fsm.Event(datatransfer.NewVoucher).FromAny().ToNoChange().
Action(func(chst *internal.ChannelState, vtype datatransfer.TypeIdentifier, voucherBytes []byte) error {
chst.Vouchers = append(chst.Vouchers, internal.EncodedVoucher{Type: vtype, Voucher: &cbg.Deferred{Raw: voucherBytes}})
Action(func(chst *internal.ChannelState, voucher datatransfer.TypedVoucher) error {
chst.Vouchers = append(chst.Vouchers, internal.EncodedVoucher{Type: voucher.Type, Voucher: internal.CborGenCompatibleNode{Node: voucher.Voucher}})
chst.AddLog("got new voucher")
return nil
}),
fsm.Event(datatransfer.NewVoucherResult).FromAny().ToNoChange().
Action(func(chst *internal.ChannelState, vtype datatransfer.TypeIdentifier, voucherResultBytes []byte) error {
Action(func(chst *internal.ChannelState, voucherResult datatransfer.TypedVoucher) error {
chst.VoucherResults = append(chst.VoucherResults,
internal.EncodedVoucherResult{Type: vtype, VoucherResult: &cbg.Deferred{Raw: voucherResultBytes}})
internal.EncodedVoucherResult{Type: voucherResult.Type, VoucherResult: internal.CborGenCompatibleNode{Node: voucherResult.Voucher}})
chst.AddLog("got new voucher result")
return nil
}),
Expand Down
30 changes: 10 additions & 20 deletions channels/channels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,7 @@ func TestChannels(t *testing.T) {
require.NotEqual(t, channels.EmptyChannelState, state)
require.Equal(t, cids[0], state.BaseCID())
require.Equal(t, selector, state.Selector())
voucher, err := state.Voucher()
require.NoError(t, err)
voucher := state.Voucher()
require.True(t, fv1.Equals(voucher))
require.Equal(t, peers[0], state.Sender())
require.Equal(t, peers[1], state.Recipient())
Expand Down Expand Up @@ -303,47 +302,38 @@ func TestChannels(t *testing.T) {

state, err := channelList.GetByID(ctx, datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1})
require.NoError(t, err)
vouchers, err := state.Vouchers()
require.NoError(t, err)
vouchers := state.Vouchers()
require.Len(t, vouchers, 1)
require.True(t, fv1.Equals(vouchers[0]))
voucher, err := state.Voucher()
require.NoError(t, err)
voucher := state.Voucher()
require.True(t, fv1.Equals(voucher))
voucher, err = state.LastVoucher()
require.NoError(t, err)
voucher = state.LastVoucher()
require.True(t, fv1.Equals(voucher))

err = channelList.NewVoucher(datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1}, fv3)
require.NoError(t, err)
state = checkEvent(ctx, t, received, datatransfer.NewVoucher)
vouchers, err = state.Vouchers()
require.NoError(t, err)
vouchers = state.Vouchers()
require.Len(t, vouchers, 2)
require.True(t, fv1.Equals(vouchers[0]))
require.True(t, fv3.Equals(vouchers[1]))
voucher, err = state.Voucher()
require.NoError(t, err)
voucher = state.Voucher()
require.True(t, fv1.Equals(voucher))
voucher, err = state.LastVoucher()
require.NoError(t, err)
voucher = state.LastVoucher()
require.True(t, fv3.Equals(voucher))

state, err = channelList.GetByID(ctx, datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1})
require.NoError(t, err)
results, err := state.VoucherResults()
require.NoError(t, err)
results := state.VoucherResults()
require.Equal(t, []datatransfer.TypedVoucher{}, results)

err = channelList.NewVoucherResult(datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1}, fvr1)
require.NoError(t, err)
state = checkEvent(ctx, t, received, datatransfer.NewVoucherResult)
voucherResults, err := state.VoucherResults()
require.NoError(t, err)
voucherResults := state.VoucherResults()
require.Len(t, voucherResults, 1)
require.True(t, fvr1.Equals(voucherResults[0]))
voucherResult, err := state.LastVoucherResult()
require.NoError(t, err)
voucherResult := state.LastVoucherResult()
require.True(t, fvr1.Equals(voucherResult))
})

Expand Down
48 changes: 45 additions & 3 deletions channels/internal/internalchannel.go
Original file line number Diff line number Diff line change
@@ -1,31 +1,73 @@
package internal

import (
"bytes"
"fmt"
"io"

"github.com/ipfs/go-cid"
"github.com/ipld/go-ipld-prime/codec/dagcbor"
"github.com/ipld/go-ipld-prime/datamodel"
"github.com/ipld/go-ipld-prime/node/basicnode"
"github.com/ipld/go-ipld-prime/schema"
peer "github.com/libp2p/go-libp2p-core/peer"
cbg "github.com/whyrusleeping/cbor-gen"

datatransfer "github.com/filecoin-project/go-data-transfer/v2"
)

type CborGenCompatibleNode struct {
Node datamodel.Node
}

func (sn CborGenCompatibleNode) IsNull() bool {
return sn.Node == nil || sn.Node == datamodel.Null
}

// UnmarshalCBOR is for cbor-gen compatibility
func (sn *CborGenCompatibleNode) UnmarshalCBOR(r io.Reader) error {
// use cbg.Deferred.UnmarshalCBOR to figure out how much to pull
def := cbg.Deferred{}
if err := def.UnmarshalCBOR(r); err != nil {
return err
}
// convert it to a Node
na := basicnode.Prototype.Any.NewBuilder()
if err := dagcbor.Decode(na, bytes.NewReader(def.Raw)); err != nil {
return err
}
sn.Node = na.Build()
return nil
}

// MarshalCBOR is for cbor-gen compatibility
func (sn *CborGenCompatibleNode) MarshalCBOR(w io.Writer) error {
node := datamodel.Null
if sn != nil && sn.Node != nil {
node = sn.Node
if tn, ok := node.(schema.TypedNode); ok {
node = tn.Representation()
}
}
return dagcbor.Encode(node, w)
}

//go:generate cbor-gen-for --map-encoding ChannelState EncodedVoucher EncodedVoucherResult

// EncodedVoucher is how the voucher is stored on disk
type EncodedVoucher struct {
// Vouchers identifier for decoding
Type datatransfer.TypeIdentifier
// used to verify this channel
Voucher *cbg.Deferred
Voucher CborGenCompatibleNode
}

// EncodedVoucherResult is how the voucher result is stored on disk
type EncodedVoucherResult struct {
// Vouchers identifier for decoding
Type datatransfer.TypeIdentifier
// used to verify this channel
VoucherResult *cbg.Deferred
VoucherResult CborGenCompatibleNode
}

// ChannelState is the internal representation on disk for the channel fsm
Expand All @@ -41,7 +83,7 @@ type ChannelState struct {
// base CID for the piece being transferred
BaseCid cid.Cid
// portion of Piece to return, specified by an IPLD selector
Selector *cbg.Deferred
Selector CborGenCompatibleNode
// the party that is sending the data (not who initiated the request)
Sender peer.ID
// the party that is receiving the data (not who initiated the request)
Expand Down
Loading

0 comments on commit f8d2cd0

Please sign in to comment.