diff --git a/tests/linearizability/client.go b/tests/linearizability/client.go index 4a4e0c675fd7..20f3abc9a8a9 100644 --- a/tests/linearizability/client.go +++ b/tests/linearizability/client.go @@ -18,6 +18,7 @@ import ( "context" "time" + "go.etcd.io/etcd/api/v3/mvccpb" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" ) @@ -47,15 +48,15 @@ func (c *recordingClient) Close() error { return c.client.Close() } -func (c *recordingClient) Get(ctx context.Context, key string) error { +func (c *recordingClient) Get(ctx context.Context, key string) ([]*mvccpb.KeyValue, error) { callTime := time.Now() resp, err := c.client.Get(ctx, key) returnTime := time.Now() if err != nil { - return err + return nil, err } c.history.AppendGet(key, callTime, returnTime, resp) - return nil + return resp.Kvs, nil } func (c *recordingClient) Put(ctx context.Context, key, value string) error { @@ -73,3 +74,22 @@ func (c *recordingClient) Delete(ctx context.Context, key string) error { c.history.AppendDelete(key, callTime, returnTime, resp, err) return nil } + +func (c *recordingClient) Txn(ctx context.Context, key, expectedValue, newValue string) error { + callTime := time.Now() + txn := c.client.Txn(ctx) + var cmp clientv3.Cmp + if expectedValue == "" { + cmp = clientv3.Compare(clientv3.CreateRevision(key), "=", 0) + } else { + cmp = clientv3.Compare(clientv3.Value(key), "=", expectedValue) + } + resp, err := txn.If( + cmp, + ).Then( + clientv3.OpPut(key, newValue), + ).Commit() + returnTime := time.Now() + c.history.AppendTxn(key, expectedValue, newValue, callTime, returnTime, resp, err) + return err +} diff --git a/tests/linearizability/history.go b/tests/linearizability/history.go index 388bccd00666..0dfb0d394efb 100644 --- a/tests/linearizability/history.go +++ b/tests/linearizability/history.go @@ -94,6 +94,25 @@ func (h *appendableHistory) AppendDelete(key string, start, end time.Time, resp }) } +func (h *appendableHistory) AppendTxn(key, expectValue, newValue string, start, end time.Time, resp *clientv3.TxnResponse, err error) { + request := EtcdRequest{Op: Txn, Key: key, TxnExpectData: expectValue, TxnNewData: newValue} + if err != nil { + h.appendFailed(request, start, err) + return + } + var revision int64 + if resp != nil && resp.Header != nil { + revision = resp.Header.Revision + } + h.successful = append(h.successful, porcupine.Operation{ + ClientId: h.id, + Input: request, + Call: start.UnixNano(), + Output: EtcdResponse{Err: err, Revision: revision, TxnSucceeded: resp.Succeeded}, + Return: end.UnixNano(), + }) +} + func (h *appendableHistory) appendFailed(request EtcdRequest, start time.Time, err error) { h.failed = append(h.failed, porcupine.Operation{ ClientId: h.id, diff --git a/tests/linearizability/model.go b/tests/linearizability/model.go index 00fd2e3d3fe0..eba51f754d15 100644 --- a/tests/linearizability/model.go +++ b/tests/linearizability/model.go @@ -27,19 +27,23 @@ const ( Get Operation = "get" Put Operation = "put" Delete Operation = "delete" + Txn Operation = "txn" ) type EtcdRequest struct { - Op Operation - Key string - PutData string + Op Operation + Key string + PutData string + TxnExpectData string + TxnNewData string } type EtcdResponse struct { - GetData string - Revision int64 - Deleted int64 - Err error + GetData string + Revision int64 + Deleted int64 + TxnSucceeded bool + Err error } type EtcdState struct { @@ -89,6 +93,12 @@ var etcdModel = porcupine.Model{ } else { return fmt.Sprintf("delete(%q) -> ok, rev: %d deleted:%d", request.Key, response.Revision, response.Deleted) } + case Txn: + if response.Err != nil { + return fmt.Sprintf("txn(if(value(%q)=%q).then(put(%q, %q)) -> %s", request.Key, request.TxnExpectData, request.Key, request.TxnNewData, response.Err) + } else { + return fmt.Sprintf("txn(if(value(%q)=%q).then(put(%q, %q)) -> %v, rev: %d", request.Key, request.TxnExpectData, request.Key, request.TxnNewData, response.TxnSucceeded, response.Revision) + } default: return "" } @@ -145,6 +155,14 @@ func initValueRevision(request EtcdRequest, response EtcdResponse) (ok bool, v V Value: "", Revision: response.Revision, } + case Txn: + if response.TxnSucceeded { + return true, ValueRevision{ + Value: request.TxnNewData, + Revision: response.Revision, + } + } + return false, ValueRevision{} default: panic("Unknown operation") } @@ -164,6 +182,12 @@ func stepValue(v ValueRevision, request EtcdRequest) (ValueRevision, EtcdRespons v.Revision += 1 resp.Deleted = 1 } + case Txn: + if v.Value == request.TxnExpectData { + v.Value = request.TxnNewData + v.Revision += 1 + resp.TxnSucceeded = true + } default: panic("unsupported operation") } diff --git a/tests/linearizability/model_test.go b/tests/linearizability/model_test.go index 47dc45800690..06f9482b9808 100644 --- a/tests/linearizability/model_test.go +++ b/tests/linearizability/model_test.go @@ -42,6 +42,12 @@ func TestModel(t *testing.T) { {req: EtcdRequest{Op: Delete, Key: "key"}, resp: EtcdResponse{Revision: 42}}, }, }, + { + name: "First Txn can start from non-zero revision", + operations: []testOperation{ + {req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "", TxnNewData: "42"}, resp: EtcdResponse{Revision: 42}}, + }, + }, { name: "Get response data should match put", operations: []testOperation{ @@ -70,7 +76,7 @@ func TestModel(t *testing.T) { }, }, { - name: "Put can fail and be lost", + name: "Put can fail and be lost before get", operations: []testOperation{ {req: EtcdRequest{Op: Put, Key: "key", PutData: "1"}, resp: EtcdResponse{Revision: 1}}, {req: EtcdRequest{Op: Put, Key: "key", PutData: "2"}, resp: EtcdResponse{Err: errors.New("failed")}}, @@ -81,18 +87,38 @@ func TestModel(t *testing.T) { }, }, { - name: "Put can fail but be persisted and increase revision before put", + name: "Put can fail and be lost before put", operations: []testOperation{ - // One failed request, one persisted. {req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{Revision: 1}}, {req: EtcdRequest{Op: Put, Key: "key", PutData: "2"}, resp: EtcdResponse{Err: errors.New("failed")}}, - {req: EtcdRequest{Op: Put, Key: "key", PutData: "3"}, resp: EtcdResponse{Revision: 3}}, - // Two failed request, two persisted. + {req: EtcdRequest{Op: Put, Key: "key", PutData: "3"}, resp: EtcdResponse{Revision: 2}}, + }, + }, + { + name: "Put can fail and be lost before delete", + operations: []testOperation{ + {req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{Revision: 1}}, + {req: EtcdRequest{Op: Put, Key: "key", PutData: "2"}, resp: EtcdResponse{Err: errors.New("failed")}}, + {req: EtcdRequest{Op: Delete, Key: "key"}, resp: EtcdResponse{Revision: 1}}, + }, + }, + { + name: "Put can fail and be lost before txn failed", + operations: []testOperation{ + // Txn failure + {req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{Revision: 1}}, + {req: EtcdRequest{Op: Put, Key: "key", PutData: "2"}, resp: EtcdResponse{Err: errors.New("failed")}}, + {req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "2", TxnNewData: "3"}, resp: EtcdResponse{Revision: 1}}, + // Txn success + {req: EtcdRequest{Op: Put, Key: "key", PutData: "2"}, resp: EtcdResponse{Revision: 2}}, {req: EtcdRequest{Op: Put, Key: "key", PutData: "4"}, resp: EtcdResponse{Err: errors.New("failed")}}, - {req: EtcdRequest{Op: Put, Key: "key", PutData: "5"}, resp: EtcdResponse{Err: errors.New("failed")}}, - {req: EtcdRequest{Op: Put, Key: "key", PutData: "6"}, resp: EtcdResponse{Revision: 6}}, + {req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "2", TxnNewData: "3"}, resp: EtcdResponse{TxnSucceeded: true, Revision: 3}}, }, }, + { + name: "Put can fail and be lost before txn success", + operations: []testOperation{}, + }, { name: "Put can fail but be persisted and increase revision before get", operations: []testOperation{ @@ -129,6 +155,21 @@ func TestModel(t *testing.T) { {req: EtcdRequest{Op: Delete, Key: "key"}, resp: EtcdResponse{Deleted: 1, Revision: 10}}, }, }, + { + name: "Put can fail but be persisted before txn", + operations: []testOperation{ + // Txn success + {req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{Revision: 1}}, + {req: EtcdRequest{Op: Put, Key: "key", PutData: "2"}, resp: EtcdResponse{Err: errors.New("failed")}}, + {req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "2"}, resp: EtcdResponse{TxnSucceeded: true, Revision: 2}, failure: true}, + {req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "2"}, resp: EtcdResponse{TxnSucceeded: true, Revision: 3}}, + // Txn failure + {req: EtcdRequest{Op: Put, Key: "key", PutData: "4"}, resp: EtcdResponse{Revision: 4}}, + {req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "5"}, resp: EtcdResponse{Revision: 4}}, + {req: EtcdRequest{Op: Put, Key: "key", PutData: "5"}, resp: EtcdResponse{Err: errors.New("failed")}}, + {req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{Revision: 5, GetData: "5"}}, + }, + }, { name: "Delete only increases revision on success", operations: []testOperation{ @@ -216,6 +257,141 @@ func TestModel(t *testing.T) { {req: EtcdRequest{Op: Delete, Key: "key"}, resp: EtcdResponse{Revision: 4}}, }, }, + { + name: "Delete can fail but be persisted before txn", + operations: []testOperation{ + // Txn success + {req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "1", Revision: 1}}, + {req: EtcdRequest{Op: Delete, Key: "key"}, resp: EtcdResponse{Err: errors.New("failed")}}, + {req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "", TxnNewData: "1"}, resp: EtcdResponse{TxnSucceeded: true, Revision: 3}}, + // Txn failure + {req: EtcdRequest{Op: Put, Key: "key", PutData: "4"}, resp: EtcdResponse{Revision: 4}}, + {req: EtcdRequest{Op: Delete, Key: "key"}, resp: EtcdResponse{Err: errors.New("failed")}}, + {req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "4", TxnNewData: "5"}, resp: EtcdResponse{TxnSucceeded: false, Revision: 5}}, + }, + }, + { + name: "Txn sets new value if value matches expected", + operations: []testOperation{ + {req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "1", Revision: 1}}, + {req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "1", TxnNewData: "2"}, resp: EtcdResponse{Revision: 1, TxnSucceeded: true}, failure: true}, + {req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "1", TxnNewData: "2"}, resp: EtcdResponse{Revision: 2, TxnSucceeded: false}, failure: true}, + {req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "1", TxnNewData: "2"}, resp: EtcdResponse{Revision: 1, TxnSucceeded: false}, failure: true}, + {req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "1", TxnNewData: "2"}, resp: EtcdResponse{Revision: 2, TxnSucceeded: true}}, + {req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "1", Revision: 1}, failure: true}, + {req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "1", Revision: 2}, failure: true}, + {req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "2", Revision: 1}, failure: true}, + {req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "2", Revision: 2}}, + }, + }, + { + name: "Txn can expect on empty key", + operations: []testOperation{ + {req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{Revision: 1}}, + {req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "", TxnNewData: "2"}, resp: EtcdResponse{Revision: 2, TxnSucceeded: true}}, + }, + }, + { + name: "Txn doesn't do anything if value doesn't match expected", + operations: []testOperation{ + {req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "1", Revision: 1}}, + {req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "2", TxnNewData: "3"}, resp: EtcdResponse{Revision: 2, TxnSucceeded: true}, failure: true}, + {req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "2", TxnNewData: "3"}, resp: EtcdResponse{Revision: 1, TxnSucceeded: true}, failure: true}, + {req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "2", TxnNewData: "3"}, resp: EtcdResponse{Revision: 2, TxnSucceeded: false}, failure: true}, + {req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "2", TxnNewData: "3"}, resp: EtcdResponse{Revision: 1, TxnSucceeded: false}}, + {req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "2", Revision: 1}, failure: true}, + {req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "2", Revision: 2}, failure: true}, + {req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "3", Revision: 1}, failure: true}, + {req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "3", Revision: 2}, failure: true}, + {req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "1", Revision: 1}}, + }, + }, + { + name: "Txn can fail and be lost before get", + operations: []testOperation{ + {req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "1", Revision: 1}}, + {req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "1", TxnNewData: "2"}, resp: EtcdResponse{Err: errors.New("failed")}}, + {req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "1", Revision: 1}}, + {req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{Revision: 2, GetData: "2"}, failure: true}, + }, + }, + { + name: "Txn can fail and be lost before delete", + operations: []testOperation{ + {req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "1", Revision: 1}}, + {req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "1", TxnNewData: "2"}, resp: EtcdResponse{Err: errors.New("failed")}}, + {req: EtcdRequest{Op: Delete, Key: "key"}, resp: EtcdResponse{Deleted: 1, Revision: 2}}, + }, + }, + { + name: "Txn can fail and be lost before put", + operations: []testOperation{ + {req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "1", Revision: 1}}, + {req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "1", TxnNewData: "2"}, resp: EtcdResponse{Err: errors.New("failed")}}, + {req: EtcdRequest{Op: Put, Key: "key", PutData: "3"}, resp: EtcdResponse{Revision: 2}}, + }, + }, + { + name: "Txn can fail but be persisted before get", + operations: []testOperation{ + // One failed request, one persisted. + {req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "1", Revision: 1}}, + {req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "1", TxnNewData: "2"}, resp: EtcdResponse{Err: errors.New("failed")}}, + {req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{Revision: 1, GetData: "2"}, failure: true}, + {req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{Revision: 2, GetData: "2"}}, + // Two failed request, two persisted. + {req: EtcdRequest{Op: Put, Key: "key", PutData: "3"}, resp: EtcdResponse{Revision: 3}}, + {req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "3", TxnNewData: "4"}, resp: EtcdResponse{Err: errors.New("failed")}}, + {req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "4", TxnNewData: "5"}, resp: EtcdResponse{Err: errors.New("failed")}}, + {req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{Revision: 5, GetData: "5"}}, + }, + }, + { + name: "Txn can fail but be persisted before put", + operations: []testOperation{ + // One failed request, one persisted. + {req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "1", Revision: 1}}, + {req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "1", TxnNewData: "2"}, resp: EtcdResponse{Err: errors.New("failed")}}, + {req: EtcdRequest{Op: Put, Key: "key", PutData: "3"}, resp: EtcdResponse{Revision: 3}}, + // Two failed request, two persisted. + {req: EtcdRequest{Op: Put, Key: "key", PutData: "4"}, resp: EtcdResponse{Revision: 4}}, + {req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "4", TxnNewData: "5"}, resp: EtcdResponse{Err: errors.New("failed")}}, + {req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "5", TxnNewData: "6"}, resp: EtcdResponse{Err: errors.New("failed")}}, + {req: EtcdRequest{Op: Put, Key: "key", PutData: "7"}, resp: EtcdResponse{Revision: 7}}, + }, + }, + { + name: "Txn can fail but be persisted before delete", + operations: []testOperation{ + // One failed request, one persisted. + {req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "1", Revision: 1}}, + {req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "1", TxnNewData: "2"}, resp: EtcdResponse{Err: errors.New("failed")}}, + {req: EtcdRequest{Op: Delete, Key: "key"}, resp: EtcdResponse{Deleted: 1, Revision: 3}}, + // Two failed request, two persisted. + {req: EtcdRequest{Op: Put, Key: "key", PutData: "4"}, resp: EtcdResponse{Revision: 4}}, + {req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "4", TxnNewData: "5"}, resp: EtcdResponse{Err: errors.New("failed")}}, + {req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "5", TxnNewData: "6"}, resp: EtcdResponse{Err: errors.New("failed")}}, + {req: EtcdRequest{Op: Delete, Key: "key"}, resp: EtcdResponse{Deleted: 1, Revision: 7}}, + }, + }, + { + name: "Txn can fail but be persisted before txn", + operations: []testOperation{ + // One failed request, one persisted with success. + {req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{GetData: "1", Revision: 1}}, + {req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "1", TxnNewData: "2"}, resp: EtcdResponse{Err: errors.New("failed")}}, + {req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "2", TxnNewData: "3"}, resp: EtcdResponse{Revision: 3, TxnSucceeded: true}}, + // Two failed request, two persisted with success. + {req: EtcdRequest{Op: Put, Key: "key", PutData: "4"}, resp: EtcdResponse{Revision: 4}}, + {req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "4", TxnNewData: "5"}, resp: EtcdResponse{Err: errors.New("failed")}}, + {req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "5", TxnNewData: "6"}, resp: EtcdResponse{Err: errors.New("failed")}}, + {req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "6", TxnNewData: "7"}, resp: EtcdResponse{Revision: 7, TxnSucceeded: true}}, + // One failed request, one persisted with failure. + {req: EtcdRequest{Op: Put, Key: "key", PutData: "8"}, resp: EtcdResponse{Revision: 8}}, + {req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "8", TxnNewData: "9"}, resp: EtcdResponse{Err: errors.New("failed")}}, + {req: EtcdRequest{Op: Txn, Key: "key", TxnExpectData: "8", TxnNewData: "10"}, resp: EtcdResponse{Revision: 9}}, + }, + }, } for _, tc := range tcs { t.Run(tc.name, func(t *testing.T) { diff --git a/tests/linearizability/traffic.go b/tests/linearizability/traffic.go index 413f3b8e2e53..ecae608cc776 100644 --- a/tests/linearizability/traffic.go +++ b/tests/linearizability/traffic.go @@ -20,11 +20,12 @@ import ( "math/rand" "time" + "go.etcd.io/etcd/api/v3/mvccpb" "golang.org/x/time/rate" ) var ( - DefaultTraffic Traffic = readWriteSingleKey{key: "key", writes: []opChance{{operation: Put, chance: 90}, {operation: Delete, chance: 10}}} + DefaultTraffic Traffic = readWriteSingleKey{key: "key", writes: []opChance{{operation: Put, chance: 90}, {operation: Delete, chance: 5}, {operation: Txn, chance: 5}}} ) type Traffic interface { @@ -50,27 +51,27 @@ func (t readWriteSingleKey) Run(ctx context.Context, c *recordingClient, limiter default: } // Execute one read per one write to avoid operation history include too many failed writes when etcd is down. - err := t.Read(ctx, c, limiter) + resp, err := t.Read(ctx, c, limiter) if err != nil { continue } // Provide each write with unique id to make it easier to validate operation history. - t.Write(ctx, c, limiter, ids.RequestId()) + t.Write(ctx, c, limiter, ids.RequestId(), resp) } } -func (t readWriteSingleKey) Read(ctx context.Context, c *recordingClient, limiter *rate.Limiter) error { - getCtx, cancel := context.WithTimeout(ctx, 20*time.Millisecond) - err := c.Get(getCtx, t.key) +func (t readWriteSingleKey) Read(ctx context.Context, c *recordingClient, limiter *rate.Limiter) ([]*mvccpb.KeyValue, error) { + getCtx, cancel := context.WithTimeout(ctx, 100*time.Millisecond) + resp, err := c.Get(getCtx, t.key) cancel() if err == nil { limiter.Wait(ctx) } - return err + return resp, err } -func (t readWriteSingleKey) Write(ctx context.Context, c *recordingClient, limiter *rate.Limiter, id int) error { - putCtx, cancel := context.WithTimeout(ctx, 20*time.Millisecond) +func (t readWriteSingleKey) Write(ctx context.Context, c *recordingClient, limiter *rate.Limiter, id int, kvs []*mvccpb.KeyValue) error { + putCtx, cancel := context.WithTimeout(ctx, 100*time.Millisecond) var err error switch t.pickWriteOperation() { @@ -78,6 +79,12 @@ func (t readWriteSingleKey) Write(ctx context.Context, c *recordingClient, limit err = c.Put(putCtx, t.key, fmt.Sprintf("%d", id)) case Delete: err = c.Delete(putCtx, t.key) + case Txn: + var value string + if len(kvs) != 0 { + value = string(kvs[0].Value) + } + err = c.Txn(putCtx, t.key, value, fmt.Sprintf("%d", id)) default: panic("invalid operation") }