Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: drop read request if the leader is changed #10094

Merged
merged 5 commits into from
Sep 24, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions clientv3/integration/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,11 @@ func TestKVGetErrConnClosed(t *testing.T) {
cli := clus.Client(0)

donec := make(chan struct{})
if err := cli.Close(); err != nil {
t.Fatal(err)
}
clus.TakeClient(0)

go func() {
defer close(donec)
_, err := cli.Get(context.TODO(), "foo")
Expand All @@ -447,11 +452,6 @@ func TestKVGetErrConnClosed(t *testing.T) {
}
}()

if err := cli.Close(); err != nil {
t.Fatal(err)
}
clus.TakeClient(0)

select {
case <-time.After(integration.RequestWaitTimeout):
t.Fatal("kv.Get took too long")
Expand Down
7 changes: 3 additions & 4 deletions clientv3/integration/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,9 @@ func TestLeaseGrantErrConnClosed(t *testing.T) {

cli := clus.Client(0)
clus.TakeClient(0)
if err := cli.Close(); err != nil {
t.Fatal(err)
}

donec := make(chan struct{})
go func() {
Expand All @@ -303,10 +306,6 @@ func TestLeaseGrantErrConnClosed(t *testing.T) {
}
}()

if err := cli.Close(); err != nil {
t.Fatal(err)
}

select {
case <-time.After(integration.RequestWaitTimeout):
t.Fatal("le.Grant took too long")
Expand Down
1 change: 1 addition & 0 deletions clientv3/integration/leasing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1644,6 +1644,7 @@ func TestLeasingReconnectTxn(t *testing.T) {
clus.Members[0].DropConnections()
time.Sleep(time.Millisecond)
}
time.Sleep(10 * time.Millisecond)
}()

_, lerr := lkv.Txn(context.TODO()).
Expand Down
52 changes: 52 additions & 0 deletions clientv3/integration/network_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
"go.etcd.io/etcd/integration"
"go.etcd.io/etcd/pkg/testutil"
"google.golang.org/grpc"
Expand Down Expand Up @@ -265,3 +266,54 @@ func testBalancerUnderNetworkPartitionWatch(t *testing.T, isolateLeader bool) {
t.Fatal("took too long to detect leader lost")
}
}

func TestDropReadUnderNetworkPartition(t *testing.T) {
defer testutil.AfterTest(t)

clus := integration.NewClusterV3(t, &integration.ClusterConfig{
Size: 3,
SkipCreatingClient: true,
})
defer clus.Terminate(t)
leaderIndex := clus.WaitLeader(t)
// get a follower endpoint
eps := []string{clus.Members[(leaderIndex+1)%3].GRPCAddr()}
ccfg := clientv3.Config{
Endpoints: eps,
DialTimeout: 10 * time.Second,
DialOptions: []grpc.DialOption{grpc.WithBlock()},
}
cli, err := clientv3.New(ccfg)
if err != nil {
t.Fatal(err)
}
defer cli.Close()

// wait for eps[0] to be pinned
mustWaitPinReady(t, cli)

// add other endpoints for later endpoint switch
cli.SetEndpoints(eps...)
time.Sleep(time.Second * 2)
conn, err := cli.Dial(clus.Members[(leaderIndex+1)%3].GRPCAddr())
if err != nil {
t.Fatal(err)
}
defer conn.Close()

clus.Members[leaderIndex].InjectPartition(t, clus.Members[(leaderIndex+1)%3], clus.Members[(leaderIndex+2)%3])
kvc := clientv3.NewKVFromKVClient(pb.NewKVClient(conn), nil)
ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second)
_, err = kvc.Get(ctx, "a")
cancel()
if err != rpctypes.ErrLeaderChanged {
t.Fatalf("expected %v, got %v", rpctypes.ErrLeaderChanged, err)
}

ctx, cancel = context.WithTimeout(context.TODO(), 10*time.Second)
_, err = kvc.Get(ctx, "a")
cancel()
if err != nil {
t.Fatalf("expected nil, got %v", err)
}
}
3 changes: 3 additions & 0 deletions etcdserver/api/v3rpc/rpctypes/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ var (

ErrGRPCNoLeader = status.New(codes.Unavailable, "etcdserver: no leader").Err()
ErrGRPCNotLeader = status.New(codes.FailedPrecondition, "etcdserver: not leader").Err()
ErrGRPCLeaderChanged = status.New(codes.Unavailable, "etcdserver: leader changed").Err()
ErrGRPCNotCapable = status.New(codes.Unavailable, "etcdserver: not capable").Err()
ErrGRPCStopped = status.New(codes.Unavailable, "etcdserver: server stopped").Err()
ErrGRPCTimeout = status.New(codes.Unavailable, "etcdserver: request timed out").Err()
Expand Down Expand Up @@ -111,6 +112,7 @@ var (

ErrorDesc(ErrGRPCNoLeader): ErrGRPCNoLeader,
ErrorDesc(ErrGRPCNotLeader): ErrGRPCNotLeader,
ErrorDesc(ErrGRPCLeaderChanged): ErrGRPCLeaderChanged,
ErrorDesc(ErrGRPCNotCapable): ErrGRPCNotCapable,
ErrorDesc(ErrGRPCStopped): ErrGRPCStopped,
ErrorDesc(ErrGRPCTimeout): ErrGRPCTimeout,
Expand Down Expand Up @@ -163,6 +165,7 @@ var (

ErrNoLeader = Error(ErrGRPCNoLeader)
ErrNotLeader = Error(ErrGRPCNotLeader)
ErrLeaderChanged = Error(ErrGRPCLeaderChanged)
ErrNotCapable = Error(ErrGRPCNotCapable)
ErrStopped = Error(ErrGRPCStopped)
ErrTimeout = Error(ErrGRPCTimeout)
Expand Down
1 change: 1 addition & 0 deletions etcdserver/api/v3rpc/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ var toGRPCErrorMap = map[error]error{

etcdserver.ErrNoLeader: rpctypes.ErrGRPCNoLeader,
etcdserver.ErrNotLeader: rpctypes.ErrGRPCNotLeader,
etcdserver.ErrLeaderChanged: rpctypes.ErrGRPCLeaderChanged,
etcdserver.ErrStopped: rpctypes.ErrGRPCStopped,
etcdserver.ErrTimeout: rpctypes.ErrGRPCTimeout,
etcdserver.ErrTimeoutDueToLeaderFail: rpctypes.ErrGRPCTimeoutDueToLeaderFail,
Expand Down
1 change: 1 addition & 0 deletions etcdserver/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ var (
ErrTimeoutDueToLeaderFail = errors.New("etcdserver: request timed out, possibly due to previous leader failure")
ErrTimeoutDueToConnectionLost = errors.New("etcdserver: request timed out, possibly due to connection lost")
ErrTimeoutLeaderTransfer = errors.New("etcdserver: request timed out, leader transfer took too long")
ErrLeaderChanged = errors.New("etcdserver: leader changed")
ErrNotEnoughStartedMembers = errors.New("etcdserver: re-configuration failed due to not enough started members")
ErrNoLeader = errors.New("etcdserver: no leader")
ErrNotLeader = errors.New("etcdserver: not leader")
Expand Down
7 changes: 7 additions & 0 deletions etcdserver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ var (
Name: "slow_read_indexes_total",
Help: "The total number of pending read indexes not in sync with leader's or timed out read index requests.",
})
readIndexFailed = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "etcd",
Subsystem: "server",
Name: "read_indexes_failed_total",
Help: "The total number of failed read indexes seen.",
})
leaseExpired = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "etcd_debugging",
Subsystem: "server",
Expand Down Expand Up @@ -132,6 +138,7 @@ func init() {
prometheus.MustRegister(proposalsPending)
prometheus.MustRegister(proposalsFailed)
prometheus.MustRegister(slowReadIndex)
prometheus.MustRegister(readIndexFailed)
prometheus.MustRegister(leaseExpired)
prometheus.MustRegister(quotaBackendBytes)
prometheus.MustRegister(currentVersion)
Expand Down
18 changes: 17 additions & 1 deletion etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,9 @@ type EtcdServer struct {
stopping chan struct{}
// done is closed when all goroutines from start() complete.
done chan struct{}
// leaderChanged is used to notify the linearizable read loop to drop the old read requests.
leaderChanged chan struct{}
leaderChangedMu sync.RWMutex

errorc chan error
id types.ID
Expand Down Expand Up @@ -752,6 +755,7 @@ func (s *EtcdServer) start() {
s.ctx, s.cancel = context.WithCancel(context.Background())
s.readwaitc = make(chan struct{}, 1)
s.readNotifier = newNotifier()
s.leaderChanged = make(chan struct{})
if s.ClusterVersion() != nil {
if lg != nil {
lg.Info(
Expand Down Expand Up @@ -938,7 +942,13 @@ func (s *EtcdServer) run() {
s.compactor.Resume()
}
}

if newLeader {
s.leaderChangedMu.Lock()
lc := s.leaderChanged
s.leaderChanged = make(chan struct{})
close(lc)
s.leaderChangedMu.Unlock()
}
// TODO: remove the nil checking
// current test utility does not provide the stats
if s.stats != nil {
Expand Down Expand Up @@ -1688,6 +1698,12 @@ func (s *EtcdServer) getLead() uint64 {
return atomic.LoadUint64(&s.lead)
}

func (s *EtcdServer) leaderChangedNotify() <-chan struct{} {
s.leaderChangedMu.RLock()
defer s.leaderChangedMu.RUnlock()
return s.leaderChanged
}

// RaftStatusGetter represents etcd server and Raft progress.
type RaftStatusGetter interface {
ID() types.ID
Expand Down
10 changes: 9 additions & 1 deletion etcdserver/v3_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -634,8 +634,10 @@ func (s *EtcdServer) linearizableReadLoop() {
ctxToSend := make([]byte, 8)
id1 := s.reqIDGen.Next()
binary.BigEndian.PutUint64(ctxToSend, id1)

leaderChangedNotifier := s.leaderChangedNotify()
select {
case <-leaderChangedNotifier:
continue
case <-s.readwaitc:
case <-s.stopping:
return
Expand All @@ -660,6 +662,7 @@ func (s *EtcdServer) linearizableReadLoop() {
} else {
plog.Errorf("failed to get read index from raft: %v", err)
}
readIndexFailed.Inc()
nr.notify(err)
continue
}
Expand Down Expand Up @@ -691,6 +694,11 @@ func (s *EtcdServer) linearizableReadLoop() {
}
slowReadIndex.Inc()
}
case <-leaderChangedNotifier:
timeout = true
readIndexFailed.Inc()
// return a retryable error.
nr.notify(ErrLeaderChanged)
case <-time.After(s.Cfg.ReqTimeout()):
if lg != nil {
lg.Warn("timed out waiting for read index response (local node might have slow network)", zap.Duration("timeout", s.Cfg.ReqTimeout()))
Expand Down
3 changes: 2 additions & 1 deletion integration/v3_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,8 @@ func TestV3LeaseSwitch(t *testing.T) {
// election timeout after it loses its quorum. And the new leader extends the TTL of
// the lease to at least TTL + election timeout.
func TestV3LeaseFailover(t *testing.T) {
defer testutil.AfterTest(t)

clus := NewClusterV3(t, &ClusterConfig{Size: 3})
defer clus.Terminate(t)

Expand Down Expand Up @@ -568,7 +570,6 @@ func TestV3LeaseFailover(t *testing.T) {
if err != nil {
t.Fatal(err)
}
defer lac.CloseSend()

// send keep alive to old leader until the old leader starts
// to drop lease request.
Expand Down