From 4de27039cbec29be4f1f0cb4c6a611797f0e0ea5 Mon Sep 17 00:00:00 2001 From: nolouch Date: Fri, 14 Sep 2018 13:33:57 +0800 Subject: [PATCH 1/5] server: drop read request if found leader changed --- clientv3/integration/leasing_test.go | 1 - .../integration/network_partition_test.go | 52 +++++++++++++++++++ etcdserver/api/v3rpc/rpctypes/error.go | 3 ++ etcdserver/api/v3rpc/util.go | 1 + etcdserver/errors.go | 1 + etcdserver/metrics.go | 7 +++ etcdserver/server.go | 7 ++- etcdserver/v3_server.go | 8 +++ 8 files changed, 78 insertions(+), 2 deletions(-) diff --git a/clientv3/integration/leasing_test.go b/clientv3/integration/leasing_test.go index a824726f0cd..78826b1898f 100644 --- a/clientv3/integration/leasing_test.go +++ b/clientv3/integration/leasing_test.go @@ -1790,7 +1790,6 @@ func TestLeasingTxnOwnerPutBranch(t *testing.T) { defer testutil.AfterTest(t) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) defer clus.Terminate(t) - lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "foo/") testutil.AssertNil(t, err) defer closeLKV() diff --git a/clientv3/integration/network_partition_test.go b/clientv3/integration/network_partition_test.go index 637d94d22dd..388eb4e0755 100644 --- a/clientv3/integration/network_partition_test.go +++ b/clientv3/integration/network_partition_test.go @@ -24,6 +24,7 @@ import ( "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" + pb "go.etcd.io/etcd/etcdserver/etcdserverpb" "go.etcd.io/etcd/integration" "go.etcd.io/etcd/pkg/testutil" "google.golang.org/grpc" @@ -265,3 +266,54 @@ func testBalancerUnderNetworkPartitionWatch(t *testing.T, isolateLeader bool) { t.Fatal("took too long to detect leader lost") } } + +func TestDropReadUnderNetworkPartition(t *testing.T) { + defer testutil.AfterTest(t) + + clus := integration.NewClusterV3(t, &integration.ClusterConfig{ + Size: 3, + SkipCreatingClient: true, + }) + defer clus.Terminate(t) + leaderIndex := clus.WaitLeader(t) + // get a follower endpoint + eps := []string{clus.Members[(leaderIndex+1)%3].GRPCAddr()} + ccfg := clientv3.Config{ + Endpoints: eps, + DialTimeout: 10 * time.Second, + DialOptions: []grpc.DialOption{grpc.WithBlock()}, + } + cli, err := clientv3.New(ccfg) + if err != nil { + t.Fatal(err) + } + defer cli.Close() + + // wait for eps[0] to be pinned + mustWaitPinReady(t, cli) + + // add other endpoints for later endpoint switch + cli.SetEndpoints(eps...) + time.Sleep(time.Second * 2) + conn, err := cli.Dial(clus.Members[(leaderIndex+1)%3].GRPCAddr()) + if err != nil { + t.Fatal(err) + } + defer conn.Close() + + clus.Members[leaderIndex].InjectPartition(t, clus.Members[(leaderIndex+1)%3], clus.Members[(leaderIndex+2)%3]) + kvc := clientv3.NewKVFromKVClient(pb.NewKVClient(conn), nil) + ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) + _, err = kvc.Get(ctx, "a") + cancel() + if err != rpctypes.ErrLeaderChanged { + t.Fatalf("expected %v, got %v", rpctypes.ErrLeaderChanged, err) + } + + ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) + _, err = kvc.Get(ctx, "a") + cancel() + if err != nil { + t.Fatalf("expected nil, got %v", err) + } +} diff --git a/etcdserver/api/v3rpc/rpctypes/error.go b/etcdserver/api/v3rpc/rpctypes/error.go index 55eab38ef17..9e45cea5b6e 100644 --- a/etcdserver/api/v3rpc/rpctypes/error.go +++ b/etcdserver/api/v3rpc/rpctypes/error.go @@ -61,6 +61,7 @@ var ( ErrGRPCNoLeader = status.New(codes.Unavailable, "etcdserver: no leader").Err() ErrGRPCNotLeader = status.New(codes.FailedPrecondition, "etcdserver: not leader").Err() + ErrGRPCLeaderChanged = status.New(codes.Unavailable, "etcdserver: leader changed").Err() ErrGRPCNotCapable = status.New(codes.Unavailable, "etcdserver: not capable").Err() ErrGRPCStopped = status.New(codes.Unavailable, "etcdserver: server stopped").Err() ErrGRPCTimeout = status.New(codes.Unavailable, "etcdserver: request timed out").Err() @@ -111,6 +112,7 @@ var ( ErrorDesc(ErrGRPCNoLeader): ErrGRPCNoLeader, ErrorDesc(ErrGRPCNotLeader): ErrGRPCNotLeader, + ErrorDesc(ErrGRPCLeaderChanged): ErrGRPCLeaderChanged, ErrorDesc(ErrGRPCNotCapable): ErrGRPCNotCapable, ErrorDesc(ErrGRPCStopped): ErrGRPCStopped, ErrorDesc(ErrGRPCTimeout): ErrGRPCTimeout, @@ -163,6 +165,7 @@ var ( ErrNoLeader = Error(ErrGRPCNoLeader) ErrNotLeader = Error(ErrGRPCNotLeader) + ErrLeaderChanged = Error(ErrGRPCLeaderChanged) ErrNotCapable = Error(ErrGRPCNotCapable) ErrStopped = Error(ErrGRPCStopped) ErrTimeout = Error(ErrGRPCTimeout) diff --git a/etcdserver/api/v3rpc/util.go b/etcdserver/api/v3rpc/util.go index 62c6c12bb52..5887dfeba44 100644 --- a/etcdserver/api/v3rpc/util.go +++ b/etcdserver/api/v3rpc/util.go @@ -44,6 +44,7 @@ var toGRPCErrorMap = map[error]error{ etcdserver.ErrNoLeader: rpctypes.ErrGRPCNoLeader, etcdserver.ErrNotLeader: rpctypes.ErrGRPCNotLeader, + etcdserver.ErrLeaderChanged: rpctypes.ErrGRPCLeaderChanged, etcdserver.ErrStopped: rpctypes.ErrGRPCStopped, etcdserver.ErrTimeout: rpctypes.ErrGRPCTimeout, etcdserver.ErrTimeoutDueToLeaderFail: rpctypes.ErrGRPCTimeoutDueToLeaderFail, diff --git a/etcdserver/errors.go b/etcdserver/errors.go index fb93c4b2a1d..8cec52a177b 100644 --- a/etcdserver/errors.go +++ b/etcdserver/errors.go @@ -27,6 +27,7 @@ var ( ErrTimeoutDueToLeaderFail = errors.New("etcdserver: request timed out, possibly due to previous leader failure") ErrTimeoutDueToConnectionLost = errors.New("etcdserver: request timed out, possibly due to connection lost") ErrTimeoutLeaderTransfer = errors.New("etcdserver: request timed out, leader transfer took too long") + ErrLeaderChanged = errors.New("etcdserver: leader changed") ErrNotEnoughStartedMembers = errors.New("etcdserver: re-configuration failed due to not enough started members") ErrNoLeader = errors.New("etcdserver: no leader") ErrNotLeader = errors.New("etcdserver: not leader") diff --git a/etcdserver/metrics.go b/etcdserver/metrics.go index 99dbea96f49..748e7edb5da 100644 --- a/etcdserver/metrics.go +++ b/etcdserver/metrics.go @@ -86,6 +86,12 @@ var ( Name: "slow_read_indexes_total", Help: "The total number of pending read indexes not in sync with leader's or timed out read index requests.", }) + readIndexFailed = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "etcd", + Subsystem: "server", + Name: "read_indexes_failed_total", + Help: "The total number of failed read indexes seen.", + }) leaseExpired = prometheus.NewCounter(prometheus.CounterOpts{ Namespace: "etcd_debugging", Subsystem: "server", @@ -132,6 +138,7 @@ func init() { prometheus.MustRegister(proposalsPending) prometheus.MustRegister(proposalsFailed) prometheus.MustRegister(slowReadIndex) + prometheus.MustRegister(readIndexFailed) prometheus.MustRegister(leaseExpired) prometheus.MustRegister(quotaBackendBytes) prometheus.MustRegister(currentVersion) diff --git a/etcdserver/server.go b/etcdserver/server.go index 50e84592336..178861745a3 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -212,6 +212,8 @@ type EtcdServer struct { stopping chan struct{} // done is closed when all goroutines from start() complete. done chan struct{} + // leaderChanged is used to notify the linearizable read loop to drop the old read requests. + leaderChanged chan struct{} errorc chan error id types.ID @@ -752,6 +754,7 @@ func (s *EtcdServer) start() { s.ctx, s.cancel = context.WithCancel(context.Background()) s.readwaitc = make(chan struct{}, 1) s.readNotifier = newNotifier() + s.leaderChanged = make(chan struct{}, 1) if s.ClusterVersion() != nil { if lg != nil { lg.Info( @@ -938,7 +941,9 @@ func (s *EtcdServer) run() { s.compactor.Resume() } } - + if newLeader { + s.leaderChanged <- struct{}{} + } // TODO: remove the nil checking // current test utility does not provide the stats if s.stats != nil { diff --git a/etcdserver/v3_server.go b/etcdserver/v3_server.go index 6fa89969c61..5bcb7fc32d0 100644 --- a/etcdserver/v3_server.go +++ b/etcdserver/v3_server.go @@ -636,6 +636,8 @@ func (s *EtcdServer) linearizableReadLoop() { binary.BigEndian.PutUint64(ctxToSend, id1) select { + case <-s.leaderChanged: + continue case <-s.readwaitc: case <-s.stopping: return @@ -660,6 +662,7 @@ func (s *EtcdServer) linearizableReadLoop() { } else { plog.Errorf("failed to get read index from raft: %v", err) } + readIndexFailed.Inc() nr.notify(err) continue } @@ -691,6 +694,11 @@ func (s *EtcdServer) linearizableReadLoop() { } slowReadIndex.Inc() } + case <-s.leaderChanged: + timeout = true + readIndexFailed.Inc() + // return a retryable error. + nr.notify(ErrLeaderChanged) case <-time.After(s.Cfg.ReqTimeout()): if lg != nil { lg.Warn("timed out waiting for read index response (local node might have slow network)", zap.Duration("timeout", s.Cfg.ReqTimeout())) From f3f6427586e24a827140fb8e079d553f085b0af3 Mon Sep 17 00:00:00 2001 From: nolouch Date: Fri, 14 Sep 2018 16:07:12 +0800 Subject: [PATCH 2/5] server: prevent blocking --- clientv3/integration/leasing_test.go | 1 + etcdserver/server.go | 5 ++++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/clientv3/integration/leasing_test.go b/clientv3/integration/leasing_test.go index 78826b1898f..a824726f0cd 100644 --- a/clientv3/integration/leasing_test.go +++ b/clientv3/integration/leasing_test.go @@ -1790,6 +1790,7 @@ func TestLeasingTxnOwnerPutBranch(t *testing.T) { defer testutil.AfterTest(t) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) defer clus.Terminate(t) + lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "foo/") testutil.AssertNil(t, err) defer closeLKV() diff --git a/etcdserver/server.go b/etcdserver/server.go index 178861745a3..fc070ce51a7 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -942,7 +942,10 @@ func (s *EtcdServer) run() { } } if newLeader { - s.leaderChanged <- struct{}{} + select { + case s.leaderChanged <- struct{}{}: + default: + } } // TODO: remove the nil checking // current test utility does not provide the stats From fd5ef74b8008d0adfc96e95824f97067cf0a60eb Mon Sep 17 00:00:00 2001 From: nolouch Date: Fri, 14 Sep 2018 17:57:56 +0800 Subject: [PATCH 3/5] clientv3/integration: try to fix tests --- clientv3/integration/leasing_test.go | 1 + integration/v3_lease_test.go | 2 ++ 2 files changed, 3 insertions(+) diff --git a/clientv3/integration/leasing_test.go b/clientv3/integration/leasing_test.go index a824726f0cd..affb6642a21 100644 --- a/clientv3/integration/leasing_test.go +++ b/clientv3/integration/leasing_test.go @@ -1644,6 +1644,7 @@ func TestLeasingReconnectTxn(t *testing.T) { clus.Members[0].DropConnections() time.Sleep(time.Millisecond) } + time.Sleep(10 * time.Millisecond) }() _, lerr := lkv.Txn(context.TODO()). diff --git a/integration/v3_lease_test.go b/integration/v3_lease_test.go index 756407a3784..8a3d93cdab7 100644 --- a/integration/v3_lease_test.go +++ b/integration/v3_lease_test.go @@ -539,6 +539,8 @@ func TestV3LeaseSwitch(t *testing.T) { // election timeout after it loses its quorum. And the new leader extends the TTL of // the lease to at least TTL + election timeout. func TestV3LeaseFailover(t *testing.T) { + defer testutil.AfterTest(t) + clus := NewClusterV3(t, &ClusterConfig{Size: 3}) defer clus.Terminate(t) From c15fb607f6b5c5d0b7f817065f0b3e61325e0655 Mon Sep 17 00:00:00 2001 From: nolouch Date: Mon, 17 Sep 2018 12:02:24 +0800 Subject: [PATCH 4/5] server: broadcast leader changed --- .../integration/network_partition_test.go | 4 ++-- etcdserver/server.go | 20 +++++++++++++------ etcdserver/v3_server.go | 6 +++--- integration/v3_lease_test.go | 6 ++++-- 4 files changed, 23 insertions(+), 13 deletions(-) diff --git a/clientv3/integration/network_partition_test.go b/clientv3/integration/network_partition_test.go index 388eb4e0755..2e9b9965d45 100644 --- a/clientv3/integration/network_partition_test.go +++ b/clientv3/integration/network_partition_test.go @@ -303,14 +303,14 @@ func TestDropReadUnderNetworkPartition(t *testing.T) { clus.Members[leaderIndex].InjectPartition(t, clus.Members[(leaderIndex+1)%3], clus.Members[(leaderIndex+2)%3]) kvc := clientv3.NewKVFromKVClient(pb.NewKVClient(conn), nil) - ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) + ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second) _, err = kvc.Get(ctx, "a") cancel() if err != rpctypes.ErrLeaderChanged { t.Fatalf("expected %v, got %v", rpctypes.ErrLeaderChanged, err) } - ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) + ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second) _, err = kvc.Get(ctx, "a") cancel() if err != nil { diff --git a/etcdserver/server.go b/etcdserver/server.go index fc070ce51a7..90473e666ce 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -213,7 +213,8 @@ type EtcdServer struct { // done is closed when all goroutines from start() complete. done chan struct{} // leaderChanged is used to notify the linearizable read loop to drop the old read requests. - leaderChanged chan struct{} + leaderChanged chan struct{} + leaderChangedMu sync.RWMutex errorc chan error id types.ID @@ -754,7 +755,7 @@ func (s *EtcdServer) start() { s.ctx, s.cancel = context.WithCancel(context.Background()) s.readwaitc = make(chan struct{}, 1) s.readNotifier = newNotifier() - s.leaderChanged = make(chan struct{}, 1) + s.leaderChanged = make(chan struct{}) if s.ClusterVersion() != nil { if lg != nil { lg.Info( @@ -942,10 +943,11 @@ func (s *EtcdServer) run() { } } if newLeader { - select { - case s.leaderChanged <- struct{}{}: - default: - } + s.leaderChangedMu.Lock() + lc := s.leaderChanged + s.leaderChanged = make(chan struct{}) + s.leaderChangedMu.Unlock() + close(lc) } // TODO: remove the nil checking // current test utility does not provide the stats @@ -1696,6 +1698,12 @@ func (s *EtcdServer) getLead() uint64 { return atomic.LoadUint64(&s.lead) } +func (s *EtcdServer) leaderChangedNotify() <-chan struct{} { + s.leaderChangedMu.RLock() + defer s.leaderChangedMu.RUnlock() + return s.leaderChanged +} + // RaftStatusGetter represents etcd server and Raft progress. type RaftStatusGetter interface { ID() types.ID diff --git a/etcdserver/v3_server.go b/etcdserver/v3_server.go index 5bcb7fc32d0..d425b634d9a 100644 --- a/etcdserver/v3_server.go +++ b/etcdserver/v3_server.go @@ -634,9 +634,9 @@ func (s *EtcdServer) linearizableReadLoop() { ctxToSend := make([]byte, 8) id1 := s.reqIDGen.Next() binary.BigEndian.PutUint64(ctxToSend, id1) - + leaderChangedNotifier := s.leaderChangedNotify() select { - case <-s.leaderChanged: + case <-leaderChangedNotifier: continue case <-s.readwaitc: case <-s.stopping: @@ -694,7 +694,7 @@ func (s *EtcdServer) linearizableReadLoop() { } slowReadIndex.Inc() } - case <-s.leaderChanged: + case <-leaderChangedNotifier: timeout = true readIndexFailed.Inc() // return a retryable error. diff --git a/integration/v3_lease_test.go b/integration/v3_lease_test.go index 8a3d93cdab7..92899b6707f 100644 --- a/integration/v3_lease_test.go +++ b/integration/v3_lease_test.go @@ -565,12 +565,14 @@ func TestV3LeaseFailover(t *testing.T) { md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader) mctx := metadata.NewOutgoingContext(context.Background(), md) ctx, cancel := context.WithCancel(mctx) - defer cancel() lac, err := lc.LeaseKeepAlive(ctx) if err != nil { t.Fatal(err) } - defer lac.CloseSend() + defer func() { + lac.CloseSend() + cancel() + }() // send keep alive to old leader until the old leader starts // to drop lease request. From 6ea54195a666567deeff29145d4abee51a804269 Mon Sep 17 00:00:00 2001 From: nolouch Date: Mon, 17 Sep 2018 15:43:06 +0800 Subject: [PATCH 5/5] client/integration: try to fix tests --- clientv3/integration/kv_test.go | 10 +++++----- clientv3/integration/lease_test.go | 7 +++---- clientv3/integration/network_partition_test.go | 2 +- etcdserver/server.go | 2 +- integration/v3_lease_test.go | 5 +---- 5 files changed, 11 insertions(+), 15 deletions(-) diff --git a/clientv3/integration/kv_test.go b/clientv3/integration/kv_test.go index 525c1ab2e98..e8d22e538f0 100644 --- a/clientv3/integration/kv_test.go +++ b/clientv3/integration/kv_test.go @@ -439,6 +439,11 @@ func TestKVGetErrConnClosed(t *testing.T) { cli := clus.Client(0) donec := make(chan struct{}) + if err := cli.Close(); err != nil { + t.Fatal(err) + } + clus.TakeClient(0) + go func() { defer close(donec) _, err := cli.Get(context.TODO(), "foo") @@ -447,11 +452,6 @@ func TestKVGetErrConnClosed(t *testing.T) { } }() - if err := cli.Close(); err != nil { - t.Fatal(err) - } - clus.TakeClient(0) - select { case <-time.After(integration.RequestWaitTimeout): t.Fatal("kv.Get took too long") diff --git a/clientv3/integration/lease_test.go b/clientv3/integration/lease_test.go index 699862baaa5..6fd8f4306aa 100644 --- a/clientv3/integration/lease_test.go +++ b/clientv3/integration/lease_test.go @@ -291,6 +291,9 @@ func TestLeaseGrantErrConnClosed(t *testing.T) { cli := clus.Client(0) clus.TakeClient(0) + if err := cli.Close(); err != nil { + t.Fatal(err) + } donec := make(chan struct{}) go func() { @@ -303,10 +306,6 @@ func TestLeaseGrantErrConnClosed(t *testing.T) { } }() - if err := cli.Close(); err != nil { - t.Fatal(err) - } - select { case <-time.After(integration.RequestWaitTimeout): t.Fatal("le.Grant took too long") diff --git a/clientv3/integration/network_partition_test.go b/clientv3/integration/network_partition_test.go index 2e9b9965d45..a67b631acb5 100644 --- a/clientv3/integration/network_partition_test.go +++ b/clientv3/integration/network_partition_test.go @@ -310,7 +310,7 @@ func TestDropReadUnderNetworkPartition(t *testing.T) { t.Fatalf("expected %v, got %v", rpctypes.ErrLeaderChanged, err) } - ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second) + ctx, cancel = context.WithTimeout(context.TODO(), 10*time.Second) _, err = kvc.Get(ctx, "a") cancel() if err != nil { diff --git a/etcdserver/server.go b/etcdserver/server.go index 90473e666ce..dafb0fad3f6 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -946,8 +946,8 @@ func (s *EtcdServer) run() { s.leaderChangedMu.Lock() lc := s.leaderChanged s.leaderChanged = make(chan struct{}) - s.leaderChangedMu.Unlock() close(lc) + s.leaderChangedMu.Unlock() } // TODO: remove the nil checking // current test utility does not provide the stats diff --git a/integration/v3_lease_test.go b/integration/v3_lease_test.go index 92899b6707f..86b6f8d9998 100644 --- a/integration/v3_lease_test.go +++ b/integration/v3_lease_test.go @@ -565,14 +565,11 @@ func TestV3LeaseFailover(t *testing.T) { md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader) mctx := metadata.NewOutgoingContext(context.Background(), md) ctx, cancel := context.WithCancel(mctx) + defer cancel() lac, err := lc.LeaseKeepAlive(ctx) if err != nil { t.Fatal(err) } - defer func() { - lac.CloseSend() - cancel() - }() // send keep alive to old leader until the old leader starts // to drop lease request.