Skip to content

Commit

Permalink
Merge pull request #15080 from geetasg/lease_support_refactor
Browse files Browse the repository at this point in the history
Add support for lease api to linearizability tests
  • Loading branch information
serathius authored Jan 11, 2023
2 parents ef917f1 + 5b84526 commit 3306639
Show file tree
Hide file tree
Showing 7 changed files with 409 additions and 13 deletions.
29 changes: 29 additions & 0 deletions tests/linearizability/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,32 @@ func (c *recordingClient) Txn(ctx context.Context, key, expectedValue, newValue
c.history.AppendTxn(key, expectedValue, newValue, callTime, returnTime, resp, err)
return err
}

func (c *recordingClient) LeaseGrant(ctx context.Context, ttl int64) (int64, error) {
callTime := time.Now()
resp, err := c.client.Lease.Grant(ctx, ttl)
returnTime := time.Now()
c.history.AppendLeaseGrant(callTime, returnTime, resp, err)
var leaseId int64
if resp != nil {
leaseId = int64(resp.ID)
}
return leaseId, err
}

func (c *recordingClient) LeaseRevoke(ctx context.Context, leaseId int64) error {
callTime := time.Now()
resp, err := c.client.Lease.Revoke(ctx, clientv3.LeaseID(leaseId))
returnTime := time.Now()
c.history.AppendLeaseRevoke(leaseId, callTime, returnTime, resp, err)
return err
}

func (c *recordingClient) PutWithLease(ctx context.Context, key string, value string, leaseId int64) error {
callTime := time.Now()
opts := clientv3.WithLease(clientv3.LeaseID(leaseId))
resp, err := c.client.Put(ctx, key, value, opts)
returnTime := time.Now()
c.history.AppendPutWithLease(key, value, int64(leaseId), callTime, returnTime, resp, err)
return err
}
81 changes: 81 additions & 0 deletions tests/linearizability/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,67 @@ func (h *appendableHistory) AppendPut(key, value string, start, end time.Time, r
})
}

func (h *appendableHistory) AppendPutWithLease(key, value string, leaseID int64, start, end time.Time, resp *clientv3.PutResponse, err error) {
request := putWithLeaseRequest(key, value, leaseID)
if err != nil {
h.appendFailed(request, start, err)
return
}
var revision int64
if resp != nil && resp.Header != nil {
revision = resp.Header.Revision
}
h.successful = append(h.successful, porcupine.Operation{
ClientId: h.id,
Input: request,
Call: start.UnixNano(),
Output: putResponse(revision),
Return: end.UnixNano(),
})
}

func (h *appendableHistory) AppendLeaseGrant(start, end time.Time, resp *clientv3.LeaseGrantResponse, err error) {
var leaseID int64
if resp != nil {
leaseID = int64(resp.ID)
}
request := leaseGrantRequest(leaseID)
if err != nil {
h.appendFailed(request, start, err)
return
}
var revision int64
if resp != nil && resp.ResponseHeader != nil {
revision = resp.ResponseHeader.Revision
}
h.successful = append(h.successful, porcupine.Operation{
ClientId: h.id,
Input: request,
Call: start.UnixNano(),
Output: leaseGrantResponse(revision),
Return: end.UnixNano(),
})
}

func (h *appendableHistory) AppendLeaseRevoke(id int64, start time.Time, end time.Time, resp *clientv3.LeaseRevokeResponse, err error) {
request := leaseRevokeRequest(id)
if err != nil {
h.appendFailed(request, start, err)
return
}
var revision int64
if resp != nil && resp.Header != nil {
revision = resp.Header.Revision
}
h.successful = append(h.successful, porcupine.Operation{
ClientId: h.id,
Input: request,
Call: start.UnixNano(),
Output: leaseRevokeResponse(revision),
Return: end.UnixNano(),
})
}

func (h *appendableHistory) AppendDelete(key string, start, end time.Time, resp *clientv3.DeleteResponse, err error) {
request := deleteRequest(key)
if err != nil {
Expand Down Expand Up @@ -171,6 +232,26 @@ func txnResponse(succeeded bool, revision int64) EtcdResponse {
return EtcdResponse{Result: result, TxnFailure: !succeeded, Revision: revision}
}

func putWithLeaseRequest(key, value string, leaseID int64) EtcdRequest {
return EtcdRequest{Ops: []EtcdOperation{{Type: PutWithLease, Key: key, Value: value, LeaseID: leaseID}}}
}

func leaseGrantRequest(leaseID int64) EtcdRequest {
return EtcdRequest{Ops: []EtcdOperation{{Type: LeaseGrant, LeaseID: leaseID}}}
}

func leaseGrantResponse(revision int64) EtcdResponse {
return EtcdResponse{Result: []EtcdOperationResult{{}}, Revision: revision}
}

func leaseRevokeRequest(leaseID int64) EtcdRequest {
return EtcdRequest{Ops: []EtcdOperation{{Type: LeaseRevoke, LeaseID: leaseID}}}
}

func leaseRevokeResponse(revision int64) EtcdResponse {
return EtcdResponse{Result: []EtcdOperationResult{{}}, Revision: revision}
}

type history struct {
successful []porcupine.Operation
// failed requests are kept separate as we don't know return time of failed operations.
Expand Down
53 changes: 53 additions & 0 deletions tests/linearizability/lease_ids.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright 2022 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package linearizability

import (
"sync"
)

type clientId2LeaseIdMapper interface {
LeaseId(int) int64
AddLeaseId(int, int64)
RemoveLeaseId(int)
}

func newClientId2LeaseIdMapper() clientId2LeaseIdMapper {
return &atomicClientId2LeaseIdMapper{m: map[int]int64{}}
}

type atomicClientId2LeaseIdMapper struct {
sync.RWMutex
// m is used to store clientId to leaseId mapping.
m map[int]int64
}

func (lm *atomicClientId2LeaseIdMapper) LeaseId(clientId int) int64 {
lm.RLock()
defer lm.RUnlock()
return lm.m[clientId]
}

func (lm *atomicClientId2LeaseIdMapper) AddLeaseId(clientId int, leaseId int64) {
lm.Lock()
defer lm.Unlock()
lm.m[clientId] = leaseId
}

func (lm *atomicClientId2LeaseIdMapper) RemoveLeaseId(clientId int) {
lm.Lock()
defer lm.Unlock()
delete(lm.m, clientId)
}
3 changes: 2 additions & 1 deletion tests/linearizability/linearizability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ func simulateTraffic(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessClu
endpoints := clus.EndpointsV3()

ids := newIdProvider()
lm := newClientId2LeaseIdMapper()
h := history{}
limiter := rate.NewLimiter(rate.Limit(config.maximalQPS), 200)

Expand All @@ -186,7 +187,7 @@ func simulateTraffic(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessClu
defer wg.Done()
defer c.Close()

config.traffic.Run(ctx, c, limiter, ids)
config.traffic.Run(ctx, c, limiter, ids, lm)
mux.Lock()
h = h.Merge(c.history.history)
mux.Unlock()
Expand Down
110 changes: 103 additions & 7 deletions tests/linearizability/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,13 @@ import (
type OperationType string

const (
Get OperationType = "get"
Put OperationType = "put"
Delete OperationType = "delete"
Txn OperationType = "txn"
Get OperationType = "get"
Put OperationType = "put"
Delete OperationType = "delete"
Txn OperationType = "txn"
PutWithLease OperationType = "putWithLease"
LeaseGrant OperationType = "leaseGrant"
LeaseRevoke OperationType = "leaseRevoke"
)

type EtcdRequest struct {
Expand All @@ -43,9 +46,10 @@ type EtcdCondition struct {
}

type EtcdOperation struct {
Type OperationType
Key string
Value string
Type OperationType
Key string
Value string
LeaseID int64
}

type EtcdResponse struct {
Expand All @@ -60,11 +64,20 @@ type EtcdOperationResult struct {
Deleted int64
}

var leased = struct{}{}

type EtcdLease struct {
LeaseID int64
Keys map[string]struct{}
}

type PossibleStates []EtcdState

type EtcdState struct {
Revision int64
KeyValues map[string]string
KeyLeases map[string]int64
Leases map[int64]EtcdLease
}

var etcdModel = porcupine.Model{
Expand Down Expand Up @@ -139,6 +152,12 @@ func describeEtcdOperation(op EtcdOperation) string {
return fmt.Sprintf("delete(%q)", op.Key)
case Txn:
return "<! unsupported: nested transaction !>"
case LeaseGrant:
return fmt.Sprintf("leaseGrant(%d)", op.LeaseID)
case LeaseRevoke:
return fmt.Sprintf("leaseRevoke(%d)", op.LeaseID)
case PutWithLease:
return fmt.Sprintf("putWithLease(%q, %q, %d)", op.Key, op.Value, op.LeaseID)
default:
return fmt.Sprintf("<! unknown op: %q !>", op.Type)
}
Expand All @@ -157,6 +176,12 @@ func describeEtcdOperationResponse(op OperationType, resp EtcdOperationResult) s
return fmt.Sprintf("deleted: %d", resp.Deleted)
case Txn:
return "<! unsupported: nested transaction !>"
case LeaseGrant:
return fmt.Sprintf("ok")
case LeaseRevoke:
return fmt.Sprintf("ok")
case PutWithLease:
return fmt.Sprintf("ok")
default:
return fmt.Sprintf("<! unknown op: %q !>", op)
}
Expand All @@ -183,6 +208,8 @@ func initState(request EtcdRequest, response EtcdResponse) EtcdState {
state := EtcdState{
Revision: response.Revision,
KeyValues: map[string]string{},
KeyLeases: map[string]int64{},
Leases: map[int64]EtcdLease{},
}
if response.TxnFailure {
return state
Expand All @@ -197,6 +224,24 @@ func initState(request EtcdRequest, response EtcdResponse) EtcdState {
case Put:
state.KeyValues[op.Key] = op.Value
case Delete:
case PutWithLease:
if _, ok := state.Leases[op.LeaseID]; ok {
state.KeyValues[op.Key] = op.Value
//detach from old lease id but we dont expect that at init
if _, ok := state.KeyLeases[op.Key]; ok {
panic("old lease id found at init")
}
//attach to new lease id
state.KeyLeases[op.Key] = op.LeaseID
state.Leases[op.LeaseID].Keys[op.Key] = leased
}
case LeaseGrant:
lease := EtcdLease{
LeaseID: op.LeaseID,
Keys: map[string]struct{}{},
}
state.Leases[op.LeaseID] = lease
case LeaseRevoke:
default:
panic("Unknown operation")
}
Expand Down Expand Up @@ -244,25 +289,76 @@ func applyRequestToSingleState(s EtcdState, request EtcdRequest) (EtcdState, Etc
s.KeyValues = newKVs
opResp := make([]EtcdOperationResult, len(request.Ops))
increaseRevision := false

for i, op := range request.Ops {
switch op.Type {
case Get:
opResp[i].Value = s.KeyValues[op.Key]
case Put:
s.KeyValues[op.Key] = op.Value
increaseRevision = true
s = detachFromOldLease(s, op)
case Delete:
if _, ok := s.KeyValues[op.Key]; ok {
delete(s.KeyValues, op.Key)
increaseRevision = true
s = detachFromOldLease(s, op)
opResp[i].Deleted = 1
}
case PutWithLease:
if _, ok := s.Leases[op.LeaseID]; ok {
//handle put op.
s.KeyValues[op.Key] = op.Value
increaseRevision = true
s = detachFromOldLease(s, op)
s = attachToNewLease(s, op)
}
case LeaseRevoke:
//Delete the keys attached to the lease
keyDeleted := false
for key, _ := range s.Leases[op.LeaseID].Keys {
//same as delete.
if _, ok := s.KeyValues[key]; ok {
if !keyDeleted {
keyDeleted = true
}
delete(s.KeyValues, key)
delete(s.KeyLeases, key)
}
}
//delete the lease
delete(s.Leases, op.LeaseID)
if keyDeleted {
increaseRevision = true
}
case LeaseGrant:
lease := EtcdLease{
LeaseID: op.LeaseID,
Keys: map[string]struct{}{},
}
s.Leases[op.LeaseID] = lease
default:
panic("unsupported operation")
}
}

if increaseRevision {
s.Revision += 1
}

return s, EtcdResponse{Result: opResp, Revision: s.Revision}
}

func detachFromOldLease(s EtcdState, op EtcdOperation) EtcdState {
if oldLeaseId, ok := s.KeyLeases[op.Key]; ok {
delete(s.Leases[oldLeaseId].Keys, op.Key)
delete(s.KeyLeases, op.Key)
}
return s
}

func attachToNewLease(s EtcdState, op EtcdOperation) EtcdState {
s.KeyLeases[op.Key] = op.LeaseID
s.Leases[op.LeaseID].Keys[op.Key] = leased
return s
}
Loading

0 comments on commit 3306639

Please sign in to comment.