Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automatically restart push channel #127

Merged
merged 5 commits into from
Dec 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/ipfs/go-cid v0.0.7
github.com/ipfs/go-datastore v0.4.5
github.com/ipfs/go-ds-badger v0.2.3
github.com/ipfs/go-graphsync v0.5.0
github.com/ipfs/go-graphsync v0.5.2
github.com/ipfs/go-ipfs-blockstore v1.0.1
github.com/ipfs/go-ipfs-blocksutil v0.0.1
github.com/ipfs/go-ipfs-chunker v0.0.5
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,8 @@ github.com/ipfs/go-ds-badger v0.2.3/go.mod h1:pEYw0rgg3FIrywKKnL+Snr+w/LjJZVMTBR
github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIykR4L04tMOYc=
github.com/ipfs/go-ds-leveldb v0.4.1/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s=
github.com/ipfs/go-ds-leveldb v0.4.2/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s=
github.com/ipfs/go-graphsync v0.5.0 h1:iaByvxq88Ys1KcaQzTS1wmRhNsNEo3SaUiSGqTSbGmM=
github.com/ipfs/go-graphsync v0.5.0/go.mod h1:e2ZxnClqBBYAtd901g9vXMJzS47labjAtOzsWtOzKNk=
github.com/ipfs/go-graphsync v0.5.2 h1:USD+daaSC+7pLHCxROThSaF6SF7WYXF03sjrta0rCfA=
github.com/ipfs/go-graphsync v0.5.2/go.mod h1:e2ZxnClqBBYAtd901g9vXMJzS47labjAtOzsWtOzKNk=
github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08=
github.com/ipfs/go-ipfs-blockstore v0.1.0/go.mod h1:5aD0AvHPi7mZc6Ci1WCAhiBQu2IsfTduLl+422H6Rqw=
github.com/ipfs/go-ipfs-blockstore v0.1.4 h1:2SGI6U1B44aODevza8Rde3+dY30Pb+lbcObe1LETxOQ=
Expand Down
74 changes: 59 additions & 15 deletions impl/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,27 +24,30 @@ import (
"github.com/filecoin-project/go-data-transfer/encoding"
"github.com/filecoin-project/go-data-transfer/message"
"github.com/filecoin-project/go-data-transfer/network"
"github.com/filecoin-project/go-data-transfer/pushchannelmonitor"
"github.com/filecoin-project/go-data-transfer/registry"
)

var log = logging.Logger("dt-impl")

type manager struct {
dataTransferNetwork network.DataTransferNetwork
validatedTypes *registry.Registry
resultTypes *registry.Registry
revalidators *registry.Registry
transportConfigurers *registry.Registry
pubSub *pubsub.PubSub
readySub *pubsub.PubSub
channels *channels.Channels
peerID peer.ID
transport datatransfer.Transport
storedCounter *storedcounter.StoredCounter
channelRemoveTimeout time.Duration
reconnectsLk sync.RWMutex
reconnects map[datatransfer.ChannelID]chan struct{}
cidLists cidlists.CIDLists
dataTransferNetwork network.DataTransferNetwork
validatedTypes *registry.Registry
resultTypes *registry.Registry
revalidators *registry.Registry
transportConfigurers *registry.Registry
pubSub *pubsub.PubSub
readySub *pubsub.PubSub
channels *channels.Channels
peerID peer.ID
transport datatransfer.Transport
storedCounter *storedcounter.StoredCounter
channelRemoveTimeout time.Duration
reconnectsLk sync.RWMutex
reconnects map[datatransfer.ChannelID]chan struct{}
cidLists cidlists.CIDLists
pushChannelMonitor *pushchannelmonitor.Monitor
pushChannelMonitorCfg *pushchannelmonitor.Config
}

type internalEvent struct {
Expand Down Expand Up @@ -88,6 +91,28 @@ func ChannelRemoveTimeout(timeout time.Duration) DataTransferOption {
}
}

// PushChannelRestartConfig sets the configuration options for automatically
// restarting push channels
// - interval is the time over which minBytesSent must have been sent
// - checksPerInterval is the number of times to check per interval
// - minBytesSent is the minimum amount of data that must have been sent over the interval
// - restartBackoff is the time to wait before checking again for restarts
func PushChannelRestartConfig(
interval time.Duration,
checksPerInterval uint32,
minBytesSent uint64,
restartBackoff time.Duration,
) DataTransferOption {
return func(m *manager) {
m.pushChannelMonitorCfg = &pushchannelmonitor.Config{
Interval: interval,
ChecksPerInterval: checksPerInterval,
MinBytesSent: minBytesSent,
RestartBackoff: restartBackoff,
}
}
}

const defaultChannelRemoveTimeout = 1 * time.Hour

// NewDataTransfer initializes a new instance of a data transfer manager
Expand All @@ -106,6 +131,7 @@ func NewDataTransfer(ds datastore.Batching, cidListsDir string, dataTransferNetw
channelRemoveTimeout: defaultChannelRemoveTimeout,
reconnects: make(map[datatransfer.ChannelID]chan struct{}),
}

cidLists, err := cidlists.NewCIDLists(cidListsDir)
if err != nil {
return nil, err
Expand All @@ -116,9 +142,17 @@ func NewDataTransfer(ds datastore.Batching, cidListsDir string, dataTransferNetw
return nil, err
}
m.channels = channels

// Apply config options
for _, option := range options {
option(m)
}

// Start push channel monitor after applying config options as the config
// options may apply to the monitor
m.pushChannelMonitor = pushchannelmonitor.NewMonitor(m, m.pushChannelMonitorCfg)
m.pushChannelMonitor.Start()

return m, nil
}

Expand Down Expand Up @@ -161,6 +195,7 @@ func (m *manager) OnReady(ready datatransfer.ReadyFunc) {

// Stop terminates all data transfers and ends processing
func (m *manager) Stop(ctx context.Context) error {
m.pushChannelMonitor.Shutdown()
return m.transport.Shutdown(ctx)
}

Expand Down Expand Up @@ -196,11 +231,20 @@ func (m *manager) OpenPushDataChannel(ctx context.Context, requestTo peer.ID, vo
transportConfigurer(chid, voucher, m.transport)
}
m.dataTransferNetwork.Protect(requestTo, chid.String())
monitoredChan := m.pushChannelMonitor.AddChannel(chid)
if err := m.dataTransferNetwork.SendMessage(ctx, requestTo, req); err != nil {
err = fmt.Errorf("Unable to send request: %w", err)
_ = m.channels.Error(chid, err)

// If push channel monitoring is enabled, shutdown the monitor as it
// wasn't possible to start the data transfer
if monitoredChan != nil {
monitoredChan.Shutdown()
}

return chid, err
}

return chid, nil
}

Expand Down
107 changes: 101 additions & 6 deletions impl/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func TestRoundTrip(t *testing.T) {
for opens < 2 || completes < 2 || len(sentIncrements) < 21 || len(receivedIncrements) < 21 {
select {
case <-ctx.Done():
t.Fatal("Did not complete succcessful data transfer")
t.Fatal("Did not complete successful data transfer")
case <-finished:
completes++
case <-opened:
Expand Down Expand Up @@ -343,7 +343,7 @@ func TestMultipleRoundTripMultipleStores(t *testing.T) {
for opens < 2*data.requestCount || completes < 2*data.requestCount {
select {
case <-ctx.Done():
t.Fatal("Did not complete succcessful data transfer")
t.Fatal("Did not complete successful data transfer")
case <-finished:
completes++
case <-opened:
Expand Down Expand Up @@ -481,7 +481,7 @@ func TestManyReceiversAtOnce(t *testing.T) {
for opens < 2*data.receiverCount || completes < 2*data.receiverCount {
select {
case <-ctx.Done():
t.Fatal("Did not complete succcessful data transfer")
t.Fatal("Did not complete successful data transfer")
case <-finished:
completes++
case <-opened:
Expand All @@ -497,6 +497,101 @@ func TestManyReceiversAtOnce(t *testing.T) {
}
}

// TestPushRequestAutoRestart tests that if the connection for a push request
// goes down, it will automatically restart (given the right config options)
func TestPushRequestAutoRestart(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

gsData := testutil.NewGraphsyncTestingData(ctx, t, nil, nil)
netRetry := network.RetryParameters(time.Second, time.Second, 5, 1)
gsData.DtNet1 = network.NewFromLibp2pHost(gsData.Host1, netRetry)
host1 := gsData.Host1 // initiator, data sender
host2 := gsData.Host2 // data recipient

tp1 := gsData.SetupGSTransportHost1()
tp2 := gsData.SetupGSTransportHost2()

restartConf := PushChannelRestartConfig(100*time.Millisecond, 1, 10, 200*time.Millisecond)
dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1, gsData.StoredCounter1, restartConf)
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, dt1)
dt2, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, tp2, gsData.StoredCounter2)
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, dt2)

received := make(chan struct{})
finished := make(chan struct{}, 2)
var subscriber datatransfer.Subscriber = func(event datatransfer.Event, channelState datatransfer.ChannelState) {
//t.Logf("%s: %s\n", datatransfer.Events[event.Code], datatransfer.Statuses[channelState.Status()])

if event.Code == datatransfer.DataReceived {
received <- struct{}{}
}

if channelState.Status() == datatransfer.Completed {
finished <- struct{}{}
}
}
dt1.SubscribeToEvents(subscriber)
dt2.SubscribeToEvents(subscriber)
voucher := testutil.FakeDTType{Data: "applesauce"}
sv := testutil.NewStubbedValidator()

sourceDagService := gsData.DagService1
destDagService := gsData.DagService2

root, origBytes := testutil.LoadUnixFSFile(ctx, t, sourceDagService, loremFile)
rootCid := root.(cidlink.Link).Cid

require.NoError(t, dt1.RegisterVoucherType(&testutil.FakeDTType{}, sv))
require.NoError(t, dt2.RegisterVoucherType(&testutil.FakeDTType{}, sv))
chid, err := dt1.OpenPushDataChannel(ctx, host2.ID(), &voucher, rootCid, gsData.AllSelector)
require.NoError(t, err)

// Wait for a block to be received
<-received

// Break connection
t.Logf("Breaking connection to peer")
require.NoError(t, gsData.Mn.UnlinkPeers(host1.ID(), host2.ID()))
require.NoError(t, gsData.Mn.DisconnectPeers(host1.ID(), host2.ID()))

t.Logf("Sleep for a second")
time.Sleep(1 * time.Second)

// Restore connection
t.Logf("Restore connection")
require.NoError(t, gsData.Mn.LinkAll())
time.Sleep(200 * time.Millisecond)
conn, err := gsData.Mn.ConnectPeers(host1.ID(), host2.ID())
require.NoError(t, err)
require.NotNil(t, conn)

t.Logf("Waiting for auto-restart on push channel %s", chid)

(func() {
finishedCount := 0
for {
select {
case <-ctx.Done():
t.Fatal("Did not complete successful data transfer")
return
case <-received:
case <-finished:
finishedCount++
if finishedCount == 2 {
return
}
}
}
})()

// Verify that the file was transferred to the destination node
testutil.VerifyHasFile(ctx, t, destDagService, root, origBytes)
}

func TestRoundTripCancelledRequest(t *testing.T) {
ctx := context.Background()
testCases := map[string]struct {
Expand Down Expand Up @@ -751,7 +846,7 @@ func TestSimulatedRetrievalFlow(t *testing.T) {
for providerFinished != nil || clientFinished != nil {
select {
case <-ctx.Done():
t.Fatal("Did not complete succcessful data transfer")
t.Fatal("Did not complete successful data transfer")
case <-providerFinished:
providerFinished = nil
case <-clientFinished:
Expand Down Expand Up @@ -868,7 +963,7 @@ func TestPauseAndResume(t *testing.T) {
pauseInitiators < 1 || pauseResponders < 1 || resumeInitiators < 1 || resumeResponders < 1 {
select {
case <-ctx.Done():
t.Fatal("Did not complete succcessful data transfer")
t.Fatal("Did not complete successful data transfer")
case <-finished:
completes++
case <-opened:
Expand Down Expand Up @@ -968,7 +1063,7 @@ func TestUnrecognizedVoucherRoundTrip(t *testing.T) {
for opens < 1 || finishes < 1 {
select {
case <-ctx.Done():
t.Fatal("Did not complete succcessful data transfer")
t.Fatal("Did not complete successful data transfer")
case <-finished:
finishes++
case <-opened:
Expand Down
6 changes: 3 additions & 3 deletions network/libp2p_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (impl *libp2pDataTransferNetwork) openStream(ctx context.Context, id peer.I
}

d := b.Duration()
log.Warnf("failed to open stream to %s on attempt %g of %g, waiting %s to try again, err: %w",
log.Warnf("failed to open stream to %s on attempt %g of %g, waiting %s to try again, err: %s",
id, nAttempts, impl.maxStreamOpenAttempts, d, err)

select {
Expand Down Expand Up @@ -183,14 +183,14 @@ func (dtnet *libp2pDataTransferNetwork) handleNewStream(s network.Stream) {
if err != io.EOF {
s.Reset() // nolint: errcheck,gosec
go dtnet.receiver.ReceiveError(err)
log.Debugf("graphsync net handleNewStream from %s error: %s", s.Conn().RemotePeer(), err)
log.Debugf("net handleNewStream from %s error: %s", s.Conn().RemotePeer(), err)
}
return
}

p := s.Conn().RemotePeer()
ctx := context.Background()
log.Debugf("graphsync net handleNewStream from %s", s.Conn().RemotePeer())
log.Debugf("net handleNewStream from %s", s.Conn().RemotePeer())

if received.IsRequest() {
receivedRequest, ok := received.(datatransfer.Request)
Expand Down
Loading