diff --git a/errors.go b/errors.go new file mode 100644 index 00000000..38c77f46 --- /dev/null +++ b/errors.go @@ -0,0 +1,26 @@ +package datatransfer + +type errorType string + +func (e errorType) Error() string { + return string(e) +} + +// ErrHandlerAlreadySet means an event handler was already set for this instance of +// hooks +const ErrHandlerAlreadySet = errorType("already set event handler") + +// ErrHandlerNotSet means you cannot issue commands to this interface because the +// handler has not been set +const ErrHandlerNotSet = errorType("event handler has not been set") + +// ErrChannelNotFound means the channel this command was issued for does not exist +const ErrChannelNotFound = errorType("channel not found") + +// ErrPause is a special error that the DataReceived / DataSent hooks can +// use to pause the channel +const ErrPause = errorType("pause channel") + +// ErrResume is a special error that the RequestReceived / ResponseReceived hooks can +// use to resume the channel +const ErrResume = errorType("resume channel") diff --git a/events.go b/events.go new file mode 100644 index 00000000..0d4da792 --- /dev/null +++ b/events.go @@ -0,0 +1,94 @@ +package datatransfer + +import "time" + +// EventCode is a name for an event that occurs on a data transfer channel +type EventCode int + +const ( + // Open is an event occurs when a channel is first opened + Open EventCode = iota + + // Accept is an event that emits when the data transfer is first accepted + Accept + + // Progress is an event that gets emitted every time more data is transferred + Progress + + // Cancel indicates one side has cancelled the transfer + Cancel + + // Error is an event that emits when an error occurs in a data transfer + Error + + // CleanupComplete emits when a request is cleaned up + CleanupComplete + + // NewVoucher means we have a new voucher on this channel + NewVoucher + + // NewVoucherResult means we have a new voucher result on this channel + NewVoucherResult + + // PauseInitiator emits when the data sender pauses transfer + PauseInitiator + + // ResumeInitiator emits when the data sender resumes transfer + ResumeInitiator + + // PauseResponder emits when the data receiver pauses transfer + PauseResponder + + // ResumeResponder emits when the data receiver resumes transfer + ResumeResponder + + // FinishTransfer emits when the initiator has completed sending/receiving data + FinishTransfer + + // ResponderCompletes emits when the initiator receives a message that the responder is finished + ResponderCompletes + + // ResponderBeginsFinalization emits when the initiator receives a message that the responder is finilizing + ResponderBeginsFinalization + + // BeginFinalizing emits when the responder completes its operations but awaits a response from the + // initiator + BeginFinalizing + + // Complete is emitted when a data transfer is complete + Complete +) + +// Events are human readable names for data transfer events +var Events = map[EventCode]string{ + Open: "Open", + Accept: "Accept", + Progress: "Progress", + Cancel: "Cancel", + Error: "Error", + CleanupComplete: "CleanupComplete", + NewVoucher: "NewVoucher", + NewVoucherResult: "NewVoucherResult", + PauseInitiator: "PauseInitiator", + ResumeInitiator: "ResumeInitiator", + PauseResponder: "PauseResponder", + ResumeResponder: "ResumeResponder", + FinishTransfer: "FinishTransfer", + ResponderBeginsFinalization: "ResponderBeginsFinalization", + ResponderCompletes: "ResponderCompletes", + BeginFinalizing: "BeginFinalizing", + Complete: "Complete", +} + +// Event is a struct containing information about a data transfer event +type Event struct { + Code EventCode // What type of event it is + Message string // Any clarifying information about the event + Timestamp time.Time // when the event happened +} + +// Subscriber is a callback that is called when events are emitted +type Subscriber func(event Event, channelState ChannelState) + +// Unsubscribe is a function that gets called to unsubscribe from data transfer events +type Unsubscribe func() diff --git a/go.mod b/go.mod index 232f5918..810440e5 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/ipfs/go-blockservice v0.1.3 github.com/ipfs/go-cid v0.0.5 github.com/ipfs/go-datastore v0.4.4 - github.com/ipfs/go-graphsync v0.0.6-0.20200715204712-ef06b3d32e83 + github.com/ipfs/go-graphsync v0.0.6-0.20200721211002-c376cbe14c0a github.com/ipfs/go-ipfs-blockstore v0.1.4 github.com/ipfs/go-ipfs-blocksutil v0.0.1 github.com/ipfs/go-ipfs-chunker v0.0.5 diff --git a/go.sum b/go.sum index 3dbde6ec..21632bcd 100644 --- a/go.sum +++ b/go.sum @@ -156,8 +156,8 @@ github.com/ipfs/go-ds-badger v0.0.5/go.mod h1:g5AuuCGmr7efyzQhLL8MzwqcauPojGPUaH github.com/ipfs/go-ds-badger v0.2.1/go.mod h1:Tx7l3aTph3FMFrRS838dcSJh+jjA7cX9DrGVwx/NOwE= github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIykR4L04tMOYc= github.com/ipfs/go-ds-leveldb v0.4.1/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s= -github.com/ipfs/go-graphsync v0.0.6-0.20200715204712-ef06b3d32e83 h1:tkGDAwcZfzDFeBNyBWYOM02Qw0rGpA2UuCvq49T3K5o= -github.com/ipfs/go-graphsync v0.0.6-0.20200715204712-ef06b3d32e83/go.mod h1:jMXfqIEDFukLPZHqDPp8tJMbHO9Rmeb9CEGevngQbmE= +github.com/ipfs/go-graphsync v0.0.6-0.20200721211002-c376cbe14c0a h1:QViYKbSYNKtfivrYx69UFJiH7HfdE5APQBbIu5fCK3k= +github.com/ipfs/go-graphsync v0.0.6-0.20200721211002-c376cbe14c0a/go.mod h1:jMXfqIEDFukLPZHqDPp8tJMbHO9Rmeb9CEGevngQbmE= github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08= github.com/ipfs/go-ipfs-blockstore v0.1.0/go.mod h1:5aD0AvHPi7mZc6Ci1WCAhiBQu2IsfTduLl+422H6Rqw= github.com/ipfs/go-ipfs-blockstore v0.1.4 h1:2SGI6U1B44aODevza8Rde3+dY30Pb+lbcObe1LETxOQ= diff --git a/impl/events.go b/impl/events.go index e2d6c4e4..ef0c7116 100644 --- a/impl/events.go +++ b/impl/events.go @@ -6,9 +6,7 @@ import ( datatransfer "github.com/filecoin-project/go-data-transfer" "github.com/filecoin-project/go-data-transfer/encoding" - "github.com/filecoin-project/go-data-transfer/message" "github.com/filecoin-project/go-data-transfer/registry" - "github.com/filecoin-project/go-data-transfer/transport" "github.com/ipfs/go-cid" "github.com/ipld/go-ipld-prime" "github.com/libp2p/go-libp2p-core/peer" @@ -51,7 +49,7 @@ func (m *manager) OnDataReceived(chid datatransfer.ChannelID, link ipld.Link, si return nil } -func (m *manager) OnDataSent(chid datatransfer.ChannelID, link ipld.Link, size uint64) (message.DataTransferMessage, error) { +func (m *manager) OnDataSent(chid datatransfer.ChannelID, link ipld.Link, size uint64) (datatransfer.Message, error) { err := m.channels.IncrementSent(chid, size) if err != nil { return nil, err @@ -71,7 +69,7 @@ func (m *manager) OnDataSent(chid datatransfer.ChannelID, link ipld.Link, size u return nil, nil } -func (m *manager) OnRequestReceived(chid datatransfer.ChannelID, request message.DataTransferRequest) (message.DataTransferResponse, error) { +func (m *manager) OnRequestReceived(chid datatransfer.ChannelID, request datatransfer.Request) (datatransfer.Response, error) { if request.IsNew() { return m.receiveNewRequest(chid.Initiator, request) } @@ -95,12 +93,12 @@ func (m *manager) OnRequestReceived(chid datatransfer.ChannelID, request message } if chst.Status() == datatransfer.ResponderPaused || chst.Status() == datatransfer.ResponderFinalizing { - return nil, transport.ErrPause + return nil, datatransfer.ErrPause } return nil, nil } -func (m *manager) OnResponseReceived(chid datatransfer.ChannelID, response message.DataTransferResponse) error { +func (m *manager) OnResponseReceived(chid datatransfer.ChannelID, response datatransfer.Response) error { if response.IsCancel() { return m.channels.Cancel(chid) } @@ -168,22 +166,18 @@ func (m *manager) OnChannelCompleted(chid datatransfer.ChannelID, success bool) func (m *manager) receiveNewRequest( initiator peer.ID, - incoming message.DataTransferRequest) (message.DataTransferResponse, error) { + incoming datatransfer.Request) (datatransfer.Response, error) { result, err := m.acceptRequest(initiator, incoming) msg, msgErr := m.response(true, err, incoming.TransferID(), result) if msgErr != nil { return nil, msgErr } - // convert to the transport error for pauses - if err == datatransfer.ErrPause { - err = transport.ErrPause - } return msg, err } func (m *manager) acceptRequest( initiator peer.ID, - incoming message.DataTransferRequest) (datatransfer.VoucherResult, error) { + incoming datatransfer.Request) (datatransfer.VoucherResult, error) { stor, err := incoming.Selector() if err != nil { @@ -218,6 +212,11 @@ func (m *manager) acceptRequest( if err := m.channels.Accept(chid); err != nil { return result, err } + processor, has := m.transportConfigurers.Processor(voucher.Type()) + if has { + transportConfigurer := processor.(datatransfer.TransportConfigurer) + transportConfigurer(chid, voucher, m.transport) + } m.dataTransferNetwork.Protect(initiator, chid.String()) if voucherErr == datatransfer.ErrPause { err := m.channels.PauseResponder(chid) @@ -235,7 +234,7 @@ func (m *manager) acceptRequest( // * deserialization of selector fails // * validation fails func (m *manager) validateVoucher(sender peer.ID, - incoming message.DataTransferRequest, + incoming datatransfer.Request, isPull bool, baseCid cid.Cid, stor ipld.Node) (datatransfer.Voucher, datatransfer.VoucherResult, error) { @@ -263,7 +262,7 @@ func (m *manager) validateVoucher(sender peer.ID, // * deserialization of selector fails // * validation fails func (m *manager) revalidateVoucher(chid datatransfer.ChannelID, - incoming message.DataTransferRequest) (datatransfer.Voucher, datatransfer.VoucherResult, error) { + incoming datatransfer.Request) (datatransfer.Voucher, datatransfer.VoucherResult, error) { vouch, err := m.decodeVoucher(incoming, m.revalidators) if err != nil { return nil, nil, err @@ -275,7 +274,7 @@ func (m *manager) revalidateVoucher(chid datatransfer.ChannelID, return vouch, result, err } -func (m *manager) processUpdateVoucher(chid datatransfer.ChannelID, request message.DataTransferRequest) (message.DataTransferResponse, error) { +func (m *manager) processUpdateVoucher(chid datatransfer.ChannelID, request datatransfer.Request) (datatransfer.Response, error) { vouch, result, voucherErr := m.revalidateVoucher(chid, request) if vouch != nil { err := m.channels.NewVoucher(chid, vouch) @@ -286,7 +285,7 @@ func (m *manager) processUpdateVoucher(chid datatransfer.ChannelID, request mess return m.processRevalidationResult(chid, result, voucherErr) } -func (m *manager) revalidationResponse(chid datatransfer.ChannelID, result datatransfer.VoucherResult, resultErr error) (message.DataTransferResponse, error) { +func (m *manager) revalidationResponse(chid datatransfer.ChannelID, result datatransfer.VoucherResult, resultErr error) (datatransfer.Response, error) { chst, err := m.channels.GetByID(context.TODO(), chid) if err != nil { return nil, err @@ -297,7 +296,7 @@ func (m *manager) revalidationResponse(chid datatransfer.ChannelID, result datat return m.response(false, resultErr, chid.ID, result) } -func (m *manager) processRevalidationResult(chid datatransfer.ChannelID, result datatransfer.VoucherResult, resultErr error) (message.DataTransferResponse, error) { +func (m *manager) processRevalidationResult(chid datatransfer.ChannelID, result datatransfer.VoucherResult, resultErr error) (datatransfer.Response, error) { vresMessage, err := m.revalidationResponse(chid, result, resultErr) if err != nil { @@ -314,7 +313,7 @@ func (m *manager) processRevalidationResult(chid datatransfer.ChannelID, result if err != nil { return nil, err } - return vresMessage, transport.ErrPause + return vresMessage, datatransfer.ErrPause } if resultErr == nil { @@ -322,12 +321,12 @@ func (m *manager) processRevalidationResult(chid datatransfer.ChannelID, result if err != nil { return nil, err } - return vresMessage, transport.ErrResume + return vresMessage, datatransfer.ErrResume } return vresMessage, resultErr } -func (m *manager) completeMessage(chid datatransfer.ChannelID) (message.DataTransferResponse, error) { +func (m *manager) completeMessage(chid datatransfer.ChannelID) (datatransfer.Response, error) { var result datatransfer.VoucherResult var resultErr error _ = m.revalidators.Each(func(_ datatransfer.TypeIdentifier, _ encoding.Decoder, processor registry.Processor) error { diff --git a/impl/impl.go b/impl/impl.go index 50354051..75858135 100644 --- a/impl/impl.go +++ b/impl/impl.go @@ -19,7 +19,6 @@ import ( "github.com/filecoin-project/go-data-transfer/message" "github.com/filecoin-project/go-data-transfer/network" "github.com/filecoin-project/go-data-transfer/registry" - "github.com/filecoin-project/go-data-transfer/transport" "github.com/filecoin-project/go-storedcounter" "github.com/hannahhoward/go-pubsub" ) @@ -27,15 +26,16 @@ import ( var log = logging.Logger("dt-impl") type manager struct { - dataTransferNetwork network.DataTransferNetwork - validatedTypes *registry.Registry - resultTypes *registry.Registry - revalidators *registry.Registry - pubSub *pubsub.PubSub - channels *channels.Channels - peerID peer.ID - transport transport.Transport - storedCounter *storedcounter.StoredCounter + dataTransferNetwork network.DataTransferNetwork + validatedTypes *registry.Registry + resultTypes *registry.Registry + revalidators *registry.Registry + transportConfigurers *registry.Registry + pubSub *pubsub.PubSub + channels *channels.Channels + peerID peer.ID + transport datatransfer.Transport + storedCounter *storedcounter.StoredCounter } type internalEvent struct { @@ -57,16 +57,17 @@ func dispatcher(evt pubsub.Event, subscriberFn pubsub.SubscriberFn) error { } // NewDataTransfer initializes a new instance of a data transfer manager -func NewDataTransfer(ds datastore.Datastore, dataTransferNetwork network.DataTransferNetwork, transport transport.Transport, storedCounter *storedcounter.StoredCounter) (datatransfer.Manager, error) { +func NewDataTransfer(ds datastore.Datastore, dataTransferNetwork network.DataTransferNetwork, transport datatransfer.Transport, storedCounter *storedcounter.StoredCounter) (datatransfer.Manager, error) { m := &manager{ - dataTransferNetwork: dataTransferNetwork, - validatedTypes: registry.NewRegistry(), - resultTypes: registry.NewRegistry(), - revalidators: registry.NewRegistry(), - pubSub: pubsub.New(dispatcher), - peerID: dataTransferNetwork.ID(), - transport: transport, - storedCounter: storedCounter, + dataTransferNetwork: dataTransferNetwork, + validatedTypes: registry.NewRegistry(), + resultTypes: registry.NewRegistry(), + revalidators: registry.NewRegistry(), + transportConfigurers: registry.NewRegistry(), + pubSub: pubsub.New(dispatcher), + peerID: dataTransferNetwork.ID(), + transport: transport, + storedCounter: storedCounter, } channels, err := channels.New(ds, m.notifier, m.voucherDecoder, m.resultTypes.Decoder, &channelEnvironment{m}) if err != nil { @@ -129,6 +130,11 @@ func (m *manager) OpenPushDataChannel(ctx context.Context, requestTo peer.ID, vo if err != nil { return chid, err } + processor, has := m.transportConfigurers.Processor(voucher.Type()) + if has { + transportConfigurer := processor.(datatransfer.TransportConfigurer) + transportConfigurer(chid, voucher, m.transport) + } m.dataTransferNetwork.Protect(requestTo, chid.String()) if err := m.dataTransferNetwork.SendMessage(ctx, requestTo, req); err != nil { err = fmt.Errorf("Unable to send request: %w", err) @@ -151,6 +157,11 @@ func (m *manager) OpenPullDataChannel(ctx context.Context, requestTo peer.ID, vo if err != nil { return chid, err } + processor, has := m.transportConfigurers.Processor(voucher.Type()) + if has { + transportConfigurer := processor.(datatransfer.TransportConfigurer) + transportConfigurer(chid, voucher, m.transport) + } m.dataTransferNetwork.Protect(requestTo, chid.String()) if err := m.transport.OpenChannel(ctx, requestTo, chid, cidlink.Link{Cid: baseCid}, selector, req); err != nil { err = fmt.Errorf("Unable to send request: %w", err) @@ -204,7 +215,7 @@ func (m *manager) CloseDataTransferChannel(ctx context.Context, chid datatransfe // pause a running data transfer channel func (m *manager) PauseDataTransferChannel(ctx context.Context, chid datatransfer.ChannelID) error { - pausable, ok := m.transport.(transport.PauseableTransport) + pausable, ok := m.transport.(datatransfer.PauseableTransport) if !ok { return errors.New("unsupported") } @@ -225,7 +236,7 @@ func (m *manager) PauseDataTransferChannel(ctx context.Context, chid datatransfe // resume a running data transfer channel func (m *manager) ResumeDataTransferChannel(ctx context.Context, chid datatransfer.ChannelID) error { - pausable, ok := m.transport.(transport.PauseableTransport) + pausable, ok := m.transport.(datatransfer.PauseableTransport) if !ok { return errors.New("unsupported") } @@ -279,3 +290,13 @@ func (m *manager) RegisterVoucherResultType(resultType datatransfer.VoucherResul } return nil } + +// RegisterTransportConfigurer registers the given transport configurer to be run on requests with the given voucher +// type +func (m *manager) RegisterTransportConfigurer(voucherType datatransfer.Voucher, configurer datatransfer.TransportConfigurer) error { + err := m.transportConfigurers.Register(voucherType, configurer) + if err != nil { + return xerrors.Errorf("error registering transport configurer: %w", err) + } + return nil +} diff --git a/impl/initiating_test.go b/impl/initiating_test.go index 400383c5..fa4ccb15 100644 --- a/impl/initiating_test.go +++ b/impl/initiating_test.go @@ -42,7 +42,7 @@ func TestDataTransferInitiating(t *testing.T) { require.Equal(t, messageReceived.PeerID, h.peers[1]) received := messageReceived.Message require.True(t, received.IsRequest()) - receivedRequest, ok := received.(message.DataTransferRequest) + receivedRequest, ok := received.(datatransfer.Request) require.True(t, ok) require.Equal(t, receivedRequest.TransferID(), channelID.ID) require.Equal(t, receivedRequest.BaseCid(), h.baseCid) @@ -69,7 +69,7 @@ func TestDataTransferInitiating(t *testing.T) { require.Equal(t, openChannel.Root, cidlink.Link{Cid: h.baseCid}) require.Equal(t, openChannel.Selector, h.stor) require.True(t, openChannel.Message.IsRequest()) - receivedRequest, ok := openChannel.Message.(message.DataTransferRequest) + receivedRequest, ok := openChannel.Message.(datatransfer.Request) require.True(t, ok) require.Equal(t, receivedRequest.TransferID(), channelID.ID) require.Equal(t, receivedRequest.BaseCid(), h.baseCid) @@ -98,7 +98,7 @@ func TestDataTransferInitiating(t *testing.T) { require.Len(t, h.network.SentMessages, 2) received := h.network.SentMessages[1].Message require.True(t, received.IsRequest()) - receivedRequest, ok := received.(message.DataTransferRequest) + receivedRequest, ok := received.(datatransfer.Request) require.True(t, ok) require.True(t, receivedRequest.IsVoucher()) require.False(t, receivedRequest.IsCancel()) @@ -117,7 +117,7 @@ func TestDataTransferInitiating(t *testing.T) { require.Len(t, h.network.SentMessages, 1) received := h.network.SentMessages[0].Message require.True(t, received.IsRequest()) - receivedRequest, ok := received.(message.DataTransferRequest) + receivedRequest, ok := received.(datatransfer.Request) require.True(t, ok) require.False(t, receivedRequest.IsCancel()) require.True(t, receivedRequest.IsVoucher()) @@ -273,6 +273,46 @@ func TestDataTransferInitiating(t *testing.T) { require.Equal(t, cancelMessage.TransferID(), channelID.ID) }, }, + "customizing push transfer": { + expectedEvents: []datatransfer.EventCode{datatransfer.Open}, + verify: func(t *testing.T, h *harness) { + err := h.dt.RegisterTransportConfigurer(h.voucher, func(channelID datatransfer.ChannelID, voucher datatransfer.Voucher, transport datatransfer.Transport) { + ft, ok := transport.(*testutil.FakeTransport) + if !ok { + return + } + ft.RecordCustomizedTransfer(channelID, voucher) + }) + require.NoError(t, err) + channelID, err := h.dt.OpenPushDataChannel(h.ctx, h.peers[1], h.voucher, h.baseCid, h.stor) + require.NoError(t, err) + require.NotEmpty(t, channelID) + require.Len(t, h.transport.CustomizedTransfers, 1) + customizedTransfer := h.transport.CustomizedTransfers[0] + require.Equal(t, channelID, customizedTransfer.ChannelID) + require.Equal(t, h.voucher, customizedTransfer.Voucher) + }, + }, + "customizing pull transfer": { + expectedEvents: []datatransfer.EventCode{datatransfer.Open}, + verify: func(t *testing.T, h *harness) { + err := h.dt.RegisterTransportConfigurer(h.voucher, func(channelID datatransfer.ChannelID, voucher datatransfer.Voucher, transport datatransfer.Transport) { + ft, ok := transport.(*testutil.FakeTransport) + if !ok { + return + } + ft.RecordCustomizedTransfer(channelID, voucher) + }) + require.NoError(t, err) + channelID, err := h.dt.OpenPullDataChannel(h.ctx, h.peers[1], h.voucher, h.baseCid, h.stor) + require.NoError(t, err) + require.NotEmpty(t, channelID) + require.Len(t, h.transport.CustomizedTransfers, 1) + customizedTransfer := h.transport.CustomizedTransfers[0] + require.Equal(t, channelID, customizedTransfer.ChannelID) + require.Equal(t, h.voucher, customizedTransfer.Voucher) + }, + }, } for testCase, verify := range testCases { t.Run(testCase, func(t *testing.T) { diff --git a/impl/integration_test.go b/impl/integration_test.go index 3b3e34d1..a6c180a4 100644 --- a/impl/integration_test.go +++ b/impl/integration_test.go @@ -13,11 +13,19 @@ import ( "github.com/filecoin-project/go-data-transfer/message" "github.com/filecoin-project/go-data-transfer/network" "github.com/filecoin-project/go-data-transfer/testutil" - "github.com/filecoin-project/go-data-transfer/transport" tp "github.com/filecoin-project/go-data-transfer/transport/graphsync" "github.com/filecoin-project/go-data-transfer/transport/graphsync/extension" + "github.com/ipfs/go-blockservice" + "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/namespace" + dss "github.com/ipfs/go-datastore/sync" "github.com/ipfs/go-graphsync" gsmsg "github.com/ipfs/go-graphsync/message" + "github.com/ipfs/go-graphsync/storeutil" + bstore "github.com/ipfs/go-ipfs-blockstore" + offline "github.com/ipfs/go-ipfs-exchange-offline" + ipldformat "github.com/ipfs/go-ipld-format" + "github.com/ipfs/go-merkledag" "github.com/ipld/go-ipld-prime" cidlink "github.com/ipld/go-ipld-prime/linking/cid" "github.com/libp2p/go-libp2p-core/peer" @@ -27,11 +35,40 @@ import ( func TestRoundTrip(t *testing.T) { ctx := context.Background() - testCases := map[string]bool{ - "roundtrip for push requests": false, - "roundtrip for pull requests": true, + testCases := map[string]struct { + isPull bool + customSourceStore bool + customTargetStore bool + }{ + "roundtrip for push requests": {}, + "roundtrip for pull requests": { + isPull: true, + }, + "custom source, push": { + customSourceStore: true, + }, + "custom source, pull": { + isPull: true, + customSourceStore: true, + }, + "custom dest, push": { + customTargetStore: true, + }, + "custom dest, pull": { + isPull: true, + customTargetStore: true, + }, + "custom both sides, push": { + customSourceStore: true, + customTargetStore: true, + }, + "custom both sides, pull": { + isPull: true, + customSourceStore: true, + customTargetStore: true, + }, } - for testCase, isPull := range testCases { + for testCase, data := range testCases { t.Run(testCase, func(t *testing.T) { ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() @@ -40,8 +77,6 @@ func TestRoundTrip(t *testing.T) { host1 := gsData.Host1 // initiator, data sender host2 := gsData.Host2 // data recipient - root := gsData.LoadUnixFSFile(t, false) - rootCid := root.(cidlink.Link).Cid tp1 := gsData.SetupGSTransportHost1() tp2 := gsData.SetupGSTransportHost2() @@ -53,6 +88,7 @@ func TestRoundTrip(t *testing.T) { require.NoError(t, err) err = dt2.Start(ctx) require.NoError(t, err) + finished := make(chan struct{}, 2) errChan := make(chan struct{}, 2) opened := make(chan struct{}, 2) @@ -81,8 +117,54 @@ func TestRoundTrip(t *testing.T) { voucher := testutil.FakeDTType{Data: "applesauce"} sv := testutil.NewStubbedValidator() + var sourceDagService ipldformat.DAGService + if data.customSourceStore { + ds := dss.MutexWrap(datastore.NewMapDatastore()) + bs := bstore.NewBlockstore(namespace.Wrap(ds, datastore.NewKey("blockstore"))) + loader := storeutil.LoaderForBlockstore(bs) + storer := storeutil.StorerForBlockstore(bs) + sourceDagService = merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs))) + err := dt1.RegisterTransportConfigurer(&testutil.FakeDTType{}, func(channelID datatransfer.ChannelID, testVoucher datatransfer.Voucher, transport datatransfer.Transport) { + fv, ok := testVoucher.(*testutil.FakeDTType) + if ok && fv.Data == voucher.Data { + gsTransport, ok := transport.(*tp.Transport) + if ok { + err := gsTransport.UseStore(channelID, loader, storer) + require.NoError(t, err) + } + } + }) + require.NoError(t, err) + } else { + sourceDagService = gsData.DagService1 + } + root, origBytes := testutil.LoadUnixFSFile(ctx, t, sourceDagService) + rootCid := root.(cidlink.Link).Cid + + var destDagService ipldformat.DAGService + if data.customTargetStore { + ds := dss.MutexWrap(datastore.NewMapDatastore()) + bs := bstore.NewBlockstore(namespace.Wrap(ds, datastore.NewKey("blockstore"))) + loader := storeutil.LoaderForBlockstore(bs) + storer := storeutil.StorerForBlockstore(bs) + destDagService = merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs))) + err := dt2.RegisterTransportConfigurer(&testutil.FakeDTType{}, func(channelID datatransfer.ChannelID, testVoucher datatransfer.Voucher, transport datatransfer.Transport) { + fv, ok := testVoucher.(*testutil.FakeDTType) + if ok && fv.Data == voucher.Data { + gsTransport, ok := transport.(*tp.Transport) + if ok { + err := gsTransport.UseStore(channelID, loader, storer) + require.NoError(t, err) + } + } + }) + require.NoError(t, err) + } else { + destDagService = gsData.DagService2 + } + var chid datatransfer.ChannelID - if isPull { + if data.isPull { sv.ExpectSuccessPull() require.NoError(t, dt1.RegisterVoucherType(&testutil.FakeDTType{}, sv)) chid, err = dt2.OpenPullDataChannel(ctx, host1.ID(), &voucher, rootCid, gsData.AllSelector) @@ -113,8 +195,8 @@ func TestRoundTrip(t *testing.T) { } } require.Equal(t, sentIncrements, receivedIncrements) - gsData.VerifyFileTransferred(t, root, true) - if isPull { + testutil.VerifyHasFile(ctx, t, destDagService, root, origBytes) + if data.isPull { assert.Equal(t, chid.Initiator, host2.ID()) } else { assert.Equal(t, chid.Initiator, host1.ID()) @@ -600,7 +682,7 @@ func TestRespondingToPushGraphsyncRequests(t *testing.T) { t.Fatal("did not receive message sent") case messageReceived = <-r.messageReceived: } - requestReceived := messageReceived.message.(message.DataTransferRequest) + requestReceived := messageReceived.message.(datatransfer.Request) var buf bytes.Buffer response, err := message.NewResponse(requestReceived.TransferID(), true, false, voucherResult.Type(), voucherResult) @@ -702,10 +784,10 @@ func TestRespondingToPullGraphsyncRequests(t *testing.T) { //create network ctx := context.Background() testCases := map[string]struct { - test func(*testing.T, *testutil.GraphsyncTestingData, transport.Transport, ipld.Link, datatransfer.TransferID, *fakeGraphSyncReceiver) + test func(*testing.T, *testutil.GraphsyncTestingData, datatransfer.Transport, ipld.Link, datatransfer.TransferID, *fakeGraphSyncReceiver) }{ "When a pull request is initiated and validated": { - test: func(t *testing.T, gsData *testutil.GraphsyncTestingData, tp2 transport.Transport, link ipld.Link, id datatransfer.TransferID, gsr *fakeGraphSyncReceiver) { + test: func(t *testing.T, gsData *testutil.GraphsyncTestingData, tp2 datatransfer.Transport, link ipld.Link, id datatransfer.TransferID, gsr *fakeGraphSyncReceiver) { sv := testutil.NewStubbedValidator() sv.ExpectSuccessPull() @@ -737,7 +819,7 @@ func TestRespondingToPullGraphsyncRequests(t *testing.T) { }, }, "When request is initiated, but fails validation": { - test: func(t *testing.T, gsData *testutil.GraphsyncTestingData, tp2 transport.Transport, link ipld.Link, id datatransfer.TransferID, gsr *fakeGraphSyncReceiver) { + test: func(t *testing.T, gsData *testutil.GraphsyncTestingData, tp2 datatransfer.Transport, link ipld.Link, id datatransfer.TransferID, gsr *fakeGraphSyncReceiver) { sv := testutil.NewStubbedValidator() sv.ExpectErrorPull() dt1, err := NewDataTransfer(gsData.DtDs2, gsData.DtNet2, tp2, gsData.StoredCounter2) @@ -794,7 +876,7 @@ func TestRespondingToPullGraphsyncRequests(t *testing.T) { } type receivedMessage struct { - message message.DataTransferMessage + message datatransfer.Message sender peer.ID } @@ -806,7 +888,7 @@ type receiver struct { func (r *receiver) ReceiveRequest( ctx context.Context, sender peer.ID, - incoming message.DataTransferRequest) { + incoming datatransfer.Request) { select { case <-ctx.Done(): @@ -817,7 +899,7 @@ func (r *receiver) ReceiveRequest( func (r *receiver) ReceiveResponse( ctx context.Context, sender peer.ID, - incoming message.DataTransferResponse) { + incoming datatransfer.Response) { select { case <-ctx.Done(): diff --git a/impl/receiver.go b/impl/receiver.go index 0c87b4da..fb0192f9 100644 --- a/impl/receiver.go +++ b/impl/receiver.go @@ -7,8 +7,6 @@ import ( "github.com/libp2p/go-libp2p-core/peer" datatransfer "github.com/filecoin-project/go-data-transfer" - "github.com/filecoin-project/go-data-transfer/message" - "github.com/filecoin-project/go-data-transfer/transport" ) type receiver struct { @@ -20,24 +18,24 @@ type receiver struct { func (r *receiver) ReceiveRequest( ctx context.Context, initiator peer.ID, - incoming message.DataTransferRequest) { + incoming datatransfer.Request) { err := r.receiveRequest(ctx, initiator, incoming) if err != nil { log.Warn(err) } } -func (r *receiver) receiveRequest(ctx context.Context, initiator peer.ID, incoming message.DataTransferRequest) error { +func (r *receiver) receiveRequest(ctx context.Context, initiator peer.ID, incoming datatransfer.Request) error { chid := datatransfer.ChannelID{Initiator: initiator, Responder: r.manager.peerID, ID: incoming.TransferID()} response, receiveErr := r.manager.OnRequestReceived(chid, incoming) - if receiveErr == transport.ErrResume { + if receiveErr == datatransfer.ErrResume { chst, err := r.manager.channels.GetByID(ctx, chid) if err != nil { return err } if resumeTransportStatesResponder.Contains(chst.Status()) { - return r.manager.transport.(transport.PauseableTransport).ResumeChannel(ctx, response, chid) + return r.manager.transport.(datatransfer.PauseableTransport).ResumeChannel(ctx, response, chid) } receiveErr = nil } @@ -55,8 +53,8 @@ func (r *receiver) receiveRequest(ctx context.Context, initiator peer.ID, incomi } } - if receiveErr == transport.ErrPause { - return r.manager.transport.(transport.PauseableTransport).PauseChannel(ctx, chid) + if receiveErr == datatransfer.ErrPause { + return r.manager.transport.(datatransfer.PauseableTransport).PauseChannel(ctx, chid) } if receiveErr != nil { @@ -72,7 +70,7 @@ func (r *receiver) receiveRequest(ctx context.Context, initiator peer.ID, incomi func (r *receiver) ReceiveResponse( ctx context.Context, sender peer.ID, - incoming message.DataTransferResponse) { + incoming datatransfer.Response) { err := r.receiveResponse(ctx, sender, incoming) if err != nil { log.Error(err) @@ -81,11 +79,11 @@ func (r *receiver) ReceiveResponse( func (r *receiver) receiveResponse( ctx context.Context, sender peer.ID, - incoming message.DataTransferResponse) error { + incoming datatransfer.Response) error { chid := datatransfer.ChannelID{Initiator: r.manager.peerID, Responder: sender, ID: incoming.TransferID()} err := r.manager.OnResponseReceived(chid, incoming) - if err == transport.ErrPause { - return r.manager.transport.(transport.PauseableTransport).PauseChannel(ctx, chid) + if err == datatransfer.ErrPause { + return r.manager.transport.(datatransfer.PauseableTransport).PauseChannel(ctx, chid) } if err != nil { _ = r.manager.transport.CloseChannel(ctx, chid) diff --git a/impl/responding_test.go b/impl/responding_test.go index 66efcdaf..7ff9b4f9 100644 --- a/impl/responding_test.go +++ b/impl/responding_test.go @@ -19,7 +19,6 @@ import ( . "github.com/filecoin-project/go-data-transfer/impl" "github.com/filecoin-project/go-data-transfer/message" "github.com/filecoin-project/go-data-transfer/testutil" - "github.com/filecoin-project/go-data-transfer/transport" "github.com/filecoin-project/go-storedcounter" ) @@ -55,7 +54,7 @@ func TestDataTransferResponding(t *testing.T) { require.Equal(t, openChannel.Root, cidlink.Link{Cid: h.baseCid}) require.Equal(t, openChannel.Selector, h.stor) require.False(t, openChannel.Message.IsRequest()) - response, ok := openChannel.Message.(message.DataTransferResponse) + response, ok := openChannel.Message.(datatransfer.Response) require.True(t, ok) require.True(t, response.Accepted()) require.Equal(t, response.TransferID(), h.id) @@ -76,7 +75,7 @@ func TestDataTransferResponding(t *testing.T) { require.Len(t, h.network.SentMessages, 1) responseMessage := h.network.SentMessages[0].Message require.False(t, responseMessage.IsRequest()) - response, ok := responseMessage.(message.DataTransferResponse) + response, ok := responseMessage.(datatransfer.Response) require.True(t, ok) require.False(t, response.Accepted()) require.Equal(t, response.TransferID(), h.id) @@ -102,7 +101,7 @@ func TestDataTransferResponding(t *testing.T) { require.Equal(t, openChannel.Root, cidlink.Link{Cid: h.baseCid}) require.Equal(t, openChannel.Selector, h.stor) require.False(t, openChannel.Message.IsRequest()) - response, ok := openChannel.Message.(message.DataTransferResponse) + response, ok := openChannel.Message.(datatransfer.Response) require.True(t, ok) require.True(t, response.Accepted()) require.Equal(t, response.TransferID(), h.id) @@ -161,7 +160,7 @@ func TestDataTransferResponding(t *testing.T) { }, verify: func(t *testing.T, h *receiverHarness) { response, err := h.transport.EventHandler.OnRequestReceived(channelID(h.id, h.peers), h.pullRequest) - require.EqualError(t, err, transport.ErrPause.Error()) + require.EqualError(t, err, datatransfer.ErrPause.Error()) require.True(t, response.Accepted()) require.Equal(t, response.TransferID(), h.id) @@ -198,7 +197,7 @@ func TestDataTransferResponding(t *testing.T) { verify: func(t *testing.T, h *receiverHarness) { h.network.Delegate.ReceiveRequest(h.ctx, h.peers[1], h.pushRequest) _, err := h.transport.EventHandler.OnRequestReceived(channelID(h.id, h.peers), h.voucherUpdate) - require.EqualError(t, err, transport.ErrResume.Error()) + require.EqualError(t, err, datatransfer.ErrResume.Error()) }, }, "receive pause, unpause": { @@ -239,7 +238,7 @@ func TestDataTransferResponding(t *testing.T) { err = h.dt.PauseDataTransferChannel(h.ctx, channelID(h.id, h.peers)) require.NoError(t, err) _, err = h.transport.EventHandler.OnRequestReceived(channelID(h.id, h.peers), h.resumeUpdate) - require.EqualError(t, err, transport.ErrPause.Error()) + require.EqualError(t, err, datatransfer.ErrPause.Error()) }, }, "receive cancel": { @@ -284,9 +283,9 @@ func TestDataTransferResponding(t *testing.T) { channelID(h.id, h.peers), cidlink.Link{Cid: testutil.GenerateCids(1)[0]}, 12345) - require.EqualError(t, err, transport.ErrPause.Error()) + require.EqualError(t, err, datatransfer.ErrPause.Error()) require.Len(t, h.network.SentMessages, 1) - response, ok := h.network.SentMessages[0].Message.(message.DataTransferResponse) + response, ok := h.network.SentMessages[0].Message.(datatransfer.Response) require.True(t, ok) require.True(t, response.Accepted()) require.Equal(t, response.TransferID(), h.id) @@ -296,7 +295,7 @@ func TestDataTransferResponding(t *testing.T) { require.True(t, response.IsVoucherResult()) require.False(t, response.EmptyVoucherResult()) response, err = h.transport.EventHandler.OnRequestReceived(channelID(h.id, h.peers), h.voucherUpdate) - require.EqualError(t, err, transport.ErrResume.Error()) + require.EqualError(t, err, datatransfer.ErrResume.Error()) require.True(t, response.Accepted()) require.Equal(t, response.TransferID(), h.id) require.False(t, response.IsUpdate()) @@ -330,7 +329,7 @@ func TestDataTransferResponding(t *testing.T) { 12345) require.Error(t, err) require.Len(t, h.network.SentMessages, 1) - response, ok := h.network.SentMessages[0].Message.(message.DataTransferResponse) + response, ok := h.network.SentMessages[0].Message.(datatransfer.Response) require.True(t, ok) require.False(t, response.Accepted()) require.Equal(t, response.TransferID(), h.id) @@ -368,9 +367,9 @@ func TestDataTransferResponding(t *testing.T) { channelID(h.id, h.peers), cidlink.Link{Cid: testutil.GenerateCids(1)[0]}, 12345) - require.EqualError(t, err, transport.ErrPause.Error()) + require.EqualError(t, err, datatransfer.ErrPause.Error()) require.Len(t, h.network.SentMessages, 1) - response, ok := h.network.SentMessages[0].Message.(message.DataTransferResponse) + response, ok := h.network.SentMessages[0].Message.(datatransfer.Response) require.True(t, ok) require.True(t, response.Accepted()) require.Equal(t, response.TransferID(), h.id) @@ -419,8 +418,8 @@ func TestDataTransferResponding(t *testing.T) { channelID(h.id, h.peers), cidlink.Link{Cid: testutil.GenerateCids(1)[0]}, 12345) - require.EqualError(t, err, transport.ErrPause.Error()) - response, ok := msg.(message.DataTransferResponse) + require.EqualError(t, err, datatransfer.ErrPause.Error()) + response, ok := msg.(datatransfer.Response) require.True(t, ok) require.True(t, response.Accepted()) require.Equal(t, response.TransferID(), h.id) @@ -430,7 +429,7 @@ func TestDataTransferResponding(t *testing.T) { require.True(t, response.IsVoucherResult()) require.False(t, response.EmptyVoucherResult()) response, err = h.transport.EventHandler.OnRequestReceived(channelID(h.id, h.peers), h.voucherUpdate) - require.EqualError(t, err, transport.ErrResume.Error()) + require.EqualError(t, err, datatransfer.ErrResume.Error()) require.True(t, response.Accepted()) require.Equal(t, response.TransferID(), h.id) require.False(t, response.IsUpdate()) @@ -466,7 +465,7 @@ func TestDataTransferResponding(t *testing.T) { err = h.transport.EventHandler.OnChannelCompleted(channelID(h.id, h.peers), true) require.NoError(t, err) require.Len(t, h.network.SentMessages, 1) - response, ok := h.network.SentMessages[0].Message.(message.DataTransferResponse) + response, ok := h.network.SentMessages[0].Message.(datatransfer.Response) require.True(t, ok) require.True(t, response.Accepted()) require.Equal(t, response.TransferID(), h.id) @@ -476,12 +475,56 @@ func TestDataTransferResponding(t *testing.T) { require.True(t, response.IsVoucherResult()) require.False(t, response.EmptyVoucherResult()) response, err = h.transport.EventHandler.OnRequestReceived(channelID(h.id, h.peers), h.voucherUpdate) - require.EqualError(t, err, transport.ErrResume.Error()) + require.EqualError(t, err, datatransfer.ErrResume.Error()) require.Equal(t, response.TransferID(), h.id) require.True(t, response.IsVoucherResult()) require.False(t, response.IsPaused()) }, }, + "new push request, customized transport": { + expectedEvents: []datatransfer.EventCode{datatransfer.Open, datatransfer.NewVoucherResult, datatransfer.Accept}, + configureValidator: func(sv *testutil.StubbedValidator) { + sv.ExpectSuccessPush() + sv.StubResult(testutil.NewFakeDTType()) + }, + verify: func(t *testing.T, h *receiverHarness) { + err := h.dt.RegisterTransportConfigurer(h.voucher, func(channelID datatransfer.ChannelID, voucher datatransfer.Voucher, transport datatransfer.Transport) { + ft, ok := transport.(*testutil.FakeTransport) + if !ok { + return + } + ft.RecordCustomizedTransfer(channelID, voucher) + }) + require.NoError(t, err) + h.network.Delegate.ReceiveRequest(h.ctx, h.peers[1], h.pushRequest) + require.Len(t, h.transport.CustomizedTransfers, 1) + customizedTransfer := h.transport.CustomizedTransfers[0] + require.Equal(t, channelID(h.id, h.peers), customizedTransfer.ChannelID) + require.Equal(t, h.voucher, customizedTransfer.Voucher) + }, + }, + "new pull request, customized transport": { + expectedEvents: []datatransfer.EventCode{datatransfer.Open, datatransfer.Accept}, + configureValidator: func(sv *testutil.StubbedValidator) { + sv.ExpectSuccessPull() + }, + verify: func(t *testing.T, h *receiverHarness) { + err := h.dt.RegisterTransportConfigurer(h.voucher, func(channelID datatransfer.ChannelID, voucher datatransfer.Voucher, transport datatransfer.Transport) { + ft, ok := transport.(*testutil.FakeTransport) + if !ok { + return + } + ft.RecordCustomizedTransfer(channelID, voucher) + }) + require.NoError(t, err) + _, err = h.transport.EventHandler.OnRequestReceived(channelID(h.id, h.peers), h.pullRequest) + require.NoError(t, err) + require.Len(t, h.transport.CustomizedTransfers, 1) + customizedTransfer := h.transport.CustomizedTransfers[0] + require.Equal(t, channelID(h.id, h.peers), customizedTransfer.ChannelID) + require.Equal(t, h.voucher, customizedTransfer.Voucher) + }, + }, } for testCase, verify := range testCases { t.Run(testCase, func(t *testing.T) { @@ -541,12 +584,12 @@ func TestDataTransferResponding(t *testing.T) { type receiverHarness struct { id datatransfer.TransferID - pushRequest message.DataTransferRequest - pullRequest message.DataTransferRequest - voucherUpdate message.DataTransferRequest - pauseUpdate message.DataTransferRequest - resumeUpdate message.DataTransferRequest - cancelUpdate message.DataTransferRequest + pushRequest datatransfer.Request + pullRequest datatransfer.Request + voucherUpdate datatransfer.Request + pauseUpdate datatransfer.Request + resumeUpdate datatransfer.Request + cancelUpdate datatransfer.Request ctx context.Context peers []peer.ID network *testutil.FakeNetwork diff --git a/impl/utils.go b/impl/utils.go index 7a07ca2b..508c2482 100644 --- a/impl/utils.go +++ b/impl/utils.go @@ -30,7 +30,7 @@ var resumeTransportStatesResponder = statusList{ } // newRequest encapsulates message creation -func (m *manager) newRequest(ctx context.Context, selector ipld.Node, isPull bool, voucher datatransfer.Voucher, baseCid cid.Cid, to peer.ID) (message.DataTransferRequest, error) { +func (m *manager) newRequest(ctx context.Context, selector ipld.Node, isPull bool, voucher datatransfer.Voucher, baseCid cid.Cid, to peer.ID) (datatransfer.Request, error) { next, err := m.storedCounter.Next() if err != nil { return nil, err @@ -39,7 +39,7 @@ func (m *manager) newRequest(ctx context.Context, selector ipld.Node, isPull boo return message.NewRequest(tid, isPull, voucher.Type(), voucher, baseCid, selector) } -func (m *manager) response(isNew bool, err error, tid datatransfer.TransferID, voucherResult datatransfer.VoucherResult) (message.DataTransferResponse, error) { +func (m *manager) response(isNew bool, err error, tid datatransfer.TransferID, voucherResult datatransfer.VoucherResult) (datatransfer.Response, error) { isAccepted := err == nil || err == datatransfer.ErrPause isPaused := err == datatransfer.ErrPause resultType := datatransfer.EmptyTypeIdentifier @@ -52,7 +52,7 @@ func (m *manager) response(isNew bool, err error, tid datatransfer.TransferID, v return message.VoucherResultResponse(tid, isAccepted, isPaused, resultType, voucherResult) } -func (m *manager) completeResponse(err error, tid datatransfer.TransferID, voucherResult datatransfer.VoucherResult) (message.DataTransferResponse, error) { +func (m *manager) completeResponse(err error, tid datatransfer.TransferID, voucherResult datatransfer.VoucherResult) (datatransfer.Response, error) { isAccepted := err == nil || err == datatransfer.ErrPause isPaused := err == datatransfer.ErrPause resultType := datatransfer.EmptyTypeIdentifier @@ -90,28 +90,28 @@ func (m *manager) pauseOther(chid datatransfer.ChannelID) error { return m.channels.PauseResponder(chid) } -func (m *manager) resumeMessage(chid datatransfer.ChannelID) message.DataTransferMessage { +func (m *manager) resumeMessage(chid datatransfer.ChannelID) datatransfer.Message { if chid.Initiator == m.peerID { return message.UpdateRequest(chid.ID, false) } return message.UpdateResponse(chid.ID, false) } -func (m *manager) pauseMessage(chid datatransfer.ChannelID) message.DataTransferMessage { +func (m *manager) pauseMessage(chid datatransfer.ChannelID) datatransfer.Message { if chid.Initiator == m.peerID { return message.UpdateRequest(chid.ID, true) } return message.UpdateResponse(chid.ID, true) } -func (m *manager) cancelMessage(chid datatransfer.ChannelID) message.DataTransferMessage { +func (m *manager) cancelMessage(chid datatransfer.ChannelID) datatransfer.Message { if chid.Initiator == m.peerID { return message.CancelRequest(chid.ID) } return message.CancelResponse(chid.ID) } -func (m *manager) decodeVoucherResult(response message.DataTransferResponse) (datatransfer.VoucherResult, error) { +func (m *manager) decodeVoucherResult(response datatransfer.Response) (datatransfer.VoucherResult, error) { vtypStr := datatransfer.TypeIdentifier(response.VoucherResultType()) decoder, has := m.resultTypes.Decoder(vtypStr) if !has { @@ -124,7 +124,7 @@ func (m *manager) decodeVoucherResult(response message.DataTransferResponse) (da return encodable.(datatransfer.Registerable), nil } -func (m *manager) decodeVoucher(request message.DataTransferRequest, registry *registry.Registry) (datatransfer.Voucher, error) { +func (m *manager) decodeVoucher(request datatransfer.Request, registry *registry.Registry) (datatransfer.Voucher, error) { vtypStr := datatransfer.TypeIdentifier(request.VoucherType()) decoder, has := registry.Decoder(vtypStr) if !has { diff --git a/manager.go b/manager.go new file mode 100644 index 00000000..1658f0c6 --- /dev/null +++ b/manager.go @@ -0,0 +1,111 @@ +package datatransfer + +import ( + "context" + + "github.com/ipfs/go-cid" + "github.com/ipld/go-ipld-prime" + "github.com/libp2p/go-libp2p-core/peer" +) + +// RequestValidator is an interface implemented by the client of the +// data transfer module to validate requests +type RequestValidator interface { + // ValidatePush validates a push request received from the peer that will send data + ValidatePush( + sender peer.ID, + voucher Voucher, + baseCid cid.Cid, + selector ipld.Node) (VoucherResult, error) + // ValidatePull validates a pull request received from the peer that will receive data + ValidatePull( + receiver peer.ID, + voucher Voucher, + baseCid cid.Cid, + selector ipld.Node) (VoucherResult, error) +} + +// Revalidator is a request validator revalidates in progress requests +// by requesting request additional vouchers, and resuming when it receives them +type Revalidator interface { + // Revalidate revalidates a request with a new voucher + Revalidate(channelID ChannelID, voucher Voucher) (VoucherResult, error) + // OnPullDataSent is called on the responder side when more bytes are sent + // for a given pull request. It should return a VoucherResult + ErrPause to + // request revalidation or nil to continue uninterrupted, + // other errors will terminate the request + OnPullDataSent(chid ChannelID, additionalBytesSent uint64) (VoucherResult, error) + // OnPushDataReceived is called on the responder side when more bytes are received + // for a given push request. It should return a VoucherResult + ErrPause to + // request revalidation or nil to continue uninterrupted, + // other errors will terminate the request + OnPushDataReceived(chid ChannelID, additionalBytesReceived uint64) (VoucherResult, error) + // OnComplete is called to make a final request for revalidation -- often for the + // purpose of settlement. + // if VoucherResult is non nil, the request will enter a settlement phase awaiting + // a final update + OnComplete(chid ChannelID) (VoucherResult, error) +} + +// TransportConfigurer provides a mechanism to provide transport specific configuration for a given voucher type +type TransportConfigurer func(chid ChannelID, voucher Voucher, transport Transport) + +// Manager is the core interface presented by all implementations of +// of the data transfer sub system +type Manager interface { + + // Start initializes data transfer processing + Start(ctx context.Context) error + + // Stop terminates all data transfers and ends processing + Stop() error + + // RegisterVoucherType registers a validator for the given voucher type + // will error if voucher type does not implement voucher + // or if there is a voucher type registered with an identical identifier + RegisterVoucherType(voucherType Voucher, validator RequestValidator) error + + // RegisterRevalidator registers a revalidator for the given voucher type + // Note: this is the voucher type used to revalidate. It can share a name + // with the initial validator type and CAN be the same type, or a different type. + // The revalidator can simply be the sampe as the original request validator, + // or a different validator that satisfies the revalidator interface. + RegisterRevalidator(voucherType Voucher, revalidator Revalidator) error + + // RegisterVoucherResultType allows deserialization of a voucher result, + // so that a listener can read the metadata + RegisterVoucherResultType(resultType VoucherResult) error + + // RegisterTransportConfigurer registers the given transport configurer to be run on requests with the given voucher + // type + RegisterTransportConfigurer(voucherType Voucher, configurer TransportConfigurer) error + + // open a data transfer that will send data to the recipient peer and + // transfer parts of the piece that match the selector + OpenPushDataChannel(ctx context.Context, to peer.ID, voucher Voucher, baseCid cid.Cid, selector ipld.Node) (ChannelID, error) + + // open a data transfer that will request data from the sending peer and + // transfer parts of the piece that match the selector + OpenPullDataChannel(ctx context.Context, to peer.ID, voucher Voucher, baseCid cid.Cid, selector ipld.Node) (ChannelID, error) + + // send an intermediate voucher as needed when the receiver sends a request for revalidation + SendVoucher(ctx context.Context, chid ChannelID, voucher Voucher) error + + // close an open channel (effectively a cancel) + CloseDataTransferChannel(ctx context.Context, chid ChannelID) error + + // pause a data transfer channel (only allowed if transport supports it) + PauseDataTransferChannel(ctx context.Context, chid ChannelID) error + + // resume a data transfer channel (only allowed if transport supports it) + ResumeDataTransferChannel(ctx context.Context, chid ChannelID) error + + // get status of a transfer + TransferChannelStatus(ctx context.Context, x ChannelID) Status + + // get notified when certain types of events happen + SubscribeToEvents(subscriber Subscriber) Unsubscribe + + // get all in progress transfers + InProgressChannels(ctx context.Context) (map[ChannelID]ChannelState, error) +} diff --git a/message.go b/message.go new file mode 100644 index 00000000..7ef4ce97 --- /dev/null +++ b/message.go @@ -0,0 +1,46 @@ +package datatransfer + +import ( + "io" + + "github.com/filecoin-project/go-data-transfer/encoding" + "github.com/ipfs/go-cid" + "github.com/ipld/go-ipld-prime" + cborgen "github.com/whyrusleeping/cbor-gen" +) + +// Message is a message for the data transfer protocol +// (either request or response) that can serialize to a protobuf +type Message interface { + IsRequest() bool + IsNew() bool + IsUpdate() bool + IsPaused() bool + IsCancel() bool + TransferID() TransferID + cborgen.CBORMarshaler + cborgen.CBORUnmarshaler + ToNet(w io.Writer) error +} + +// Request is a response message for the data transfer protocol +type Request interface { + Message + IsPull() bool + IsVoucher() bool + VoucherType() TypeIdentifier + Voucher(decoder encoding.Decoder) (encoding.Encodable, error) + BaseCid() cid.Cid + Selector() (ipld.Node, error) +} + +// Response is a response message for the data transfer protocol +type Response interface { + Message + IsVoucherResult() bool + IsComplete() bool + Accepted() bool + VoucherResultType() TypeIdentifier + VoucherResult(decoder encoding.Decoder) (encoding.Encodable, error) + EmptyVoucherResult() bool +} diff --git a/message/message.go b/message/message.go index f3d1d9f7..5f9700da 100644 --- a/message/message.go +++ b/message/message.go @@ -23,48 +23,8 @@ const ( voucherResultMessage ) -// Reference file: https://github.com/ipfs/go-graphsync/blob/master/message/message.go -// though here we have a simpler message type that serializes/deserializes to two -// different types that share an interface, and we serialize to CBOR and not Protobuf. - -// DataTransferMessage is a message for the data transfer protocol -// (either request or response) that can serialize to a protobuf -type DataTransferMessage interface { - IsRequest() bool - IsNew() bool - IsUpdate() bool - IsPaused() bool - IsCancel() bool - TransferID() datatransfer.TransferID - cborgen.CBORMarshaler - cborgen.CBORUnmarshaler - ToNet(w io.Writer) error -} - -// DataTransferRequest is a response message for the data transfer protocol -type DataTransferRequest interface { - DataTransferMessage - IsPull() bool - IsVoucher() bool - VoucherType() datatransfer.TypeIdentifier - Voucher(decoder encoding.Decoder) (encoding.Encodable, error) - BaseCid() cid.Cid - Selector() (ipld.Node, error) -} - -// DataTransferResponse is a response message for the data transfer protocol -type DataTransferResponse interface { - DataTransferMessage - IsVoucherResult() bool - IsComplete() bool - Accepted() bool - VoucherResultType() datatransfer.TypeIdentifier - VoucherResult(decoder encoding.Decoder) (encoding.Encodable, error) - EmptyVoucherResult() bool -} - // NewRequest generates a new request for the data transfer protocol -func NewRequest(id datatransfer.TransferID, isPull bool, vtype datatransfer.TypeIdentifier, voucher encoding.Encodable, baseCid cid.Cid, selector ipld.Node) (DataTransferRequest, error) { +func NewRequest(id datatransfer.TransferID, isPull bool, vtype datatransfer.TypeIdentifier, voucher encoding.Encodable, baseCid cid.Cid, selector ipld.Node) (datatransfer.Request, error) { vbytes, err := encoding.Encode(voucher) if err != nil { return nil, xerrors.Errorf("Creating request: %w", err) @@ -88,7 +48,7 @@ func NewRequest(id datatransfer.TransferID, isPull bool, vtype datatransfer.Type } // CancelRequest request generates a request to cancel an in progress request -func CancelRequest(id datatransfer.TransferID) DataTransferRequest { +func CancelRequest(id datatransfer.TransferID) datatransfer.Request { return &transferRequest{ Type: uint64(cancelMessage), XferID: uint64(id), @@ -96,7 +56,7 @@ func CancelRequest(id datatransfer.TransferID) DataTransferRequest { } // UpdateRequest generates a new request update -func UpdateRequest(id datatransfer.TransferID, isPaused bool) DataTransferRequest { +func UpdateRequest(id datatransfer.TransferID, isPaused bool) datatransfer.Request { return &transferRequest{ Type: uint64(updateMessage), Paus: isPaused, @@ -105,7 +65,7 @@ func UpdateRequest(id datatransfer.TransferID, isPaused bool) DataTransferReques } // VoucherRequest generates a new request for the data transfer protocol -func VoucherRequest(id datatransfer.TransferID, vtype datatransfer.TypeIdentifier, voucher encoding.Encodable) (DataTransferRequest, error) { +func VoucherRequest(id datatransfer.TransferID, vtype datatransfer.TypeIdentifier, voucher encoding.Encodable) (datatransfer.Request, error) { vbytes, err := encoding.Encode(voucher) if err != nil { return nil, xerrors.Errorf("Creating request: %w", err) @@ -119,7 +79,7 @@ func VoucherRequest(id datatransfer.TransferID, vtype datatransfer.TypeIdentifie } // NewResponse builds a new Data Transfer response -func NewResponse(id datatransfer.TransferID, accepted bool, isPaused bool, voucherResultType datatransfer.TypeIdentifier, voucherResult encoding.Encodable) (DataTransferResponse, error) { +func NewResponse(id datatransfer.TransferID, accepted bool, isPaused bool, voucherResultType datatransfer.TypeIdentifier, voucherResult encoding.Encodable) (datatransfer.Response, error) { vbytes, err := encoding.Encode(voucherResult) if err != nil { return nil, xerrors.Errorf("Creating request: %w", err) @@ -135,7 +95,7 @@ func NewResponse(id datatransfer.TransferID, accepted bool, isPaused bool, vouch } // VoucherResultResponse builds a new response for a voucher result -func VoucherResultResponse(id datatransfer.TransferID, accepted bool, isPaused bool, voucherResultType datatransfer.TypeIdentifier, voucherResult encoding.Encodable) (DataTransferResponse, error) { +func VoucherResultResponse(id datatransfer.TransferID, accepted bool, isPaused bool, voucherResultType datatransfer.TypeIdentifier, voucherResult encoding.Encodable) (datatransfer.Response, error) { vbytes, err := encoding.Encode(voucherResult) if err != nil { return nil, xerrors.Errorf("Creating request: %w", err) @@ -151,7 +111,7 @@ func VoucherResultResponse(id datatransfer.TransferID, accepted bool, isPaused b } // UpdateResponse returns a new update response -func UpdateResponse(id datatransfer.TransferID, isPaused bool) DataTransferResponse { +func UpdateResponse(id datatransfer.TransferID, isPaused bool) datatransfer.Response { return &transferResponse{ Type: uint64(updateMessage), Paus: isPaused, @@ -160,7 +120,7 @@ func UpdateResponse(id datatransfer.TransferID, isPaused bool) DataTransferRespo } // CancelResponse makes a new cancel response message -func CancelResponse(id datatransfer.TransferID) DataTransferResponse { +func CancelResponse(id datatransfer.TransferID) datatransfer.Response { return &transferResponse{ Type: uint64(cancelMessage), XferID: uint64(id), @@ -168,7 +128,7 @@ func CancelResponse(id datatransfer.TransferID) DataTransferResponse { } // CompleteResponse returns a new complete response message -func CompleteResponse(id datatransfer.TransferID, isAccepted bool, isPaused bool, voucherResultType datatransfer.TypeIdentifier, voucherResult encoding.Encodable) (DataTransferResponse, error) { +func CompleteResponse(id datatransfer.TransferID, isAccepted bool, isPaused bool, voucherResultType datatransfer.TypeIdentifier, voucherResult encoding.Encodable) (datatransfer.Response, error) { vbytes, err := encoding.Encode(voucherResult) if err != nil { return nil, xerrors.Errorf("Creating request: %w", err) @@ -184,7 +144,7 @@ func CompleteResponse(id datatransfer.TransferID, isAccepted bool, isPaused bool } // FromNet can read a network stream to deserialize a GraphSyncMessage -func FromNet(r io.Reader) (DataTransferMessage, error) { +func FromNet(r io.Reader) (datatransfer.Message, error) { tresp := transferMessage{} err := tresp.UnmarshalCBOR(r) if tresp.IsRequest() { diff --git a/message/message_test.go b/message/message_test.go index 94e81fae..513fb0c9 100644 --- a/message/message_test.go +++ b/message/message_test.go @@ -33,8 +33,8 @@ func TestNewRequest(t *testing.T) { receivedSelector, err := request.Selector() require.NoError(t, err) require.Equal(t, selector, receivedSelector) - // Sanity check to make sure we can cast to DataTransferMessage - msg, ok := request.(DataTransferMessage) + // Sanity check to make sure we can cast to datatransfer.Message + msg, ok := request.(datatransfer.Message) require.True(t, ok) assert.True(t, msg.IsRequest()) @@ -62,7 +62,7 @@ func TestTransferRequest_UnmarshalCBOR(t *testing.T) { assert.Equal(t, req.TransferID(), desMsg.TransferID()) assert.Equal(t, req.IsRequest(), desMsg.IsRequest()) - desReq := desMsg.(DataTransferRequest) + desReq := desMsg.(datatransfer.Request) assert.Equal(t, req.IsPull(), desReq.IsPull()) assert.Equal(t, req.IsCancel(), desReq.IsCancel()) assert.Equal(t, req.BaseCid(), desReq.BaseCid()) @@ -82,8 +82,8 @@ func TestResponses(t *testing.T) { assert.True(t, response.IsPaused()) assert.False(t, response.IsRequest()) testutil.AssertFakeDTVoucherResult(t, response, voucherResult) - // Sanity check to make sure we can cast to DataTransferMessage - msg, ok := response.(DataTransferMessage) + // Sanity check to make sure we can cast to datatransfer.Message + msg, ok := response.(datatransfer.Message) require.True(t, ok) assert.False(t, msg.IsRequest()) @@ -123,7 +123,7 @@ func TestTransferResponse_UnmarshalCBOR(t *testing.T) { assert.False(t, desMsg.IsPaused()) assert.Equal(t, id, desMsg.TransferID()) - desResp, ok := desMsg.(DataTransferResponse) + desResp, ok := desMsg.(datatransfer.Response) require.True(t, ok) assert.True(t, desResp.Accepted()) assert.True(t, desResp.IsNew()) @@ -146,7 +146,7 @@ func TestRequestCancel(t *testing.T) { deserialized, err := FromNet(wbuf) require.NoError(t, err) - deserializedRequest, ok := deserialized.(DataTransferRequest) + deserializedRequest, ok := deserialized.(datatransfer.Request) require.True(t, ok) require.Equal(t, deserializedRequest.TransferID(), req.TransferID()) require.Equal(t, deserializedRequest.IsCancel(), req.IsCancel()) @@ -169,7 +169,7 @@ func TestRequestUpdate(t *testing.T) { deserialized, err := FromNet(wbuf) require.NoError(t, err) - deserializedRequest, ok := deserialized.(DataTransferRequest) + deserializedRequest, ok := deserialized.(datatransfer.Request) require.True(t, ok) require.Equal(t, deserializedRequest.TransferID(), req.TransferID()) require.Equal(t, deserializedRequest.IsCancel(), req.IsCancel()) @@ -188,8 +188,8 @@ func TestUpdateResponse(t *testing.T) { assert.True(t, response.IsPaused()) assert.False(t, response.IsRequest()) - // Sanity check to make sure we can cast to DataTransferMessage - msg, ok := response.(DataTransferMessage) + // Sanity check to make sure we can cast to datatransfer.Message + msg, ok := response.(datatransfer.Message) require.True(t, ok) assert.False(t, msg.IsRequest()) @@ -207,8 +207,8 @@ func TestCancelResponse(t *testing.T) { assert.False(t, response.IsUpdate()) assert.True(t, response.IsCancel()) assert.False(t, response.IsRequest()) - // Sanity check to make sure we can cast to DataTransferMessage - msg, ok := response.(DataTransferMessage) + // Sanity check to make sure we can cast to datatransfer.Message + msg, ok := response.(datatransfer.Message) require.True(t, ok) assert.False(t, msg.IsRequest()) @@ -230,8 +230,8 @@ func TestCompleteResponse(t *testing.T) { assert.True(t, response.EmptyVoucherResult()) assert.True(t, response.IsComplete()) assert.False(t, response.IsRequest()) - // Sanity check to make sure we can cast to DataTransferMessage - msg, ok := response.(DataTransferMessage) + // Sanity check to make sure we can cast to datatransfer.Message + msg, ok := response.(datatransfer.Message) require.True(t, ok) assert.False(t, msg.IsRequest()) @@ -256,7 +256,7 @@ func TestToNetFromNetEquivalency(t *testing.T) { deserialized, err := FromNet(buf) require.NoError(t, err) - deserializedRequest, ok := deserialized.(DataTransferRequest) + deserializedRequest, ok := deserialized.(datatransfer.Request) require.True(t, ok) require.Equal(t, deserializedRequest.TransferID(), request.TransferID()) @@ -274,7 +274,7 @@ func TestToNetFromNetEquivalency(t *testing.T) { deserialized, err = FromNet(buf) require.NoError(t, err) - deserializedResponse, ok := deserialized.(DataTransferResponse) + deserializedResponse, ok := deserialized.(datatransfer.Response) require.True(t, ok) require.Equal(t, deserializedResponse.TransferID(), response.TransferID()) @@ -290,7 +290,7 @@ func TestToNetFromNetEquivalency(t *testing.T) { deserialized, err = FromNet(buf) require.NoError(t, err) - deserializedRequest, ok = deserialized.(DataTransferRequest) + deserializedRequest, ok = deserialized.(datatransfer.Request) require.True(t, ok) require.Equal(t, deserializedRequest.TransferID(), request.TransferID()) @@ -298,7 +298,7 @@ func TestToNetFromNetEquivalency(t *testing.T) { require.Equal(t, deserializedRequest.IsRequest(), request.IsRequest()) } -func NewTestTransferRequest() (DataTransferRequest, error) { +func NewTestTransferRequest() (datatransfer.Request, error) { bcid := testutil.GenerateCids(1)[0] selector := builder.NewSelectorSpecBuilder(basicnode.Style.Any).Matcher().Node() isPull := false diff --git a/message/transfer_message.go b/message/transfer_message.go index dc5c116a..303b29a3 100644 --- a/message/transfer_message.go +++ b/message/transfer_message.go @@ -3,7 +3,7 @@ package message import ( "io" - "github.com/filecoin-project/go-data-transfer" + datatransfer "github.com/filecoin-project/go-data-transfer" ) //go:generate cbor-gen-for transferMessage @@ -14,7 +14,7 @@ type transferMessage struct { Response *transferResponse } -// ========= DataTransferMessage interface +// ========= datatransfer.Message interface // IsRequest returns true if this message is a data request func (tm *transferMessage) IsRequest() bool { diff --git a/message/transfer_request.go b/message/transfer_request.go index 34258701..c3184245 100644 --- a/message/transfer_request.go +++ b/message/transfer_request.go @@ -16,7 +16,7 @@ import ( //go:generate cbor-gen-for transferRequest -// transferRequest is a struct that fulfills the DataTransferRequest interface. +// transferRequest is a struct that fulfills the datatransfer.Request interface. // its members are exported to be used by cbor-gen type transferRequest struct { BCid *cid.Cid @@ -55,7 +55,7 @@ func (trq *transferRequest) TransferID() datatransfer.TransferID { return datatransfer.TransferID(trq.XferID) } -// ========= DataTransferRequest interface +// ========= datatransfer.Request interface // IsPull returns true if this is a data pull request func (trq *transferRequest) IsPull() bool { return trq.Pull diff --git a/message/transfer_response.go b/message/transfer_response.go index 4ea22456..db0051b0 100644 --- a/message/transfer_response.go +++ b/message/transfer_response.go @@ -11,7 +11,7 @@ import ( //go:generate cbor-gen-for transferResponse -// transferResponse is a private struct that satisfies the DataTransferResponse interface +// transferResponse is a private struct that satisfies the datatransfer.Response interface type transferResponse struct { Type uint64 Acpt bool diff --git a/network/interface.go b/network/interface.go index 6bdbab3c..833ac134 100644 --- a/network/interface.go +++ b/network/interface.go @@ -6,7 +6,7 @@ import ( "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/protocol" - "github.com/filecoin-project/go-data-transfer/message" + datatransfer "github.com/filecoin-project/go-data-transfer" ) var ( @@ -23,7 +23,7 @@ type DataTransferNetwork interface { SendMessage( context.Context, peer.ID, - message.DataTransferMessage) error + datatransfer.Message) error // SetDelegate registers the Reciver to handle messages received from the // network. @@ -41,12 +41,12 @@ type Receiver interface { ReceiveRequest( ctx context.Context, sender peer.ID, - incoming message.DataTransferRequest) + incoming datatransfer.Request) ReceiveResponse( ctx context.Context, sender peer.ID, - incoming message.DataTransferResponse) + incoming datatransfer.Response) ReceiveError(error) } diff --git a/network/libp2p_impl.go b/network/libp2p_impl.go index 76dc429c..2a84cdc3 100644 --- a/network/libp2p_impl.go +++ b/network/libp2p_impl.go @@ -12,6 +12,7 @@ import ( "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" + datatransfer "github.com/filecoin-project/go-data-transfer" "github.com/filecoin-project/go-data-transfer/message" ) @@ -48,11 +49,11 @@ func (s *streamMessageSender) Reset() error { return s.s.Reset() } -func (s *streamMessageSender) SendMsg(ctx context.Context, msg message.DataTransferMessage) error { +func (s *streamMessageSender) SendMsg(ctx context.Context, msg datatransfer.Message) error { return msgToStream(ctx, s.s, msg) } -func msgToStream(ctx context.Context, s network.Stream, msg message.DataTransferMessage) error { +func msgToStream(ctx context.Context, s network.Stream, msg datatransfer.Message) error { if msg.IsRequest() { log.Debugf("Outgoing request message for transfer ID: %d", msg.TransferID()) } @@ -88,7 +89,7 @@ func (dtnet *libp2pDataTransferNetwork) newStreamToPeer(ctx context.Context, p p func (dtnet *libp2pDataTransferNetwork) SendMessage( ctx context.Context, p peer.ID, - outgoing message.DataTransferMessage) error { + outgoing datatransfer.Message) error { s, err := dtnet.newStreamToPeer(ctx, p) if err != nil { @@ -142,12 +143,12 @@ func (dtnet *libp2pDataTransferNetwork) handleNewStream(s network.Stream) { ctx := context.Background() log.Debugf("graphsync net handleNewStream from %s", s.Conn().RemotePeer()) if received.IsRequest() { - receivedRequest, ok := received.(message.DataTransferRequest) + receivedRequest, ok := received.(datatransfer.Request) if ok { dtnet.receiver.ReceiveRequest(ctx, p, receivedRequest) } } else { - receivedResponse, ok := received.(message.DataTransferResponse) + receivedResponse, ok := received.(datatransfer.Response) if ok { dtnet.receiver.ReceiveResponse(ctx, p, receivedResponse) } diff --git a/network/libp2p_impl_test.go b/network/libp2p_impl_test.go index da474d3a..2eb8c4b4 100644 --- a/network/libp2p_impl_test.go +++ b/network/libp2p_impl_test.go @@ -22,8 +22,8 @@ import ( // Receiver is an interface for receiving messages from the DataTransferNetwork. type receiver struct { messageReceived chan struct{} - lastRequest message.DataTransferRequest - lastResponse message.DataTransferResponse + lastRequest datatransfer.Request + lastResponse datatransfer.Response lastSender peer.ID connectedPeers chan peer.ID } @@ -31,7 +31,7 @@ type receiver struct { func (r *receiver) ReceiveRequest( ctx context.Context, sender peer.ID, - incoming message.DataTransferRequest) { + incoming datatransfer.Request) { r.lastSender = sender r.lastRequest = incoming select { @@ -43,7 +43,7 @@ func (r *receiver) ReceiveRequest( func (r *receiver) ReceiveResponse( ctx context.Context, sender peer.ID, - incoming message.DataTransferResponse) { + incoming datatransfer.Response) { r.lastSender = sender r.lastResponse = incoming select { diff --git a/statuses.go b/statuses.go new file mode 100644 index 00000000..6a4c89be --- /dev/null +++ b/statuses.go @@ -0,0 +1,83 @@ +package datatransfer + +// Status is the status of transfer for a given channel +type Status uint64 + +const ( + // Requested means a data transfer was requested by has not yet been approved + Requested Status = iota + + // Ongoing means the data transfer is in progress + Ongoing + + // TransferFinished indicates the initiator is done sending/receiving + // data but is awaiting confirmation from the responder + TransferFinished + + // ResponderCompleted indicates the initiator received a message from the + // responder that it's completed + ResponderCompleted + + // Finalizing means the responder is awaiting a final message from the initator to + // consider the transfer done + Finalizing + + // Completing just means we have some final cleanup for a completed request + Completing + + // Completed means the data transfer is completed successfully + Completed + + // Failing just means we have some final cleanup for a failed request + Failing + + // Failed means the data transfer failed + Failed + + // Cancelling just means we have some final cleanup for a cancelled request + Cancelling + + // Cancelled means the data transfer ended prematurely + Cancelled + + // InitiatorPaused means the data sender has paused the channel (only the sender can unpause this) + InitiatorPaused + + // ResponderPaused means the data receiver has paused the channel (only the receiver can unpause this) + ResponderPaused + + // BothPaused means both sender and receiver have paused the channel seperately (both must unpause) + BothPaused + + // ResponderFinalizing is a unique state where the responder is awaiting a final voucher + ResponderFinalizing + + // ResponderFinalizingTransferFinished is a unique state where the responder is awaiting a final voucher + // and we have received all data + ResponderFinalizingTransferFinished + + // ChannelNotFoundError means the searched for data transfer does not exist + ChannelNotFoundError +) + +// Statuses are human readable names for data transfer states +var Statuses = map[Status]string{ + // Requested means a data transfer was requested by has not yet been approved + Requested: "Requested", + Ongoing: "Ongoing", + TransferFinished: "TransferFinished", + ResponderCompleted: "ResponderCompleted", + Finalizing: "Finalizing", + Completing: "Completing", + Completed: "Completed", + Failing: "Failing", + Failed: "Failed", + Cancelling: "Cancelling", + Cancelled: "Cancelled", + InitiatorPaused: "InitiatorPaused", + ResponderPaused: "ResponderPaused", + BothPaused: "BothPaused", + ResponderFinalizing: "ResponderFinalizing", + ResponderFinalizingTransferFinished: "ResponderFinalizingTransferFinished", + ChannelNotFoundError: "ChannelNotFoundError", +} diff --git a/testutil/fakedttype.go b/testutil/fakedttype.go index 8316407d..9fde52f0 100644 --- a/testutil/fakedttype.go +++ b/testutil/fakedttype.go @@ -7,7 +7,6 @@ import ( datatransfer "github.com/filecoin-project/go-data-transfer" "github.com/filecoin-project/go-data-transfer/encoding" - "github.com/filecoin-project/go-data-transfer/message" ) //go:generate cbor-gen-for FakeDTType @@ -23,7 +22,7 @@ func (ft FakeDTType) Type() datatransfer.TypeIdentifier { } // AssertFakeDTVoucher asserts that a data transfer requests contains the expected fake data transfer voucher type -func AssertFakeDTVoucher(t *testing.T, request message.DataTransferRequest, expected *FakeDTType) { +func AssertFakeDTVoucher(t *testing.T, request datatransfer.Request, expected *FakeDTType) { require.Equal(t, datatransfer.TypeIdentifier("FakeDTType"), request.VoucherType()) fakeDTDecoder, err := encoding.NewDecoder(&FakeDTType{}) require.NoError(t, err) @@ -33,7 +32,7 @@ func AssertFakeDTVoucher(t *testing.T, request message.DataTransferRequest, expe } // AssertEqualFakeDTVoucher asserts that two requests have the same fake data transfer voucher -func AssertEqualFakeDTVoucher(t *testing.T, expectedRequest message.DataTransferRequest, request message.DataTransferRequest) { +func AssertEqualFakeDTVoucher(t *testing.T, expectedRequest datatransfer.Request, request datatransfer.Request) { require.Equal(t, expectedRequest.VoucherType(), request.VoucherType()) fakeDTDecoder, err := encoding.NewDecoder(&FakeDTType{}) require.NoError(t, err) @@ -45,7 +44,7 @@ func AssertEqualFakeDTVoucher(t *testing.T, expectedRequest message.DataTransfer } // AssertFakeDTVoucherResult asserts that a data transfer response contains the expected fake data transfer voucher result type -func AssertFakeDTVoucherResult(t *testing.T, response message.DataTransferResponse, expected *FakeDTType) { +func AssertFakeDTVoucherResult(t *testing.T, response datatransfer.Response, expected *FakeDTType) { require.Equal(t, datatransfer.TypeIdentifier("FakeDTType"), response.VoucherResultType()) fakeDTDecoder, err := encoding.NewDecoder(&FakeDTType{}) require.NoError(t, err) @@ -55,7 +54,7 @@ func AssertFakeDTVoucherResult(t *testing.T, response message.DataTransferRespon } // AssertEqualFakeDTVoucherResult asserts that two responses have the same fake data transfer voucher result -func AssertEqualFakeDTVoucherResult(t *testing.T, expectedResponse message.DataTransferResponse, response message.DataTransferResponse) { +func AssertEqualFakeDTVoucherResult(t *testing.T, expectedResponse datatransfer.Response, response datatransfer.Response) { require.Equal(t, expectedResponse.VoucherResultType(), response.VoucherResultType()) fakeDTDecoder, err := encoding.NewDecoder(&FakeDTType{}) require.NoError(t, err) diff --git a/testutil/fakegraphsync.go b/testutil/fakegraphsync.go index b36c354f..10ab18c2 100644 --- a/testutil/fakegraphsync.go +++ b/testutil/fakegraphsync.go @@ -3,9 +3,12 @@ package testutil import ( "bytes" "context" + "errors" "math/rand" + "sync" "testing" + datatransfer "github.com/filecoin-project/go-data-transfer" "github.com/filecoin-project/go-data-transfer/message" "github.com/filecoin-project/go-data-transfer/transport/graphsync/extension" "github.com/ipfs/go-cid" @@ -17,7 +20,7 @@ import ( "github.com/stretchr/testify/require" ) -func matchDtMessage(t *testing.T, extensions []graphsync.ExtensionData) message.DataTransferMessage { +func matchDtMessage(t *testing.T, extensions []graphsync.ExtensionData) datatransfer.Message { var matchedExtension *graphsync.ExtensionData for _, ext := range extensions { if ext.Name == extension.ExtensionDataTransfer { @@ -41,7 +44,7 @@ type ReceivedGraphSyncRequest struct { } // DTMessage returns the data transfer message among the graphsync extensions sent with this request -func (gsRequest ReceivedGraphSyncRequest) DTMessage(t *testing.T) message.DataTransferMessage { +func (gsRequest ReceivedGraphSyncRequest) DTMessage(t *testing.T) datatransfer.Message { return matchDtMessage(t, gsRequest.Extensions) } @@ -55,7 +58,7 @@ type ResumeRequest struct { } // DTMessage returns the data transfer message among the graphsync extensions sent with this request -func (resumeRequest ResumeRequest) DTMessage(t *testing.T) message.DataTransferMessage { +func (resumeRequest ResumeRequest) DTMessage(t *testing.T) datatransfer.Message { return matchDtMessage(t, resumeRequest.Extensions) } @@ -71,7 +74,7 @@ type ResumeResponse struct { } // DTMessage returns the data transfer message among the graphsync extensions sent with this request -func (resumeResponse ResumeResponse) DTMessage(t *testing.T) message.DataTransferMessage { +func (resumeResponse ResumeResponse) DTMessage(t *testing.T) datatransfer.Message { return matchDtMessage(t, resumeResponse.Extensions) } @@ -80,6 +83,11 @@ type CancelResponse struct { RequestID graphsync.RequestID } +type PersistenceOption struct { + ipld.Loader + ipld.Storer +} + // FakeGraphSync implements a GraphExchange but does nothing type FakeGraphSync struct { requests chan ReceivedGraphSyncRequest // records calls to fakeGraphSync.Request @@ -88,6 +96,8 @@ type FakeGraphSync struct { pauseResponses chan PauseResponse resumeResponses chan ResumeResponse cancelResponses chan CancelResponse + persistenceOptionsLk sync.RWMutex + persistenceOptions map[string]PersistenceOption leaveRequestsOpen bool OutgoingRequestHook graphsync.OnOutgoingRequestHook IncomingBlockHook graphsync.OnIncomingBlockHook @@ -102,12 +112,13 @@ type FakeGraphSync struct { // NewFakeGraphSync returns a new fake graphsync implementation func NewFakeGraphSync() *FakeGraphSync { return &FakeGraphSync{ - requests: make(chan ReceivedGraphSyncRequest, 1), - pauseRequests: make(chan PauseRequest, 1), - resumeRequests: make(chan ResumeRequest, 1), - pauseResponses: make(chan PauseResponse, 1), - resumeResponses: make(chan ResumeResponse, 1), - cancelResponses: make(chan CancelResponse, 1), + requests: make(chan ReceivedGraphSyncRequest, 1), + pauseRequests: make(chan PauseRequest, 1), + resumeRequests: make(chan ResumeRequest, 1), + pauseResponses: make(chan PauseResponse, 1), + resumeResponses: make(chan ResumeResponse, 1), + cancelResponses: make(chan CancelResponse, 1), + persistenceOptions: make(map[string]PersistenceOption), } } @@ -211,6 +222,23 @@ func (fgs *FakeGraphSync) AssertCancelResponseReceived(ctx context.Context, t *t return cancelResponseReceived } +// AssertHasPersistenceOption verifies that a persistence option was registered +func (fgs *FakeGraphSync) AssertHasPersistenceOption(t *testing.T, name string) PersistenceOption { + fgs.persistenceOptionsLk.RLock() + defer fgs.persistenceOptionsLk.RUnlock() + option, ok := fgs.persistenceOptions[name] + require.Truef(t, ok, "persistence option %s should be registered", name) + return option +} + +// AssertDoesNotHavePersistenceOption verifies that a persistence option is not registered +func (fgs *FakeGraphSync) AssertDoesNotHavePersistenceOption(t *testing.T, name string) { + fgs.persistenceOptionsLk.RLock() + defer fgs.persistenceOptionsLk.RUnlock() + _, ok := fgs.persistenceOptions[name] + require.Falsef(t, ok, "persistence option %s should be registered", name) +} + // Request initiates a new GraphSync request to the given peer using the given selector spec. func (fgs *FakeGraphSync) Request(ctx context.Context, p peer.ID, root ipld.Link, selector ipld.Node, extensions ...graphsync.ExtensionData) (<-chan graphsync.ResponseProgress, <-chan error) { @@ -226,6 +254,21 @@ func (fgs *FakeGraphSync) Request(ctx context.Context, p peer.ID, root ipld.Link // RegisterPersistenceOption registers an alternate loader/storer combo that can be substituted for the default func (fgs *FakeGraphSync) RegisterPersistenceOption(name string, loader ipld.Loader, storer ipld.Storer) error { + fgs.persistenceOptionsLk.Lock() + defer fgs.persistenceOptionsLk.Unlock() + _, ok := fgs.persistenceOptions[name] + if ok { + return errors.New("already registered") + } + fgs.persistenceOptions[name] = PersistenceOption{Loader: loader, Storer: storer} + return nil +} + +// UnregisterPersistenceOption unregisters an existing loader/storer combo +func (fgs *FakeGraphSync) UnregisterPersistenceOption(name string) error { + fgs.persistenceOptionsLk.Lock() + defer fgs.persistenceOptionsLk.Unlock() + delete(fgs.persistenceOptions, name) return nil } @@ -419,9 +462,13 @@ func NewFakeResponse(id graphsync.RequestID, extensions map[graphsync.ExtensionN } } -type FakeOutgoingRequestHookActions struct{} +type FakeOutgoingRequestHookActions struct { + PersistenceOption string +} -func (fa *FakeOutgoingRequestHookActions) UsePersistenceOption(name string) {} +func (fa *FakeOutgoingRequestHookActions) UsePersistenceOption(name string) { + fa.PersistenceOption = name +} func (fa *FakeOutgoingRequestHookActions) UseLinkTargetNodeStyleChooser(_ traversal.LinkTargetNodeStyleChooser) { } @@ -468,17 +515,20 @@ func (fa *FakeOutgoingBlockHookActions) PauseResponse() { var _ graphsync.OutgoingBlockHookActions = &FakeOutgoingBlockHookActions{} type FakeIncomingRequestHookActions struct { - TerminationError error - Validated bool - SentExtension graphsync.ExtensionData - Paused bool + PersistenceOption string + TerminationError error + Validated bool + SentExtension graphsync.ExtensionData + Paused bool } func (fa *FakeIncomingRequestHookActions) SendExtensionData(ext graphsync.ExtensionData) { fa.SentExtension = ext } -func (fa *FakeIncomingRequestHookActions) UsePersistenceOption(name string) {} +func (fa *FakeIncomingRequestHookActions) UsePersistenceOption(name string) { + fa.PersistenceOption = name +} func (fa *FakeIncomingRequestHookActions) UseLinkTargetNodeStyleChooser(_ traversal.LinkTargetNodeStyleChooser) { } diff --git a/testutil/faketransport.go b/testutil/faketransport.go index 6e6869c5..5a01f9bd 100644 --- a/testutil/faketransport.go +++ b/testutil/faketransport.go @@ -4,8 +4,6 @@ import ( "context" datatransfer "github.com/filecoin-project/go-data-transfer" - "github.com/filecoin-project/go-data-transfer/message" - "github.com/filecoin-project/go-data-transfer/transport" "github.com/ipld/go-ipld-prime" "github.com/libp2p/go-libp2p-core/peer" ) @@ -16,28 +14,35 @@ type OpenedChannel struct { ChannelID datatransfer.ChannelID Root ipld.Link Selector ipld.Node - Message message.DataTransferMessage + Message datatransfer.Message } // ResumedChannel records a call to resume a channel type ResumedChannel struct { ChannelID datatransfer.ChannelID - Message message.DataTransferMessage + Message datatransfer.Message +} + +// CustomizedTransfer is just a way to record calls made to transport configurer +type CustomizedTransfer struct { + ChannelID datatransfer.ChannelID + Voucher datatransfer.Voucher } // FakeTransport is a fake transport with mocked results type FakeTransport struct { - OpenedChannels []OpenedChannel - OpenChannelErr error - ClosedChannels []datatransfer.ChannelID - CloseChannelErr error - PausedChannels []datatransfer.ChannelID - PauseChannelErr error - ResumedChannels []ResumedChannel - ResumeChannelErr error - CleanedUpChannels []datatransfer.ChannelID - EventHandler transport.Events - SetEventHandlerErr error + OpenedChannels []OpenedChannel + OpenChannelErr error + ClosedChannels []datatransfer.ChannelID + CloseChannelErr error + PausedChannels []datatransfer.ChannelID + PauseChannelErr error + ResumedChannels []ResumedChannel + ResumeChannelErr error + CleanedUpChannels []datatransfer.ChannelID + CustomizedTransfers []CustomizedTransfer + EventHandler datatransfer.EventsHandler + SetEventHandlerErr error } // NewFakeTransport returns a new instance of FakeTransport @@ -50,7 +55,7 @@ func NewFakeTransport() *FakeTransport { // Note: from a data transfer symantic standpoint, it doesn't matter if the // request is push or pull -- OpenChannel is called by the party that is // intending to receive data -func (ft *FakeTransport) OpenChannel(ctx context.Context, dataSender peer.ID, channelID datatransfer.ChannelID, root ipld.Link, stor ipld.Node, msg message.DataTransferMessage) error { +func (ft *FakeTransport) OpenChannel(ctx context.Context, dataSender peer.ID, channelID datatransfer.ChannelID, root ipld.Link, stor ipld.Node, msg datatransfer.Message) error { ft.OpenedChannels = append(ft.OpenedChannels, OpenedChannel{dataSender, channelID, root, stor, msg}) return ft.OpenChannelErr } @@ -62,7 +67,7 @@ func (ft *FakeTransport) CloseChannel(ctx context.Context, chid datatransfer.Cha } // SetEventHandler sets the handler for events on channels -func (ft *FakeTransport) SetEventHandler(events transport.Events) error { +func (ft *FakeTransport) SetEventHandler(events datatransfer.EventsHandler) error { ft.EventHandler = events return ft.SetEventHandlerErr } @@ -74,7 +79,7 @@ func (ft *FakeTransport) PauseChannel(ctx context.Context, chid datatransfer.Cha } // ResumeChannel resumes the given channel -func (ft *FakeTransport) ResumeChannel(ctx context.Context, msg message.DataTransferMessage, chid datatransfer.ChannelID) error { +func (ft *FakeTransport) ResumeChannel(ctx context.Context, msg datatransfer.Message, chid datatransfer.ChannelID) error { ft.ResumedChannels = append(ft.ResumedChannels, ResumedChannel{chid, msg}) return ft.ResumeChannelErr } @@ -83,3 +88,7 @@ func (ft *FakeTransport) ResumeChannel(ctx context.Context, msg message.DataTran func (ft *FakeTransport) CleanupChannel(chid datatransfer.ChannelID) { ft.CleanedUpChannels = append(ft.CleanedUpChannels, chid) } + +func (ft *FakeTransport) RecordCustomizedTransfer(chid datatransfer.ChannelID, voucher datatransfer.Voucher) { + ft.CustomizedTransfers = append(ft.CustomizedTransfers, CustomizedTransfer{chid, voucher}) +} diff --git a/testutil/gstestdata.go b/testutil/gstestdata.go index 4a871d09..0694a41c 100644 --- a/testutil/gstestdata.go +++ b/testutil/gstestdata.go @@ -3,7 +3,6 @@ package testutil import ( "bytes" "context" - "errors" "io" "io/ioutil" "os" @@ -12,11 +11,10 @@ import ( "runtime" "testing" + datatransfer "github.com/filecoin-project/go-data-transfer" "github.com/filecoin-project/go-data-transfer/network" - "github.com/filecoin-project/go-data-transfer/transport" gstransport "github.com/filecoin-project/go-data-transfer/transport/graphsync" "github.com/filecoin-project/go-storedcounter" - blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-blockservice" "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/namespace" @@ -24,6 +22,7 @@ import ( "github.com/ipfs/go-graphsync" gsimpl "github.com/ipfs/go-graphsync/impl" gsnet "github.com/ipfs/go-graphsync/network" + "github.com/ipfs/go-graphsync/storeutil" bstore "github.com/ipfs/go-ipfs-blockstore" chunker "github.com/ipfs/go-ipfs-chunker" offline "github.com/ipfs/go-ipfs-exchange-offline" @@ -85,38 +84,6 @@ func NewGraphsyncTestingData(ctx context.Context, t *testing.T) *GraphsyncTestin gsData := &GraphsyncTestingData{} gsData.Ctx = ctx - makeLoader := func(bs bstore.Blockstore) ipld.Loader { - return func(lnk ipld.Link, lnkCtx ipld.LinkContext) (io.Reader, error) { - c, ok := lnk.(cidlink.Link) - if !ok { - return nil, errors.New("Incorrect Link Type") - } - // read block from one store - block, err := bs.Get(c.Cid) - if err != nil { - return nil, err - } - return bytes.NewReader(block.RawData()), nil - } - } - - makeStorer := func(bs bstore.Blockstore) ipld.Storer { - return func(lnkCtx ipld.LinkContext) (io.Writer, ipld.StoreCommitter, error) { - var buf bytes.Buffer - var committer ipld.StoreCommitter = func(lnk ipld.Link) error { - c, ok := lnk.(cidlink.Link) - if !ok { - return errors.New("Incorrect Link Type") - } - block, err := blocks.NewBlockWithCid(buf.Bytes(), c.Cid) - if err != nil { - return err - } - return bs.Put(block) - } - return &buf, committer, nil - } - } ds1 := dss.MutexWrap(datastore.NewMapDatastore()) ds2 := dss.MutexWrap(datastore.NewMapDatastore()) @@ -135,12 +102,12 @@ func NewGraphsyncTestingData(ctx context.Context, t *testing.T) *GraphsyncTestin gsData.DagService2 = merkledag.NewDAGService(blockservice.New(gsData.Bs2, offline.Exchange(gsData.Bs2))) // setup an IPLD loader/storer for blockstore 1 - gsData.Loader1 = makeLoader(gsData.Bs1) - gsData.Storer1 = makeStorer(gsData.Bs1) + gsData.Loader1 = storeutil.LoaderForBlockstore(gsData.Bs1) + gsData.Storer1 = storeutil.StorerForBlockstore(gsData.Bs1) // setup an IPLD loader/storer for blockstore 2 - gsData.Loader2 = makeLoader(gsData.Bs2) - gsData.Storer2 = makeStorer(gsData.Bs2) + gsData.Loader2 = storeutil.LoaderForBlockstore(gsData.Bs2) + gsData.Storer2 = storeutil.StorerForBlockstore(gsData.Bs2) mn := mocknet.New(ctx) @@ -174,7 +141,7 @@ func (gsData *GraphsyncTestingData) SetupGraphsyncHost1() graphsync.GraphExchang } // SetupGSTransportHost1 sets up a new grapshync transport over real graphsync on the first host -func (gsData *GraphsyncTestingData) SetupGSTransportHost1() transport.Transport { +func (gsData *GraphsyncTestingData) SetupGSTransportHost1() datatransfer.Transport { // setup graphsync gs := gsData.SetupGraphsyncHost1() return gstransport.NewTransport(gsData.Host1.ID(), gs) @@ -187,7 +154,7 @@ func (gsData *GraphsyncTestingData) SetupGraphsyncHost2() graphsync.GraphExchang } // SetupGSTransportHost2 sets up a new grapshync transport over real graphsync on the second host -func (gsData *GraphsyncTestingData) SetupGSTransportHost2() transport.Transport { +func (gsData *GraphsyncTestingData) SetupGSTransportHost2() datatransfer.Transport { // setup graphsync gs := gsData.SetupGraphsyncHost2() return gstransport.NewTransport(gsData.Host2.ID(), gs) @@ -195,6 +162,22 @@ func (gsData *GraphsyncTestingData) SetupGSTransportHost2() transport.Transport // LoadUnixFSFile loads a fixtures file we can test dag transfer with func (gsData *GraphsyncTestingData) LoadUnixFSFile(t *testing.T, useSecondNode bool) ipld.Link { + // import to UnixFS + var dagService ipldformat.DAGService + if useSecondNode { + dagService = gsData.DagService2 + } else { + dagService = gsData.DagService1 + } + + link, origBytes := LoadUnixFSFile(gsData.Ctx, t, dagService) + gsData.OrigBytes = origBytes + return link +} + +// LoadUnixFSFile loads a fixtures file into the given DAG Service, returning an ipld.Link for the file +// and the original file bytes +func LoadUnixFSFile(ctx context.Context, t *testing.T, dagService ipldformat.DAGService) (ipld.Link, []byte) { _, curFile, _, ok := runtime.Caller(0) require.True(t, ok) @@ -209,13 +192,7 @@ func (gsData *GraphsyncTestingData) LoadUnixFSFile(t *testing.T, useSecondNode b file := files.NewReaderFile(tr) // import to UnixFS - var dagService ipldformat.DAGService - if useSecondNode { - dagService = gsData.DagService2 - } else { - dagService = gsData.DagService1 - } - bufferedDS := ipldformat.NewBufferedDAG(gsData.Ctx, dagService) + bufferedDS := ipldformat.NewBufferedDAG(ctx, dagService) params := ihelper.DagBuilderParams{ Maxlinks: unixfsLinksPerLevel, @@ -234,9 +211,7 @@ func (gsData *GraphsyncTestingData) LoadUnixFSFile(t *testing.T, useSecondNode b require.NoError(t, err) // save the original files bytes - gsData.OrigBytes = buf.Bytes() - - return cidlink.Link{Cid: nd.Cid()} + return cidlink.Link{Cid: nd.Cid()}, buf.Bytes() } // VerifyFileTransferred verifies all of the file was transfer to the given node @@ -248,14 +223,20 @@ func (gsData *GraphsyncTestingData) VerifyFileTransferred(t *testing.T, link ipl dagService = gsData.DagService1 } + VerifyHasFile(gsData.Ctx, t, dagService, link, gsData.OrigBytes) +} + +// VerifyHasFile verifies the presence of the given file with the given ipld.Link and file contents (fileBytes) +// exists in the given blockstore identified by dagService +func VerifyHasFile(ctx context.Context, t *testing.T, dagService ipldformat.DAGService, link ipld.Link, fileBytes []byte) { c := link.(cidlink.Link).Cid // load the root of the UnixFS DAG from the new blockstore - otherNode, err := dagService.Get(gsData.Ctx, c) + otherNode, err := dagService.Get(ctx, c) require.NoError(t, err) // Setup a UnixFS file reader - n, err := unixfile.NewUnixfsFile(gsData.Ctx, dagService, otherNode) + n, err := unixfile.NewUnixfsFile(ctx, dagService, otherNode) require.NoError(t, err) fn, ok := n.(files.File) @@ -266,5 +247,5 @@ func (gsData *GraphsyncTestingData) VerifyFileTransferred(t *testing.T, link ipl require.NoError(t, err) // verify original bytes match final bytes! - require.EqualValues(t, gsData.OrigBytes, finalBytes) + require.EqualValues(t, fileBytes, finalBytes) } diff --git a/testutil/message.go b/testutil/message.go index 0dfc1e39..75b19c0e 100644 --- a/testutil/message.go +++ b/testutil/message.go @@ -11,7 +11,7 @@ import ( ) // NewDTRequest makes a new DT Request message -func NewDTRequest(t *testing.T, transferID datatransfer.TransferID) message.DataTransferRequest { +func NewDTRequest(t *testing.T, transferID datatransfer.TransferID) datatransfer.Request { voucher := NewFakeDTType() baseCid := GenerateCids(1)[0] selector := builder.NewSelectorSpecBuilder(basicnode.Style.Any).Matcher().Node() @@ -21,7 +21,7 @@ func NewDTRequest(t *testing.T, transferID datatransfer.TransferID) message.Data } // NewDTResponse makes a new DT Request message -func NewDTResponse(t *testing.T, transferID datatransfer.TransferID) message.DataTransferResponse { +func NewDTResponse(t *testing.T, transferID datatransfer.TransferID) datatransfer.Response { vresult := NewFakeDTType() r, err := message.NewResponse(transferID, false, false, vresult.Type(), vresult) require.NoError(t, err) diff --git a/testutil/testnet.go b/testutil/testnet.go index 1ce20d30..1600e3de 100644 --- a/testutil/testnet.go +++ b/testutil/testnet.go @@ -3,7 +3,7 @@ package testutil import ( "context" - "github.com/filecoin-project/go-data-transfer/message" + datatransfer "github.com/filecoin-project/go-data-transfer" "github.com/filecoin-project/go-data-transfer/network" "github.com/libp2p/go-libp2p-core/peer" ) @@ -11,7 +11,7 @@ import ( // FakeSentMessage is a recording of a message sent on the FakeNetwork type FakeSentMessage struct { PeerID peer.ID - Message message.DataTransferMessage + Message datatransfer.Message } // FakeNetwork is a network that satisfies the DataTransferNetwork interface but @@ -28,7 +28,7 @@ func NewFakeNetwork(id peer.ID) *FakeNetwork { } // SendMessage sends a GraphSync message to a peer. -func (fn *FakeNetwork) SendMessage(ctx context.Context, p peer.ID, m message.DataTransferMessage) error { +func (fn *FakeNetwork) SendMessage(ctx context.Context, p peer.ID, m datatransfer.Message) error { fn.SentMessages = append(fn.SentMessages, FakeSentMessage{p, m}) return nil } diff --git a/testutil/testutil.go b/testutil/testutil.go index b63d82af..f7a2f859 100644 --- a/testutil/testutil.go +++ b/testutil/testutil.go @@ -4,7 +4,7 @@ import ( "bytes" "testing" - "github.com/filecoin-project/go-data-transfer/message" + datatransfer "github.com/filecoin-project/go-data-transfer" blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" blocksutil "github.com/ipfs/go-ipfs-blocksutil" @@ -90,7 +90,7 @@ func ContainsBlock(blks []blocks.Block, block blocks.Block) bool { } // AssertEqualSelector asserts two requests have the same valid selector -func AssertEqualSelector(t *testing.T, expectedRequest message.DataTransferRequest, request message.DataTransferRequest) { +func AssertEqualSelector(t *testing.T, expectedRequest datatransfer.Request, request datatransfer.Request) { expectedSelector, err := expectedRequest.Selector() require.NoError(t, err) selector, err := request.Selector() diff --git a/transport/transport.go b/transport.go similarity index 56% rename from transport/transport.go rename to transport.go index 91a74278..cf133708 100644 --- a/transport/transport.go +++ b/transport.go @@ -1,68 +1,30 @@ -/* -Package transport defines interfaces for implementing a transport layer for data -transfer. Where the data transfer manager will coordinate setting up push and -pull requests, validation, etc, the transport layer is responsible for moving -data back and forth, and may be medium specific. For example, some transports -may have the ability to pause and resume requests, while others may not. -Some may support individual data events, while others may only support message -events. Some transport layers may opt to use the actual data transfer network -protocols directly while others may be able to encode messages in their own -data protocol. -*/ -package transport +package datatransfer import ( "context" - datatransfer "github.com/filecoin-project/go-data-transfer" - "github.com/filecoin-project/go-data-transfer/message" ipld "github.com/ipld/go-ipld-prime" peer "github.com/libp2p/go-libp2p-core/peer" ) -type errorType string - -func (e errorType) Error() string { - return string(e) -} - -// ErrHandlerAlreadySet means an event handler was already set for this instance of -// hooks -const ErrHandlerAlreadySet = errorType("already set event handler") - -// ErrHandlerNotSet means you cannot issue commands to this interface because the -// handler has not been set -const ErrHandlerNotSet = errorType("event handler has not been set") - -// ErrChannelNotFound means the channel this command was issued for does not exist -const ErrChannelNotFound = errorType("channel not found") - -// ErrPause is a special error that the DataReceived / DataSent hooks can -// use to pause the channel -const ErrPause = errorType("pause channel") - -// ErrResume is a special error that the RequestReceived / ResponseReceived hooks can -// use to resume the channel -const ErrResume = errorType("resume channel") - -// Events are semantic data transfer events that happen as a result of graphsync hooks -type Events interface { +// EventsHandler are semantic data transfer events that happen as a result of graphsync hooks +type EventsHandler interface { // OnChannelOpened is called when we ask the other peer to send us data on the // given channel ID // return values are: // - nil = this channel is recognized // - error = ignore incoming data for this channel - OnChannelOpened(chid datatransfer.ChannelID) error + OnChannelOpened(chid ChannelID) error // OnResponseReceived is called when we receive a response to a request // - nil = continue receiving data // - error = cancel this request - OnResponseReceived(chid datatransfer.ChannelID, msg message.DataTransferResponse) error + OnResponseReceived(chid ChannelID, msg Response) error // OnDataReceive is called when we receive data for the given channel ID // return values are: // - nil = proceed with sending data // - error = cancel this request // - err == ErrPause - pause this request - OnDataReceived(chid datatransfer.ChannelID, link ipld.Link, size uint64) error + OnDataReceived(chid ChannelID, link ipld.Link, size uint64) error // OnDataSent is called when we send data for the given channel ID // return values are: // message = data transfer message along with data @@ -70,7 +32,7 @@ type Events interface { // - nil = proceed with sending data // - error = cancel this request // - err == ErrPause - pause this request - OnDataSent(chid datatransfer.ChannelID, link ipld.Link, size uint64) (message.DataTransferMessage, error) + OnDataSent(chid ChannelID, link ipld.Link, size uint64) (Message, error) // OnRequestReceived is called when we receive a new request to send data // for the given channel ID // return values are: @@ -80,16 +42,26 @@ type Events interface { // - error = cancel this request // - err == ErrPause - pause this request (only for new requests) // - err == ErrResume - resume this request (only for update requests) - OnRequestReceived(chid datatransfer.ChannelID, msg message.DataTransferRequest) (message.DataTransferResponse, error) + OnRequestReceived(chid ChannelID, msg Request) (Response, error) // OnResponseCompleted is called when we finish sending data for the given channel ID // Error returns are logged but otherwise have not effect - OnChannelCompleted(chid datatransfer.ChannelID, success bool) error + OnChannelCompleted(chid ChannelID, success bool) error } -// Transport is the minimum interface that must be satisfied to serve as a datatransfer -// transport layer. Transports must be able to open -// (open is always called by the receiving peer) -// and close channels, and set at an event handler +/* +Transport defines the interface for a transport layer for data +transfer. Where the data transfer manager will coordinate setting up push and +pull requests, validation, etc, the transport layer is responsible for moving +data back and forth, and may be medium specific. For example, some transports +may have the ability to pause and resume requests, while others may not. +Some may support individual data events, while others may only support message +events. Some transport layers may opt to use the actual data transfer network +protocols directly while others may be able to encode messages in their own +data protocol. + +Transport is the minimum interface that must be satisfied to serve as a datatransfer +transport layer. Transports must be able to open (open is always called by the receiving peer) +and close channels, and set at an event handler */ type Transport interface { // OpenChannel initiates an outgoing request for the other peer to send data // to us on this channel @@ -98,18 +70,18 @@ type Transport interface { // intending to receive data OpenChannel(ctx context.Context, dataSender peer.ID, - channelID datatransfer.ChannelID, + channelID ChannelID, root ipld.Link, stor ipld.Node, - msg message.DataTransferMessage) error + msg Message) error // CloseChannel closes the given channel - CloseChannel(ctx context.Context, chid datatransfer.ChannelID) error + CloseChannel(ctx context.Context, chid ChannelID) error // SetEventHandler sets the handler for events on channels - SetEventHandler(events Events) error + SetEventHandler(events EventsHandler) error // CleanupChannel is called on the otherside of a cancel - removes any associated // data for the channel - CleanupChannel(chid datatransfer.ChannelID) + CleanupChannel(chid ChannelID) } // PauseableTransport is a transport that can also pause and resume channels @@ -117,11 +89,11 @@ type PauseableTransport interface { Transport // PauseChannel paused the given channel ID PauseChannel(ctx context.Context, - chid datatransfer.ChannelID, + chid ChannelID, ) error // ResumeChannel resumes the given channel ResumeChannel(ctx context.Context, - msg message.DataTransferMessage, - chid datatransfer.ChannelID, + msg Message, + chid ChannelID, ) error } diff --git a/transport/graphsync/extension/gsextension.go b/transport/graphsync/extension/gsextension.go index 9bd6e2f2..0d29ed71 100644 --- a/transport/graphsync/extension/gsextension.go +++ b/transport/graphsync/extension/gsextension.go @@ -3,6 +3,7 @@ package extension import ( "bytes" + datatransfer "github.com/filecoin-project/go-data-transfer" "github.com/filecoin-project/go-data-transfer/message" "github.com/ipfs/go-graphsync" ) @@ -13,7 +14,7 @@ const ( ) // ToExtensionData converts a message to a graphsync extension -func ToExtensionData(msg message.DataTransferMessage) (graphsync.ExtensionData, error) { +func ToExtensionData(msg datatransfer.Message) (graphsync.ExtensionData, error) { buf := new(bytes.Buffer) err := msg.ToNet(buf) if err != nil { @@ -35,7 +36,7 @@ type GsExtended interface { // * nil + nil if the extension is not found // * nil + error if the extendedData fails to unmarshal // * unmarshaled ExtensionDataTransferData + nil if all goes well -func GetTransferData(extendedData GsExtended) (message.DataTransferMessage, error) { +func GetTransferData(extendedData GsExtended) (datatransfer.Message, error) { data, ok := extendedData.Extension(ExtensionDataTransfer) if !ok { return nil, nil diff --git a/transport/graphsync/graphsync.go b/transport/graphsync/graphsync.go index 4b4d0249..85204be5 100644 --- a/transport/graphsync/graphsync.go +++ b/transport/graphsync/graphsync.go @@ -5,10 +5,7 @@ import ( "errors" "sync" - "github.com/filecoin-project/go-data-transfer/transport" - datatransfer "github.com/filecoin-project/go-data-transfer" - "github.com/filecoin-project/go-data-transfer/message" "github.com/filecoin-project/go-data-transfer/transport/graphsync/extension" "github.com/ipfs/go-graphsync" ipld "github.com/ipld/go-ipld-prime" @@ -31,7 +28,7 @@ type responseProgress struct { // Transport manages graphsync hooks for data transfer, translating from // graphsync hooks to semantic data transfer events type Transport struct { - events transport.Events + events datatransfer.EventsHandler gs graphsync.GraphExchange peerID peer.ID dataLock sync.RWMutex @@ -42,6 +39,7 @@ type Transport struct { requestorCancelledMap map[datatransfer.ChannelID]struct{} pendingExtensions map[datatransfer.ChannelID][]graphsync.ExtensionData responseProgressMap map[datatransfer.ChannelID]*responseProgress + stores map[datatransfer.ChannelID]struct{} } // NewTransport makes a new hooks manager with the given hook events interface @@ -56,6 +54,7 @@ func NewTransport(peerID peer.ID, gs graphsync.GraphExchange) *Transport { channelIDMap: make(map[datatransfer.ChannelID]graphsyncKey), responseProgressMap: make(map[datatransfer.ChannelID]*responseProgress), pending: make(map[datatransfer.ChannelID]chan struct{}), + stores: make(map[datatransfer.ChannelID]struct{}), } } @@ -69,9 +68,9 @@ func (t *Transport) OpenChannel(ctx context.Context, channelID datatransfer.ChannelID, root ipld.Link, stor ipld.Node, - msg message.DataTransferMessage) error { + msg datatransfer.Message) error { if t.events == nil { - return transport.ErrHandlerNotSet + return datatransfer.ErrHandlerNotSet } ext, err := extension.ToExtensionData(msg) if err != nil { @@ -123,11 +122,11 @@ func (t *Transport) gsKeyFromChannelID(ctx context.Context, chid datatransfer.Ch pending, hasPending := t.pending[chid] t.dataLock.RUnlock() if !hasPending { - return graphsyncKey{}, transport.ErrChannelNotFound + return graphsyncKey{}, datatransfer.ErrChannelNotFound } select { case <-ctx.Done(): - return graphsyncKey{}, transport.ErrChannelNotFound + return graphsyncKey{}, datatransfer.ErrChannelNotFound case <-pending: } } @@ -138,7 +137,7 @@ func (t *Transport) PauseChannel(ctx context.Context, chid datatransfer.ChannelID, ) error { if t.events == nil { - return transport.ErrHandlerNotSet + return datatransfer.ErrHandlerNotSet } gsKey, err := t.gsKeyFromChannelID(ctx, chid) if err != nil { @@ -158,11 +157,11 @@ func (t *Transport) PauseChannel(ctx context.Context, // ResumeChannel resumes the given channel func (t *Transport) ResumeChannel(ctx context.Context, - msg message.DataTransferMessage, + msg datatransfer.Message, chid datatransfer.ChannelID, ) error { if t.events == nil { - return transport.ErrHandlerNotSet + return datatransfer.ErrHandlerNotSet } gsKey, err := t.gsKeyFromChannelID(ctx, chid) if err != nil { @@ -193,7 +192,7 @@ func (t *Transport) ResumeChannel(ctx context.Context, // CloseChannel closes the given channel func (t *Transport) CloseChannel(ctx context.Context, chid datatransfer.ChannelID) error { if t.events == nil { - return transport.ErrHandlerNotSet + return datatransfer.ErrHandlerNotSet } gsKey, err := t.gsKeyFromChannelID(ctx, chid) if err != nil { @@ -204,7 +203,7 @@ func (t *Transport) CloseChannel(ctx context.Context, chid datatransfer.ChannelI cancelFn, ok := t.contextCancelMap[chid] t.dataLock.RUnlock() if !ok { - return transport.ErrChannelNotFound + return datatransfer.ErrChannelNotFound } cancelFn() return nil @@ -229,9 +228,9 @@ func (t *Transport) CleanupChannel(chid datatransfer.ChannelID) { } // SetEventHandler sets the handler for events on channels -func (t *Transport) SetEventHandler(events transport.Events) error { +func (t *Transport) SetEventHandler(events datatransfer.EventsHandler) error { if t.events != nil { - return transport.ErrHandlerAlreadySet + return datatransfer.ErrHandlerAlreadySet } t.events = events t.gs.RegisterIncomingRequestHook(t.gsReqRecdHook) @@ -245,6 +244,18 @@ func (t *Transport) SetEventHandler(events transport.Events) error { return nil } +// UseStore tells the graphsync transport to use the given loader and storer for this channelID +func (t *Transport) UseStore(channelID datatransfer.ChannelID, loader ipld.Loader, storer ipld.Storer) error { + err := t.gs.RegisterPersistenceOption("data-transfer-"+channelID.String(), loader, storer) + if err != nil { + return err + } + t.dataLock.Lock() + t.stores[channelID] = struct{}{} + t.dataLock.Unlock() + return nil +} + func (t *Transport) gsOutgoingRequestHook(p peer.ID, request graphsync.RequestData, hookActions graphsync.OutgoingRequestHookActions) { message, _ := extension.GetTransferData(request) @@ -275,6 +286,10 @@ func (t *Transport) gsOutgoingRequestHook(p peer.ID, request graphsync.RequestDa close(pending) delete(t.pending, chid) } + _, ok := t.stores[chid] + if ok { + hookActions.UsePersistenceOption("data-transfer-" + chid.String()) + } t.dataLock.Unlock() } @@ -288,12 +303,12 @@ func (t *Transport) gsIncomingBlockHook(p peer.ID, response graphsync.ResponseDa } err := t.events.OnDataReceived(chid, block.Link(), block.BlockSize()) - if err != nil && err != transport.ErrPause { + if err != nil && err != datatransfer.ErrPause { hookActions.TerminateWithError(err) return } - if err == transport.ErrPause { + if err == datatransfer.ErrPause { hookActions.PauseRequest() } } @@ -314,12 +329,12 @@ func (t *Transport) gsOutgoingBlockHook(p peer.ID, request graphsync.RequestData rp.maximumSent = rp.currentSent msg, err := t.events.OnDataSent(chid, block.Link(), block.BlockSize()) - if err != nil && err != transport.ErrPause { + if err != nil && err != datatransfer.ErrPause { hookActions.TerminateWithError(err) return } - if err == transport.ErrPause { + if err == datatransfer.ErrPause { hookActions.PauseResponse() } @@ -350,16 +365,16 @@ func (t *Transport) gsReqRecdHook(p peer.ID, request graphsync.RequestData, hook } var chid datatransfer.ChannelID - var responseMessage message.DataTransferMessage + var responseMessage datatransfer.Message if msg.IsRequest() { // when a DT request comes in on graphsync, it's a pull chid = datatransfer.ChannelID{ID: msg.TransferID(), Initiator: p, Responder: t.peerID} - request := msg.(message.DataTransferRequest) + request := msg.(datatransfer.Request) responseMessage, err = t.events.OnRequestReceived(chid, request) } else { // when a DT response comes in on graphsync, it's a push chid = datatransfer.ChannelID{ID: msg.TransferID(), Initiator: t.peerID, Responder: p} - response := msg.(message.DataTransferResponse) + response := msg.(datatransfer.Response) err = t.events.OnResponseReceived(chid, response) } @@ -372,12 +387,12 @@ func (t *Transport) gsReqRecdHook(p peer.ID, request graphsync.RequestData, hook hookActions.SendExtensionData(extension) } - if err != nil && err != transport.ErrPause { + if err != nil && err != datatransfer.ErrPause { hookActions.TerminateWithError(err) return } - if err == transport.ErrPause { + if err == datatransfer.ErrPause { hookActions.PauseResponse() } @@ -399,8 +414,11 @@ func (t *Transport) gsReqRecdHook(p peer.ID, request graphsync.RequestData, hook } else { t.responseProgressMap[chid] = &responseProgress{} } + _, ok := t.stores[chid] + if ok { + hookActions.UsePersistenceOption("data-transfer-" + chid.String()) + } t.dataLock.Unlock() - hookActions.ValidateRequest() } @@ -432,6 +450,14 @@ func (t *Transport) cleanupChannel(chid datatransfer.ChannelID, gsKey graphsyncK delete(t.responseProgressMap, chid) delete(t.pendingExtensions, chid) delete(t.requestorCancelledMap, chid) + _, ok := t.stores[chid] + if ok { + err := t.gs.UnregisterPersistenceOption("data-transfer-" + chid.String()) + if err != nil { + log.Error(err) + } + } + delete(t.stores, chid) } func (t *Transport) gsRequestUpdatedHook(p peer.ID, request graphsync.RequestData, update graphsync.RequestData, hookActions graphsync.RequestUpdatedHookActions) { @@ -454,7 +480,7 @@ func (t *Transport) gsRequestUpdatedHook(p peer.ID, request graphsync.RequestDat hookActions.SendExtensionData(extension) } - if err != nil && err != transport.ErrPause { + if err != nil && err != datatransfer.ErrPause { hookActions.TerminateWithError(err) } @@ -487,7 +513,7 @@ func (t *Transport) gsIncomingResponseHook(p peer.ID, response graphsync.Respons } } -func (t *Transport) processExtension(chid datatransfer.ChannelID, gsMsg extension.GsExtended, p peer.ID) (message.DataTransferMessage, error) { +func (t *Transport) processExtension(chid datatransfer.ChannelID, gsMsg extension.GsExtended, p peer.ID) (datatransfer.Message, error) { // if this is a push request the sender is us. msg, err := extension.GetTransferData(gsMsg) @@ -506,7 +532,7 @@ func (t *Transport) processExtension(chid datatransfer.ChannelID, gsMsg extensio if (chid != datatransfer.ChannelID{ID: msg.TransferID(), Initiator: p, Responder: t.peerID}) { return nil, errors.New("received request on response channel") } - dtRequest := msg.(message.DataTransferRequest) + dtRequest := msg.(datatransfer.Request) return t.events.OnRequestReceived(chid, dtRequest) } @@ -515,7 +541,7 @@ func (t *Transport) processExtension(chid datatransfer.ChannelID, gsMsg extensio return nil, errors.New("received response on request channel") } - dtResponse := msg.(message.DataTransferResponse) + dtResponse := msg.(datatransfer.Response) return nil, t.events.OnResponseReceived(chid, dtResponse) } diff --git a/transport/graphsync/graphsync_test.go b/transport/graphsync/graphsync_test.go index 3ef210c6..f5d3ec54 100644 --- a/transport/graphsync/graphsync_test.go +++ b/transport/graphsync/graphsync_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "errors" + "io" "math/rand" "testing" "time" @@ -11,11 +12,10 @@ import ( datatransfer "github.com/filecoin-project/go-data-transfer" "github.com/filecoin-project/go-data-transfer/message" "github.com/filecoin-project/go-data-transfer/testutil" - "github.com/filecoin-project/go-data-transfer/transport" . "github.com/filecoin-project/go-data-transfer/transport/graphsync" "github.com/filecoin-project/go-data-transfer/transport/graphsync/extension" "github.com/ipfs/go-graphsync" - ipld "github.com/ipld/go-ipld-prime" + "github.com/ipld/go-ipld-prime" cidlink "github.com/ipld/go-ipld-prime/linking/cid" peer "github.com/libp2p/go-libp2p-core/peer" "github.com/stretchr/testify/require" @@ -347,7 +347,7 @@ func TestManager(t *testing.T) { }, "outgoing data send error == pause will pause request": { events: fakeEvents{ - OnDataSentError: transport.ErrPause, + OnDataSentError: datatransfer.ErrPause, }, action: func(gsData *harness) { gsData.incomingRequestHook() @@ -661,6 +661,44 @@ func TestManager(t *testing.T) { } }, }, + "UseStore can change store used for outgoing requests": { + action: func(gsData *harness) { + loader := func(ipld.Link, ipld.LinkContext) (io.Reader, error) { + return nil, nil + } + storer := func(ipld.LinkContext) (io.Writer, ipld.StoreCommitter, error) { + return nil, nil, nil + } + _ = gsData.transport.UseStore(datatransfer.ChannelID{ID: gsData.transferID, Responder: gsData.other, Initiator: gsData.self}, loader, storer) + gsData.outgoingRequestHook() + }, + check: func(t *testing.T, events *fakeEvents, gsData *harness) { + expectedChannel := "data-transfer-" + datatransfer.ChannelID{ID: gsData.transferID, Responder: gsData.other, Initiator: gsData.self}.String() + gsData.fgs.AssertHasPersistenceOption(t, expectedChannel) + require.Equal(t, expectedChannel, gsData.outgoingRequestHookActions.PersistenceOption) + gsData.transport.CleanupChannel(datatransfer.ChannelID{ID: gsData.transferID, Responder: gsData.other, Initiator: gsData.self}) + gsData.fgs.AssertDoesNotHavePersistenceOption(t, expectedChannel) + }, + }, + "UseStore can change store used for incoming requests": { + action: func(gsData *harness) { + loader := func(ipld.Link, ipld.LinkContext) (io.Reader, error) { + return nil, nil + } + storer := func(ipld.LinkContext) (io.Writer, ipld.StoreCommitter, error) { + return nil, nil, nil + } + _ = gsData.transport.UseStore(datatransfer.ChannelID{ID: gsData.transferID, Responder: gsData.self, Initiator: gsData.other}, loader, storer) + gsData.incomingRequestHook() + }, + check: func(t *testing.T, events *fakeEvents, gsData *harness) { + expectedChannel := "data-transfer-" + datatransfer.ChannelID{ID: gsData.transferID, Responder: gsData.self, Initiator: gsData.other}.String() + gsData.fgs.AssertHasPersistenceOption(t, expectedChannel) + require.Equal(t, expectedChannel, gsData.incomingRequestHookActions.PersistenceOption) + gsData.transport.CleanupChannel(datatransfer.ChannelID{ID: gsData.transferID, Responder: gsData.self, Initiator: gsData.other}) + gsData.fgs.AssertDoesNotHavePersistenceOption(t, expectedChannel) + }, + }, } ctx := context.Background() for testCase, data := range testCases { @@ -723,10 +761,10 @@ type fakeEvents struct { OnChannelCompletedCalled bool OnChannelCompletedErr error ChannelCompletedSuccess bool - DataSentMessage message.DataTransferMessage - RequestReceivedRequest message.DataTransferRequest - RequestReceivedResponse message.DataTransferResponse - ResponseReceivedResponse message.DataTransferResponse + DataSentMessage datatransfer.Message + RequestReceivedRequest datatransfer.Request + RequestReceivedResponse datatransfer.Response + ResponseReceivedResponse datatransfer.Response } func (fe *fakeEvents) OnChannelOpened(chid datatransfer.ChannelID) error { @@ -739,12 +777,12 @@ func (fe *fakeEvents) OnDataReceived(chid datatransfer.ChannelID, link ipld.Link return fe.OnDataReceivedError } -func (fe *fakeEvents) OnDataSent(chid datatransfer.ChannelID, link ipld.Link, size uint64) (message.DataTransferMessage, error) { +func (fe *fakeEvents) OnDataSent(chid datatransfer.ChannelID, link ipld.Link, size uint64) (datatransfer.Message, error) { fe.OnDataSentCalled = true return fe.DataSentMessage, fe.OnDataSentError } -func (fe *fakeEvents) OnRequestReceived(chid datatransfer.ChannelID, request message.DataTransferRequest) (message.DataTransferResponse, error) { +func (fe *fakeEvents) OnRequestReceived(chid datatransfer.ChannelID, request datatransfer.Request) (datatransfer.Response, error) { fe.OnRequestReceivedCallCount++ fe.RequestReceivedChannelID = chid fe.RequestReceivedRequest = request @@ -755,7 +793,7 @@ func (fe *fakeEvents) OnRequestReceived(chid datatransfer.ChannelID, request mes return fe.RequestReceivedResponse, err } -func (fe *fakeEvents) OnResponseReceived(chid datatransfer.ChannelID, response message.DataTransferResponse) error { +func (fe *fakeEvents) OnResponseReceived(chid datatransfer.ChannelID, response datatransfer.Response) error { fe.OnResponseReceivedCallCount++ fe.ResponseReceivedResponse = response fe.ResponseReceivedChannelID = chid @@ -773,8 +811,8 @@ func (fe *fakeEvents) OnChannelCompleted(chid datatransfer.ChannelID, success bo } type harness struct { - outgoing message.DataTransferRequest - incoming message.DataTransferResponse + outgoing datatransfer.Request + incoming datatransfer.Response ctx context.Context transport *Transport fgs *testutil.FakeGraphSync @@ -830,7 +868,7 @@ func (dtc *dtConfig) extensions(t *testing.T, transferID datatransfer.TransferID if dtc.dtExtensionMalformed { extensions[extension.ExtensionDataTransfer] = testutil.RandomBytes(100) } else { - var msg message.DataTransferMessage + var msg datatransfer.Message if dtc.dtIsResponse { msg = testutil.NewDTResponse(t, transferID) } else { @@ -878,14 +916,14 @@ func (grc *gsResponseConfig) makeResponse(t *testing.T, transferID datatransfer. return testutil.NewFakeResponse(requestID, extensions, grc.status) } -func assertDecodesToMessage(t *testing.T, data []byte, expected message.DataTransferMessage) { +func assertDecodesToMessage(t *testing.T, data []byte, expected datatransfer.Message) { buf := bytes.NewReader(data) actual, err := message.FromNet(buf) require.NoError(t, err) require.Equal(t, expected, actual) } -func assertHasOutgoingMessage(t *testing.T, extensions []graphsync.ExtensionData, expected message.DataTransferMessage) { +func assertHasOutgoingMessage(t *testing.T, extensions []graphsync.ExtensionData, expected datatransfer.Message) { buf := new(bytes.Buffer) err := expected.ToNet(buf) require.NoError(t, err) diff --git a/types.go b/types.go index 5f34d3b8..06c16a25 100644 --- a/types.go +++ b/types.go @@ -1,10 +1,7 @@ package datatransfer import ( - "context" - "errors" "fmt" - "time" "github.com/filecoin-project/go-data-transfer/encoding" "github.com/ipfs/go-cid" @@ -20,12 +17,6 @@ func (es errorString) Error() string { //go:generate cbor-gen-for ChannelID -// ErrChannelNotFound indicates the given channel does not exist -const ErrChannelNotFound = errorString("channel not found") - -// ErrPause can be returned from validators to pause a request -const ErrPause = errorString("data transfer pause request") - // TypeIdentifier is a unique string identifier for a type of encodable object in a // registry type TypeIdentifier string @@ -51,88 +42,6 @@ type Voucher Registerable // voucher being rejected or accepted type VoucherResult Registerable -// Status is the status of transfer for a given channel -type Status uint64 - -const ( - // Requested means a data transfer was requested by has not yet been approved - Requested Status = iota - - // Ongoing means the data transfer is in progress - Ongoing - - // TransferFinished indicates the initiator is done sending/receiving - // data but is awaiting confirmation from the responder - TransferFinished - - // ResponderCompleted indicates the initiator received a message from the - // responder that it's completed - ResponderCompleted - - // Finalizing means the responder is awaiting a final message from the initator to - // consider the transfer done - Finalizing - - // Completing just means we have some final cleanup for a completed request - Completing - - // Completed means the data transfer is completed successfully - Completed - - // Failing just means we have some final cleanup for a failed request - Failing - - // Failed means the data transfer failed - Failed - - // Cancelling just means we have some final cleanup for a cancelled request - Cancelling - - // Cancelled means the data transfer ended prematurely - Cancelled - - // InitiatorPaused means the data sender has paused the channel (only the sender can unpause this) - InitiatorPaused - - // ResponderPaused means the data receiver has paused the channel (only the receiver can unpause this) - ResponderPaused - - // BothPaused means both sender and receiver have paused the channel seperately (both must unpause) - BothPaused - - // ResponderFinalizing is a unique state where the responder is awaiting a final voucher - ResponderFinalizing - - // ResponderFinalizingTransferFinished is a unique state where the responder is awaiting a final voucher - // and we have received all data - ResponderFinalizingTransferFinished - - // ChannelNotFoundError means the searched for data transfer does not exist - ChannelNotFoundError -) - -// Statuses are human readable names for data transfer states -var Statuses = map[Status]string{ - // Requested means a data transfer was requested by has not yet been approved - Requested: "Requested", - Ongoing: "Ongoing", - TransferFinished: "TransferFinished", - ResponderCompleted: "ResponderCompleted", - Finalizing: "Finalizing", - Completing: "Completing", - Completed: "Completed", - Failing: "Failing", - Failed: "Failed", - Cancelling: "Cancelling", - Cancelled: "Cancelled", - InitiatorPaused: "InitiatorPaused", - ResponderPaused: "ResponderPaused", - BothPaused: "BothPaused", - ResponderFinalizing: "ResponderFinalizing", - ResponderFinalizingTransferFinished: "ResponderFinalizingTransferFinished", - ChannelNotFoundError: "ChannelNotFoundError", -} - // TransferID is an identifier for a data transfer, shared between // request/responder and unique to the requester type TransferID uint64 @@ -220,194 +129,3 @@ type ChannelState interface { // LastVoucherResult returns the last voucher result sent on the channel LastVoucherResult() VoucherResult } - -// EventCode is a name for an event that occurs on a data transfer channel -type EventCode int - -const ( - // Open is an event occurs when a channel is first opened - Open EventCode = iota - - // Accept is an event that emits when the data transfer is first accepted - Accept - - // Progress is an event that gets emitted every time more data is transferred - Progress - - // Cancel indicates one side has cancelled the transfer - Cancel - - // Error is an event that emits when an error occurs in a data transfer - Error - - // CleanupComplete emits when a request is cleaned up - CleanupComplete - - // NewVoucher means we have a new voucher on this channel - NewVoucher - - // NewVoucherResult means we have a new voucher result on this channel - NewVoucherResult - - // PauseInitiator emits when the data sender pauses transfer - PauseInitiator - - // ResumeInitiator emits when the data sender resumes transfer - ResumeInitiator - - // PauseResponder emits when the data receiver pauses transfer - PauseResponder - - // ResumeResponder emits when the data receiver resumes transfer - ResumeResponder - - // FinishTransfer emits when the initiator has completed sending/receiving data - FinishTransfer - - // ResponderCompletes emits when the initiator receives a message that the responder is finished - ResponderCompletes - - // ResponderBeginsFinalization emits when the initiator receives a message that the responder is finilizing - ResponderBeginsFinalization - - // BeginFinalizing emits when the responder completes its operations but awaits a response from the - // initiator - BeginFinalizing - - // Complete is emitted when a data transfer is complete - Complete -) - -// Events are human readable names for data transfer events -var Events = map[EventCode]string{ - Open: "Open", - Accept: "Accept", - Progress: "Progress", - Cancel: "Cancel", - Error: "Error", - CleanupComplete: "CleanupComplete", - NewVoucher: "NewVoucher", - NewVoucherResult: "NewVoucherResult", - PauseInitiator: "PauseInitiator", - ResumeInitiator: "ResumeInitiator", - PauseResponder: "PauseResponder", - ResumeResponder: "ResumeResponder", - FinishTransfer: "FinishTransfer", - ResponderBeginsFinalization: "ResponderBeginsFinalization", - ResponderCompletes: "ResponderCompletes", - BeginFinalizing: "BeginFinalizing", - Complete: "Complete", -} - -// Event is a struct containing information about a data transfer event -type Event struct { - Code EventCode // What type of event it is - Message string // Any clarifying information about the event - Timestamp time.Time // when the event happened -} - -// Subscriber is a callback that is called when events are emitted -type Subscriber func(event Event, channelState ChannelState) - -// Unsubscribe is a function that gets called to unsubscribe from data transfer events -type Unsubscribe func() - -// RequestValidator is an interface implemented by the client of the -// data transfer module to validate requests -type RequestValidator interface { - // ValidatePush validates a push request received from the peer that will send data - ValidatePush( - sender peer.ID, - voucher Voucher, - baseCid cid.Cid, - selector ipld.Node) (VoucherResult, error) - // ValidatePull validates a pull request received from the peer that will receive data - ValidatePull( - receiver peer.ID, - voucher Voucher, - baseCid cid.Cid, - selector ipld.Node) (VoucherResult, error) -} - -// ErrRetryValidation is a special error that the a revalidator can return -// for ValidatePush/ValidatePull that will not fail the request -// but send the voucher result back and await another attempt -var ErrRetryValidation = errors.New("Retry Revalidation") - -// Revalidator is a request validator revalidates in progress requests -// by requesting request additional vouchers, and resuming when it receives them -type Revalidator interface { - // Revalidate revalidates a request with a new voucher - Revalidate(channelID ChannelID, voucher Voucher) (VoucherResult, error) - // OnPullDataSent is called on the responder side when more bytes are sent - // for a given pull request. It should return a VoucherResult + ErrPause to - // request revalidation or nil to continue uninterrupted, - // other errors will terminate the request - OnPullDataSent(chid ChannelID, additionalBytesSent uint64) (VoucherResult, error) - // OnPushDataReceived is called on the responder side when more bytes are received - // for a given push request. It should return a VoucherResult + ErrPause to - // request revalidation or nil to continue uninterrupted, - // other errors will terminate the request - OnPushDataReceived(chid ChannelID, additionalBytesReceived uint64) (VoucherResult, error) - // OnComplete is called to make a final request for revalidation -- often for the - // purpose of settlement. - // if VoucherResult is non nil, the request will enter a settlement phase awaiting - // a final update - OnComplete(chid ChannelID) (VoucherResult, error) -} - -// Manager is the core interface presented by all implementations of -// of the data transfer sub system -type Manager interface { - - // Start initializes data transfer processing - Start(ctx context.Context) error - - // Stop terminates all data transfers and ends processing - Stop() error - - // RegisterVoucherType registers a validator for the given voucher type - // will error if voucher type does not implement voucher - // or if there is a voucher type registered with an identical identifier - RegisterVoucherType(voucherType Voucher, validator RequestValidator) error - - // RegisterRevalidator registers a revalidator for the given voucher type - // Note: this is the voucher type used to revalidate. It can share a name - // with the initial validator type and CAN be the same type, or a different type. - // The revalidator can simply be the sampe as the original request validator, - // or a different validator that satisfies the revalidator interface. - RegisterRevalidator(voucherType Voucher, revalidator Revalidator) error - - // RegisterVoucherResultType allows deserialization of a voucher result, - // so that a listener can read the metadata - RegisterVoucherResultType(resultType VoucherResult) error - - // open a data transfer that will send data to the recipient peer and - // transfer parts of the piece that match the selector - OpenPushDataChannel(ctx context.Context, to peer.ID, voucher Voucher, baseCid cid.Cid, selector ipld.Node) (ChannelID, error) - - // open a data transfer that will request data from the sending peer and - // transfer parts of the piece that match the selector - OpenPullDataChannel(ctx context.Context, to peer.ID, voucher Voucher, baseCid cid.Cid, selector ipld.Node) (ChannelID, error) - - // send an intermediate voucher as needed when the receiver sends a request for revalidation - SendVoucher(ctx context.Context, chid ChannelID, voucher Voucher) error - - // close an open channel (effectively a cancel) - CloseDataTransferChannel(ctx context.Context, chid ChannelID) error - - // pause a data transfer channel (only allowed if transport supports it) - PauseDataTransferChannel(ctx context.Context, chid ChannelID) error - - // resume a data transfer channel (only allowed if transport supports it) - ResumeDataTransferChannel(ctx context.Context, chid ChannelID) error - - // get status of a transfer - TransferChannelStatus(ctx context.Context, x ChannelID) Status - - // get notified when certain types of events happen - SubscribeToEvents(subscriber Subscriber) Unsubscribe - - // get all in progress transfers - InProgressChannels(ctx context.Context) (map[ChannelID]ChannelState, error) -}