diff --git a/clientv3/integration/kv_test.go b/clientv3/integration/kv_test.go index 33ed7ecba13c..f41b952976b7 100644 --- a/clientv3/integration/kv_test.go +++ b/clientv3/integration/kv_test.go @@ -825,53 +825,6 @@ func TestKVPutStoppedServerAndClose(t *testing.T) { } } -// TestKVGetResetLoneEndpoint ensures that if an endpoint resets and all other -// endpoints are down, then it will reconnect. -func TestKVGetResetLoneEndpoint(t *testing.T) { - defer testutil.AfterTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 2, SkipCreatingClient: true}) - defer clus.Terminate(t) - - // get endpoint list - eps := make([]string, 2) - for i := range eps { - eps[i] = clus.Members[i].GRPCAddr() - } - - cfg := clientv3.Config{Endpoints: eps, DialTimeout: 500 * time.Millisecond} - cli, err := clientv3.New(cfg) - if err != nil { - t.Fatal(err) - } - defer cli.Close() - - // disconnect everything - clus.Members[0].Stop(t) - clus.Members[1].Stop(t) - - // have Get try to reconnect - donec := make(chan struct{}) - go func() { - // 3-second is the minimum interval between endpoint being marked - // as unhealthy and being removed from unhealthy, so possibly - // takes >5-second to unpin and repin an endpoint - // TODO: decrease timeout when balancer switch rewrite - ctx, cancel := context.WithTimeout(context.TODO(), 7*time.Second) - if _, err := cli.Get(ctx, "abc", clientv3.WithSerializable()); err != nil { - t.Fatal(err) - } - cancel() - close(donec) - }() - time.Sleep(500 * time.Millisecond) - clus.Members[0].Restart(t) - select { - case <-time.After(10 * time.Second): - t.Fatalf("timed out waiting for Get") - case <-donec: - } -} - // TestKVPutAtMostOnce ensures that a Put will only occur at most once // in the presence of network errors. func TestKVPutAtMostOnce(t *testing.T) { diff --git a/clientv3/integration/server_shutdown_test.go b/clientv3/integration/server_shutdown_test.go index 74780030f9cf..a56dee1341dd 100644 --- a/clientv3/integration/server_shutdown_test.go +++ b/clientv3/integration/server_shutdown_test.go @@ -235,3 +235,117 @@ func testBalancerUnderServerShutdownImmutable(t *testing.T, op func(*clientv3.Cl t.Errorf("failed to finish range request in time %v (timeout %v)", err, timeout) } } + +func TestBalancerUnderServerStopInflightLinearizableGetOnRestart(t *testing.T) { + tt := []pinTestOpt{ + {pinLeader: true, stopPinFirst: true}, + {pinLeader: true, stopPinFirst: false}, + {pinLeader: false, stopPinFirst: true}, + {pinLeader: false, stopPinFirst: false}, + } + for i := range tt { + testBalancerUnderServerStopInflightRangeOnRestart(t, true, tt[i]) + } +} + +func TestBalancerUnderServerStopInflightSerializableGetOnRestart(t *testing.T) { + tt := []pinTestOpt{ + {pinLeader: true, stopPinFirst: true}, + {pinLeader: true, stopPinFirst: false}, + {pinLeader: false, stopPinFirst: true}, + {pinLeader: false, stopPinFirst: false}, + } + for i := range tt { + testBalancerUnderServerStopInflightRangeOnRestart(t, false, tt[i]) + } +} + +type pinTestOpt struct { + pinLeader bool + stopPinFirst bool +} + +// testBalancerUnderServerStopInflightRangeOnRestart expects +// inflight range request reconnects on server restart. +func testBalancerUnderServerStopInflightRangeOnRestart(t *testing.T, linearizable bool, opt pinTestOpt) { + defer testutil.AfterTest(t) + + cfg := &integration.ClusterConfig{ + Size: 2, + SkipCreatingClient: true, + } + if linearizable { + cfg.Size = 3 + } + + clus := integration.NewClusterV3(t, cfg) + defer clus.Terminate(t) + eps := []string{clus.Members[0].GRPCAddr(), clus.Members[1].GRPCAddr()} + if linearizable { + eps = append(eps, clus.Members[2].GRPCAddr()) + } + + lead := clus.WaitLeader(t) + + target := lead + if !opt.pinLeader { + target = (target + 1) % 2 + } + + // pin eps[target] + cli, err := clientv3.New(clientv3.Config{Endpoints: []string{eps[target]}}) + if err != nil { + t.Errorf("failed to create client: %v", err) + } + defer cli.Close() + + // wait for eps[target] to be pinned + mustWaitPinReady(t, cli) + + // add all eps to list, so that when the original pined one fails + // the client can switch to other available eps + cli.SetEndpoints(eps...) + + if opt.stopPinFirst { + clus.Members[target].Stop(t) + // give some time for balancer switch before stopping the other + time.Sleep(time.Second) + clus.Members[(target+1)%2].Stop(t) + } else { + clus.Members[(target+1)%2].Stop(t) + // balancer cannot pin other member since it's already stopped + clus.Members[target].Stop(t) + } + + // 3-second is the minimum interval between endpoint being marked + // as unhealthy and being removed from unhealthy, so possibly + // takes >5-second to unpin and repin an endpoint + // TODO: decrease timeout when balancer switch rewrite + clientTimeout := 7 * time.Second + + var gops []clientv3.OpOption + if !linearizable { + gops = append(gops, clientv3.WithSerializable()) + } + + donec, readyc := make(chan struct{}), make(chan struct{}, 1) + go func() { + defer close(donec) + ctx, cancel := context.WithTimeout(context.TODO(), clientTimeout) + readyc <- struct{}{} + _, err := cli.Get(ctx, "abc", gops...) + cancel() + if err != nil { + t.Fatal(err) + } + }() + + <-readyc + clus.Members[target].Restart(t) + + select { + case <-time.After(clientTimeout + 3*time.Second): + t.Fatalf("timed out waiting for Get [linearizable: %v, opt: %+v]", linearizable, opt) + case <-donec: + } +}