Skip to content

Commit

Permalink
server: add 500ms retries to ReadIndex requests for l-reads
Browse files Browse the repository at this point in the history
It is second approach (with first being #12762) to solve #12680
  • Loading branch information
wpedrak committed Mar 16, 2021
1 parent 4b21e38 commit e977923
Showing 1 changed file with 22 additions and 12 deletions.
34 changes: 22 additions & 12 deletions server/etcdserver/v3_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ const (
// We should stop accepting new proposals if the gap growing to a certain point.
maxGapBetweenApplyAndCommitIndex = 5000
traceThreshold = 100 * time.Millisecond
readIndexRetryTime = 500 * time.Millisecond
)

type RaftKV interface {
Expand Down Expand Up @@ -768,19 +769,16 @@ func (s *EtcdServer) requestCurrentIndex(leaderChangedNotifier <-chan struct{},
return 0, err
}

confirmedIndex, err := s.readReadIndexResponse(leaderChangedNotifier, requestId)
if err != nil {
return 0, err
}
return confirmedIndex, nil
}

func (s *EtcdServer) readReadIndexResponse(leaderChangedNotifier <-chan struct{}, requestId uint64) (uint64, error) {
lg := s.Logger()
errorTimer := time.NewTimer(s.Cfg.ReqTimeout())
defer errorTimer.Stop()
retryTimer := time.NewTimer(readIndexRetryTime)
defer retryTimer.Stop()

for {
select {
case rs := <-s.r.readStateC:
requestIdBytes := unit64ToBigEndianBytes(requestId)
requestIdBytes := uint64ToBigEndianBytes(requestId)
gotOwnResponse := bytes.Equal(rs.RequestCtx, requestIdBytes)
if !gotOwnResponse {
// a previous request might time out. now we should ignore the response of it and
Expand All @@ -802,7 +800,19 @@ func (s *EtcdServer) readReadIndexResponse(leaderChangedNotifier <-chan struct{}
readIndexFailed.Inc()
// return a retryable error.
return 0, ErrLeaderChanged
case <-time.After(s.Cfg.ReqTimeout()):
case <-retryTimer.C:
lg.Warn(
"waiting for ReadIndex response took too long, retrying",
zap.Uint64("sent-request-id", requestId),
zap.Duration("retry-timeout", readIndexRetryTime),
)
err := s.sendReadIndex(requestId)
if err != nil {
return 0, err
}
retryTimer.Reset(readIndexRetryTime)
continue
case <-errorTimer.C:
lg.Warn(
"timed out waiting for read index response (local node might have slow network)",
zap.Duration("timeout", s.Cfg.ReqTimeout()),
Expand All @@ -815,14 +825,14 @@ func (s *EtcdServer) readReadIndexResponse(leaderChangedNotifier <-chan struct{}
}
}

func unit64ToBigEndianBytes(number uint64) []byte {
func uint64ToBigEndianBytes(number uint64) []byte {
byteResult := make([]byte, 8)
binary.BigEndian.PutUint64(byteResult, number)
return byteResult
}

func (s *EtcdServer) sendReadIndex(requestIndex uint64) error {
ctxToSend := unit64ToBigEndianBytes(requestIndex)
ctxToSend := uint64ToBigEndianBytes(requestIndex)

cctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
err := s.r.ReadIndex(cctx, ctxToSend)
Expand Down

0 comments on commit e977923

Please sign in to comment.