Skip to content

Commit

Permalink
etcdserver: support read index
Browse files Browse the repository at this point in the history
Use read index to achieve l-read.
  • Loading branch information
xiang90 committed Sep 13, 2016
1 parent cfe717e commit 8d14c11
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 16 deletions.
15 changes: 15 additions & 0 deletions etcdserver/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package etcdserver
import (
"encoding/json"
"expvar"
"fmt"
"sort"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -103,6 +104,9 @@ type raftNode struct {
// a chan to send out apply
applyc chan apply

// a chan to send out readState
readStateC chan raft.ReadState

// TODO: remove the etcdserver related logic from raftNode
// TODO: add a state machine interface to apply the commit entries
// and do snapshot/recover
Expand Down Expand Up @@ -196,6 +200,17 @@ func (r *raftNode) start(s *EtcdServer) {
}
}

if len(rd.ReadStates) != 0 {
if len(rd.ReadStates) > 1 {
fmt.Println("drop len(rd.ReadStates)")
}
select {
case r.readStateC <- rd.ReadStates[len(rd.ReadStates)-1]:
default:
fmt.Println("drop ri")
}
}

raftDone := make(chan struct{}, 1)
ap := apply{
entries: rd.CommittedEntries,
Expand Down
13 changes: 12 additions & 1 deletion etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,15 @@ type EtcdServer struct {

snapCount uint64

w wait.Wait
w wait.Wait

readMu sync.RWMutex
// read routine notifies etcd server that it waits for reading by sending an empty struct to
// readwaitC
readwaitc chan struct{}
// etcdserver notifies read routine that it can process the request by closing the readNotifyc
readNotifyc chan struct{}

stop chan struct{}
done chan struct{}
errorc chan error
Expand Down Expand Up @@ -384,6 +392,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
ticker: time.Tick(time.Duration(cfg.TickMs) * time.Millisecond),
raftStorage: s,
storage: NewStorage(w, ss),
readStateC: make(chan raft.ReadState, 1),
},
id: id,
attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
Expand Down Expand Up @@ -471,6 +480,7 @@ func (s *EtcdServer) Start() {
go s.purgeFile()
go monitorFileDescriptor(s.done)
go s.monitorVersions()
go s.linearizableReadLoop()
}

// start prepares and starts server in a new goroutine. It is no longer safe to
Expand All @@ -485,6 +495,7 @@ func (s *EtcdServer) start() {
s.applyWait = wait.NewTimeList()
s.done = make(chan struct{})
s.stop = make(chan struct{})
s.readwaitc = make(chan struct{}, 1024)
if s.ClusterVersion() != nil {
plog.Infof("starting server... [version: %v, cluster version: %v]", version.Version, version.Cluster(s.ClusterVersion().String()))
} else {
Expand Down
112 changes: 97 additions & 15 deletions etcdserver/v3_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package etcdserver

import (
"bytes"
"encoding/binary"
"strconv"
"strings"
"time"
Expand All @@ -26,6 +28,7 @@ import (
"github.com/coreos/etcd/lease/leasehttp"
"github.com/coreos/etcd/lease/leasepb"
"github.com/coreos/etcd/mvcc"
"github.com/coreos/etcd/raft"
"golang.org/x/net/context"
"google.golang.org/grpc/metadata"
)
Expand Down Expand Up @@ -86,26 +89,28 @@ type Authenticator interface {
}

func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
if r.Serializable {
var resp *pb.RangeResponse
var err error
chk := func(ai *auth.AuthInfo) error {
return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd)
if !r.Serializable {
notify, err := s.linearizableReadNotify(ctx)
if err != nil {
return nil, err
}
get := func() { resp, err = s.applyV3Base.Range(noTxn, r) }
if serr := s.doSerialize(ctx, chk, get); serr != nil {
return nil, serr

select {
case <-notify:
case <-ctx.Done():
return nil, ctx.Err()
}
return resp, err
}
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{Range: r})
if err != nil {
return nil, err
var resp *pb.RangeResponse
var err error
chk := func(ai *auth.AuthInfo) error {
return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd)
}
if result.err != nil {
return nil, result.err
get := func() { resp, err = s.applyV3Base.Range(noTxn, r) }
if serr := s.doSerialize(ctx, chk, get); serr != nil {
return nil, serr
}
return result.resp.(*pb.RangeResponse), nil
return resp, err
}

func (s *EtcdServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
Expand Down Expand Up @@ -641,3 +646,80 @@ func (s *EtcdServer) processInternalRaftRequest(ctx context.Context, r pb.Intern

// Watchable returns a watchable interface attached to the etcdserver.
func (s *EtcdServer) Watchable() mvcc.WatchableKV { return s.KV() }

func (s *EtcdServer) linearizableReadLoop() {
var (
rs raft.ReadState
requests int
)
ctx := make([]byte, 8)
s.readNotifyc = make(chan struct{})

for {
binary.BigEndian.PutUint64(ctx, s.reqIDGen.Next())

for (len(s.readwaitc) > 0 && requests < 256) || (len(s.readwaitc) == 0 && requests == 0) {
select {
case <-s.readwaitc:
requests++
}
}

s.readMu.Lock()
var drained bool
for !drained {
select {
// drain all pending requests
case <-s.readwaitc:
default:
drained = true
}
}
requests = 0
notifyc := s.readNotifyc
s.readNotifyc = make(chan struct{})
s.readMu.Unlock()

if err := s.r.ReadIndex(context.Background(), ctx); err != nil {
continue
}
select {
case rs = <-s.r.readStateC:
if !bytes.Equal(rs.RequestCtx, ctx) {
plog.Errorf("unexpected read index context value (want %v, got %v)", rs.RequestCtx, ctx)
continue
}
case <-time.After(time.Second):
plog.Warningf("time out waiting for read index response")
continue
}

if ai := s.getAppliedIndex(); ai < rs.Index {
select {
case <-s.applyWait.Wait(ai):
}
}
// unblock all the l-read that happened before we requested the
// read index
close(notifyc)
}
}

func (s *EtcdServer) linearizableReadNotify(ctx context.Context) (chan struct{}, error) {
for {
s.readMu.RLock()
select {
case s.readwaitc <- struct{}{}:
nc := s.readNotifyc
s.readMu.RUnlock()
return nc, nil
case <-ctx.Done():
s.readMu.RUnlock()
return nil, ctx.Err()
default:
s.readMu.RUnlock()
// retry after draining
time.Sleep(10 * time.Millisecond)
}
}
}

0 comments on commit 8d14c11

Please sign in to comment.