From 1e31d705331dce904c8f0202d45e0178eff7a919 Mon Sep 17 00:00:00 2001 From: Sukun Date: Wed, 28 Jun 2023 01:10:06 +0530 Subject: [PATCH] swarm: implement blackhole detection (#2320) * swarm: implement blackhole detection * address review comments --- dashboards/swarm/swarm.json | 254 ++++++++++++++++++++- p2p/net/swarm/black_hole_detector.go | 257 ++++++++++++++++++++++ p2p/net/swarm/black_hole_detector_test.go | 179 +++++++++++++++ p2p/net/swarm/dial_worker.go | 11 +- p2p/net/swarm/swarm.go | 6 + p2p/net/swarm/swarm_dial.go | 13 ++ p2p/net/swarm/swarm_dial_test.go | 43 ++++ p2p/net/swarm/swarm_metrics.go | 40 ++++ p2p/net/swarm/swarm_metrics_test.go | 11 + 9 files changed, 810 insertions(+), 4 deletions(-) create mode 100644 p2p/net/swarm/black_hole_detector.go create mode 100644 p2p/net/swarm/black_hole_detector_test.go diff --git a/dashboards/swarm/swarm.json b/dashboards/swarm/swarm.json index d7db1c8075..ec64784139 100644 --- a/dashboards/swarm/swarm.json +++ b/dashboards/swarm/swarm.json @@ -35,6 +35,18 @@ "name": "Prometheus", "version": "1.0.0" }, + { + "type": "panel", + "id": "stat", + "name": "Stat", + "version": "" + }, + { + "type": "panel", + "id": "state-timeline", + "name": "State timeline", + "version": "" + }, { "type": "panel", "id": "timeseries", @@ -3026,8 +3038,248 @@ ], "title": "Dials per connection", "type": "piechart" + }, + { + "collapsed": false, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 84 + }, + "id": 44, + "panels": [], + "title": "Black Hole Detection", + "type": "row" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "fixed" + }, + "custom": { + "fillOpacity": 76, + "lineWidth": 0, + "spanNulls": true + }, + "mappings": [ + { + "options": { + "0": { + "color": "blue", + "index": 0, + "text": "Probing" + }, + "1": { + "color": "green", + "index": 1, + "text": "Allowed" + }, + "2": { + "color": "purple", + "index": 2, + "text": "Blocked" + } + }, + "type": "value" + } + ], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 6, + "w": 24, + "x": 0, + "y": 85 + }, + "id": 46, + "options": { + "alignValue": "center", + "legend": { + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "mergeValues": true, + "rowHeight": 0.9, + "showValue": "always", + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "code", + "expr": "libp2p_swarm_black_hole_filter_state{instance=~\"$instance\"}", + "legendFormat": "{{instance}} {{name}}", + "range": true, + "refId": "A" + } + ], + "title": "Black Hole Filter State", + "type": "state-timeline" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "fixedColor": "purple", + "mode": "fixed" + }, + "mappings": [ + { + "options": { + "0": { + "index": 0, + "text": "-" + } + }, + "type": "value" + } + ], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 7, + "w": 12, + "x": 0, + "y": 91 + }, + "id": 49, + "options": { + "colorMode": "value", + "graphMode": "none", + "justifyMode": "auto", + "orientation": "horizontal", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "textMode": "value_and_name" + }, + "pluginVersion": "9.3.6", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "code", + "expr": "libp2p_swarm_black_hole_filter_next_request_allowed_after{instance=~\"$instance\"}", + "legendFormat": "{{instance}}: {{name}}", + "range": true, + "refId": "A" + } + ], + "title": "Black Hole Filter Requests Till Next Probe", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "max": 10, + "min": 0, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "purple", + "value": null + }, + { + "color": "green", + "value": 5 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 7, + "w": 12, + "x": 12, + "y": 91 + }, + "id": 47, + "options": { + "orientation": "vertical", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "showThresholdLabels": false, + "showThresholdMarkers": true + }, + "pluginVersion": "9.3.6", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "code", + "exemplar": false, + "expr": "libp2p_swarm_black_hole_filter_success_fraction{instance=~\"$instance\"} * 100", + "instant": true, + "legendFormat": "{{instance}} {{name}}", + "range": false, + "refId": "A" + } + ], + "title": "Black Hole Filter Success Percentage", + "type": "gauge" } ], + "refresh": false, "schemaVersion": 37, "style": "dark", "tags": [], @@ -3075,6 +3327,6 @@ "timezone": "", "title": "libp2p Swarm", "uid": "a15PyhO4z", - "version": 6, + "version": 7, "weekStart": "" } diff --git a/p2p/net/swarm/black_hole_detector.go b/p2p/net/swarm/black_hole_detector.go new file mode 100644 index 0000000000..0c415080e0 --- /dev/null +++ b/p2p/net/swarm/black_hole_detector.go @@ -0,0 +1,257 @@ +package swarm + +import ( + "fmt" + "sync" + + ma "github.com/multiformats/go-multiaddr" + manet "github.com/multiformats/go-multiaddr/net" +) + +type blackHoleState int + +const ( + blackHoleStateProbing blackHoleState = iota + blackHoleStateAllowed + blackHoleStateBlocked +) + +func (st blackHoleState) String() string { + switch st { + case blackHoleStateProbing: + return "Probing" + case blackHoleStateAllowed: + return "Allowed" + case blackHoleStateBlocked: + return "Blocked" + default: + return fmt.Sprintf("Unknown %d", st) + } +} + +type blackHoleResult int + +const ( + blackHoleResultAllowed blackHoleResult = iota + blackHoleResultProbing + blackHoleResultBlocked +) + +// blackHoleFilter provides black hole filtering for dials. This filter should be used in +// concert with a UDP of IPv6 address filter to detect UDP or IPv6 black hole. In a black +// holed environments dial requests are blocked and only periodic probes to check the +// state of the black hole are allowed. +// +// Requests are blocked if the number of successes in the last n dials is less than +// minSuccesses. If a request succeeds in Blocked state, the filter state is reset and n +// subsequent requests are allowed before reevaluating black hole state. Dials cancelled +// when some other concurrent dial succeeded are counted as failures. A sufficiently large +// n prevents false negatives in such cases. +type blackHoleFilter struct { + // n serves the dual purpose of being the minimum number of requests after which we + // probe the state of the black hole in blocked state and the minimum number of + // completed dials required before evaluating black hole state. + n int + // minSuccesses is the minimum number of Success required in the last n dials + // to consider we are not blocked. + minSuccesses int + // name for the detector. + name string + + // requests counts number of dial requests to peers. We handle request at a peer + // level and record results at individual address dial level. + requests int + // dialResults of the last `n` dials. A successful dial is true. + dialResults []bool + // successes is the count of successful dials in outcomes + successes int + // state is the current state of the detector + state blackHoleState + + mu sync.Mutex + metricsTracer MetricsTracer +} + +// RecordResult records the outcome of a dial. A successful dial will change the state +// of the filter to Allowed. A failed dial only blocks subsequent requests if the success +// fraction over the last n outcomes is less than the minSuccessFraction of the filter. +func (b *blackHoleFilter) RecordResult(success bool) { + b.mu.Lock() + defer b.mu.Unlock() + + if b.state == blackHoleStateBlocked && success { + // If the call succeeds in a blocked state we reset to allowed. + // This is better than slowly accumulating values till we cross the minSuccessFraction + // threshold since a blackhole is a binary property. + b.reset() + return + } + + if success { + b.successes++ + } + b.dialResults = append(b.dialResults, success) + + if len(b.dialResults) > b.n { + if b.dialResults[0] { + b.successes-- + } + b.dialResults = b.dialResults[1:] + } + + b.updateState() + b.trackMetrics() +} + +// HandleRequest returns the result of applying the black hole filter for the request. +func (b *blackHoleFilter) HandleRequest() blackHoleResult { + b.mu.Lock() + defer b.mu.Unlock() + + b.requests++ + + b.trackMetrics() + + if b.state == blackHoleStateAllowed { + return blackHoleResultAllowed + } else if b.state == blackHoleStateProbing || b.requests%b.n == 0 { + return blackHoleResultProbing + } else { + return blackHoleResultBlocked + } +} + +func (b *blackHoleFilter) reset() { + b.successes = 0 + b.dialResults = b.dialResults[:0] + b.requests = 0 + b.updateState() +} + +func (b *blackHoleFilter) updateState() { + st := b.state + + if len(b.dialResults) < b.n { + b.state = blackHoleStateProbing + } else if b.successes >= b.minSuccesses { + b.state = blackHoleStateAllowed + } else { + b.state = blackHoleStateBlocked + } + + if st != b.state { + log.Debugf("%s blackHoleDetector state changed from %s to %s", b.name, st, b.state) + } +} + +func (b *blackHoleFilter) trackMetrics() { + if b.metricsTracer == nil { + return + } + + nextRequestAllowedAfter := 0 + if b.state == blackHoleStateBlocked { + nextRequestAllowedAfter = b.n - (b.requests % b.n) + } + + successFraction := 0.0 + if len(b.dialResults) > 0 { + successFraction = float64(b.successes) / float64(len(b.dialResults)) + } + + b.metricsTracer.UpdatedBlackHoleFilterState( + b.name, + b.state, + nextRequestAllowedAfter, + successFraction, + ) +} + +// blackHoleDetector provides UDP and IPv6 black hole detection using a `blackHoleFilter` +// for each. For details of the black hole detection logic see `blackHoleFilter`. +// +// black hole filtering is done at a peer dial level to ensure that periodic probes to +// detect change of the black hole state are actually dialed and are not skipped +// because of dial prioritisation logic. +type blackHoleDetector struct { + udp, ipv6 *blackHoleFilter +} + +// FilterAddrs filters the peer's addresses removing black holed addresses +func (d *blackHoleDetector) FilterAddrs(addrs []ma.Multiaddr) []ma.Multiaddr { + hasUDP, hasIPv6 := false, false + for _, a := range addrs { + if !manet.IsPublicAddr(a) { + continue + } + if isProtocolAddr(a, ma.P_UDP) { + hasUDP = true + } + if isProtocolAddr(a, ma.P_IP6) { + hasIPv6 = true + } + } + + udpRes := blackHoleResultAllowed + if d.udp != nil && hasUDP { + udpRes = d.udp.HandleRequest() + } + + ipv6Res := blackHoleResultAllowed + if d.ipv6 != nil && hasIPv6 { + ipv6Res = d.ipv6.HandleRequest() + } + + return ma.FilterAddrs( + addrs, + func(a ma.Multiaddr) bool { + if !manet.IsPublicAddr(a) { + return true + } + // allow all UDP addresses while probing irrespective of IPv6 black hole state + if udpRes == blackHoleResultProbing && isProtocolAddr(a, ma.P_UDP) { + return true + } + // allow all IPv6 addresses while probing irrespective of UDP black hole state + if ipv6Res == blackHoleResultProbing && isProtocolAddr(a, ma.P_IP6) { + return true + } + + if udpRes == blackHoleResultBlocked && isProtocolAddr(a, ma.P_UDP) { + return false + } + if ipv6Res == blackHoleResultBlocked && isProtocolAddr(a, ma.P_IP6) { + return false + } + return true + }, + ) +} + +// RecordResult updates the state of the relevant `blackHoleFilter`s for addr +func (d *blackHoleDetector) RecordResult(addr ma.Multiaddr, success bool) { + if !manet.IsPublicAddr(addr) { + return + } + if d.udp != nil && isProtocolAddr(addr, ma.P_UDP) { + d.udp.RecordResult(success) + } + if d.ipv6 != nil && isProtocolAddr(addr, ma.P_IP6) { + d.ipv6.RecordResult(success) + } +} + +func newBlackHoleDetector(detectUDP, detectIPv6 bool, mt MetricsTracer) *blackHoleDetector { + d := &blackHoleDetector{} + + // A black hole is a binary property. On a network if UDP dials are blocked or there is + // no IPv6 connectivity, all dials will fail. So a low success rate of 5 out 100 dials + // is good enough. + if detectUDP { + d.udp = &blackHoleFilter{n: 100, minSuccesses: 5, name: "UDP", metricsTracer: mt} + } + if detectIPv6 { + d.ipv6 = &blackHoleFilter{n: 100, minSuccesses: 5, name: "IPv6", metricsTracer: mt} + } + return d +} diff --git a/p2p/net/swarm/black_hole_detector_test.go b/p2p/net/swarm/black_hole_detector_test.go new file mode 100644 index 0000000000..564fc07767 --- /dev/null +++ b/p2p/net/swarm/black_hole_detector_test.go @@ -0,0 +1,179 @@ +package swarm + +import ( + "fmt" + "testing" + + ma "github.com/multiformats/go-multiaddr" + "github.com/stretchr/testify/require" +) + +func TestBlackHoleFilterReset(t *testing.T) { + n := 10 + bhf := &blackHoleFilter{n: n, minSuccesses: 2, name: "test"} + var i = 0 + // calls up to n should be probing + for i = 1; i <= n; i++ { + if bhf.HandleRequest() != blackHoleResultProbing { + t.Fatalf("expected calls up to n to be probes") + } + bhf.RecordResult(false) + } + + // after threshold calls every nth call should be a probe + for i = n + 1; i < 42; i++ { + result := bhf.HandleRequest() + if (i%n == 0 && result != blackHoleResultProbing) || (i%n != 0 && result != blackHoleResultBlocked) { + t.Fatalf("expected every nth dial to be a probe") + } + } + + bhf.RecordResult(true) + // check if calls up to n are probes again + for i = 0; i < n; i++ { + if bhf.HandleRequest() != blackHoleResultProbing { + t.Fatalf("expected black hole detector state to reset after success") + } + bhf.RecordResult(false) + } + + // next call should be blocked + if bhf.HandleRequest() != blackHoleResultBlocked { + t.Fatalf("expected dial to be blocked") + } +} + +func TestBlackHoleFilterSuccessFraction(t *testing.T) { + n := 10 + tests := []struct { + minSuccesses, successes int + result blackHoleResult + }{ + {minSuccesses: 5, successes: 5, result: blackHoleResultAllowed}, + {minSuccesses: 3, successes: 3, result: blackHoleResultAllowed}, + {minSuccesses: 5, successes: 4, result: blackHoleResultBlocked}, + {minSuccesses: 5, successes: 7, result: blackHoleResultAllowed}, + {minSuccesses: 3, successes: 1, result: blackHoleResultBlocked}, + {minSuccesses: 0, successes: 0, result: blackHoleResultAllowed}, + {minSuccesses: 10, successes: 10, result: blackHoleResultAllowed}, + } + for i, tc := range tests { + t.Run(fmt.Sprintf("case-%d", i), func(t *testing.T) { + bhf := blackHoleFilter{n: n, minSuccesses: tc.minSuccesses} + for i := 0; i < tc.successes; i++ { + bhf.RecordResult(true) + } + for i := 0; i < n-tc.successes; i++ { + bhf.RecordResult(false) + } + got := bhf.HandleRequest() + if got != tc.result { + t.Fatalf("expected %d got %d", tc.result, got) + } + }) + } +} + +func TestBlackHoleDetectorInApplicableAddress(t *testing.T) { + bhd := newBlackHoleDetector(true, true, nil) + addrs := []ma.Multiaddr{ + ma.StringCast("/ip4/1.2.3.4/tcp/1234"), + ma.StringCast("/ip4/1.2.3.4/tcp/1233"), + ma.StringCast("/ip6/::1/udp/1234/quic-v1"), + ma.StringCast("/ip4/192.168.1.5/udp/1234/quic-v1"), + } + for i := 0; i < 1000; i++ { + filteredAddrs := bhd.FilterAddrs(addrs) + require.ElementsMatch(t, addrs, filteredAddrs) + for j := 0; j < len(addrs); j++ { + bhd.RecordResult(addrs[j], false) + } + } +} + +func TestBlackHoleDetectorUDPDisabled(t *testing.T) { + bhd := newBlackHoleDetector(false, true, nil) + publicAddr := ma.StringCast("/ip4/1.2.3.4/udp/1234/quic-v1") + privAddr := ma.StringCast("/ip4/192.168.1.5/udp/1234/quic-v1") + for i := 0; i < 100; i++ { + bhd.RecordResult(publicAddr, false) + } + addrs := []ma.Multiaddr{publicAddr, privAddr} + require.ElementsMatch(t, addrs, bhd.FilterAddrs(addrs)) +} + +func TestBlackHoleDetectorIPv6Disabled(t *testing.T) { + bhd := newBlackHoleDetector(true, false, nil) + publicAddr := ma.StringCast("/ip6/1::1/tcp/1234") + privAddr := ma.StringCast("/ip6/::1/tcp/1234") + addrs := []ma.Multiaddr{publicAddr, privAddr} + for i := 0; i < 100; i++ { + bhd.RecordResult(publicAddr, false) + } + require.ElementsMatch(t, addrs, bhd.FilterAddrs(addrs)) +} + +func TestBlackHoleDetectorProbes(t *testing.T) { + bhd := &blackHoleDetector{ + udp: &blackHoleFilter{n: 2, minSuccesses: 1, name: "udp"}, + ipv6: &blackHoleFilter{n: 3, minSuccesses: 1, name: "ipv6"}, + } + udp6Addr := ma.StringCast("/ip6/1::1/udp/1234/quic-v1") + addrs := []ma.Multiaddr{udp6Addr} + for i := 0; i < 3; i++ { + bhd.RecordResult(udp6Addr, false) + } + for i := 1; i < 100; i++ { + filteredAddrs := bhd.FilterAddrs(addrs) + if i%2 == 0 || i%3 == 0 { + if len(filteredAddrs) == 0 { + t.Fatalf("expected probe to be allowed irrespective of the state of other black hole filter") + } + } else { + if len(filteredAddrs) != 0 { + t.Fatalf("expected dial to be blocked %s", filteredAddrs) + } + } + } + +} + +func TestBlackHoleDetectorAddrFiltering(t *testing.T) { + udp6Pub := ma.StringCast("/ip6/1::1/udp/1234/quic-v1") + udp6Pri := ma.StringCast("/ip6/::1/udp/1234/quic-v1") + upd4Pub := ma.StringCast("/ip4/1.2.3.4/udp/1234/quic-v1") + udp4Pri := ma.StringCast("/ip4/192.168.1.5/udp/1234/quic-v1") + tcp6Pub := ma.StringCast("/ip6/1::1/tcp/1234/quic-v1") + tcp6Pri := ma.StringCast("/ip6/::1/tcp/1234/quic-v1") + tcp4Pub := ma.StringCast("/ip4/1.2.3.4/tcp/1234/quic-v1") + tcp4Pri := ma.StringCast("/ip4/192.168.1.5/tcp/1234/quic-v1") + + makeBHD := func(udpBlocked, ipv6Blocked bool) *blackHoleDetector { + bhd := &blackHoleDetector{ + udp: &blackHoleFilter{n: 100, minSuccesses: 10, name: "udp"}, + ipv6: &blackHoleFilter{n: 100, minSuccesses: 10, name: "ipv6"}, + } + for i := 0; i < 100; i++ { + bhd.RecordResult(upd4Pub, !udpBlocked) + } + for i := 0; i < 100; i++ { + bhd.RecordResult(tcp6Pub, !ipv6Blocked) + } + return bhd + } + + allInput := []ma.Multiaddr{udp6Pub, udp6Pri, upd4Pub, udp4Pri, tcp6Pub, tcp6Pri, + tcp4Pub, tcp4Pri} + + udpBlockedOutput := []ma.Multiaddr{udp6Pri, udp4Pri, tcp6Pub, tcp6Pri, tcp4Pub, tcp4Pri} + bhd := makeBHD(true, false) + require.ElementsMatch(t, udpBlockedOutput, bhd.FilterAddrs(allInput)) + + ip6BlockedOutput := []ma.Multiaddr{udp6Pri, upd4Pub, udp4Pri, tcp6Pri, tcp4Pub, tcp4Pri} + bhd = makeBHD(false, true) + require.ElementsMatch(t, ip6BlockedOutput, bhd.FilterAddrs(allInput)) + + bothBlockedOutput := []ma.Multiaddr{udp6Pri, udp4Pri, tcp6Pri, tcp4Pub, tcp4Pri} + bhd = makeBHD(true, true) + require.ElementsMatch(t, bothBlockedOutput, bhd.FilterAddrs(allInput)) +} diff --git a/p2p/net/swarm/dial_worker.go b/p2p/net/swarm/dial_worker.go index 5688494f49..0334ac863e 100644 --- a/p2p/net/swarm/dial_worker.go +++ b/p2p/net/swarm/dial_worker.go @@ -295,8 +295,8 @@ loop: ad.dialRankingDelay = now.Sub(ad.createdAt) err := w.s.dialNextAddr(ad.ctx, w.peer, ad.addr, w.resch) if err != nil { - // the actual dial happens in a different go routine. An err here - // only happens in case of backoff. handle that. + // Errored without attempting a dial. This happens in case of + // backoff or black hole. w.dispatchError(ad, err) } else { // the dial was successful. update inflight dials @@ -358,11 +358,16 @@ loop: } // it must be an error -- add backoff if applicable and dispatch - if res.Err != context.Canceled && !w.connected { + // ErrDialRefusedBlackHole shouldn't end up here, just a safety check + if res.Err != ErrDialRefusedBlackHole && res.Err != context.Canceled && !w.connected { // we only add backoff if there has not been a successful connection // for consistency with the old dialer behavior. w.s.backf.AddBackoff(w.peer, res.Addr) + } else if res.Err == ErrDialRefusedBlackHole { + log.Errorf("SWARM BUG: unexpected ErrDialRefusedBlackHole while dialing peer %s to addr %s", + w.peer, res.Addr) } + w.dispatchError(ad, res.Err) // Only schedule next dial on error. // If we scheduleNextDial on success, we will end up making one dial more than diff --git a/p2p/net/swarm/swarm.go b/p2p/net/swarm/swarm.go index 194d012758..c0e8f1cf12 100644 --- a/p2p/net/swarm/swarm.go +++ b/p2p/net/swarm/swarm.go @@ -173,6 +173,8 @@ type Swarm struct { metricsTracer MetricsTracer dialRanker network.DialRanker + + bhd *blackHoleDetector } // NewSwarm constructs a Swarm. @@ -209,8 +211,12 @@ func NewSwarm(local peer.ID, peers peerstore.Peerstore, eventBus event.Bus, opts } s.dsync = newDialSync(s.dialWorkerLoop) + s.limiter = newDialLimiter(s.dialAddr) s.backf.init(s.ctx) + + s.bhd = newBlackHoleDetector(true, true, s.metricsTracer) + return s, nil } diff --git a/p2p/net/swarm/swarm_dial.go b/p2p/net/swarm/swarm_dial.go index f0c941320e..83799f56f7 100644 --- a/p2p/net/swarm/swarm_dial.go +++ b/p2p/net/swarm/swarm_dial.go @@ -39,6 +39,9 @@ var ( // been dialed too frequently ErrDialBackoff = errors.New("dial backoff") + // ErrDialRefusedBlackHole is returned when we are in a black holed environment + ErrDialRefusedBlackHole = errors.New("dial refused because of black hole") + // ErrDialToSelf is returned if we attempt to dial our own peer ErrDialToSelf = errors.New("dial to self attempted") @@ -434,9 +437,13 @@ func (s *Swarm) filterKnownUndialables(p peer.ID, addrs []ma.Multiaddr) []ma.Mul // filter addresses we cannot dial addrs = ma.FilterAddrs(addrs, s.canDial) + // filter low priority addresses among the addresses we can dial addrs = filterLowPriorityAddresses(addrs) + // remove black holed addrs + addrs = s.bhd.FilterAddrs(addrs) + return ma.FilterAddrs(addrs, func(addr ma.Multiaddr) bool { return !ma.Contains(ourAddrs, addr) }, // TODO: Consider allowing link-local addresses @@ -484,6 +491,12 @@ func (s *Swarm) dialAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr) (tra start := time.Now() connC, err := tpt.Dial(ctx, addr, p) + + // We're recording any error as a failure here. + // Notably, this also applies to cancelations (i.e. if another dial attempt was faster). + // This is ok since the black hole detector uses a very low threshold (5%). + s.bhd.RecordResult(addr, err == nil) + if err != nil { if s.metricsTracer != nil { s.metricsTracer.FailedDialing(addr, err) diff --git a/p2p/net/swarm/swarm_dial_test.go b/p2p/net/swarm/swarm_dial_test.go index f22144ee8c..2f6b3f8c4d 100644 --- a/p2p/net/swarm/swarm_dial_test.go +++ b/p2p/net/swarm/swarm_dial_test.go @@ -330,3 +330,46 @@ func TestAddrsForDialFiltering(t *testing.T) { }) } } + +func TestBlackHoledAddrBlocked(t *testing.T) { + resolver, err := madns.NewResolver() + if err != nil { + t.Fatal(err) + } + s := newTestSwarmWithResolver(t, resolver) + defer s.Close() + + n := 3 + s.bhd.ipv6 = &blackHoleFilter{n: n, minSuccesses: 1, name: "IPv6"} + + // all dials to the address will fail. RFC6666 Discard Prefix + addr := ma.StringCast("/ip6/0100::1/tcp/54321/") + + p, err := test.RandPeerID() + if err != nil { + t.Error(err) + } + s.Peerstore().AddAddr(p, addr, peerstore.PermanentAddrTTL) + + // do 1 extra dial to ensure that the blackHoleDetector state is updated since it + // happens in a different goroutine + for i := 0; i < n+1; i++ { + s.backf.Clear(p) + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + conn, err := s.DialPeer(ctx, p) + if err == nil || conn != nil { + t.Fatalf("expected dial to fail") + } + cancel() + } + s.backf.Clear(p) + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + conn, err := s.DialPeer(ctx, p) + if conn != nil { + t.Fatalf("expected dial to be blocked") + } + if err != ErrNoGoodAddresses { + t.Fatalf("expected to receive an error of type *DialError, got %s of type %T", err, err) + } +} diff --git a/p2p/net/swarm/swarm_metrics.go b/p2p/net/swarm/swarm_metrics.go index 3110217f81..28564e9e54 100644 --- a/p2p/net/swarm/swarm_metrics.go +++ b/p2p/net/swarm/swarm_metrics.go @@ -85,6 +85,30 @@ var ( Buckets: []float64{0.001, 0.01, 0.05, 0.1, 0.2, 0.3, 0.4, 0.5, 0.75, 1, 2}, }, ) + blackHoleFilterState = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: metricNamespace, + Name: "black_hole_filter_state", + Help: "State of the black hole filter", + }, + []string{"name"}, + ) + blackHoleFilterSuccessFraction = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: metricNamespace, + Name: "black_hole_filter_success_fraction", + Help: "Fraction of successful dials among the last n requests", + }, + []string{"name"}, + ) + blackHoleFilterNextRequestAllowedAfter = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: metricNamespace, + Name: "black_hole_filter_next_request_allowed_after", + Help: "Number of requests after which the next request will be allowed", + }, + []string{"name"}, + ) collectors = []prometheus.Collector{ connsOpened, keyTypes, @@ -94,6 +118,9 @@ var ( connHandshakeLatency, dialsPerPeer, dialRankingDelay, + blackHoleFilterSuccessFraction, + blackHoleFilterState, + blackHoleFilterNextRequestAllowedAfter, } ) @@ -104,6 +131,7 @@ type MetricsTracer interface { FailedDialing(ma.Multiaddr, error) DialCompleted(success bool, totalDials int) DialRankingDelay(d time.Duration) + UpdatedBlackHoleFilterState(name string, state blackHoleState, nextProbeAfter int, successFraction float64) } type metricsTracer struct{} @@ -235,3 +263,15 @@ func (m *metricsTracer) DialCompleted(success bool, totalDials int) { func (m *metricsTracer) DialRankingDelay(d time.Duration) { dialRankingDelay.Observe(d.Seconds()) } + +func (m *metricsTracer) UpdatedBlackHoleFilterState(name string, state blackHoleState, + nextProbeAfter int, successFraction float64) { + tags := metricshelper.GetStringSlice() + defer metricshelper.PutStringSlice(tags) + + *tags = append(*tags, name) + + blackHoleFilterState.WithLabelValues(*tags...).Set(float64(state)) + blackHoleFilterSuccessFraction.WithLabelValues(*tags...).Set(successFraction) + blackHoleFilterNextRequestAllowedAfter.WithLabelValues(*tags...).Set(float64(nextProbeAfter)) +} diff --git a/p2p/net/swarm/swarm_metrics_test.go b/p2p/net/swarm/swarm_metrics_test.go index 0e13048a99..25e13f3213 100644 --- a/p2p/net/swarm/swarm_metrics_test.go +++ b/p2p/net/swarm/swarm_metrics_test.go @@ -78,6 +78,9 @@ func TestMetricsNoAllocNoCover(t *testing.T) { ma.StringCast("/ip4/1.2.3.4/udp/2345"), } + bhfNames := []string{"udp", "ipv6", "tcp", "icmp"} + bhfState := []blackHoleState{blackHoleStateAllowed, blackHoleStateBlocked} + tests := map[string]func(){ "OpenedConnection": func() { mt.OpenedConnection(randItem(directions), randItem(keys), randItem(connections), randItem(addrs)) @@ -91,6 +94,14 @@ func TestMetricsNoAllocNoCover(t *testing.T) { "FailedDialing": func() { mt.FailedDialing(randItem(addrs), randItem(errors)) }, "DialCompleted": func() { mt.DialCompleted(mrand.Intn(2) == 1, mrand.Intn(10)) }, "DialRankingDelay": func() { mt.DialRankingDelay(time.Duration(mrand.Intn(1e10))) }, + "UpdatedBlackHoleFilterState": func() { + mt.UpdatedBlackHoleFilterState( + randItem(bhfNames), + randItem(bhfState), + mrand.Intn(100), + mrand.Float64(), + ) + }, } for method, f := range tests {