Skip to content

Commit

Permalink
fixup! feat(ipld): new data-transfer ipld vouchers + bindnode
Browse files Browse the repository at this point in the history
  • Loading branch information
rvagg committed Jun 3, 2022
1 parent 2a8436f commit 43e2645
Show file tree
Hide file tree
Showing 15 changed files with 58 additions and 74 deletions.
4 changes: 2 additions & 2 deletions retrievalmarket/impl/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,15 +404,15 @@ func (c *clientDealEnvironment) OpenDataTransfer(ctx context.Context, to peer.ID
if proposal.SelectorSpecified() {
sel = proposal.Selector.Node
}
vouch, err := shared.ToNode(proposal)
vouch, err := shared.TypeToNode(proposal)
if err != nil {
return datatransfer.ChannelID{}, err
}
return c.c.dataTransfer.OpenPullDataChannel(ctx, to, datatransfer.TypedVoucher{Voucher: vouch, Type: proposal.Type()}, proposal.PayloadCID, sel)
}

func (c *clientDealEnvironment) SendDataTransferVoucher(ctx context.Context, channelID datatransfer.ChannelID, payment *retrievalmarket.DealPayment) error {
vouch, err := shared.ToNode(payment)
vouch, err := shared.TypeToNode(payment)
if err != nil {
return err
}
Expand Down
8 changes: 4 additions & 4 deletions retrievalmarket/impl/dtutils/dtutils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (

func TestProviderDataTransferSubscriber(t *testing.T) {
dealProposal := shared_testutil.MakeTestDealProposal()
node, err := shared.ToNode(dealProposal)
node, err := shared.TypeToNode(dealProposal)
require.NoError(t, err)
dealProposalVoucher := datatransfer.TypedVoucher{Voucher: node, Type: dealProposal.Type()}
testPeers := shared_testutil.GeneratePeers(2)
Expand Down Expand Up @@ -113,11 +113,11 @@ func TestProviderDataTransferSubscriber(t *testing.T) {
}
func TestClientDataTransferSubscriber(t *testing.T) {
dealProposal := shared_testutil.MakeTestDealProposal()
node, err := shared.ToNode(dealProposal)
node, err := shared.TypeToNode(dealProposal)
require.NoError(t, err)
dealProposalVoucher := datatransfer.TypedVoucher{Voucher: node, Type: dealProposal.Type()}
dealResponseVoucher := func(dealResponse retrievalmarket.DealResponse) datatransfer.TypedVoucher {
node, err := shared.ToNode(&dealResponse)
node, err := shared.TypeToNode(&dealResponse)
require.NoError(t, err)
return datatransfer.TypedVoucher{Voucher: node, Type: dealResponse.Type()}
}
Expand Down Expand Up @@ -326,7 +326,7 @@ func TestTransportConfigurer(t *testing.T) {
thisPeer := expectedChannelID.Initiator
expectedPeer := expectedChannelID.Responder
dealProposalVoucher := func(proposal rm.DealProposal) datatransfer.TypedVoucher {
node, err := shared.ToNode(&proposal)
node, err := shared.TypeToNode(&proposal)
require.NoError(t, err)
return datatransfer.TypedVoucher{Voucher: node, Type: proposal.Type()}
}
Expand Down
12 changes: 6 additions & 6 deletions retrievalmarket/impl/ipld_compat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestIpldCompat_DealResponse(t *testing.T) {
nb := basicnode.Prototype.Any.NewBuilder()
assert.Nil(t, dagcbor.Decode(nb, &originalBuf))
node := nb.Build()
drBindnodeIface, err := shared.FromNode(node, &retrievalmarket.DealResponse{})
drBindnodeIface, err := shared.TypeFromNode(node, &retrievalmarket.DealResponse{})
assert.Nil(t, err)
drBindnode, ok := drBindnodeIface.(*retrievalmarket.DealResponse)
assert.True(t, ok)
Expand All @@ -71,7 +71,7 @@ func TestIpldCompat_DealResponse(t *testing.T) {
compareDealResponse(t, testCase.dr, *drBindnode)

// encode the new DealResponse with bindnode to bytes
node, err = shared.ToNode(drBindnode)
node, err = shared.TypeToNode(drBindnode)
assert.Nil(t, err)
var bindnodeBuf bytes.Buffer
dagcbor.Encode(node.(schema.TypedNode).Representation(), &bindnodeBuf)
Expand Down Expand Up @@ -154,7 +154,7 @@ func TestIpldCompat_DealProposal(t *testing.T) {
nb := basicnode.Prototype.Any.NewBuilder()
assert.Nil(t, dagcbor.Decode(nb, &originalBuf))
node := nb.Build()
dpBindnodeIface, err := shared.FromNode(node, &retrievalmarket.DealProposal{})
dpBindnodeIface, err := shared.TypeFromNode(node, &retrievalmarket.DealProposal{})
assert.Nil(t, err)
dpBindnode, ok := dpBindnodeIface.(*retrievalmarket.DealProposal)
assert.True(t, ok)
Expand All @@ -163,7 +163,7 @@ func TestIpldCompat_DealProposal(t *testing.T) {
compareDealProposal(t, testCase.dp, *dpBindnode)

// encode the new DealProposal with bindnode to bytes
node, err = shared.ToNode(dpBindnode)
node, err = shared.TypeToNode(dpBindnode)
assert.Nil(t, err)
var bindnodeBuf bytes.Buffer
dagcbor.Encode(node.(schema.TypedNode).Representation(), &bindnodeBuf)
Expand Down Expand Up @@ -275,7 +275,7 @@ func TestIpldCompat_DealPayment(t *testing.T) {
nb := basicnode.Prototype.Any.NewBuilder()
assert.Nil(t, dagcbor.Decode(nb, &originalBuf))
node := nb.Build()
dpBindnodeIface, err := shared.FromNode(node, &retrievalmarket.DealPayment{})
dpBindnodeIface, err := shared.TypeFromNode(node, &retrievalmarket.DealPayment{})
assert.Nil(t, err)
dpBindnode, ok := dpBindnodeIface.(*retrievalmarket.DealPayment)
assert.True(t, ok)
Expand All @@ -284,7 +284,7 @@ func TestIpldCompat_DealPayment(t *testing.T) {
compareDealPayment(t, testCase.dp, *dpBindnode)

// encode the new DealPayment with bindnode to bytes
node, err = shared.ToNode(dpBindnode)
node, err = shared.TypeToNode(dpBindnode)
assert.Nil(t, err)
var bindnodeBuf bytes.Buffer
dagcbor.Encode(node.(schema.TypedNode).Representation(), &bindnodeBuf)
Expand Down
4 changes: 2 additions & 2 deletions retrievalmarket/impl/providerstates/provider_states.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func updateFunding(ctx fsm.Context,
DataLimit: deal.Params.NextInterval(totalPaid),
}
if voucherResult != nil {
node, err := shared.ToNode(voucherResult)
node, err := shared.TypeToNode(voucherResult)
if err != nil {
log.Errorf("failed to convert DealResponse to Node: %s", err.Error())
}
Expand Down Expand Up @@ -186,7 +186,7 @@ func errorDealResponse(dealID rm.ProviderDealIdentifier, errMsg error) datatrans
Message: errMsg.Error(),
Status: rm.DealStatusErrored,
}
node, err := shared.ToNode(&dr)
node, err := shared.TypeToNode(&dr)
if err != nil {
log.Errorf("failed to convert DealResponse to Node: %s", err.Error())
return datatransfer.ValidationResult{Accepted: false}
Expand Down
6 changes: 3 additions & 3 deletions retrievalmarket/impl/providerstates/provider_states_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,15 +150,15 @@ func TestUnpauseDeal(t *testing.T) {
func TestUpdateFunding(t *testing.T) {
ctx := context.Background()
emptyDealPayment := rm.DealPayment{}
emptyDealPaymentNode, err := shared.ToNode(&emptyDealPayment)
emptyDealPaymentNode, err := shared.TypeToNode(&emptyDealPayment)
require.NoError(t, err)
emptyDealPaymentVoucher := datatransfer.TypedVoucher{Voucher: emptyDealPaymentNode, Type: emptyDealPayment.Type()}
emptyDealProposal := rm.DealProposal{}
emptyDealProposalNode, err := shared.ToNode(&emptyDealProposal)
emptyDealProposalNode, err := shared.TypeToNode(&emptyDealProposal)
require.NoError(t, err)
emptyDealProposalVoucher := datatransfer.TypedVoucher{Voucher: emptyDealProposalNode, Type: emptyDealProposal.Type()}
dealResponseVoucher := func(resp rm.DealResponse) *datatransfer.TypedVoucher {
node, err := shared.ToNode(&resp)
node, err := shared.TypeToNode(&resp)
require.NoError(t, err)
return &datatransfer.TypedVoucher{Voucher: node, Type: resp.Type()}
}
Expand Down
6 changes: 3 additions & 3 deletions retrievalmarket/impl/requestvalidation/requestvalidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func rejectProposal(proposal *rm.DealProposal, status rm.DealStatus, reason stri
Status: status,
Message: reason,
}
node, err := shared.ToNode(&dr)
node, err := shared.TypeToNode(&dr)
if err != nil {
return datatransfer.ValidationResult{}, err
}
Expand Down Expand Up @@ -156,7 +156,7 @@ func (rv *ProviderRequestValidator) validatePull(receiver peer.ID, proposal *rm.
Status: status,
PaymentOwed: deal.Params.OutstandingBalance(big.Zero(), 0, false),
}
node, err := shared.ToNode(&dr)
node, err := shared.TypeToNode(&dr)
if err != nil {
return datatransfer.ValidationResult{}, nil
}
Expand Down Expand Up @@ -213,7 +213,7 @@ func errorDealResponse(dealID rm.ProviderDealIdentifier, err error) (datatransfe
Message: err.Error(),
Status: rm.DealStatusErrored,
}
node, err := shared.ToNode(&dr)
node, err := shared.TypeToNode(&dr)
if err != nil {
return datatransfer.ValidationResult{}, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestValidatePush(t *testing.T) {
fve := &fakeValidationEnvironment{}
sender := shared_testutil.GeneratePeers(1)[0]
testDp := shared_testutil.MakeTestDealProposal()
voucher, err := shared.ToNode(testDp)
voucher, err := shared.TypeToNode(testDp)
require.NoError(t, err)
requestValidator := requestvalidation.NewProviderRequestValidator(fve)
validationResult, err := requestValidator.ValidatePush(datatransfer.ChannelID{}, sender, voucher, testDp.PayloadCID, selectorparse.CommonSelector_ExploreAllRecursively)
Expand All @@ -45,14 +45,14 @@ func dealResponseToVoucher(t *testing.T, status rm.DealStatus, id rm.DealID, mes
if owed != nil {
dr.PaymentOwed = *owed
}
node, err := shared.ToNode(&dr)
node, err := shared.TypeToNode(&dr)
require.NoError(t, err)
return &datatransfer.TypedVoucher{Voucher: node, Type: dr.Type()}
}

func TestValidatePull(t *testing.T) {
proposal := shared_testutil.MakeTestDealProposal()
node, err := shared.ToNode(proposal)
node, err := shared.TypeToNode(proposal)
require.NoError(t, err)
proposalVoucher := datatransfer.TypedVoucher{Voucher: node, Type: proposal.Type()}
zero := big.Zero()
Expand Down Expand Up @@ -199,7 +199,7 @@ func TestValidateRestart(t *testing.T) {
ID: dealID,
Params: params,
}
node, err := shared.ToNode(&proposal)
node, err := shared.TypeToNode(&proposal)
require.NoError(t, err)
proposalVoucher := datatransfer.TypedVoucher{Voucher: node, Type: proposal.Type()}

Expand Down
6 changes: 3 additions & 3 deletions retrievalmarket/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ func DealProposalFromNode(node datamodel.Node) (*DealProposal, error) {
if node == nil {
return nil, fmt.Errorf("empty voucher")
}
dpIface, err := shared.FromNode(node, &DealProposal{})
dpIface, err := shared.TypeFromNode(node, &DealProposal{})
if err != nil {
return nil, xerrors.Errorf("invalid DealProposal: %w", err)
}
Expand Down Expand Up @@ -439,7 +439,7 @@ func DealResponseFromNode(node datamodel.Node) (*DealResponse, error) {
if node == nil {
return nil, fmt.Errorf("empty voucher")
}
dpIface, err := shared.FromNode(node, &DealResponse{})
dpIface, err := shared.TypeFromNode(node, &DealResponse{})
if err != nil {
return nil, xerrors.Errorf("invalid DealResponse: %w", err)
}
Expand Down Expand Up @@ -502,7 +502,7 @@ func DealPaymentFromNode(node datamodel.Node) (*DealPayment, error) {
if node == nil {
return nil, fmt.Errorf("empty voucher")
}
dpIface, err := shared.FromNode(node, &DealPayment{})
dpIface, err := shared.TypeFromNode(node, &DealPayment{})
if err != nil {
return nil, xerrors.Errorf("invalid DealPayment: %w", err)
}
Expand Down
44 changes: 14 additions & 30 deletions shared/ipldutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,6 @@ type typeWithBindnodeSchema interface {
BindnodeSchema() string
}

// TODO: remove this I think
type typeWithBindnodePostDecode interface {
BindnodePostDecode() error
}

// We use the prototype map to store TypedPrototype and Type information
// mapped against Go type names so we only have to run the schema parse once.
// Currently there's not much additional benefit of storing this but there
Expand Down Expand Up @@ -96,29 +91,23 @@ func schemaTypeFor(typeName string, ptrType interface{}) (schema.Type, error) {

// FromReader deserializes DAG-CBOR from a Reader and instantiates the Go type
// that's provided as a pointer via the ptrValue argument.
func FromReader(r io.Reader, ptrValue interface{}) (interface{}, error) {
func TypeFromReader(r io.Reader, ptrValue interface{}) (interface{}, error) {
name := typeName(ptrValue)
proto, err := prototypeFor(name, ptrValue)
if err != nil {
return nil, err
}
builder := proto.Representation().NewBuilder()
if err := dagcbor.Decode(builder, r); err != nil {
node, err := ipld.DecodeStreamingUsingPrototype(r, dagcbor.Decode, proto)
if err != nil {
return nil, err
}
typ := bindnode.Unwrap(builder.Build())
if twpd, ok := typ.(typeWithBindnodePostDecode); ok {
// we have some more work to do
if err = twpd.BindnodePostDecode(); err != nil {
return nil, err
}
}
typ := bindnode.Unwrap(node)
return typ, nil
}

// FromNode converts an datamodel.Node into an appropriate Go type that's provided as
// a pointer via the ptrValue argument
func FromNode(node datamodel.Node, ptrValue interface{}) (interface{}, error) {
func TypeFromNode(node datamodel.Node, ptrValue interface{}) (interface{}, error) {
name := typeName(ptrValue)
proto, err := prototypeFor(name, ptrValue)
if err != nil {
Expand All @@ -133,18 +122,12 @@ func FromNode(node datamodel.Node, ptrValue interface{}) (interface{}, error) {
return nil, err
}
typ := bindnode.Unwrap(builder.Build())
if twpd, ok := typ.(typeWithBindnodePostDecode); ok {
// we have some more work to do
if err = twpd.BindnodePostDecode(); err != nil {
return nil, err
}
}
return typ, nil
}

// ToNode converts a Go type that's provided as a pointer via the ptrValue
// argument to an datamodel.Node.
func ToNode(ptrValue interface{}) (datamodel.Node, error) {
// argument to an schema.TypedNode.
func TypeToNode(ptrValue interface{}) (schema.TypedNode, error) {
name := typeName(ptrValue)
proto, err := prototypeFor(name, ptrValue)
if err != nil {
Expand All @@ -153,13 +136,14 @@ func ToNode(ptrValue interface{}) (datamodel.Node, error) {
return bindnode.Wrap(ptrValue, proto.Type(), bindnodeOptions...), err
}

// NodeToWriter is a utility method that serializes an datamodel.Node as DAG-CBOR to
// a Writer
func NodeToWriter(node datamodel.Node, w io.Writer) error {
if typedNode, ok := node.(schema.TypedNode); ok {
node = typedNode.Representation()
// TypeToWriter is a utility method that serializes a Go type that's provided as a
// pointer via the ptrValue argument as DAG-CBOR to a Writer
func TypeToWriter(ptrValue interface{}, w io.Writer) error {
node, err := TypeToNode(ptrValue)
if err != nil {
return err
}
return dagcbor.Encode(node, w)
return ipld.EncodeStreaming(w, node, dagcbor.Encode)
}

// CborGenCompatibleNode is for cbor-gen / go-ipld-prime compatibility, to
Expand Down
2 changes: 1 addition & 1 deletion storagemarket/impl/clientstates/client_states.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func InitiateDataTransfer(ctx fsm.Context, environment ClientDealEnvironment, de
log.Infof("sending data for a deal %s", deal.ProposalCid)

voucher := requestvalidation.StorageDataTransferVoucher{Proposal: deal.ProposalCid}
node, err := shared.ToNode(&voucher)
node, err := shared.TypeToNode(&voucher)
if err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions storagemarket/impl/dtutils/dtutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func ProviderDataTransferSubscriber(deals EventReceiver) datatransfer.Subscriber
channelState.ChannelID())
return
}
voucherIface, err := shared.FromNode(node.Voucher, &requestvalidation.StorageDataTransferVoucher{})
voucherIface, err := shared.TypeFromNode(node.Voucher, &requestvalidation.StorageDataTransferVoucher{})
// if this event is for a transfer not related to storage, ignore
if err != nil {
log.Debugw("ignoring data-transfer event as it's not storage related", "event", datatransfer.Events[event.Code], "channelID",
Expand Down Expand Up @@ -107,7 +107,7 @@ func ClientDataTransferSubscriber(deals EventReceiver) datatransfer.Subscriber {
channelState.ChannelID())
return
}
voucherIface, err := shared.FromNode(node.Voucher, &requestvalidation.StorageDataTransferVoucher{})
voucherIface, err := shared.TypeFromNode(node.Voucher, &requestvalidation.StorageDataTransferVoucher{})
// if this event is for a transfer not related to storage, ignore
if err != nil {
log.Debugw("ignoring data-transfer event as it's not storage related", "event", datatransfer.Events[event.Code], "channelID",
Expand Down Expand Up @@ -171,7 +171,7 @@ func TransportConfigurer(storeGetter StoreGetter) datatransfer.TransportConfigur
log.Errorf("attempting to configure data store, empty voucher")
return
}
voucherIface, err := shared.FromNode(voucher.Voucher, &requestvalidation.StorageDataTransferVoucher{})
voucherIface, err := shared.TypeFromNode(voucher.Voucher, &requestvalidation.StorageDataTransferVoucher{})
// if this event is for a transfer not related to storage, ignore
if err != nil {
log.Errorf("attempting to configure data store, bad voucher: %s", err)
Expand Down
2 changes: 1 addition & 1 deletion storagemarket/impl/dtutils/dtutils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func storageDataTransferVoucher(t *testing.T, proposalCid cid.Cid) datatransfer.
sdtv := requestvalidation.StorageDataTransferVoucher{
Proposal: proposalCid,
}
node, err := shared.ToNode(&sdtv)
node, err := shared.TypeToNode(&sdtv)
require.NoError(t, err)
return datatransfer.TypedVoucher{Voucher: node, Type: sdtv.Type()}
}
Expand Down
4 changes: 2 additions & 2 deletions storagemarket/impl/ipld_compat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestIpldCompat_StorageDataTransferVoucher(t *testing.T) {
nb := basicnode.Prototype.Any.NewBuilder()
dagcbor.Decode(nb, &originalBuf)
node := nb.Build()
sdtvBindnodeIface, err := shared.FromNode(node, &requestvalidation.StorageDataTransferVoucher{})
sdtvBindnodeIface, err := shared.TypeFromNode(node, &requestvalidation.StorageDataTransferVoucher{})
assert.Nil(t, err)
sdtvBindnode, ok := sdtvBindnodeIface.(*requestvalidation.StorageDataTransferVoucher)
assert.True(t, ok)
Expand All @@ -45,7 +45,7 @@ func TestIpldCompat_StorageDataTransferVoucher(t *testing.T) {
assert.Equal(t, sdtv.Proposal, sdtvBindnode.Proposal)

// encode the new StorageDataTransferVoucher with bindnode to bytes
node, err = shared.ToNode(sdtvBindnode)
node, err = shared.TypeToNode(sdtvBindnode)
assert.Nil(t, err)
var bindnodeBuf bytes.Buffer
dagcbor.Encode(node.(schema.TypedNode).Representation(), &bindnodeBuf)
Expand Down
4 changes: 2 additions & 2 deletions storagemarket/impl/requestvalidation/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func ValidatePush(
baseCid cid.Cid,
Selector datamodel.Node) error {

dealVoucherIface, err := shared.FromNode(voucher, &StorageDataTransferVoucher{})
dealVoucherIface, err := shared.TypeFromNode(voucher, &StorageDataTransferVoucher{})
if err != nil {
return xerrors.Errorf("could not decode StorageDataTransferVoucher: %w", err)
}
Expand Down Expand Up @@ -58,7 +58,7 @@ func ValidatePull(
baseCid cid.Cid,
Selector datamodel.Node) error {

dealVoucherIface, err := shared.FromNode(voucher, &StorageDataTransferVoucher{})
dealVoucherIface, err := shared.TypeFromNode(voucher, &StorageDataTransferVoucher{})
if err != nil {
return xerrors.Errorf("could not decode StorageDataTransferVoucher: %w", err)
}
Expand Down
Loading

0 comments on commit 43e2645

Please sign in to comment.