Skip to content

Commit

Permalink
feat: auto-restart connection for push data channels
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkmc committed Dec 14, 2020
1 parent c03054f commit 21a9ac3
Show file tree
Hide file tree
Showing 7 changed files with 644 additions and 9 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/ipfs/go-cid v0.0.7
github.com/ipfs/go-datastore v0.4.5
github.com/ipfs/go-ds-badger v0.2.3
github.com/ipfs/go-graphsync v0.5.1-0.20201210094259-62ba246a15e7
github.com/ipfs/go-graphsync v0.5.2
github.com/ipfs/go-ipfs-blockstore v1.0.1
github.com/ipfs/go-ipfs-blocksutil v0.0.1
github.com/ipfs/go-ipfs-chunker v0.0.5
Expand Down
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -228,10 +228,8 @@ github.com/ipfs/go-ds-badger v0.2.3/go.mod h1:pEYw0rgg3FIrywKKnL+Snr+w/LjJZVMTBR
github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIykR4L04tMOYc=
github.com/ipfs/go-ds-leveldb v0.4.1/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s=
github.com/ipfs/go-ds-leveldb v0.4.2/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s=
github.com/ipfs/go-graphsync v0.5.0 h1:iaByvxq88Ys1KcaQzTS1wmRhNsNEo3SaUiSGqTSbGmM=
github.com/ipfs/go-graphsync v0.5.0/go.mod h1:e2ZxnClqBBYAtd901g9vXMJzS47labjAtOzsWtOzKNk=
github.com/ipfs/go-graphsync v0.5.1-0.20201210094259-62ba246a15e7 h1:BiBW+yEEtZmwX8h9zlEmBneJAkAh1dfRBoNpmm9zko0=
github.com/ipfs/go-graphsync v0.5.1-0.20201210094259-62ba246a15e7/go.mod h1:e2ZxnClqBBYAtd901g9vXMJzS47labjAtOzsWtOzKNk=
github.com/ipfs/go-graphsync v0.5.2 h1:USD+daaSC+7pLHCxROThSaF6SF7WYXF03sjrta0rCfA=
github.com/ipfs/go-graphsync v0.5.2/go.mod h1:e2ZxnClqBBYAtd901g9vXMJzS47labjAtOzsWtOzKNk=
github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08=
github.com/ipfs/go-ipfs-blockstore v0.1.0/go.mod h1:5aD0AvHPi7mZc6Ci1WCAhiBQu2IsfTduLl+422H6Rqw=
github.com/ipfs/go-ipfs-blockstore v0.1.4 h1:2SGI6U1B44aODevza8Rde3+dY30Pb+lbcObe1LETxOQ=
Expand Down
28 changes: 26 additions & 2 deletions impl/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,14 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-storedcounter"

datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-data-transfer/channels"
"github.com/filecoin-project/go-data-transfer/cidlists"
"github.com/filecoin-project/go-data-transfer/encoding"
"github.com/filecoin-project/go-data-transfer/message"
"github.com/filecoin-project/go-data-transfer/network"
"github.com/filecoin-project/go-data-transfer/registry"
"github.com/filecoin-project/go-storedcounter"
)

var log = logging.Logger("dt-impl")
Expand All @@ -45,6 +44,7 @@ type manager struct {
reconnectsLk sync.RWMutex
reconnects map[datatransfer.ChannelID]chan struct{}
cidLists cidlists.CIDLists
pushChannelMonitor *pushChannelMonitor
}

type internalEvent struct {
Expand Down Expand Up @@ -88,6 +88,17 @@ func ChannelRemoveTimeout(timeout time.Duration) DataTransferOption {
}
}

// PushChannelRestartConfig sets the configuration options for automatically
// restarting push channels
func PushChannelRestartConfig(interval time.Duration, minBytesSent uint64) DataTransferOption {
return func(m *manager) {
m.pushChannelMonitor.setConfig(&pushMonitorConfig{
Interval: interval,
MinBytesSent: minBytesSent,
})
}
}

const defaultChannelRemoveTimeout = 1 * time.Hour

// NewDataTransfer initializes a new instance of a data transfer manager
Expand All @@ -106,6 +117,8 @@ func NewDataTransfer(ds datastore.Batching, cidListsDir string, dataTransferNetw
channelRemoveTimeout: defaultChannelRemoveTimeout,
reconnects: make(map[datatransfer.ChannelID]chan struct{}),
}
m.pushChannelMonitor = newPushChannelMonitor(m)

cidLists, err := cidlists.NewCIDLists(cidListsDir)
if err != nil {
return nil, err
Expand All @@ -116,9 +129,16 @@ func NewDataTransfer(ds datastore.Batching, cidListsDir string, dataTransferNetw
return nil, err
}
m.channels = channels

// Apply config options
for _, option := range options {
option(m)
}

// Start push channel monitor after applying config options as the config
// options may apply to the monitor
m.pushChannelMonitor.start()

return m, nil
}

Expand Down Expand Up @@ -161,6 +181,7 @@ func (m *manager) OnReady(ready datatransfer.ReadyFunc) {

// Stop terminates all data transfers and ends processing
func (m *manager) Stop(ctx context.Context) error {
m.pushChannelMonitor.shutdown()
return m.transport.Shutdown(ctx)
}

Expand Down Expand Up @@ -196,11 +217,14 @@ func (m *manager) OpenPushDataChannel(ctx context.Context, requestTo peer.ID, vo
transportConfigurer(chid, voucher, m.transport)
}
m.dataTransferNetwork.Protect(requestTo, chid.String())
monitor := m.pushChannelMonitor.add(chid)
if err := m.dataTransferNetwork.SendMessage(ctx, requestTo, req); err != nil {
err = fmt.Errorf("Unable to send request: %w", err)
_ = m.channels.Error(chid, err)
monitor.shutdown()
return chid, err
}

return chid, nil
}

Expand Down
93 changes: 93 additions & 0 deletions impl/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,99 @@ func TestManyReceiversAtOnce(t *testing.T) {
}
}

func TestPushRequestAutoRestart(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

gsData := testutil.NewGraphsyncTestingData(ctx, t, nil, nil)
netRetry := network.RetryParameters(time.Second, time.Second, 5, 1)
gsData.DtNet1 = network.NewFromLibp2pHost(gsData.Host1, netRetry)
host1 := gsData.Host1 // initiator, data sender
host2 := gsData.Host2 // data recipient

tp1 := gsData.SetupGSTransportHost1()
tp2 := gsData.SetupGSTransportHost2()

restartConf := PushChannelRestartConfig(100*time.Millisecond, 1)
dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1, gsData.StoredCounter1, restartConf)
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, dt1)
dt2, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, tp2, gsData.StoredCounter2)
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, dt2)

received := make(chan struct{})
finished := make(chan struct{}, 2)
var subscriber datatransfer.Subscriber = func(event datatransfer.Event, channelState datatransfer.ChannelState) {
//t.Logf("%s: %s\n", datatransfer.Events[event.Code], datatransfer.Statuses[channelState.Status()])

if event.Code == datatransfer.DataReceived {
received <- struct{}{}
}

if channelState.Status() == datatransfer.Completed {
finished <- struct{}{}
}
}
dt1.SubscribeToEvents(subscriber)
dt2.SubscribeToEvents(subscriber)
voucher := testutil.FakeDTType{Data: "applesauce"}
sv := testutil.NewStubbedValidator()

sourceDagService := gsData.DagService1
destDagService := gsData.DagService2

root, origBytes := testutil.LoadUnixFSFile(ctx, t, sourceDagService, loremFile)
rootCid := root.(cidlink.Link).Cid

require.NoError(t, dt1.RegisterVoucherType(&testutil.FakeDTType{}, sv))
require.NoError(t, dt2.RegisterVoucherType(&testutil.FakeDTType{}, sv))
chid, err := dt1.OpenPushDataChannel(ctx, host2.ID(), &voucher, rootCid, gsData.AllSelector)
require.NoError(t, err)

// Wait for a block to be received
<-received

// Break connection
t.Logf("Breaking connection to peer")
require.NoError(t, gsData.Mn.UnlinkPeers(host1.ID(), host2.ID()))
require.NoError(t, gsData.Mn.DisconnectPeers(host1.ID(), host2.ID()))

t.Logf("Sleep for a second")
time.Sleep(1 * time.Second)

// Restore connection
t.Logf("Restore connection")
require.NoError(t, gsData.Mn.LinkAll())
time.Sleep(200 * time.Millisecond)
conn, err := gsData.Mn.ConnectPeers(host1.ID(), host2.ID())
require.NoError(t, err)
require.NotNil(t, conn)

t.Logf("Waiting for auto-restart on push channel %s", chid)

(func() {
finishedCount := 0
for {
select {
case <-ctx.Done():
t.Fatal("Did not complete succcessful data transfer")
return
case <-received:
case <-finished:
finishedCount++
if finishedCount == 2 {
return
}
}
}
})()

// Verify that the file was transferred to the destination node
testutil.VerifyHasFile(ctx, t, destDagService, root, origBytes)
}

func TestRoundTripCancelledRequest(t *testing.T) {
ctx := context.Background()
testCases := map[string]struct {
Expand Down
Loading

0 comments on commit 21a9ac3

Please sign in to comment.