Skip to content

Commit

Permalink
cherry-pick from etcd-io#12343
Browse files Browse the repository at this point in the history
  • Loading branch information
chaochn47 committed Oct 11, 2021
1 parent 33fd6f3 commit 8fc7fd1
Show file tree
Hide file tree
Showing 25 changed files with 859 additions and 4 deletions.
21 changes: 21 additions & 0 deletions client/v3/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type (
CompactResponse pb.CompactionResponse
PutResponse pb.PutResponse
GetResponse pb.RangeResponse
GetStreamResponse pb.RangeResponse
DeleteResponse pb.DeleteRangeResponse
TxnResponse pb.TxnResponse
)
Expand All @@ -47,6 +48,8 @@ type KV interface {
// When passed WithSort(), the keys will be sorted.
Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error)

GetStream(ctx context.Context, key string, opts ...OpOption) (*GetStreamResponse, error)

// Delete deletes a key, or optionally using WithRange(end), [key, end).
Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error)

Expand All @@ -67,12 +70,14 @@ type KV interface {
type OpResponse struct {
put *PutResponse
get *GetResponse
getStream *GetStreamResponse
del *DeleteResponse
txn *TxnResponse
}

func (op OpResponse) Put() *PutResponse { return op.put }
func (op OpResponse) Get() *GetResponse { return op.get }
func (op OpResponse) GetStream() *GetStreamResponse { return op.getStream }
func (op OpResponse) Del() *DeleteResponse { return op.del }
func (op OpResponse) Txn() *TxnResponse { return op.txn }

Expand All @@ -82,6 +87,9 @@ func (resp *PutResponse) OpResponse() OpResponse {
func (resp *GetResponse) OpResponse() OpResponse {
return OpResponse{get: resp}
}
func (resp *GetStreamResponse) OpResponse() OpResponse {
return OpResponse{getStream: resp}
}
func (resp *DeleteResponse) OpResponse() OpResponse {
return OpResponse{del: resp}
}
Expand Down Expand Up @@ -120,6 +128,11 @@ func (kv *kv) Get(ctx context.Context, key string, opts ...OpOption) (*GetRespon
return r.get, toErr(ctx, err)
}

func (kv *kv) GetStream(ctx context.Context, key string, opts ...OpOption) (*GetStreamResponse, error) {
r, err := kv.Do(ctx, OpGetStream(key, opts...))
return r.getStream, toErr(ctx, err)
}

func (kv *kv) Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error) {
r, err := kv.Do(ctx, OpDelete(key, opts...))
return r.del, toErr(ctx, err)
Expand Down Expand Up @@ -150,6 +163,14 @@ func (kv *kv) Do(ctx context.Context, op Op) (OpResponse, error) {
if err == nil {
return OpResponse{get: (*GetResponse)(resp)}, nil
}
case tRangeStream:
var rangeStreamClient pb.KV_RangeStreamClient
var resp *pb.RangeResponse
rangeStreamClient, err = kv.openRangeStreamClient(ctx, op.toRangeStreamRequest(), kv.callOpts...)
resp, err = kv.serveRangeStream(ctx, rangeStreamClient)
if err == nil {
return OpResponse{getStream: (*GetStreamResponse)(resp)}, nil
}
case tPut:
var resp *pb.PutResponse
r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID), PrevKv: op.prevKV, IgnoreValue: op.ignoreValue, IgnoreLease: op.ignoreLease}
Expand Down
55 changes: 55 additions & 0 deletions client/v3/leasing/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,18 @@ func (lc *leaseCache) Add(key string, resp *v3.GetResponse, op v3.Op) *v3.GetRes
return ret
}

func (lc *leaseCache) AddStream(key string, resp *v3.GetStreamResponse, op v3.Op) *v3.GetStreamResponse {
lk := &leaseKey{(*v3.GetResponse)(resp), resp.Header.Revision, closedCh}
lc.mu.Lock()
if lc.header == nil || lc.header.Revision < resp.Header.Revision {
lc.header = resp.Header
}
lc.entries[key] = lk
ret := lk.getStream(op)
lc.mu.Unlock()
return ret
}

func (lc *leaseCache) Update(key, val []byte, respHeader *v3pb.ResponseHeader) {
li := lc.entries[string(key)]
if li == nil {
Expand Down Expand Up @@ -216,6 +228,28 @@ func (lc *leaseCache) Get(ctx context.Context, op v3.Op) (*v3.GetResponse, bool)
return ret, true
}


func (lc *leaseCache) GetStream(ctx context.Context, op v3.Op) (*v3.GetStreamResponse, bool) {
if isBadOp(op) {
return nil, false
}
key := string(op.KeyBytes())
li, wc := lc.notify(key)
if li == nil {
return nil, true
}
select {
case <-wc:
case <-ctx.Done():
return nil, true
}
lc.mu.RLock()
lk := *li
ret := lk.getStream(op)
lc.mu.RUnlock()
return ret, true
}

func (lk *leaseKey) get(op v3.Op) *v3.GetResponse {
ret := *lk.response
ret.Header = copyHeader(ret.Header)
Expand All @@ -239,6 +273,27 @@ func (lk *leaseKey) get(op v3.Op) *v3.GetResponse {
return &ret
}

func (lk *leaseKey) getStream(op v3.Op) *v3.GetStreamResponse {
ret := *lk.response
ret.Header = copyHeader(ret.Header)
empty := len(ret.Kvs) == 0
if empty {
ret.Kvs = nil
} else {
kv := *ret.Kvs[0]
kv.Key = make([]byte, len(kv.Key))
copy(kv.Key, ret.Kvs[0].Key)
if !op.IsKeysOnly() {
kv.Value = make([]byte, len(kv.Value))
copy(kv.Value, ret.Kvs[0].Value)
}
ret.Kvs = []*mvccpb.KeyValue{&kv}
}

retNew := (v3.GetStreamResponse)(ret)
return &retNew
}

func (lc *leaseCache) notify(key string) (*leaseKey, <-chan struct{}) {
lc.mu.RLock()
defer lc.mu.RUnlock()
Expand Down
81 changes: 81 additions & 0 deletions client/v3/leasing/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ func (lkv *leasingKV) Get(ctx context.Context, key string, opts ...v3.OpOption)
return lkv.get(ctx, v3.OpGet(key, opts...))
}

func (lkv *leasingKV) GetStream(ctx context.Context, key string, opts ...v3.OpOption) (*v3.GetStreamResponse, error) {
return lkv.getStream(ctx, v3.OpGetStream(key, opts...))
}

func (lkv *leasingKV) Put(ctx context.Context, key, val string, opts ...v3.OpOption) (*v3.PutResponse, error) {
return lkv.put(ctx, v3.OpPut(key, val, opts...))
}
Expand All @@ -99,6 +103,9 @@ func (lkv *leasingKV) Do(ctx context.Context, op v3.Op) (v3.OpResponse, error) {
case op.IsGet():
resp, err := lkv.get(ctx, op)
return resp.OpResponse(), err
case op.IsGetStream():
resp, err := lkv.getStream(ctx, op)
return resp.OpResponse(), err
case op.IsPut():
resp, err := lkv.put(ctx, op)
return resp.OpResponse(), err
Expand Down Expand Up @@ -331,6 +338,80 @@ func (lkv *leasingKV) get(ctx context.Context, op v3.Op) (*v3.GetResponse, error
return getResp, nil
}

func (lkv *leasingKV) acquireStream(ctx context.Context, key string, op v3.Op) (*v3.TxnResponse, error) {
for ctx.Err() == nil {
if err := lkv.waitSession(ctx); err != nil {
return nil, err
}
lcmp := v3.Cmp{Key: []byte(key), Target: pb.Compare_LEASE}
resp, err := lkv.kv.Txn(ctx).If(
v3.Compare(v3.CreateRevision(lkv.pfx+key), "=", 0),
v3.Compare(lcmp, "=", 0)).
Then(
op,
v3.OpPut(lkv.pfx+key, "", v3.WithLease(lkv.leaseID()))).
Else(
op,
v3.OpGetStream(lkv.pfx+key),
).Commit()
if err == nil {
if !resp.Succeeded {
kvs := resp.Responses[1].GetResponseRange().Kvs
// if txn failed since already owner, lease is acquired
resp.Succeeded = len(kvs) > 0 && v3.LeaseID(kvs[0].Lease) == lkv.leaseID()
}
return resp, nil
}
// retry if transient error
if _, ok := err.(rpctypes.EtcdError); ok {
return nil, err
}
if ev, ok := status.FromError(err); ok && ev.Code() != codes.Unavailable {
return nil, err
}
}
return nil, ctx.Err()
}

func (lkv *leasingKV) getStream(ctx context.Context, op v3.Op) (*v3.GetStreamResponse, error) {
do := func() (*v3.GetStreamResponse, error) {
r, err := lkv.kv.Do(ctx, op)
return r.GetStream(), err
}
if !lkv.readySession() {
return do()
}

if resp, ok := lkv.leases.GetStream(ctx, op); resp != nil {
return resp, nil
} else if !ok || op.IsSerializable() {
// must be handled by server or can skip linearization
return do()
}

key := string(op.KeyBytes())
if !lkv.leases.MayAcquire(key) {
resp, err := lkv.kv.Do(ctx, op)
return resp.GetStream(), err
}

resp, err := lkv.acquireStream(ctx, key, v3.OpGetStream(key))
if err != nil {
return nil, err
}
getResp := (*v3.GetStreamResponse)(resp.Responses[0].GetResponseRange())
getResp.Header = resp.Header
if resp.Succeeded {
getResp = lkv.leases.AddStream(key, getResp, op)
lkv.wg.Add(1)
go func() {
defer lkv.wg.Done()
lkv.monitorLease(ctx, key, resp.Header.Revision)
}()
}
return getResp, nil
}

func (lkv *leasingKV) deleteRangeRPC(ctx context.Context, maxLeaseRev int64, key, end string) (*v3.DeleteResponse, error) {
lkey, lend := lkv.pfx+key, lkv.pfx+end
resp, err := lkv.kv.Txn(ctx).If(
Expand Down
4 changes: 4 additions & 0 deletions client/v3/mock/mockserver/mockserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,10 @@ func (m *mockKVServer) Range(context.Context, *pb.RangeRequest) (*pb.RangeRespon
return &pb.RangeResponse{}, nil
}

func (m *mockKVServer) RangeStream(r *pb.RangeRequest, rss pb.KV_RangeStreamServer) error {
return nil
}

func (m *mockKVServer) Put(context.Context, *pb.PutRequest) (*pb.PutResponse, error) {
return &pb.PutResponse{}, nil
}
Expand Down
22 changes: 22 additions & 0 deletions client/v3/namespace/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,20 @@ func (kv *kvPrefix) Get(ctx context.Context, key string, opts ...clientv3.OpOpti
return get, nil
}


func (kv *kvPrefix) GetStream(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetStreamResponse, error) {
if len(key) == 0 {
return nil, rpctypes.ErrEmptyKey
}
r, err := kv.KV.Do(ctx, kv.prefixOp(clientv3.OpGetStream(key, opts...)))
if err != nil {
return nil, err
}
get := r.GetStream()
kv.unprefixGetStreamResponse(get)
return get, nil
}

func (kv *kvPrefix) Delete(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.DeleteResponse, error) {
if len(key) == 0 && !(clientv3.IsOptsWithFromKey(opts) || clientv3.IsOptsWithPrefix(opts)) {
return nil, rpctypes.ErrEmptyKey
Expand All @@ -84,6 +98,8 @@ func (kv *kvPrefix) Do(ctx context.Context, op clientv3.Op) (clientv3.OpResponse
switch {
case r.Get() != nil:
kv.unprefixGetResponse(r.Get())
case r.GetStream() != nil:
kv.unprefixGetStreamResponse(r.GetStream())
case r.Put() != nil:
kv.unprefixPutResponse(r.Put())
case r.Del() != nil:
Expand Down Expand Up @@ -144,6 +160,12 @@ func (kv *kvPrefix) unprefixGetResponse(resp *clientv3.GetResponse) {
}
}

func (kv *kvPrefix) unprefixGetStreamResponse(resp *clientv3.GetStreamResponse) {
for i := range resp.Kvs {
resp.Kvs[i].Key = resp.Kvs[i].Key[len(kv.pfx):]
}
}

func (kv *kvPrefix) unprefixPutResponse(resp *clientv3.PutResponse) {
if resp.PrevKv != nil {
resp.PrevKv.Key = resp.PrevKv.Key[len(kv.pfx):]
Expand Down
32 changes: 31 additions & 1 deletion client/v3/op.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const (
tPut
tDeleteRange
tTxn
tRangeStream
)

var noPrefixEnd = []byte{0}
Expand Down Expand Up @@ -112,6 +113,8 @@ func (op Op) IsPut() bool { return op.t == tPut }
// IsGet returns true iff the operation is a Get.
func (op Op) IsGet() bool { return op.t == tRange }

func (op Op) IsGetStream() bool { return op.t == tRangeStream }

// IsDelete returns true iff the operation is a Delete.
func (op Op) IsDelete() bool { return op.t == tDeleteRange }

Expand Down Expand Up @@ -169,6 +172,21 @@ func (op Op) toRangeRequest() *pb.RangeRequest {
return r
}

func (op Op) toRangeStreamRequest() *pb.RangeRequest {
if op.t != tRangeStream {
panic("op.t != tRangeStream")
}
r := &pb.RangeRequest{
Key: op.key,
RangeEnd: op.end,
Limit: op.limit,
Revision: op.rev,
Serializable: op.serializable,
KeysOnly: op.keysOnly,
}
return r
}

func (op Op) toTxnRequest() *pb.TxnRequest {
thenOps := make([]*pb.RequestOp, len(op.thenOps))
for i, tOp := range op.thenOps {
Expand All @@ -189,6 +207,8 @@ func (op Op) toRequestOp() *pb.RequestOp {
switch op.t {
case tRange:
return &pb.RequestOp{Request: &pb.RequestOp_RequestRange{RequestRange: op.toRangeRequest()}}
case tRangeStream:
return &pb.RequestOp{Request: &pb.RequestOp_RequestRange{RequestRange: op.toRangeStreamRequest()}}
case tPut:
r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID), PrevKv: op.prevKV, IgnoreValue: op.ignoreValue, IgnoreLease: op.ignoreLease}
return &pb.RequestOp{Request: &pb.RequestOp_RequestPut{RequestPut: r}}
Expand Down Expand Up @@ -216,7 +236,7 @@ func (op Op) isWrite() bool {
}
return false
}
return op.t != tRange
return op.t != tRange && op.t != tRangeStream
}

func NewOp() *Op {
Expand All @@ -234,6 +254,16 @@ func OpGet(key string, opts ...OpOption) Op {
return ret
}

func OpGetStream(key string, opts ...OpOption) Op {
// WithPrefix and WithFromKey are not supported together
if IsOptsWithPrefix(opts) && IsOptsWithFromKey(opts) {
panic("`WithPrefix` and `WithFromKey` cannot be set at the same time, choose one")
}
ret := Op{t: tRangeStream, key: []byte(key)}
ret.applyOpts(opts)
return ret
}

// OpDelete returns "delete" operation based on given key and operation options.
func OpDelete(key string, opts ...OpOption) Op {
// WithPrefix and WithFromKey are not supported together
Expand Down
Loading

0 comments on commit 8fc7fd1

Please sign in to comment.