Skip to content

Commit

Permalink
cherry-pick from etcd-io#12343
Browse files Browse the repository at this point in the history
  • Loading branch information
chaochn47 committed Oct 11, 2021
1 parent 33fd6f3 commit 754fe88
Show file tree
Hide file tree
Showing 25 changed files with 976 additions and 26 deletions.
149 changes: 136 additions & 13 deletions client/v3/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,22 @@ package clientv3

import (
"context"
v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
"io"
"time"

pb "go.etcd.io/etcd/api/v3/etcdserverpb"

"google.golang.org/grpc"
)

type (
CompactResponse pb.CompactionResponse
PutResponse pb.PutResponse
GetResponse pb.RangeResponse
DeleteResponse pb.DeleteRangeResponse
TxnResponse pb.TxnResponse
CompactResponse pb.CompactionResponse
PutResponse pb.PutResponse
GetResponse pb.RangeResponse
GetStreamResponse pb.RangeResponse
DeleteResponse pb.DeleteRangeResponse
TxnResponse pb.TxnResponse
)

type KV interface {
Expand All @@ -47,6 +51,8 @@ type KV interface {
// When passed WithSort(), the keys will be sorted.
Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error)

GetStream(ctx context.Context, key string, opts ...OpOption) (*GetStreamResponse, error)

// Delete deletes a key, or optionally using WithRange(end), [key, end).
Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error)

Expand All @@ -65,23 +71,28 @@ type KV interface {
}

type OpResponse struct {
put *PutResponse
get *GetResponse
del *DeleteResponse
txn *TxnResponse
put *PutResponse
get *GetResponse
getStream *GetStreamResponse
del *DeleteResponse
txn *TxnResponse
}

func (op OpResponse) Put() *PutResponse { return op.put }
func (op OpResponse) Get() *GetResponse { return op.get }
func (op OpResponse) Del() *DeleteResponse { return op.del }
func (op OpResponse) Txn() *TxnResponse { return op.txn }
func (op OpResponse) Put() *PutResponse { return op.put }
func (op OpResponse) Get() *GetResponse { return op.get }
func (op OpResponse) GetStream() *GetStreamResponse { return op.getStream }
func (op OpResponse) Del() *DeleteResponse { return op.del }
func (op OpResponse) Txn() *TxnResponse { return op.txn }

func (resp *PutResponse) OpResponse() OpResponse {
return OpResponse{put: resp}
}
func (resp *GetResponse) OpResponse() OpResponse {
return OpResponse{get: resp}
}
func (resp *GetStreamResponse) OpResponse() OpResponse {
return OpResponse{getStream: resp}
}
func (resp *DeleteResponse) OpResponse() OpResponse {
return OpResponse{del: resp}
}
Expand Down Expand Up @@ -120,6 +131,11 @@ func (kv *kv) Get(ctx context.Context, key string, opts ...OpOption) (*GetRespon
return r.get, toErr(ctx, err)
}

func (kv *kv) GetStream(ctx context.Context, key string, opts ...OpOption) (*GetStreamResponse, error) {
r, err := kv.Do(ctx, OpGetStream(key, opts...))
return r.getStream, toErr(ctx, err)
}

func (kv *kv) Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error) {
r, err := kv.Do(ctx, OpDelete(key, opts...))
return r.del, toErr(ctx, err)
Expand Down Expand Up @@ -150,6 +166,14 @@ func (kv *kv) Do(ctx context.Context, op Op) (OpResponse, error) {
if err == nil {
return OpResponse{get: (*GetResponse)(resp)}, nil
}
case tRangeStream:
var rangeStreamClient pb.KV_RangeStreamClient
var resp *pb.RangeResponse
rangeStreamClient, err = kv.openRangeStreamClient(ctx, op.toRangeStreamRequest(), kv.callOpts...)
resp, err = kv.serveRangeStream(ctx, rangeStreamClient)
if err == nil {
return OpResponse{getStream: (*GetStreamResponse)(resp)}, nil
}
case tPut:
var resp *pb.PutResponse
r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID), PrevKv: op.prevKV, IgnoreValue: op.ignoreValue, IgnoreLease: op.ignoreLease}
Expand All @@ -175,3 +199,102 @@ func (kv *kv) Do(ctx context.Context, op Op) (OpResponse, error) {
}
return OpResponse{}, toErr(ctx, err)
}

// openRangeStreamClient retries opening a rangeStream client until success or halt.
// manually retry in case "rsc==nil && err==nil"
// TODO: remove FailFast=false
func (kv *kv) openRangeStreamClient(ctx context.Context, in *pb.RangeRequest, opts ...grpc.CallOption) (rsc pb.KV_RangeStreamClient, err error) {
backoff := time.Millisecond
for {
select {
case <-ctx.Done():
if err == nil {
return nil, ctx.Err()
}
return nil, err
default:
}
if rsc, err = kv.remote.RangeStream(ctx, in, opts...); rsc != nil && err == nil {
break
}
if isHaltErr(ctx, err) {
return nil, v3rpc.Error(err)
}
if isUnavailableErr(ctx, err) {
// retry, but backoff
if backoff < maxBackoff {
// 25% backoff factor
backoff = backoff + backoff/4
if backoff > maxBackoff {
backoff = maxBackoff
}
}
time.Sleep(backoff)
}
}
return rsc, nil
}

func (kv *kv) serveRangeStream(ctx context.Context, rsc pb.KV_RangeStreamClient) (*pb.RangeResponse, error) {
rspC := make(chan *pb.RangeResponse)
errC := make(chan error)

mainRSP := &pb.RangeResponse{}
mainRSP.Header = &pb.ResponseHeader{}

go kv.handleRangeStream(ctx, rsc, rspC, errC)

Loop:
for {
select {
case subRsp := <-rspC:
if subRsp == nil {
break Loop
}

mainRSP.Kvs = append(mainRSP.Kvs, subRsp.Kvs...)
mainRSP.Count = subRsp.Count
mainRSP.Header = subRsp.Header
case err := <-errC:
return nil, err
case <-ctx.Done():
return nil, ctx.Err()
}
}

return mainRSP, nil
}

func (kv *kv) handleRangeStream(ctx context.Context, rsc pb.KV_RangeStreamClient, rspC chan *pb.RangeResponse, errC chan error) {
defer func() {
close(rspC)
close(errC)
}()

for {
resp, err := rsc.Recv()
if err != nil {
if err == io.EOF {
select {
case rspC <- nil:
case <-ctx.Done():
return
}
break
}

select {
case errC <- err:
case <-ctx.Done():
}
return
}

select {
case rspC <- resp:
case <-ctx.Done():
return
}
}
return
}
54 changes: 54 additions & 0 deletions client/v3/leasing/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,18 @@ func (lc *leaseCache) Add(key string, resp *v3.GetResponse, op v3.Op) *v3.GetRes
return ret
}

func (lc *leaseCache) AddStream(key string, resp *v3.GetStreamResponse, op v3.Op) *v3.GetStreamResponse {
lk := &leaseKey{(*v3.GetResponse)(resp), resp.Header.Revision, closedCh}
lc.mu.Lock()
if lc.header == nil || lc.header.Revision < resp.Header.Revision {
lc.header = resp.Header
}
lc.entries[key] = lk
ret := lk.getStream(op)
lc.mu.Unlock()
return ret
}

func (lc *leaseCache) Update(key, val []byte, respHeader *v3pb.ResponseHeader) {
li := lc.entries[string(key)]
if li == nil {
Expand Down Expand Up @@ -216,6 +228,27 @@ func (lc *leaseCache) Get(ctx context.Context, op v3.Op) (*v3.GetResponse, bool)
return ret, true
}

func (lc *leaseCache) GetStream(ctx context.Context, op v3.Op) (*v3.GetStreamResponse, bool) {
if isBadOp(op) {
return nil, false
}
key := string(op.KeyBytes())
li, wc := lc.notify(key)
if li == nil {
return nil, true
}
select {
case <-wc:
case <-ctx.Done():
return nil, true
}
lc.mu.RLock()
lk := *li
ret := lk.getStream(op)
lc.mu.RUnlock()
return ret, true
}

func (lk *leaseKey) get(op v3.Op) *v3.GetResponse {
ret := *lk.response
ret.Header = copyHeader(ret.Header)
Expand All @@ -239,6 +272,27 @@ func (lk *leaseKey) get(op v3.Op) *v3.GetResponse {
return &ret
}

func (lk *leaseKey) getStream(op v3.Op) *v3.GetStreamResponse {
ret := *lk.response
ret.Header = copyHeader(ret.Header)
empty := len(ret.Kvs) == 0
if empty {
ret.Kvs = nil
} else {
kv := *ret.Kvs[0]
kv.Key = make([]byte, len(kv.Key))
copy(kv.Key, ret.Kvs[0].Key)
if !op.IsKeysOnly() {
kv.Value = make([]byte, len(kv.Value))
copy(kv.Value, ret.Kvs[0].Value)
}
ret.Kvs = []*mvccpb.KeyValue{&kv}
}

retNew := (v3.GetStreamResponse)(ret)
return &retNew
}

func (lc *leaseCache) notify(key string) (*leaseKey, <-chan struct{}) {
lc.mu.RLock()
defer lc.mu.RUnlock()
Expand Down
81 changes: 81 additions & 0 deletions client/v3/leasing/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ func (lkv *leasingKV) Get(ctx context.Context, key string, opts ...v3.OpOption)
return lkv.get(ctx, v3.OpGet(key, opts...))
}

func (lkv *leasingKV) GetStream(ctx context.Context, key string, opts ...v3.OpOption) (*v3.GetStreamResponse, error) {
return lkv.getStream(ctx, v3.OpGetStream(key, opts...))
}

func (lkv *leasingKV) Put(ctx context.Context, key, val string, opts ...v3.OpOption) (*v3.PutResponse, error) {
return lkv.put(ctx, v3.OpPut(key, val, opts...))
}
Expand All @@ -99,6 +103,9 @@ func (lkv *leasingKV) Do(ctx context.Context, op v3.Op) (v3.OpResponse, error) {
case op.IsGet():
resp, err := lkv.get(ctx, op)
return resp.OpResponse(), err
case op.IsGetStream():
resp, err := lkv.getStream(ctx, op)
return resp.OpResponse(), err
case op.IsPut():
resp, err := lkv.put(ctx, op)
return resp.OpResponse(), err
Expand Down Expand Up @@ -331,6 +338,80 @@ func (lkv *leasingKV) get(ctx context.Context, op v3.Op) (*v3.GetResponse, error
return getResp, nil
}

func (lkv *leasingKV) acquireStream(ctx context.Context, key string, op v3.Op) (*v3.TxnResponse, error) {
for ctx.Err() == nil {
if err := lkv.waitSession(ctx); err != nil {
return nil, err
}
lcmp := v3.Cmp{Key: []byte(key), Target: pb.Compare_LEASE}
resp, err := lkv.kv.Txn(ctx).If(
v3.Compare(v3.CreateRevision(lkv.pfx+key), "=", 0),
v3.Compare(lcmp, "=", 0)).
Then(
op,
v3.OpPut(lkv.pfx+key, "", v3.WithLease(lkv.leaseID()))).
Else(
op,
v3.OpGetStream(lkv.pfx+key),
).Commit()
if err == nil {
if !resp.Succeeded {
kvs := resp.Responses[1].GetResponseRange().Kvs
// if txn failed since already owner, lease is acquired
resp.Succeeded = len(kvs) > 0 && v3.LeaseID(kvs[0].Lease) == lkv.leaseID()
}
return resp, nil
}
// retry if transient error
if _, ok := err.(rpctypes.EtcdError); ok {
return nil, err
}
if ev, ok := status.FromError(err); ok && ev.Code() != codes.Unavailable {
return nil, err
}
}
return nil, ctx.Err()
}

func (lkv *leasingKV) getStream(ctx context.Context, op v3.Op) (*v3.GetStreamResponse, error) {
do := func() (*v3.GetStreamResponse, error) {
r, err := lkv.kv.Do(ctx, op)
return r.GetStream(), err
}
if !lkv.readySession() {
return do()
}

if resp, ok := lkv.leases.GetStream(ctx, op); resp != nil {
return resp, nil
} else if !ok || op.IsSerializable() {
// must be handled by server or can skip linearization
return do()
}

key := string(op.KeyBytes())
if !lkv.leases.MayAcquire(key) {
resp, err := lkv.kv.Do(ctx, op)
return resp.GetStream(), err
}

resp, err := lkv.acquireStream(ctx, key, v3.OpGetStream(key))
if err != nil {
return nil, err
}
getResp := (*v3.GetStreamResponse)(resp.Responses[0].GetResponseRange())
getResp.Header = resp.Header
if resp.Succeeded {
getResp = lkv.leases.AddStream(key, getResp, op)
lkv.wg.Add(1)
go func() {
defer lkv.wg.Done()
lkv.monitorLease(ctx, key, resp.Header.Revision)
}()
}
return getResp, nil
}

func (lkv *leasingKV) deleteRangeRPC(ctx context.Context, maxLeaseRev int64, key, end string) (*v3.DeleteResponse, error) {
lkey, lend := lkv.pfx+key, lkv.pfx+end
resp, err := lkv.kv.Txn(ctx).If(
Expand Down
4 changes: 4 additions & 0 deletions client/v3/mock/mockserver/mockserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,10 @@ func (m *mockKVServer) Range(context.Context, *pb.RangeRequest) (*pb.RangeRespon
return &pb.RangeResponse{}, nil
}

func (m *mockKVServer) RangeStream(r *pb.RangeRequest, rss pb.KV_RangeStreamServer) error {
return nil
}

func (m *mockKVServer) Put(context.Context, *pb.PutRequest) (*pb.PutResponse, error) {
return &pb.PutResponse{}, nil
}
Expand Down
Loading

0 comments on commit 754fe88

Please sign in to comment.