diff --git a/clientv3/integration/kv_test.go b/clientv3/integration/kv_test.go index e28a1cdd3181..52a1db59b0b4 100644 --- a/clientv3/integration/kv_test.go +++ b/clientv3/integration/kv_test.go @@ -16,6 +16,7 @@ package integration import ( "bytes" + "fmt" "math/rand" "reflect" "strings" @@ -536,52 +537,56 @@ func TestKVCompact(t *testing.T) { func TestKVGetRetry(t *testing.T) { defer testutil.AfterTest(t) - clusterSize := 3 - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: clusterSize}) - defer clus.Terminate(t) + for i := 0; ; i++ { + fmt.Println(i) + clusterSize := 3 + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: clusterSize}) - // because killing leader and following election - // could give no other endpoints for client reconnection - fIdx := (clus.WaitLeader(t) + 1) % clusterSize + // because killing leader and following election + // could give no other endpoints for client reconnection + fIdx := (clus.WaitLeader(t) + 1) % clusterSize - kv := clientv3.NewKV(clus.Client(fIdx)) - ctx := context.TODO() + kv := clientv3.NewKV(clus.Client(fIdx)) + ctx := context.TODO() - if _, err := kv.Put(ctx, "foo", "bar"); err != nil { - t.Fatal(err) - } + if _, err := kv.Put(ctx, "foo", "bar"); err != nil { + t.Fatal(err) + } - clus.Members[fIdx].Stop(t) + clus.Members[fIdx].Stop(t) - donec := make(chan struct{}) - go func() { - // Get will fail, but reconnect will trigger - gresp, gerr := kv.Get(ctx, "foo") - if gerr != nil { - t.Fatal(gerr) - } - wkvs := []*mvccpb.KeyValue{ - { - Key: []byte("foo"), - Value: []byte("bar"), - CreateRevision: 2, - ModRevision: 2, - Version: 1, - }, - } - if !reflect.DeepEqual(gresp.Kvs, wkvs) { - t.Fatalf("bad get: got %v, want %v", gresp.Kvs, wkvs) - } - donec <- struct{}{} - }() + donec := make(chan struct{}) + go func() { + // Get will fail, but reconnect will trigger + gresp, gerr := kv.Get(ctx, "foo") + if gerr != nil { + t.Fatal(gerr) + } + wkvs := []*mvccpb.KeyValue{ + { + Key: []byte("foo"), + Value: []byte("bar"), + CreateRevision: 2, + ModRevision: 2, + Version: 1, + }, + } + if !reflect.DeepEqual(gresp.Kvs, wkvs) { + t.Fatalf("bad get: got %v, want %v", gresp.Kvs, wkvs) + } + donec <- struct{}{} + }() - time.Sleep(100 * time.Millisecond) - clus.Members[fIdx].Restart(t) + time.Sleep(100 * time.Millisecond) + clus.Members[fIdx].Restart(t) - select { - case <-time.After(5 * time.Second): - t.Fatalf("timed out waiting for get") - case <-donec: + select { + case <-time.After(5 * time.Second): + t.Fatalf("timed out waiting for get") + case <-donec: + } + + clus.Terminate(t) } } diff --git a/etcdserver/raft.go b/etcdserver/raft.go index 95e95fd22a4f..27170b602b59 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -103,6 +103,9 @@ type raftNode struct { // a chan to send out apply applyc chan apply + // a chan to send out readState + readStateC chan raft.ReadState + // TODO: remove the etcdserver related logic from raftNode // TODO: add a state machine interface to apply the commit entries // and do snapshot/recover @@ -196,6 +199,14 @@ func (r *raftNode) start(s *EtcdServer) { } } + if len(rd.ReadStates) != 0 { + select { + case r.readStateC <- rd.ReadStates[len(rd.ReadStates)-1]: + case <-r.stopped: + return + } + } + raftDone := make(chan struct{}, 1) ap := apply{ entries: rd.CommittedEntries, diff --git a/etcdserver/server.go b/etcdserver/server.go index 3bc8d81d68ac..f5e58f51d6e3 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -177,12 +177,22 @@ type EtcdServer struct { snapCount uint64 w wait.Wait + + readMu sync.RWMutex + // read routine notifies etcd server that it waits for reading by sending an empty struct to + // readwaitC + readwaitc chan struct{} + // readNotifier is used to notify the read routine that it can process the request + // when there is no error + readNotifier *notifier + // stop signals the run goroutine should shutdown. stop chan struct{} // stopping is closed by run goroutine on shutdown. stopping chan struct{} // done is closed when all goroutines from start() complete. - done chan struct{} + done chan struct{} + errorc chan error id types.ID attributes membership.Attributes @@ -391,6 +401,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) { ticker: time.Tick(time.Duration(cfg.TickMs) * time.Millisecond), raftStorage: s, storage: NewStorage(w, ss), + readStateC: make(chan raft.ReadState, 1), }, id: id, attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()}, @@ -478,6 +489,7 @@ func (s *EtcdServer) Start() { s.goAttach(s.purgeFile) s.goAttach(func() { monitorFileDescriptor(s.stopping) }) s.goAttach(s.monitorVersions) + s.goAttach(s.linearizableReadLoop) } // start prepares and starts server in a new goroutine. It is no longer safe to @@ -493,6 +505,8 @@ func (s *EtcdServer) start() { s.done = make(chan struct{}) s.stop = make(chan struct{}) s.stopping = make(chan struct{}) + s.readwaitc = make(chan struct{}, 1) + s.readNotifier = newNotifier() if s.ClusterVersion() != nil { plog.Infof("starting server... [version: %v, cluster version: %v]", version.Version, version.Cluster(s.ClusterVersion().String())) } else { diff --git a/etcdserver/util.go b/etcdserver/util.go index f189cd9463fa..66084ae12446 100644 --- a/etcdserver/util.go +++ b/etcdserver/util.go @@ -79,3 +79,19 @@ func longestConnected(tp rafthttp.Transporter, membs []types.ID) (types.ID, bool } return longest, true } + +type notifier struct { + c chan struct{} + err error +} + +func newNotifier() *notifier { + return ¬ifier{ + c: make(chan struct{}, 0), + } +} + +func (nc *notifier) notify(err error) { + nc.err = err + close(nc.c) +} diff --git a/etcdserver/v3_server.go b/etcdserver/v3_server.go index 4c534c88ff99..ec623a35225b 100644 --- a/etcdserver/v3_server.go +++ b/etcdserver/v3_server.go @@ -15,6 +15,8 @@ package etcdserver import ( + "bytes" + "encoding/binary" "strconv" "strings" "time" @@ -26,6 +28,9 @@ import ( "github.com/coreos/etcd/lease/leasehttp" "github.com/coreos/etcd/lease/leasepb" "github.com/coreos/etcd/mvcc" + "github.com/coreos/etcd/raft" + + "github.com/coreos/go-semver/semver" "golang.org/x/net/context" "google.golang.org/grpc/metadata" ) @@ -44,6 +49,10 @@ const ( maxGapBetweenApplyAndCommitIndex = 1000 ) +var ( + newRangeClusterVersion = *semver.Must(semver.NewVersion("3.1.0")) +) + type RaftKV interface { Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) @@ -86,6 +95,31 @@ type Authenticator interface { } func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) { + // TODO: remove this checking when we release etcd 3.2 + if s.ClusterVersion() == nil || s.ClusterVersion().LessThan(newRangeClusterVersion) { + return s.legacyRange(ctx, r) + } + + if !r.Serializable { + err := s.linearizableReadNotify(ctx) + if err != nil { + return nil, err + } + } + var resp *pb.RangeResponse + var err error + chk := func(ai *auth.AuthInfo) error { + return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd) + } + get := func() { resp, err = s.applyV3Base.Range(noTxn, r) } + if serr := s.doSerialize(ctx, chk, get); serr != nil { + return nil, serr + } + return resp, err +} + +// TODO: remove this func when we release etcd 3.2 +func (s *EtcdServer) legacyRange(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) { if r.Serializable { var resp *pb.RangeResponse var err error @@ -641,3 +675,84 @@ func (s *EtcdServer) processInternalRaftRequest(ctx context.Context, r pb.Intern // Watchable returns a watchable interface attached to the etcdserver. func (s *EtcdServer) Watchable() mvcc.WatchableKV { return s.KV() } + +func (s *EtcdServer) linearizableReadLoop() { + var rs raft.ReadState + + for { + ctx := make([]byte, 8) + binary.BigEndian.PutUint64(ctx, s.reqIDGen.Next()) + + select { + case <-s.readwaitc: + case <-s.stopping: + return + } + + nextnr := newNotifier() + + s.readMu.Lock() + nr := s.readNotifier + s.readNotifier = nextnr + s.readMu.Unlock() + + cctx, cancel := context.WithTimeout(context.Background(), time.Second) + if err := s.r.ReadIndex(cctx, ctx); err != nil { + cancel() + if err == raft.ErrStopped { + return + } + plog.Errorf("failed to get read index from raft: %v", err) + nr.notify(err) + continue + } + cancel() + + select { + case rs = <-s.r.readStateC: + if !bytes.Equal(rs.RequestCtx, ctx) { + plog.Errorf("unexpected read index context value (want %v, got %v)", rs.RequestCtx, ctx) + nr.notify(ErrTimeoutDueToLeaderFail) + continue + } + case <-time.After(time.Second): + plog.Warningf("timed out waiting for read index response") + nr.notify(ErrTimeout) + continue + case <-s.stopping: + return + } + + if ai := s.getAppliedIndex(); ai < rs.Index { + select { + case <-s.applyWait.Wait(rs.Index): + case <-s.stopping: + return + } + } + // unblock all l-reads requested at indices before rs.Index + nr.notify(nil) + } +} + +func (s *EtcdServer) linearizableReadNotify(ctx context.Context) error { + s.readMu.RLock() + nc := s.readNotifier + s.readMu.RUnlock() + + // signal linearizable loop for current notify if it hasn't been already + select { + case s.readwaitc <- struct{}{}: + default: + } + + // wait for read state notification + select { + case <-nc.c: + return nc.err + case <-ctx.Done(): + return ctx.Err() + case <-s.done: + return ErrStopped + } +}