diff --git a/go.mod b/go.mod index f64e64ce..c90bda4a 100644 --- a/go.mod +++ b/go.mod @@ -3,13 +3,14 @@ module github.com/libp2p/go-libp2p-core go 1.17 require ( + github.com/benbjohnson/clock v1.3.0 github.com/btcsuite/btcd/btcec/v2 v2.1.3 github.com/coreos/go-semver v0.3.0 github.com/gogo/protobuf v1.3.1 github.com/ipfs/go-cid v0.2.0 github.com/ipfs/go-log/v2 v2.5.1 github.com/libp2p/go-buffer-pool v0.0.2 - github.com/libp2p/go-flow-metrics v0.0.3 + github.com/libp2p/go-flow-metrics v0.0.4-0.20220708232054-e5893ddd3f08 github.com/libp2p/go-msgio v0.0.6 github.com/libp2p/go-openssl v0.0.7 github.com/minio/sha256-simd v1.0.0 @@ -38,5 +39,5 @@ require ( go.uber.org/zap v1.19.1 // indirect golang.org/x/crypto v0.0.0-20210506145944-38f3c27a63bf // indirect golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c // indirect - gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index b799050e..689c221c 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,6 @@ -github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= +github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A= +github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/btcsuite/btcd/btcec/v2 v2.1.3 h1:xM/n3yIhHAhHy04z4i43C8p4ehixJZMsnrVJkgl+MTE= github.com/btcsuite/btcd/btcec/v2 v2.1.3/go.mod h1:ctjw4H1kknNJmRN4iP1R7bTQ+v3GJkZBd6mui8ZsAZE= github.com/btcsuite/btcd/chaincfg/chainhash v1.0.0/go.mod h1:7SFka0XMvUgj3hfZtydOrQY2mwhPclbT2snogU7SQQc= @@ -32,8 +33,8 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/libp2p/go-buffer-pool v0.0.2 h1:QNK2iAFa8gjAe1SPz6mHSMuCcjs+X1wlHzeOSqcmlfs= github.com/libp2p/go-buffer-pool v0.0.2/go.mod h1:MvaB6xw5vOrDl8rYZGLFdKAuk/hRoRZd1Vi32+RXyFM= -github.com/libp2p/go-flow-metrics v0.0.3 h1:8tAs/hSdNvUiLgtlSy3mxwxWP4I9y/jlkPFT7epKdeM= -github.com/libp2p/go-flow-metrics v0.0.3/go.mod h1:HeoSNUrOJVK1jEpDqVEiUOIXqhbnS27omG0uWU5slZs= +github.com/libp2p/go-flow-metrics v0.0.4-0.20220708232054-e5893ddd3f08 h1:LMrPQC6zMpO3T1liGM6RTyBDZERcJHOCVdoDUoGtkmA= +github.com/libp2p/go-flow-metrics v0.0.4-0.20220708232054-e5893ddd3f08/go.mod h1:4Xi8MX8wj5aWNDAZttg6UPmc0ZrnFNsMtpsYUClFtro= github.com/libp2p/go-msgio v0.0.6 h1:lQ7Uc0kS1wb1EfRxO2Eir/RJoHkHn7t6o+EiwsYIKJA= github.com/libp2p/go-msgio v0.0.6/go.mod h1:4ecVB6d9f4BDSL5fqvPiC4A3KivjWn+Venn/1ALLMWA= github.com/libp2p/go-openssl v0.0.7 h1:eCAzdLejcNVBzP/iZM9vqHnQm+XyCEbSSIheIPRGNsw= @@ -129,5 +130,6 @@ gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/metrics/bandwidth_test.go b/metrics/bandwidth_test.go index 685850d1..e0bd8d98 100644 --- a/metrics/bandwidth_test.go +++ b/metrics/bandwidth_test.go @@ -2,16 +2,25 @@ package metrics import ( "fmt" - "math" - "runtime" "sync" "testing" "time" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/protocol" + + "github.com/libp2p/go-flow-metrics" + + "github.com/benbjohnson/clock" + "github.com/stretchr/testify/require" ) +var cl = clock.NewMock() + +func init() { + flow.SetClock(cl) +} + func BenchmarkBandwidthCounter(b *testing.B) { b.StopTimer() b.ResetTimer() @@ -49,20 +58,13 @@ func round(bwc *BandwidthCounter, b *testing.B) { b.StopTimer() } -// Allow 1% errors for bw calculations. -const acceptableError = 0.01 - func TestBandwidthCounter(t *testing.T) { bwc := NewBandwidthCounter() - start := make(chan struct{}) - var wg sync.WaitGroup - wg.Add(200) - for i := 0; i < 100; i++ { - p := peer.ID(fmt.Sprintf("peer-%d", i)) - for j := 0; j < 2; j++ { - proto := protocol.ID(fmt.Sprintf("proto-%d", j)) - go func() { - defer wg.Done() + for i := 0; i < 40; i++ { + for i := 0; i < 100; i++ { + p := peer.ID(fmt.Sprintf("peer-%d", i)) + for j := 0; j < 2; j++ { + proto := protocol.ID(fmt.Sprintf("proto-%d", j)) // make sure the bandwidth counters are active bwc.LogSentMessage(100) @@ -70,27 +72,15 @@ func TestBandwidthCounter(t *testing.T) { bwc.LogSentMessageStream(100, proto, p) bwc.LogRecvMessageStream(50, proto, p) - <-start - - t := time.NewTicker(100 * time.Millisecond) - defer t.Stop() - - for i := 0; i < 39; i++ { - bwc.LogSentMessage(100) - bwc.LogRecvMessage(50) - bwc.LogSentMessageStream(100, proto, p) - bwc.LogRecvMessageStream(50, proto, p) - <-t.C - } - }() + // <-start + } } + cl.Add(100 * time.Millisecond) } assertProtocols := func(check func(Stats)) { byProtocol := bwc.GetBandwidthByProtocol() - if len(byProtocol) != 2 { - t.Errorf("expected 2 protocols, got %d", len(byProtocol)) - } + require.Len(t, byProtocol, 2, "expected 2 protocols") for i := 0; i < 2; i++ { p := protocol.ID(fmt.Sprintf("proto-%d", i)) for _, stats := range [...]Stats{bwc.GetBandwidthForProtocol(p), byProtocol[p]} { @@ -101,9 +91,7 @@ func TestBandwidthCounter(t *testing.T) { assertPeers := func(check func(Stats)) { byPeer := bwc.GetBandwidthByPeer() - if len(byPeer) != 100 { - t.Errorf("expected 100 peers, got %d", len(byPeer)) - } + require.Len(t, byPeer, 100, "expected 100 peers") for i := 0; i < 100; i++ { p := peer.ID(fmt.Sprintf("peer-%d", i)) for _, stats := range [...]Stats{bwc.GetBandwidthForPeer(p), byPeer[p]} { @@ -112,34 +100,22 @@ func TestBandwidthCounter(t *testing.T) { } } - time.Sleep(time.Second) - close(start) - - wg.Wait() - time.Sleep(1 * time.Second) - assertPeers(func(stats Stats) { - assertEq(t, 8000, stats.TotalOut) - assertEq(t, 4000, stats.TotalIn) + require.Equal(t, int64(8000), stats.TotalOut) + require.Equal(t, int64(4000), stats.TotalIn) }) assertProtocols(func(stats Stats) { - assertEq(t, 400000, stats.TotalOut) - assertEq(t, 200000, stats.TotalIn) + require.Equal(t, int64(400000), stats.TotalOut) + require.Equal(t, int64(200000), stats.TotalIn) }) - { - stats := bwc.GetBandwidthTotals() - assertEq(t, 800000, stats.TotalOut) - assertEq(t, 400000, stats.TotalIn) - } + stats := bwc.GetBandwidthTotals() + require.Equal(t, int64(800000), stats.TotalOut) + require.Equal(t, int64(400000), stats.TotalIn) } func TestResetBandwidthCounter(t *testing.T) { - if runtime.GOOS != "linux" { - // Specifically, it fails on MacOS because we need a high precision timer. - t.Skip("this test is highly timing dependent and only passes reliably on Linux") - } bwc := NewBandwidthCounter() p := peer.ID("peer-0") @@ -151,69 +127,44 @@ func TestResetBandwidthCounter(t *testing.T) { bwc.LogSentMessageStream(100, proto, p) bwc.LogRecvMessageStream(50, proto, p) - time.Sleep(1*time.Second + time.Millisecond) + time.Sleep(200 * time.Millisecond) // make sure the meters are registered with the sweeper + cl.Add(time.Second) bwc.LogSentMessage(42) bwc.LogRecvMessage(24) bwc.LogSentMessageStream(100, proto, p) bwc.LogRecvMessageStream(50, proto, p) - time.Sleep(1*time.Second + time.Millisecond) + cl.Add(time.Second) { stats := bwc.GetBandwidthTotals() - assertEq(t, 84, stats.TotalOut) - assertEq(t, 48, stats.TotalIn) + require.Equal(t, int64(84), stats.TotalOut) + require.Equal(t, int64(48), stats.TotalIn) } { stats := bwc.GetBandwidthByProtocol() - assertApproxEq(t, 1, float64(len(stats))) + require.Len(t, stats, 1) stat := stats[proto] - assertApproxEq(t, 100, stat.RateOut) - assertApproxEq(t, 50, stat.RateIn) + require.Equal(t, float64(100), stat.RateOut) + require.Equal(t, float64(50), stat.RateIn) } { stats := bwc.GetBandwidthByPeer() - assertApproxEq(t, 1, float64(len(stats))) + require.Len(t, stats, 1) stat := stats[p] - assertApproxEq(t, 100, stat.RateOut) - assertApproxEq(t, 50, stat.RateIn) + require.Equal(t, float64(100), stat.RateOut) + require.Equal(t, float64(50), stat.RateIn) } bwc.Reset() { stats := bwc.GetBandwidthTotals() - assertEq(t, 0, stats.TotalOut) - assertEq(t, 0, stats.TotalIn) - } - - { - byProtocol := bwc.GetBandwidthByProtocol() - if len(byProtocol) != 0 { - t.Errorf("expected 0 protocols, got %d", len(byProtocol)) - } - } - - { - byPeer := bwc.GetBandwidthByPeer() - if len(byPeer) != 0 { - t.Errorf("expected 0 peers, got %d", len(byPeer)) - } - } -} - -func assertEq(t *testing.T, expected, actual int64) { - if expected != actual { - t.Errorf("expected %d, got %d", expected, actual) - } -} - -func assertApproxEq(t *testing.T, expected, actual float64) { - t.Helper() - margin := expected * acceptableError - if !(math.Abs(expected-actual) <= margin) { - t.Errorf("expected %f (±%f), got %f", expected, margin, actual) + require.Zero(t, stats.TotalOut) + require.Zero(t, stats.TotalIn) + require.Empty(t, bwc.GetBandwidthByProtocol(), "expected 0 protocols") + require.Empty(t, bwc.GetBandwidthByPeer(), "expected 0 peers") } }