From 973940ecb32d1b3b9867274a42896201bfd3a600 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 15 Feb 2018 21:28:59 +0100 Subject: [PATCH 1/7] Delay dials to realy addrs --- dial_delay.go | 167 ++++++++++++++++++++++++++++++++++++++++++++++++++ swarm_dial.go | 56 +++++++++++++---- 2 files changed, 211 insertions(+), 12 deletions(-) create mode 100644 dial_delay.go diff --git a/dial_delay.go b/dial_delay.go new file mode 100644 index 00000000..6b45ce9b --- /dev/null +++ b/dial_delay.go @@ -0,0 +1,167 @@ +package swarm + +import ( + "context" + "time" + + ma "github.com/multiformats/go-multiaddr" + mafmt "github.com/whyrusleeping/mafmt" +) + +const p_circuit = 290 + +const numTiers = 2 +const tierDelay = 2 * time.Second + +var relay = mafmt.Or(mafmt.Base(p_circuit), mafmt.And(mafmt.IPFS, mafmt.Base(p_circuit))) + +// delayDialAddrs returns a address channel sorted by priority, pushing the +// addresses with delay between them. The other channel can be used to trigger +// sending more addresses in case all previous failed +func delayDialAddrs(ctx context.Context, c <-chan ma.Multiaddr) (<-chan ma.Multiaddr, chan<- struct{}) { + out := make(chan ma.Multiaddr) + delay := time.NewTimer(tierDelay) + triggerNext := make(chan struct{}, 1) + + go func() { + defer delay.Stop() + defer close(out) + var pending [numTiers][]ma.Multiaddr + lastTier := 0 + + // put enqueues the mutliaddr + put := func(addr ma.Multiaddr) { + tier := getTier(addr) + pending[tier] = append(pending[tier], addr) + } + + // get gets the best (lowest tier) multiaddr available + get := func() (ma.Multiaddr, int) { + for i, tier := range pending[:] { + if len(tier) > 0 { + addr := tier[len(tier)-1] + tier[len(tier)-1] = nil + pending[i] = tier[:len(tier)-1] + return addr, i + } + } + return nil, -1 + } + + outer: + for { + fill: + for { + select { + case addr, ok := <-c: + if !ok { + break outer + } + put(addr) + default: + break fill + } + } + + next, tier := get() + + // Nothing? Block! + if next == nil { + select { + case addr, ok := <-c: + if !ok { + break outer + } + put(addr) + case <-ctx.Done(): + return + } + continue + } + + // Jumping a tier? + if tier > lastTier { + // Wait the delay (preempt with new addresses or when the dialer + // requests more addresses) + select { + case addr, ok := <-c: + if !ok { + break outer + } + put(addr) + continue + case <-delay.C: + delay.Reset(tierDelay) + case <-triggerNext: + if !delay.Stop() { + <-delay.C + } + delay.Reset(tierDelay) + case <-ctx.Done(): + return + } + } + + lastTier = tier + + select { + case addr, ok := <-c: + put(next) + if !ok { + break outer + } + put(addr) + continue + case out <- next: + // Always count the timeout since the last dial. + if !delay.Stop() { + <-delay.C + } + delay.Reset(tierDelay) + case <-ctx.Done(): + return + } + } + + // finish sending + for { + next, tier := get() + if next == nil { + return + } + if tier > lastTier { + select { + case <-delay.C: + case <-triggerNext: + if !delay.Stop() { + <-delay.C + } + delay.Reset(tierDelay) + case <-ctx.Done(): + return + } + } + tier = lastTier + select { + case out <- next: + delay.Stop() + delay.Reset(tierDelay) + case <-ctx.Done(): + return + } + } + }() + + return out, triggerNext +} + +// getTier returns the priority tier of the address. +// return value must be > 0 & < numTiers. +func getTier(addr ma.Multiaddr) int { + switch { + case relay.Matches(addr): + return 1 + default: + return 0 + } +} diff --git a/swarm_dial.go b/swarm_dial.go index 7125bc3c..e5589a0a 100644 --- a/swarm_dial.go +++ b/swarm_dial.go @@ -16,16 +16,39 @@ import ( ma "github.com/multiformats/go-multiaddr" ) -// Diagram of dial sync: +// Diagram of Dial // -// many callers of Dial() synched w. dials many addrs results to callers -// ----------------------\ dialsync use earliest /-------------- -// -----------------------\ |----------\ /---------------- -// ------------------------>------------<------- >---------<----------------- -// -----------------------| \----x \---------------- -// ----------------------| \-----x \--------------- -// any may fail if no addr at end -// retry dialAttempt x +// Concurrent Calls +// to Dial(peerId) dial finds dialable +// / addresses and passes +// v them to dialAddrs +// Dial(peerId) --------------------\ | +// Dial(peerId) -------------| v +// Dial(peerId) ------------>----> doDial() --> dial() --> dialAddrs() <- dialAddrs concurrently dials +// Dial(peerId) ------------| ^ __/\__ all known peer addresses +// Dial(peerId) ------/ | /||||||\ +// / |||||||| +// Synced with dialsync LLLLL--- <- Costly paths are delayed +// calls doDial DDDDD +// ||||| +// /--------------------------------> ||--- <- limitedDial (LD) limits +// Each connection is established || concurrency for transports +// by dialAddr, which starts a Some connection attempts -> x|- which require file descriptors +// network connection and sets may fail | || +// up the encryption layers | || --- <- if needed, dials through costly +// | || ||| paths (such as relay) will start +// | || LLL +// \-> xx--DDD +// ||||| +// eventually a connection will ||||| +// get established (C), other -> |C||| +// Connection setup will be attempts will get cancelled x|xxx +// performed with dialConnSetup. | +// It adds the connection to the ----------------------------------> | +// swarm and agrees on the | +// muxer to use /---\ +// The result is distributed -> ||||| +// to callers of Dial ||||| var ( // ErrDialBackoff is returned by the backoff code when a given peer has @@ -335,6 +358,8 @@ func (s *Swarm) dialAddrs(ctx context.Context, p peer.ID, remoteAddrs <-chan ma. ctx, cancel := context.WithCancel(ctx) defer cancel() // cancel work when we exit func + sortedAddrs, triggerMore := delayDialAddrs(ctx, remoteAddrs) + // use a single response type instead of errs and conns, reduces complexity *a ton* respch := make(chan dialResult) @@ -344,11 +369,12 @@ func (s *Swarm) dialAddrs(ctx context.Context, p peer.ID, remoteAddrs <-chan ma. defer s.limiter.clearAllPeerDials(p) var active int + for { select { - case addr, ok := <-remoteAddrs: + case addr, ok := <-sortedAddrs: if !ok { - remoteAddrs = nil + sortedAddrs = nil if active == 0 { return nil, exitErr } @@ -369,9 +395,15 @@ func (s *Swarm) dialAddrs(ctx context.Context, p peer.ID, remoteAddrs <-chan ma. // Errors are normal, lots of dials will fail exitErr = resp.Err - if remoteAddrs == nil && active == 0 { + if sortedAddrs == nil && active == 0 { return nil, exitErr } + if active == 0 { + select { + case triggerMore <- struct{}{}: + default: + } + } } else if resp.Conn != nil { return resp.Conn, nil } From 2e22aa867dc647e07eb638ce9a48d29c3cfea60f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Sun, 18 Feb 2018 23:11:34 +0100 Subject: [PATCH 2/7] dial delay: tests and fixes --- dial_delay.go | 14 +- dial_delay_test.go | 396 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 404 insertions(+), 6 deletions(-) create mode 100644 dial_delay_test.go diff --git a/dial_delay.go b/dial_delay.go index 6b45ce9b..c7b589f4 100644 --- a/dial_delay.go +++ b/dial_delay.go @@ -11,9 +11,10 @@ import ( const p_circuit = 290 const numTiers = 2 -const tierDelay = 2 * time.Second -var relay = mafmt.Or(mafmt.Base(p_circuit), mafmt.And(mafmt.IPFS, mafmt.Base(p_circuit))) +var tierDelay = 2 * time.Second + +var relay = mafmt.Or(mafmt.And(mafmt.Base(p_circuit), mafmt.Base(ma.P_IPFS)), mafmt.And(mafmt.Base(ma.P_IPFS), mafmt.Base(p_circuit), mafmt.Base(ma.P_IPFS))) // delayDialAddrs returns a address channel sorted by priority, pushing the // addresses with delay between them. The other channel can be used to trigger @@ -27,7 +28,7 @@ func delayDialAddrs(ctx context.Context, c <-chan ma.Multiaddr) (<-chan ma.Multi defer delay.Stop() defer close(out) var pending [numTiers][]ma.Multiaddr - lastTier := 0 + lastTier := -1 // put enqueues the mutliaddr put := func(addr ma.Multiaddr) { @@ -80,11 +81,12 @@ func delayDialAddrs(ctx context.Context, c <-chan ma.Multiaddr) (<-chan ma.Multi } // Jumping a tier? - if tier > lastTier { + if tier > lastTier && lastTier != -1 { // Wait the delay (preempt with new addresses or when the dialer // requests more addresses) select { case addr, ok := <-c: + put(next) if !ok { break outer } @@ -129,7 +131,7 @@ func delayDialAddrs(ctx context.Context, c <-chan ma.Multiaddr) (<-chan ma.Multi if next == nil { return } - if tier > lastTier { + if tier > lastTier && lastTier != -1 { select { case <-delay.C: case <-triggerNext: @@ -141,7 +143,7 @@ func delayDialAddrs(ctx context.Context, c <-chan ma.Multiaddr) (<-chan ma.Multi return } } - tier = lastTier + lastTier = tier select { case out <- next: delay.Stop() diff --git a/dial_delay_test.go b/dial_delay_test.go new file mode 100644 index 00000000..9fb4bea1 --- /dev/null +++ b/dial_delay_test.go @@ -0,0 +1,396 @@ +package swarm + +import ( + "testing" + "time" + + "context" + ma "github.com/multiformats/go-multiaddr" + "sync" +) + +const ( + T0_A = "/ip4/127.0.0.1/tcp/4001" + T0_B = "/ip4/127.0.0.1/tcp/4002" + T0_C = "/ip4/127.0.0.1/tcp/4003" + T0_D = "/ip4/127.0.0.1/tcp/4004" + + T1_A = "/p2p-circuit/ipfs/QmSoLSafTMBsPKadTEgaXctDQVcqN88CNLHXMkTNwMKPnu" + T1_B = "/p2p-circuit/ipfs/QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM" + T1_C = "/ipfs/QmSoLSafTMBsPKadTEgaXctDQVcqN88CNLHXMkTNwMKPnu/p2p-circuit/ipfs/QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM" +) + +func prepare() { + var circuitProto = ma.Protocol{ + Code: p_circuit, + Size: 0, + Name: "p2p-circuit", + VCode: ma.CodeToVarint(p_circuit), + } + _ = ma.AddProtocol(circuitProto) + + tierDelay = 32 * time.Millisecond // 2x windows timer resolution +} + +func addrChan(t *testing.T, nsync int, addrDelays ...string) <-chan ma.Multiaddr { + out := make(chan ma.Multiaddr, nsync) + c := sync.NewCond(&sync.Mutex{}) + c.L.Lock() + + go func() { + defer close(out) + n := 0 + + for _, ad := range addrDelays { + if ad[0] != '/' { + delay, err := time.ParseDuration(ad) + if err != nil { + t.Fatal(err) + } + + time.Sleep(delay) + continue + } + + addr, err := ma.NewMultiaddr(ad) + if err != nil { + t.Fatal(err) + } + + out <- addr + + n++ + if n == nsync { + c.L.Lock() + c.Broadcast() + c.L.Unlock() + } + } + }() + + if nsync != 0 { + c.Wait() + } + c.L.Unlock() + + return out +} + +func TestRelayMatch(t *testing.T) { + prepare() + + addr, err := ma.NewMultiaddr(T0_A) + if err != nil { + t.Fatal(err) + } + + if relay.Matches(addr) { + t.Error("T0_A shouldn't match") + } + + addr, err = ma.NewMultiaddr(T1_A) + if err != nil { + t.Fatal(err) + } + + if !relay.Matches(addr) { + t.Error("T1_A should match") + } + + addr, err = ma.NewMultiaddr(T1_C) + if err != nil { + t.Fatal(err) + } + + if !relay.Matches(addr) { + t.Error("T1_C should match") + } +} + +func TestDelayBasic(t *testing.T) { + ctx := context.Background() + prepare() + + start := time.Now() + ch, _ := delayDialAddrs(ctx, addrChan(t, 0, T0_A)) + + if !recvPath(t, ch, T0_A) { + t.Error("expected to recieve a path") + } + + if time.Now().Sub(start) > 16*time.Millisecond { + t.Errorf("it took longer than expected to get the address (%s)", time.Now().Sub(start).String()) + } +} + +func TestDelaySingleT1(t *testing.T) { + ctx := context.Background() + prepare() + + start := time.Now() + ch, _ := delayDialAddrs(ctx, addrChan(t, 0, T1_A)) + + if !recvPath(t, ch, T1_A) { + t.Error("expected to recieve a path") + } + + if time.Now().Sub(start) > 16*time.Millisecond { + t.Error("it took longer than expected to get the address") + } +} + +func TestDelaySimpleT0T1(t *testing.T) { + ctx := context.Background() + prepare() + + start := time.Now() + ch, _ := delayDialAddrs(ctx, addrChan(t, 2, T0_A, T1_A)) + + if !recvPath(t, ch, T0_A) { + t.Error("expected to recieve a path") + } + + if time.Now().Sub(start) > 16*time.Millisecond { + t.Error("it took longer than expected to get the address") + } + + if !recvPath(t, ch, T1_A) { + t.Error("expected to recieve a path") + } + + if time.Now().Sub(start) < 16*time.Millisecond { + t.Error("it took shorter than expected to get the address") + } +} + +func TestDelaySimpleT1T0(t *testing.T) { + ctx := context.Background() + prepare() + + start := time.Now() + ch, _ := delayDialAddrs(ctx, addrChan(t, 2, T1_A, T0_A)) + + if !recvPath(t, ch, T0_A) { + t.Error("expected to recieve a path") + } + + if time.Now().Sub(start) > 16*time.Millisecond { + t.Error("it took longer than expected to get the address") + } + + if !recvPath(t, ch, T1_A) { + t.Error("expected to recieve a path") + } + + if time.Now().Sub(start) < 16*time.Millisecond { + t.Error("it took shorter than expected to get the address") + } +} + +func TestDelayedT1T0(t *testing.T) { + ctx := context.Background() + prepare() + + start := time.Now() + ch, _ := delayDialAddrs(ctx, addrChan(t, 0, T1_A, "16ms", T0_A)) + + if !recvPath(t, ch, T1_A) { + t.Error("expected to recieve a path") + } + + if time.Now().Sub(start) > 16*time.Millisecond { + t.Error("it took longer than expected to get the address") + } + + if !recvPath(t, ch, T0_A) { + t.Error("expected to recieve a path") + } + + if time.Now().Sub(start) < 16*time.Millisecond { + t.Error("it took shorter than expected to get the address") + } + + if time.Now().Sub(start) > 32*time.Millisecond { + t.Error("it took longer than expected to get the address") + } +} + +func TestDelayMoreT1T0(t *testing.T) { + ctx := context.Background() + prepare() + + start := time.Now() + ch, more := delayDialAddrs(ctx, addrChan(t, 2, T1_A, T0_A)) + + if !recvPath(t, ch, T0_A) { + t.Error("expected to recieve a path") + } + + if time.Now().Sub(start) > 16*time.Millisecond { + t.Error("it took longer than expected to get the address") + } + + more <- struct{}{} + + if !recvPath(t, ch, T1_A) { + t.Error("expected to recieve a path") + } + + if time.Now().Sub(start) > 16*time.Millisecond { + t.Error("it took longer than expected to get the address") + } +} + +func TestDelaySingleT0T1WaitT0(t *testing.T) { + ctx := context.Background() + prepare() + tierDelay = 64 * time.Millisecond // 4x windows timer resolution + + start := time.Now() + ch, _ := delayDialAddrs(ctx, addrChan(t, 2, T0_A, T1_A, "16ms", T0_B)) + + if !recvPath(t, ch, T0_A) { + t.Error("expected to recieve a path") + } + + if time.Now().Sub(start) > 16*time.Millisecond { + t.Error("it took longer than expected to get the address") + } + + if !recvPath(t, ch, T0_B) { + t.Error("expected to recieve a path") + } + + if time.Now().Sub(start) < 16*time.Millisecond { + t.Error("it took shorter than expected to get the address") + } + + if time.Now().Sub(start) > 32*time.Millisecond { + t.Error("it took longer than expected to get the address") + } + + if !recvPath(t, ch, T1_A) { + t.Error("expected to recieve a path") + } + + if time.Now().Sub(start) < 64*time.Millisecond { + t.Error("it took shorter than expected to get the address") + } +} + +func TestDelaySingleT0T1Wait(t *testing.T) { + ctx := context.Background() + prepare() + + start := time.Now() + ch, _ := delayDialAddrs(ctx, addrChan(t, 2, T0_A, T1_A, "16ms")) + + if !recvPath(t, ch, T0_A) { + t.Error("expected to recieve a path") + } + + if time.Now().Sub(start) > 16*time.Millisecond { + t.Error("it took longer than expected to get the address") + } + + if !recvPath(t, ch, T1_A) { + t.Error("expected to recieve a path") + } + + if time.Now().Sub(start) < 32*time.Millisecond { + t.Error("it took shorter than expected to get the address") + } +} + +func TestDelaySingleT0T1WaitLong(t *testing.T) { + ctx := context.Background() + prepare() + + start := time.Now() + ch, _ := delayDialAddrs(ctx, addrChan(t, 2, T0_A, T1_A, "64ms")) + + if !recvPath(t, ch, T0_A) { + t.Error("expected to recieve a path") + } + + if time.Now().Sub(start) > 16*time.Millisecond { + t.Error("it took longer than expected to get the address") + } + + if !recvPath(t, ch, T1_A) { + t.Error("expected to recieve a path") + } + + if time.Now().Sub(start) < 32*time.Millisecond { + t.Error("it took shorter than expected to get the address") + } +} + +func TestDelaySingleT0T1WaitTrigger(t *testing.T) { + ctx := context.Background() + prepare() + + start := time.Now() + ch, more := delayDialAddrs(ctx, addrChan(t, 2, T0_A, T1_A, "64ms")) + + if !recvPath(t, ch, T0_A) { + t.Error("expected to recieve a path") + } + + if time.Now().Sub(start) > 16*time.Millisecond { + t.Error("it took longer than expected to get the address") + } + + more <- struct{}{} + + if !recvPath(t, ch, T1_A) { + t.Error("expected to recieve a path") + } + + if time.Now().Sub(start) > 32*time.Millisecond { + t.Error("it took longer than expected to get the address") + } +} + +func TestDelaySingleT0WaitT1(t *testing.T) { + ctx := context.Background() + prepare() + + start := time.Now() + ch, _ := delayDialAddrs(ctx, addrChan(t, 0, T0_A, "16ms", T1_A)) + + time.Sleep(32 * time.Millisecond) + + if !recvPath(t, ch, T0_A) { + t.Error("expected to recieve a path") + } + + if time.Now().Sub(start) > 48*time.Millisecond { + t.Error("it took longer than expected to get the address") + } + + if !recvPath(t, ch, T1_A) { + t.Error("expected to recieve a path") + } + + if time.Now().Sub(start) < 32*time.Millisecond { + t.Error("it took shorter than expected to get the address") + } +} + +func recvPath(t *testing.T, c <-chan ma.Multiaddr, expect string) bool { + a, ok := <-c + if !ok { + return ok + } + + if a == nil { + t.Error("got nil path") + } + + if a.String() != expect { + t.Errorf("paths didn't match: %s - %s", expect, a.String()) + } + + return ok +} From 928ffbc40d558011d909c3b4f51b0c943cc3191a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 19 Feb 2018 03:17:27 +0100 Subject: [PATCH 3/7] dial delay: simplify the code --- dial_delay.go | 121 ++++++++++++++++++++++++++++++--------------- dial_delay_test.go | 6 +++ 2 files changed, 86 insertions(+), 41 deletions(-) diff --git a/dial_delay.go b/dial_delay.go index c7b589f4..de77d467 100644 --- a/dial_delay.go +++ b/dial_delay.go @@ -49,38 +49,38 @@ func delayDialAddrs(ctx context.Context, c <-chan ma.Multiaddr) (<-chan ma.Multi return nil, -1 } - outer: - for { - fill: + // fillBuckets reads pending addresses form the channel without blocking + fillBuckets := func() bool { for { select { case addr, ok := <-c: if !ok { - break outer + return false } put(addr) default: - break fill + return true } } + } - next, tier := get() - - // Nothing? Block! - if next == nil { - select { - case addr, ok := <-c: - if !ok { - break outer - } - put(addr) - case <-ctx.Done(): - return + // waitForMore woits for addresses from the channel + waitForMore := func() (bool, error) { + select { + case addr, ok := <-c: + if !ok { + return false, nil } - continue + put(addr) + case <-ctx.Done(): + return false, ctx.Err() } + return true, nil + } - // Jumping a tier? + // maybeJumpTier will check if the address tier is changing and optionally + // wait some time. + maybeJumpTier := func(tier int, next ma.Multiaddr) (cont bool, brk bool, err error) { if tier > lastTier && lastTier != -1 { // Wait the delay (preempt with new addresses or when the dialer // requests more addresses) @@ -88,10 +88,10 @@ func delayDialAddrs(ctx context.Context, c <-chan ma.Multiaddr) (<-chan ma.Multi case addr, ok := <-c: put(next) if !ok { - break outer + return false, true, nil } put(addr) - continue + return true, false, nil case <-delay.C: delay.Reset(tierDelay) case <-triggerNext: @@ -100,20 +100,24 @@ func delayDialAddrs(ctx context.Context, c <-chan ma.Multiaddr) (<-chan ma.Multi } delay.Reset(tierDelay) case <-ctx.Done(): - return + return false, false, ctx.Err() } } + // Note that we want to only update the tier after we've done the waiting + // or we were asked to finish early lastTier = tier + return false, false, nil + } + recvOrSend := func(next ma.Multiaddr) (brk bool, err error) { select { case addr, ok := <-c: put(next) if !ok { - break outer + return true, nil } put(addr) - continue case out <- next: // Always count the timeout since the last dial. if !delay.Stop() { @@ -121,34 +125,69 @@ func delayDialAddrs(ctx context.Context, c <-chan ma.Multiaddr) (<-chan ma.Multi } delay.Reset(tierDelay) case <-ctx.Done(): + return false, ctx.Err() + } + return false, nil + } + + // process the address stream + for { + if !fillBuckets() { + break // input channel closed + } + + next, tier := get() + + // Nothing? Block! + if next == nil { + ok, err := waitForMore() + if err != nil { + return + } + if !ok { + break // input channel closed + } + continue + } + + cont, brk, err := maybeJumpTier(tier, next) + if cont { + continue // received an address while waiting, in case it's lower tier + // look at it immediately + } + if brk { + break // input channel closed + } + if err != nil { + return + } + + brk, err = recvOrSend(next) + if brk { + break // input channel closed + } + if err != nil { return } } + // the channel is closed by now + c = nil + // finish sending for { next, tier := get() if next == nil { return } - if tier > lastTier && lastTier != -1 { - select { - case <-delay.C: - case <-triggerNext: - if !delay.Stop() { - <-delay.C - } - delay.Reset(tierDelay) - case <-ctx.Done(): - return - } + + _, _, err := maybeJumpTier(tier, next) + if err != nil { + return } - lastTier = tier - select { - case out <- next: - delay.Stop() - delay.Reset(tierDelay) - case <-ctx.Done(): + + _, err = recvOrSend(next) + if err != nil { return } } diff --git a/dial_delay_test.go b/dial_delay_test.go index 9fb4bea1..b5391d44 100644 --- a/dial_delay_test.go +++ b/dial_delay_test.go @@ -32,6 +32,12 @@ func prepare() { tierDelay = 32 * time.Millisecond // 2x windows timer resolution } +// addrChan creates a multiaddr channel with `nsync` size. If nsync is larger +// than 0, the entries will get pre-buffered in the channel. +// addrDelays is a set of addresses and delays between sending them. If a string +// starts with '/' it will be parsed as an address and sent to the channel. +// Otherwise it will get parsed as a time to sleep before sending next addresses +// or closing the channel func addrChan(t *testing.T, nsync int, addrDelays ...string) <-chan ma.Multiaddr { out := make(chan ma.Multiaddr, nsync) c := sync.NewCond(&sync.Mutex{}) From a945f494838acb1d5228b59d01c21ce2fd34e7ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Sun, 3 Jun 2018 21:54:17 +0200 Subject: [PATCH 4/7] dial delay: address review --- dial_delay.go | 15 ++++++++------- dial_delay_test.go | 4 ++-- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/dial_delay.go b/dial_delay.go index de77d467..35e7e7f1 100644 --- a/dial_delay.go +++ b/dial_delay.go @@ -12,7 +12,7 @@ const p_circuit = 290 const numTiers = 2 -var tierDelay = 2 * time.Second +var TierDelay = 1 * time.Second var relay = mafmt.Or(mafmt.And(mafmt.Base(p_circuit), mafmt.Base(ma.P_IPFS)), mafmt.And(mafmt.Base(ma.P_IPFS), mafmt.Base(p_circuit), mafmt.Base(ma.P_IPFS))) @@ -21,7 +21,7 @@ var relay = mafmt.Or(mafmt.And(mafmt.Base(p_circuit), mafmt.Base(ma.P_IPFS)), ma // sending more addresses in case all previous failed func delayDialAddrs(ctx context.Context, c <-chan ma.Multiaddr) (<-chan ma.Multiaddr, chan<- struct{}) { out := make(chan ma.Multiaddr) - delay := time.NewTimer(tierDelay) + delay := time.NewTimer(TierDelay) triggerNext := make(chan struct{}, 1) go func() { @@ -30,13 +30,14 @@ func delayDialAddrs(ctx context.Context, c <-chan ma.Multiaddr) (<-chan ma.Multi var pending [numTiers][]ma.Multiaddr lastTier := -1 - // put enqueues the mutliaddr + // put enqueues the multiaddr put := func(addr ma.Multiaddr) { tier := getTier(addr) pending[tier] = append(pending[tier], addr) } // get gets the best (lowest tier) multiaddr available + // note that within a single tier put/get behave like a stack (LIFO) get := func() (ma.Multiaddr, int) { for i, tier := range pending[:] { if len(tier) > 0 { @@ -64,7 +65,7 @@ func delayDialAddrs(ctx context.Context, c <-chan ma.Multiaddr) (<-chan ma.Multi } } - // waitForMore woits for addresses from the channel + // waitForMore waits for addresses from the channel waitForMore := func() (bool, error) { select { case addr, ok := <-c: @@ -93,12 +94,12 @@ func delayDialAddrs(ctx context.Context, c <-chan ma.Multiaddr) (<-chan ma.Multi put(addr) return true, false, nil case <-delay.C: - delay.Reset(tierDelay) + delay.Reset(TierDelay) case <-triggerNext: if !delay.Stop() { <-delay.C } - delay.Reset(tierDelay) + delay.Reset(TierDelay) case <-ctx.Done(): return false, false, ctx.Err() } @@ -123,7 +124,7 @@ func delayDialAddrs(ctx context.Context, c <-chan ma.Multiaddr) (<-chan ma.Multi if !delay.Stop() { <-delay.C } - delay.Reset(tierDelay) + delay.Reset(TierDelay) case <-ctx.Done(): return false, ctx.Err() } diff --git a/dial_delay_test.go b/dial_delay_test.go index b5391d44..cc55fc04 100644 --- a/dial_delay_test.go +++ b/dial_delay_test.go @@ -29,7 +29,7 @@ func prepare() { } _ = ma.AddProtocol(circuitProto) - tierDelay = 32 * time.Millisecond // 2x windows timer resolution + TierDelay = 32 * time.Millisecond // 2x windows timer resolution } // addrChan creates a multiaddr channel with `nsync` size. If nsync is larger @@ -250,7 +250,7 @@ func TestDelayMoreT1T0(t *testing.T) { func TestDelaySingleT0T1WaitT0(t *testing.T) { ctx := context.Background() prepare() - tierDelay = 64 * time.Millisecond // 4x windows timer resolution + TierDelay = 64 * time.Millisecond // 4x windows timer resolution start := time.Now() ch, _ := delayDialAddrs(ctx, addrChan(t, 2, T0_A, T1_A, "16ms", T0_B)) From 91fe548fda68d5592ba7b1ad48da612ca8a19a50 Mon Sep 17 00:00:00 2001 From: vyzo Date: Fri, 28 Sep 2018 18:38:42 +0300 Subject: [PATCH 5/7] dial delay: safer relay address filter Just the presence of /p2p-circuit in the addr is enough to warrant delayed dialing. This accepts both explicit relay addresses and is also future-proof for changes in go-libp2p-circuit#48 and specs#72. --- dial_delay.go | 12 +++++++----- dial_delay_test.go | 10 +++++----- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/dial_delay.go b/dial_delay.go index 35e7e7f1..7b6d1dc4 100644 --- a/dial_delay.go +++ b/dial_delay.go @@ -5,17 +5,14 @@ import ( "time" ma "github.com/multiformats/go-multiaddr" - mafmt "github.com/whyrusleeping/mafmt" ) -const p_circuit = 290 +const P_CIRCUIT = 290 const numTiers = 2 var TierDelay = 1 * time.Second -var relay = mafmt.Or(mafmt.And(mafmt.Base(p_circuit), mafmt.Base(ma.P_IPFS)), mafmt.And(mafmt.Base(ma.P_IPFS), mafmt.Base(p_circuit), mafmt.Base(ma.P_IPFS))) - // delayDialAddrs returns a address channel sorted by priority, pushing the // addresses with delay between them. The other channel can be used to trigger // sending more addresses in case all previous failed @@ -201,9 +198,14 @@ func delayDialAddrs(ctx context.Context, c <-chan ma.Multiaddr) (<-chan ma.Multi // return value must be > 0 & < numTiers. func getTier(addr ma.Multiaddr) int { switch { - case relay.Matches(addr): + case isRelayAddr(addr): return 1 default: return 0 } } + +func isRelayAddr(addr ma.Multiaddr) bool { + _, err := addr.ValueForProtocol(P_CIRCUIT) + return err == nil +} diff --git a/dial_delay_test.go b/dial_delay_test.go index cc55fc04..ababdb98 100644 --- a/dial_delay_test.go +++ b/dial_delay_test.go @@ -22,10 +22,10 @@ const ( func prepare() { var circuitProto = ma.Protocol{ - Code: p_circuit, + Code: P_CIRCUIT, Size: 0, Name: "p2p-circuit", - VCode: ma.CodeToVarint(p_circuit), + VCode: ma.CodeToVarint(P_CIRCUIT), } _ = ma.AddProtocol(circuitProto) @@ -90,7 +90,7 @@ func TestRelayMatch(t *testing.T) { t.Fatal(err) } - if relay.Matches(addr) { + if isRelayAddr(addr) { t.Error("T0_A shouldn't match") } @@ -99,7 +99,7 @@ func TestRelayMatch(t *testing.T) { t.Fatal(err) } - if !relay.Matches(addr) { + if isRelayAddr(addr) { t.Error("T1_A should match") } @@ -108,7 +108,7 @@ func TestRelayMatch(t *testing.T) { t.Fatal(err) } - if !relay.Matches(addr) { + if isRelayAddr(addr) { t.Error("T1_C should match") } } From 835519e35be94692d53e3a3804a0ab4b89ed8754 Mon Sep 17 00:00:00 2001 From: vyzo Date: Fri, 28 Sep 2018 19:18:37 +0300 Subject: [PATCH 6/7] dial delay: reify delay logic into top-level function --- dial_delay.go | 253 +++++++++++++++++++++++++------------------------- 1 file changed, 128 insertions(+), 125 deletions(-) diff --git a/dial_delay.go b/dial_delay.go index 7b6d1dc4..7d6741cf 100644 --- a/dial_delay.go +++ b/dial_delay.go @@ -18,180 +18,183 @@ var TierDelay = 1 * time.Second // sending more addresses in case all previous failed func delayDialAddrs(ctx context.Context, c <-chan ma.Multiaddr) (<-chan ma.Multiaddr, chan<- struct{}) { out := make(chan ma.Multiaddr) - delay := time.NewTimer(TierDelay) triggerNext := make(chan struct{}, 1) - go func() { - defer delay.Stop() - defer close(out) - var pending [numTiers][]ma.Multiaddr - lastTier := -1 + go doDelayDialAddrs(ctx, c, out, triggerNext) - // put enqueues the multiaddr - put := func(addr ma.Multiaddr) { - tier := getTier(addr) - pending[tier] = append(pending[tier], addr) - } + return out, triggerNext +} - // get gets the best (lowest tier) multiaddr available - // note that within a single tier put/get behave like a stack (LIFO) - get := func() (ma.Multiaddr, int) { - for i, tier := range pending[:] { - if len(tier) > 0 { - addr := tier[len(tier)-1] - tier[len(tier)-1] = nil - pending[i] = tier[:len(tier)-1] - return addr, i - } - } - return nil, -1 - } +func doDelayDialAddrs(ctx context.Context, c <-chan ma.Multiaddr, out chan ma.Multiaddr, triggerNext chan struct{}) { + delay := time.NewTimer(TierDelay) - // fillBuckets reads pending addresses form the channel without blocking - fillBuckets := func() bool { - for { - select { - case addr, ok := <-c: - if !ok { - return false - } - put(addr) - default: - return true - } + defer delay.Stop() + defer close(out) + var pending [numTiers][]ma.Multiaddr + lastTier := -1 + + // put enqueues the multiaddr + put := func(addr ma.Multiaddr) { + tier := getTier(addr) + pending[tier] = append(pending[tier], addr) + } + + // get gets the best (lowest tier) multiaddr available + // note that within a single tier put/get behave like a stack (LIFO) + get := func() (ma.Multiaddr, int) { + for i, tier := range pending[:] { + if len(tier) > 0 { + addr := tier[len(tier)-1] + tier[len(tier)-1] = nil + pending[i] = tier[:len(tier)-1] + return addr, i } } + return nil, -1 + } - // waitForMore waits for addresses from the channel - waitForMore := func() (bool, error) { + // fillBuckets reads pending addresses form the channel without blocking + fillBuckets := func() bool { + for { select { case addr, ok := <-c: if !ok { - return false, nil + return false } put(addr) - case <-ctx.Done(): - return false, ctx.Err() + default: + return true } - return true, nil } + } - // maybeJumpTier will check if the address tier is changing and optionally - // wait some time. - maybeJumpTier := func(tier int, next ma.Multiaddr) (cont bool, brk bool, err error) { - if tier > lastTier && lastTier != -1 { - // Wait the delay (preempt with new addresses or when the dialer - // requests more addresses) - select { - case addr, ok := <-c: - put(next) - if !ok { - return false, true, nil - } - put(addr) - return true, false, nil - case <-delay.C: - delay.Reset(TierDelay) - case <-triggerNext: - if !delay.Stop() { - <-delay.C - } - delay.Reset(TierDelay) - case <-ctx.Done(): - return false, false, ctx.Err() - } + // waitForMore waits for addresses from the channel + waitForMore := func() (bool, error) { + select { + case addr, ok := <-c: + if !ok { + return false, nil } - - // Note that we want to only update the tier after we've done the waiting - // or we were asked to finish early - lastTier = tier - return false, false, nil + put(addr) + case <-ctx.Done(): + return false, ctx.Err() } + return true, nil + } - recvOrSend := func(next ma.Multiaddr) (brk bool, err error) { + // maybeJumpTier will check if the address tier is changing and optionally + // wait some time. + maybeJumpTier := func(tier int, next ma.Multiaddr) (cont bool, brk bool, err error) { + if tier > lastTier && lastTier != -1 { + // Wait the delay (preempt with new addresses or when the dialer + // requests more addresses) select { case addr, ok := <-c: put(next) if !ok { - return true, nil + return false, true, nil } put(addr) - case out <- next: - // Always count the timeout since the last dial. + return true, false, nil + case <-delay.C: + delay.Reset(TierDelay) + case <-triggerNext: if !delay.Stop() { <-delay.C } delay.Reset(TierDelay) case <-ctx.Done(): - return false, ctx.Err() + return false, false, ctx.Err() } - return false, nil } - // process the address stream - for { - if !fillBuckets() { - break // input channel closed + // Note that we want to only update the tier after we've done the waiting + // or we were asked to finish early + lastTier = tier + return false, false, nil + } + + recvOrSend := func(next ma.Multiaddr) (brk bool, err error) { + select { + case addr, ok := <-c: + put(next) + if !ok { + return true, nil + } + put(addr) + case out <- next: + // Always count the timeout since the last dial. + if !delay.Stop() { + <-delay.C } + delay.Reset(TierDelay) + case <-ctx.Done(): + return false, ctx.Err() + } + return false, nil + } - next, tier := get() + // process the address stream + for { + if !fillBuckets() { + break // input channel closed + } - // Nothing? Block! - if next == nil { - ok, err := waitForMore() - if err != nil { - return - } - if !ok { - break // input channel closed - } - continue - } + next, tier := get() - cont, brk, err := maybeJumpTier(tier, next) - if cont { - continue // received an address while waiting, in case it's lower tier - // look at it immediately - } - if brk { - break // input channel closed - } + // Nothing? Block! + if next == nil { + ok, err := waitForMore() if err != nil { return } - - brk, err = recvOrSend(next) - if brk { + if !ok { break // input channel closed } - if err != nil { - return - } + continue } - // the channel is closed by now - c = nil + cont, brk, err := maybeJumpTier(tier, next) + if cont { + continue // received an address while waiting, in case it's lower tier + // look at it immediately + } + if brk { + break // input channel closed + } + if err != nil { + return + } - // finish sending - for { - next, tier := get() - if next == nil { - return - } + brk, err = recvOrSend(next) + if brk { + break // input channel closed + } + if err != nil { + return + } + } - _, _, err := maybeJumpTier(tier, next) - if err != nil { - return - } + // the channel is closed by now + c = nil - _, err = recvOrSend(next) - if err != nil { - return - } + // finish sending + for { + next, tier := get() + if next == nil { + return } - }() - return out, triggerNext + _, _, err := maybeJumpTier(tier, next) + if err != nil { + return + } + + _, err = recvOrSend(next) + if err != nil { + return + } + } } // getTier returns the priority tier of the address. From 4be8cf4f1c5c9cd32ae40704cd6d9749e40044be Mon Sep 17 00:00:00 2001 From: vyzo Date: Fri, 28 Sep 2018 19:22:48 +0300 Subject: [PATCH 7/7] dial delay: fix failing test --- dial_delay_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dial_delay_test.go b/dial_delay_test.go index ababdb98..6855b310 100644 --- a/dial_delay_test.go +++ b/dial_delay_test.go @@ -99,7 +99,7 @@ func TestRelayMatch(t *testing.T) { t.Fatal(err) } - if isRelayAddr(addr) { + if !isRelayAddr(addr) { t.Error("T1_A should match") } @@ -108,7 +108,7 @@ func TestRelayMatch(t *testing.T) { t.Fatal(err) } - if isRelayAddr(addr) { + if !isRelayAddr(addr) { t.Error("T1_C should match") } }