Skip to content

Commit

Permalink
Allow custom configuration of transports (#57)
Browse files Browse the repository at this point in the history
* feat(graphsync): setup alternate stores

add a method to allow swapping of an alternate store for a given request

* refactor(datatransfer): move all interfaces to top

move interfaces to the top level package to avoid circular imports

* feat(impl): support transport configurer

Support a per voucher type transport configurer to allow custom configuration of a transport

* fix(lint): fix lint/mod-tidy

* fix(impl): fix integration test
  • Loading branch information
hannahhoward authored Jul 27, 2020
1 parent 34b009b commit 5bb7276
Show file tree
Hide file tree
Showing 34 changed files with 981 additions and 683 deletions.
26 changes: 26 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package datatransfer

type errorType string

func (e errorType) Error() string {
return string(e)
}

// ErrHandlerAlreadySet means an event handler was already set for this instance of
// hooks
const ErrHandlerAlreadySet = errorType("already set event handler")

// ErrHandlerNotSet means you cannot issue commands to this interface because the
// handler has not been set
const ErrHandlerNotSet = errorType("event handler has not been set")

// ErrChannelNotFound means the channel this command was issued for does not exist
const ErrChannelNotFound = errorType("channel not found")

// ErrPause is a special error that the DataReceived / DataSent hooks can
// use to pause the channel
const ErrPause = errorType("pause channel")

// ErrResume is a special error that the RequestReceived / ResponseReceived hooks can
// use to resume the channel
const ErrResume = errorType("resume channel")
94 changes: 94 additions & 0 deletions events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package datatransfer

import "time"

// EventCode is a name for an event that occurs on a data transfer channel
type EventCode int

const (
// Open is an event occurs when a channel is first opened
Open EventCode = iota

// Accept is an event that emits when the data transfer is first accepted
Accept

// Progress is an event that gets emitted every time more data is transferred
Progress

// Cancel indicates one side has cancelled the transfer
Cancel

// Error is an event that emits when an error occurs in a data transfer
Error

// CleanupComplete emits when a request is cleaned up
CleanupComplete

// NewVoucher means we have a new voucher on this channel
NewVoucher

// NewVoucherResult means we have a new voucher result on this channel
NewVoucherResult

// PauseInitiator emits when the data sender pauses transfer
PauseInitiator

// ResumeInitiator emits when the data sender resumes transfer
ResumeInitiator

// PauseResponder emits when the data receiver pauses transfer
PauseResponder

// ResumeResponder emits when the data receiver resumes transfer
ResumeResponder

// FinishTransfer emits when the initiator has completed sending/receiving data
FinishTransfer

// ResponderCompletes emits when the initiator receives a message that the responder is finished
ResponderCompletes

// ResponderBeginsFinalization emits when the initiator receives a message that the responder is finilizing
ResponderBeginsFinalization

// BeginFinalizing emits when the responder completes its operations but awaits a response from the
// initiator
BeginFinalizing

// Complete is emitted when a data transfer is complete
Complete
)

// Events are human readable names for data transfer events
var Events = map[EventCode]string{
Open: "Open",
Accept: "Accept",
Progress: "Progress",
Cancel: "Cancel",
Error: "Error",
CleanupComplete: "CleanupComplete",
NewVoucher: "NewVoucher",
NewVoucherResult: "NewVoucherResult",
PauseInitiator: "PauseInitiator",
ResumeInitiator: "ResumeInitiator",
PauseResponder: "PauseResponder",
ResumeResponder: "ResumeResponder",
FinishTransfer: "FinishTransfer",
ResponderBeginsFinalization: "ResponderBeginsFinalization",
ResponderCompletes: "ResponderCompletes",
BeginFinalizing: "BeginFinalizing",
Complete: "Complete",
}

// Event is a struct containing information about a data transfer event
type Event struct {
Code EventCode // What type of event it is
Message string // Any clarifying information about the event
Timestamp time.Time // when the event happened
}

// Subscriber is a callback that is called when events are emitted
type Subscriber func(event Event, channelState ChannelState)

// Unsubscribe is a function that gets called to unsubscribe from data transfer events
type Unsubscribe func()
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/ipfs/go-blockservice v0.1.3
github.com/ipfs/go-cid v0.0.5
github.com/ipfs/go-datastore v0.4.4
github.com/ipfs/go-graphsync v0.0.6-0.20200715204712-ef06b3d32e83
github.com/ipfs/go-graphsync v0.0.6-0.20200721211002-c376cbe14c0a
github.com/ipfs/go-ipfs-blockstore v0.1.4
github.com/ipfs/go-ipfs-blocksutil v0.0.1
github.com/ipfs/go-ipfs-chunker v0.0.5
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,8 @@ github.com/ipfs/go-ds-badger v0.0.5/go.mod h1:g5AuuCGmr7efyzQhLL8MzwqcauPojGPUaH
github.com/ipfs/go-ds-badger v0.2.1/go.mod h1:Tx7l3aTph3FMFrRS838dcSJh+jjA7cX9DrGVwx/NOwE=
github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIykR4L04tMOYc=
github.com/ipfs/go-ds-leveldb v0.4.1/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s=
github.com/ipfs/go-graphsync v0.0.6-0.20200715204712-ef06b3d32e83 h1:tkGDAwcZfzDFeBNyBWYOM02Qw0rGpA2UuCvq49T3K5o=
github.com/ipfs/go-graphsync v0.0.6-0.20200715204712-ef06b3d32e83/go.mod h1:jMXfqIEDFukLPZHqDPp8tJMbHO9Rmeb9CEGevngQbmE=
github.com/ipfs/go-graphsync v0.0.6-0.20200721211002-c376cbe14c0a h1:QViYKbSYNKtfivrYx69UFJiH7HfdE5APQBbIu5fCK3k=
github.com/ipfs/go-graphsync v0.0.6-0.20200721211002-c376cbe14c0a/go.mod h1:jMXfqIEDFukLPZHqDPp8tJMbHO9Rmeb9CEGevngQbmE=
github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08=
github.com/ipfs/go-ipfs-blockstore v0.1.0/go.mod h1:5aD0AvHPi7mZc6Ci1WCAhiBQu2IsfTduLl+422H6Rqw=
github.com/ipfs/go-ipfs-blockstore v0.1.4 h1:2SGI6U1B44aODevza8Rde3+dY30Pb+lbcObe1LETxOQ=
Expand Down
39 changes: 19 additions & 20 deletions impl/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@ import (

datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-data-transfer/encoding"
"github.com/filecoin-project/go-data-transfer/message"
"github.com/filecoin-project/go-data-transfer/registry"
"github.com/filecoin-project/go-data-transfer/transport"
"github.com/ipfs/go-cid"
"github.com/ipld/go-ipld-prime"
"github.com/libp2p/go-libp2p-core/peer"
Expand Down Expand Up @@ -51,7 +49,7 @@ func (m *manager) OnDataReceived(chid datatransfer.ChannelID, link ipld.Link, si
return nil
}

func (m *manager) OnDataSent(chid datatransfer.ChannelID, link ipld.Link, size uint64) (message.DataTransferMessage, error) {
func (m *manager) OnDataSent(chid datatransfer.ChannelID, link ipld.Link, size uint64) (datatransfer.Message, error) {
err := m.channels.IncrementSent(chid, size)
if err != nil {
return nil, err
Expand All @@ -71,7 +69,7 @@ func (m *manager) OnDataSent(chid datatransfer.ChannelID, link ipld.Link, size u
return nil, nil
}

func (m *manager) OnRequestReceived(chid datatransfer.ChannelID, request message.DataTransferRequest) (message.DataTransferResponse, error) {
func (m *manager) OnRequestReceived(chid datatransfer.ChannelID, request datatransfer.Request) (datatransfer.Response, error) {
if request.IsNew() {
return m.receiveNewRequest(chid.Initiator, request)
}
Expand All @@ -95,12 +93,12 @@ func (m *manager) OnRequestReceived(chid datatransfer.ChannelID, request message
}
if chst.Status() == datatransfer.ResponderPaused ||
chst.Status() == datatransfer.ResponderFinalizing {
return nil, transport.ErrPause
return nil, datatransfer.ErrPause
}
return nil, nil
}

func (m *manager) OnResponseReceived(chid datatransfer.ChannelID, response message.DataTransferResponse) error {
func (m *manager) OnResponseReceived(chid datatransfer.ChannelID, response datatransfer.Response) error {
if response.IsCancel() {
return m.channels.Cancel(chid)
}
Expand Down Expand Up @@ -168,22 +166,18 @@ func (m *manager) OnChannelCompleted(chid datatransfer.ChannelID, success bool)

func (m *manager) receiveNewRequest(
initiator peer.ID,
incoming message.DataTransferRequest) (message.DataTransferResponse, error) {
incoming datatransfer.Request) (datatransfer.Response, error) {
result, err := m.acceptRequest(initiator, incoming)
msg, msgErr := m.response(true, err, incoming.TransferID(), result)
if msgErr != nil {
return nil, msgErr
}
// convert to the transport error for pauses
if err == datatransfer.ErrPause {
err = transport.ErrPause
}
return msg, err
}

func (m *manager) acceptRequest(
initiator peer.ID,
incoming message.DataTransferRequest) (datatransfer.VoucherResult, error) {
incoming datatransfer.Request) (datatransfer.VoucherResult, error) {

stor, err := incoming.Selector()
if err != nil {
Expand Down Expand Up @@ -218,6 +212,11 @@ func (m *manager) acceptRequest(
if err := m.channels.Accept(chid); err != nil {
return result, err
}
processor, has := m.transportConfigurers.Processor(voucher.Type())
if has {
transportConfigurer := processor.(datatransfer.TransportConfigurer)
transportConfigurer(chid, voucher, m.transport)
}
m.dataTransferNetwork.Protect(initiator, chid.String())
if voucherErr == datatransfer.ErrPause {
err := m.channels.PauseResponder(chid)
Expand All @@ -235,7 +234,7 @@ func (m *manager) acceptRequest(
// * deserialization of selector fails
// * validation fails
func (m *manager) validateVoucher(sender peer.ID,
incoming message.DataTransferRequest,
incoming datatransfer.Request,
isPull bool,
baseCid cid.Cid,
stor ipld.Node) (datatransfer.Voucher, datatransfer.VoucherResult, error) {
Expand Down Expand Up @@ -263,7 +262,7 @@ func (m *manager) validateVoucher(sender peer.ID,
// * deserialization of selector fails
// * validation fails
func (m *manager) revalidateVoucher(chid datatransfer.ChannelID,
incoming message.DataTransferRequest) (datatransfer.Voucher, datatransfer.VoucherResult, error) {
incoming datatransfer.Request) (datatransfer.Voucher, datatransfer.VoucherResult, error) {
vouch, err := m.decodeVoucher(incoming, m.revalidators)
if err != nil {
return nil, nil, err
Expand All @@ -275,7 +274,7 @@ func (m *manager) revalidateVoucher(chid datatransfer.ChannelID,
return vouch, result, err
}

func (m *manager) processUpdateVoucher(chid datatransfer.ChannelID, request message.DataTransferRequest) (message.DataTransferResponse, error) {
func (m *manager) processUpdateVoucher(chid datatransfer.ChannelID, request datatransfer.Request) (datatransfer.Response, error) {
vouch, result, voucherErr := m.revalidateVoucher(chid, request)
if vouch != nil {
err := m.channels.NewVoucher(chid, vouch)
Expand All @@ -286,7 +285,7 @@ func (m *manager) processUpdateVoucher(chid datatransfer.ChannelID, request mess
return m.processRevalidationResult(chid, result, voucherErr)
}

func (m *manager) revalidationResponse(chid datatransfer.ChannelID, result datatransfer.VoucherResult, resultErr error) (message.DataTransferResponse, error) {
func (m *manager) revalidationResponse(chid datatransfer.ChannelID, result datatransfer.VoucherResult, resultErr error) (datatransfer.Response, error) {
chst, err := m.channels.GetByID(context.TODO(), chid)
if err != nil {
return nil, err
Expand All @@ -297,7 +296,7 @@ func (m *manager) revalidationResponse(chid datatransfer.ChannelID, result datat
return m.response(false, resultErr, chid.ID, result)
}

func (m *manager) processRevalidationResult(chid datatransfer.ChannelID, result datatransfer.VoucherResult, resultErr error) (message.DataTransferResponse, error) {
func (m *manager) processRevalidationResult(chid datatransfer.ChannelID, result datatransfer.VoucherResult, resultErr error) (datatransfer.Response, error) {
vresMessage, err := m.revalidationResponse(chid, result, resultErr)

if err != nil {
Expand All @@ -314,20 +313,20 @@ func (m *manager) processRevalidationResult(chid datatransfer.ChannelID, result
if err != nil {
return nil, err
}
return vresMessage, transport.ErrPause
return vresMessage, datatransfer.ErrPause
}

if resultErr == nil {
err = m.resume(chid)
if err != nil {
return nil, err
}
return vresMessage, transport.ErrResume
return vresMessage, datatransfer.ErrResume
}
return vresMessage, resultErr
}

func (m *manager) completeMessage(chid datatransfer.ChannelID) (message.DataTransferResponse, error) {
func (m *manager) completeMessage(chid datatransfer.ChannelID) (datatransfer.Response, error) {
var result datatransfer.VoucherResult
var resultErr error
_ = m.revalidators.Each(func(_ datatransfer.TypeIdentifier, _ encoding.Decoder, processor registry.Processor) error {
Expand Down
Loading

0 comments on commit 5bb7276

Please sign in to comment.