From f2a912a4e62d92d9f1977fd6c639f61bf10cb47a Mon Sep 17 00:00:00 2001 From: lzhfromustc Date: Wed, 9 Dec 2020 22:23:21 -0500 Subject: [PATCH] test: change channel operations to avoid potential goroutine leaks In these unit tests, goroutines may leak if certain branches are chosen. This commit edits channel operations and buffer sizes, so no matter what branch is chosen, the test will end correctly. This commit doesn't change the semantics of unit tests. --- pkg/transport/timeout_dialer_test.go | 6 ++--- server/etcdserver/server_test.go | 2 +- .../recipes/v3_double_barrier_test.go | 24 ++++++++++--------- tests/integration/clientv3/kv_test.go | 2 +- tests/integration/v2_http_kv_test.go | 2 +- tests/integration/v3_watch_test.go | 2 +- 6 files changed, 20 insertions(+), 18 deletions(-) diff --git a/pkg/transport/timeout_dialer_test.go b/pkg/transport/timeout_dialer_test.go index ff188a78750..a2ff0021473 100644 --- a/pkg/transport/timeout_dialer_test.go +++ b/pkg/transport/timeout_dialer_test.go @@ -22,14 +22,14 @@ import ( func TestReadWriteTimeoutDialer(t *testing.T) { stop := make(chan struct{}) - defer func() { - stop <- struct{}{} - }() ln, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { t.Fatalf("unexpected listen error: %v", err) } + defer func() { + stop <- struct{}{} + }() ts := testBlockingServer{ln, 2, stop} go ts.Start(t) diff --git a/server/etcdserver/server_test.go b/server/etcdserver/server_test.go index bb2fd64b17e..5285b356b69 100644 --- a/server/etcdserver/server_test.go +++ b/server/etcdserver/server_test.go @@ -213,7 +213,7 @@ func TestApplyRepeat(t *testing.T) { // wait for conf change message act, err := n.Wait(1) // wait for stop message (async to avoid deadlock) - stopc := make(chan error) + stopc := make(chan error, 1) go func() { _, werr := n.Wait(1) stopc <- werr diff --git a/tests/integration/clientv3/experimental/recipes/v3_double_barrier_test.go b/tests/integration/clientv3/experimental/recipes/v3_double_barrier_test.go index 73ad8cd3d2f..feaf9a6f4db 100644 --- a/tests/integration/clientv3/experimental/recipes/v3_double_barrier_test.go +++ b/tests/integration/clientv3/experimental/recipes/v3_double_barrier_test.go @@ -36,6 +36,7 @@ func TestDoubleBarrier(t *testing.T) { b := recipe.NewDoubleBarrier(session, "test-barrier", waiters) donec := make(chan struct{}) + defer close(donec) for i := 0; i < waiters-1; i++ { go func() { session, err := concurrency.NewSession(clus.RandClient()) @@ -48,17 +49,17 @@ func TestDoubleBarrier(t *testing.T) { if err := bb.Enter(); err != nil { t.Errorf("could not enter on barrier (%v)", err) } - donec <- struct{}{} + <-donec if err := bb.Leave(); err != nil { t.Errorf("could not leave on barrier (%v)", err) } - donec <- struct{}{} + <-donec }() } time.Sleep(10 * time.Millisecond) select { - case <-donec: + case donec <- struct{}{}: t.Fatalf("barrier did not enter-wait") default: } @@ -72,13 +73,13 @@ func TestDoubleBarrier(t *testing.T) { select { case <-timerC: t.Fatalf("barrier enter timed out") - case <-donec: + case donec <- struct{}{}: } } time.Sleep(10 * time.Millisecond) select { - case <-donec: + case donec <- struct{}{}: t.Fatalf("barrier did not leave-wait") default: } @@ -89,7 +90,7 @@ func TestDoubleBarrier(t *testing.T) { select { case <-timerC: t.Fatalf("barrier leave timed out") - case <-donec: + case donec <- struct{}{}: } } } @@ -100,6 +101,7 @@ func TestDoubleBarrierFailover(t *testing.T) { waiters := 10 donec := make(chan struct{}) + defer close(donec) s0, err := concurrency.NewSession(clus.Client(0)) if err != nil { @@ -118,7 +120,7 @@ func TestDoubleBarrierFailover(t *testing.T) { if berr := b.Enter(); berr != nil { t.Errorf("could not enter on barrier (%v)", berr) } - donec <- struct{}{} + <-donec }() for i := 0; i < waiters-1; i++ { @@ -127,16 +129,16 @@ func TestDoubleBarrierFailover(t *testing.T) { if berr := b.Enter(); berr != nil { t.Errorf("could not enter on barrier (%v)", berr) } - donec <- struct{}{} + <-donec b.Leave() - donec <- struct{}{} + <-donec }() } // wait for barrier enter to unblock for i := 0; i < waiters; i++ { select { - case <-donec: + case donec <- struct{}{}: case <-time.After(10 * time.Second): t.Fatalf("timed out waiting for enter, %d", i) } @@ -148,7 +150,7 @@ func TestDoubleBarrierFailover(t *testing.T) { // join on rest of waiters for i := 0; i < waiters-1; i++ { select { - case <-donec: + case donec <- struct{}{}: case <-time.After(10 * time.Second): t.Fatalf("timed out waiting for leave, %d", i) } diff --git a/tests/integration/clientv3/kv_test.go b/tests/integration/clientv3/kv_test.go index 1b87515eaca..2a3cd011866 100644 --- a/tests/integration/clientv3/kv_test.go +++ b/tests/integration/clientv3/kv_test.go @@ -779,7 +779,7 @@ func TestKVPutFailGetRetry(t *testing.T) { t.Fatalf("got success on disconnected put, wanted error") } - donec := make(chan struct{}) + donec := make(chan struct{}, 1) go func() { // Get will fail, but reconnect will trigger gresp, gerr := kv.Get(context.TODO(), "foo") diff --git a/tests/integration/v2_http_kv_test.go b/tests/integration/v2_http_kv_test.go index 76d523ea46e..daba250e84a 100644 --- a/tests/integration/v2_http_kv_test.go +++ b/tests/integration/v2_http_kv_test.go @@ -954,7 +954,7 @@ func TestV2WatchKeyInDir(t *testing.T) { tc := NewTestClient() var body map[string]interface{} - c := make(chan bool) + c := make(chan bool, 1) // Create an expiring directory v := url.Values{} diff --git a/tests/integration/v3_watch_test.go b/tests/integration/v3_watch_test.go index 140ce250c31..ba09e3a6e22 100644 --- a/tests/integration/v3_watch_test.go +++ b/tests/integration/v3_watch_test.go @@ -1097,7 +1097,7 @@ func TestV3WatchWithFilter(t *testing.T) { t.Fatal(err) } - recv := make(chan *pb.WatchResponse) + recv := make(chan *pb.WatchResponse, 1) go func() { // check received PUT resp, rerr := ws.Recv()