Skip to content

Commit

Permalink
tests: Add Txn operation to linearizability tests
Browse files Browse the repository at this point in the history
Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
  • Loading branch information
serathius committed Dec 6, 2022
1 parent a4c6d1b commit 403f113
Show file tree
Hide file tree
Showing 5 changed files with 329 additions and 26 deletions.
29 changes: 26 additions & 3 deletions tests/linearizability/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,19 @@ func (c *recordingClient) Close() error {
return c.client.Close()
}

func (c *recordingClient) Get(ctx context.Context, key string) error {
func (c *recordingClient) Get(ctx context.Context, key string) (string, error) {
callTime := time.Now()
resp, err := c.client.Get(ctx, key)
returnTime := time.Now()
if err != nil {
return err
return "", err
}
c.history.AppendGet(key, callTime, returnTime, resp)
return nil
var value string
if len(resp.Kvs) > 0 {
value = string(resp.Kvs[0].Value)
}
return value, nil
}

func (c *recordingClient) Put(ctx context.Context, key, value string) error {
Expand All @@ -73,3 +77,22 @@ func (c *recordingClient) Delete(ctx context.Context, key string) error {
c.history.AppendDelete(key, callTime, returnTime, resp, err)
return nil
}

func (c *recordingClient) Txn(ctx context.Context, key, expectedValue, newValue string) error {
callTime := time.Now()
txn := c.client.Txn(ctx)
var cmp clientv3.Cmp
if expectedValue == "" {
cmp = clientv3.Compare(clientv3.CreateRevision(key), "=", 0)
} else {
cmp = clientv3.Compare(clientv3.Value(key), "=", expectedValue)
}
resp, err := txn.If(
cmp,
).Then(
clientv3.OpPut(key, newValue),
).Commit()
returnTime := time.Now()
c.history.AppendTxn(key, expectedValue, newValue, callTime, returnTime, resp, err)
return err
}
19 changes: 19 additions & 0 deletions tests/linearizability/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,25 @@ func (h *appendableHistory) AppendDelete(key string, start, end time.Time, resp
})
}

func (h *appendableHistory) AppendTxn(key, expectValue, newValue string, start, end time.Time, resp *clientv3.TxnResponse, err error) {
request := EtcdRequest{Op: Txn, Key: key, TxnExpectData: expectValue, TxnNewData: newValue}
if err != nil {
h.appendFailed(request, start, err)
return
}
var revision int64
if resp != nil && resp.Header != nil {
revision = resp.Header.Revision
}
h.successful = append(h.successful, porcupine.Operation{
ClientId: h.id,
Input: request,
Call: start.UnixNano(),
Output: EtcdResponse{Err: err, Revision: revision, TxnSucceeded: resp.Succeeded},
Return: end.UnixNano(),
})
}

func (h *appendableHistory) appendFailed(request EtcdRequest, start time.Time, err error) {
h.failed = append(h.failed, porcupine.Operation{
ClientId: h.id,
Expand Down
82 changes: 75 additions & 7 deletions tests/linearizability/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,23 @@ const (
Get Operation = "get"
Put Operation = "put"
Delete Operation = "delete"
Txn Operation = "txn"
)

type EtcdRequest struct {
Op Operation
Key string
PutData string
Op Operation
Key string
PutData string
TxnExpectData string
TxnNewData string
}

type EtcdResponse struct {
GetData string
Revision int64
Deleted int64
Err error
GetData string
Revision int64
Deleted int64
TxnSucceeded bool
Err error
}

type EtcdState struct {
Expand Down Expand Up @@ -86,6 +90,12 @@ var etcdModel = porcupine.Model{
} else {
return fmt.Sprintf("delete(%q) -> ok, rev: %d deleted:%d", request.Key, response.Revision, response.Deleted)
}
case Txn:
if response.Err != nil {
return fmt.Sprintf("txn(if(value(%q)=%q).then(put(%q, %q)) -> %s", request.Key, request.TxnExpectData, request.Key, request.TxnNewData, response.Err)
} else {
return fmt.Sprintf("txn(if(value(%q)=%q).then(put(%q, %q)) -> %v, rev: %d", request.Key, request.TxnExpectData, request.Key, request.TxnNewData, response.TxnSucceeded, response.Revision)
}
default:
return "<invalid>"
}
Expand All @@ -109,6 +119,8 @@ func step(state EtcdState, request EtcdRequest, response EtcdResponse) (bool, Et
return stepPut(state, request, response)
case Delete:
return stepDelete(state, request, response)
case Txn:
return stepTxn(state, request, response)
default:
panic("Unknown operation")
}
Expand All @@ -132,6 +144,14 @@ func initState(request EtcdRequest, response EtcdResponse) EtcdState {
if response.Err != nil {
state.FailedWrite = &request
}
case Txn:
if response.Err == nil {
if response.TxnSucceeded {
state.Value = request.TxnNewData
}
} else {
state.FailedWrite = &request
}
default:
panic("Unknown operation")
}
Expand All @@ -152,6 +172,8 @@ func stepGet(state EtcdState, request EtcdRequest, response EtcdResponse) (bool,
ok = response.GetData == state.FailedWrite.PutData
case Delete:
ok = response.GetData == ""
case Txn:
ok = response.GetData == state.FailedWrite.TxnNewData
default:
panic("Unknown operation")
}
Expand Down Expand Up @@ -209,5 +231,51 @@ func stepDelete(state EtcdState, request EtcdRequest, response EtcdResponse) (bo

state.Value = ""
state.LastRevision = response.Revision
state.FailedWrite = nil
return true, state
}

func stepTxn(state EtcdState, request EtcdRequest, response EtcdResponse) (bool, EtcdState) {
if response.Err != nil {
state.FailedWrite = &request
return true, state
}
// revision should never decrease
if response.Revision < state.LastRevision {
return false, state
}
valueMatch := state.Value == request.TxnExpectData

if state.FailedWrite == nil {
// transaction should succeed if value matches
if response.TxnSucceeded != valueMatch {
return false, state
}

// if transaction succeeded, revision must increase
if response.TxnSucceeded && response.Revision != state.LastRevision+1 {
return false, state
}
// if transaction failed, revision stays the same
if !response.TxnSucceeded && response.Revision != state.LastRevision {
return false, state
}
} else {
// if value matches, revision must increase as either transaction succeeded or failed request was persisted.
if valueMatch && response.Revision < state.LastRevision+1 {
return false, state
}

// if txn succeeded (+1 rev) without matching key, means that fail request was persisted (+1 rev).
if response.TxnSucceeded && !valueMatch && response.Revision < state.LastRevision+2 {
return false, state
}
}

if response.TxnSucceeded {
state.Value = request.TxnNewData
state.FailedWrite = nil
state.LastRevision = response.Revision
}
return true, state
}
Loading

0 comments on commit 403f113

Please sign in to comment.