Skip to content

Commit

Permalink
Merge pull request #15180 from serathius/linearizability-large-put
Browse files Browse the repository at this point in the history
tests: Implement LargePut requests
  • Loading branch information
serathius authored Jan 24, 2023
2 parents 91ec368 + ef0bdbe commit 3b612ce
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 39 deletions.
36 changes: 24 additions & 12 deletions tests/linearizability/linearizability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,24 +47,36 @@ var (
minimalQPS: 100,
maximalQPS: 200,
clientCount: 8,
traffic: readWriteSingleKey{keyCount: 4, leaseTTL: DefaultLeaseTTL, writes: []requestChance{
{operation: Put, chance: 50},
{operation: Delete, chance: 10},
{operation: PutWithLease, chance: 10},
{operation: LeaseRevoke, chance: 10},
{operation: CompareAndSet, chance: 10},
{operation: Defragment, chance: 10},
}},
traffic: traffic{
keyCount: 4,
leaseTTL: DefaultLeaseTTL,
largePutSize: 32769,
writes: []requestChance{
{operation: Put, chance: 50},
{operation: LargePut, chance: 5},
{operation: Delete, chance: 10},
{operation: PutWithLease, chance: 10},
{operation: LeaseRevoke, chance: 10},
{operation: CompareAndSet, chance: 10},
{operation: Defragment, chance: 5},
},
},
}
HighTraffic = trafficConfig{
name: "HighTraffic",
minimalQPS: 200,
maximalQPS: 1000,
clientCount: 12,
traffic: readWriteSingleKey{keyCount: 4, leaseTTL: DefaultLeaseTTL, writes: []requestChance{
{operation: Put, chance: 90},
{operation: Defragment, chance: 10},
}},
traffic: traffic{
keyCount: 4,
largePutSize: 32769,
leaseTTL: DefaultLeaseTTL,
writes: []requestChance{
{operation: Put, chance: 90},
{operation: LargePut, chance: 5},
{operation: Defragment, chance: 5},
},
},
}
defaultTraffic = LowTraffic
trafficList = []trafficConfig{
Expand Down
8 changes: 4 additions & 4 deletions tests/linearizability/model/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func getRequest(key string) EtcdRequest {
}

func getResponse(value string, revision int64) EtcdResponse {
return EtcdResponse{Txn: &TxnResponse{OpsResult: []EtcdOperationResult{{Value: value}}}, Revision: revision}
return EtcdResponse{Txn: &TxnResponse{OpsResult: []EtcdOperationResult{{Value: ToValueOrHash(value)}}}, Revision: revision}
}

func failedResponse(err error) EtcdResponse {
Expand All @@ -225,7 +225,7 @@ func unknownResponse(revision int64) EtcdResponse {
}

func putRequest(key, value string) EtcdRequest {
return EtcdRequest{Type: Txn, Txn: &TxnRequest{Ops: []EtcdOperation{{Type: Put, Key: key, Value: value}}}}
return EtcdRequest{Type: Txn, Txn: &TxnRequest{Ops: []EtcdOperation{{Type: Put, Key: key, Value: ToValueOrHash(value)}}}}
}

func putResponse(revision int64) EtcdResponse {
Expand All @@ -241,7 +241,7 @@ func deleteResponse(deleted int64, revision int64) EtcdResponse {
}

func txnRequest(key, expectValue, newValue string) EtcdRequest {
return EtcdRequest{Type: Txn, Txn: &TxnRequest{Conds: []EtcdCondition{{Key: key, ExpectedValue: expectValue}}, Ops: []EtcdOperation{{Type: Put, Key: key, Value: newValue}}}}
return EtcdRequest{Type: Txn, Txn: &TxnRequest{Conds: []EtcdCondition{{Key: key, ExpectedValue: ToValueOrHash(expectValue)}}, Ops: []EtcdOperation{{Type: Put, Key: key, Value: ToValueOrHash(newValue)}}}}
}

func txnResponse(succeeded bool, revision int64) EtcdResponse {
Expand All @@ -253,7 +253,7 @@ func txnResponse(succeeded bool, revision int64) EtcdResponse {
}

func putWithLeaseRequest(key, value string, leaseID int64) EtcdRequest {
return EtcdRequest{Type: Txn, Txn: &TxnRequest{Ops: []EtcdOperation{{Type: Put, Key: key, Value: value, LeaseID: leaseID}}}}
return EtcdRequest{Type: Txn, Txn: &TxnRequest{Ops: []EtcdOperation{{Type: Put, Key: key, Value: ToValueOrHash(value), LeaseID: leaseID}}}}
}

func leaseGrantRequest(leaseID int64) EtcdRequest {
Expand Down
53 changes: 39 additions & 14 deletions tests/linearizability/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"encoding/json"
"fmt"
"github.com/anishathalye/porcupine"
"hash/fnv"
"reflect"
"strings"
)
Expand Down Expand Up @@ -76,13 +77,13 @@ type TxnRequest struct {

type EtcdCondition struct {
Key string
ExpectedValue string
ExpectedValue ValueOrHash
}

type EtcdOperation struct {
Type OperationType
Key string
Value string
Value ValueOrHash
LeaseID int64
}

Expand Down Expand Up @@ -120,7 +121,7 @@ func Match(r1, r2 EtcdResponse) bool {
}

type EtcdOperationResult struct {
Value string
Value ValueOrHash
Deleted int64
}

Expand All @@ -134,11 +135,28 @@ type PossibleStates []EtcdState

type EtcdState struct {
Revision int64
KeyValues map[string]string
KeyValues map[string]ValueOrHash
KeyLeases map[string]int64
Leases map[int64]EtcdLease
}

type ValueOrHash struct {
Value string
Hash uint32
}

func ToValueOrHash(value string) ValueOrHash {
v := ValueOrHash{}
if len(value) < 20 {
v.Value = value
} else {
h := fnv.New32a()
h.Write([]byte(value))
v.Hash = h.Sum32()
}
return v
}

func describeEtcdRequestResponse(request EtcdRequest, response EtcdResponse) string {
return fmt.Sprintf("%s -> %s", describeEtcdRequest(request), describeEtcdResponse(request, response))
}
Expand Down Expand Up @@ -181,7 +199,7 @@ func describeEtcdRequest(request EtcdRequest) string {
func describeEtcdConditions(conds []EtcdCondition) string {
opsDescription := make([]string, len(conds))
for i := range conds {
opsDescription[i] = fmt.Sprintf("%s==%q", conds[i].Key, conds[i].ExpectedValue)
opsDescription[i] = fmt.Sprintf("%s==%s", conds[i].Key, describeValueOrHash(conds[i].ExpectedValue))
}
return strings.Join(opsDescription, " && ")
}
Expand Down Expand Up @@ -211,9 +229,9 @@ func describeEtcdOperation(op EtcdOperation) string {
return fmt.Sprintf("get(%q)", op.Key)
case Put:
if op.LeaseID != 0 {
return fmt.Sprintf("put(%q, %q, %d)", op.Key, op.Value, op.LeaseID)
return fmt.Sprintf("put(%q, %s, %d)", op.Key, describeValueOrHash(op.Value), op.LeaseID)
}
return fmt.Sprintf("put(%q, %q, nil)", op.Key, op.Value)
return fmt.Sprintf("put(%q, %s, nil)", op.Key, describeValueOrHash(op.Value))
case Delete:
return fmt.Sprintf("delete(%q)", op.Key)
default:
Expand All @@ -224,10 +242,7 @@ func describeEtcdOperation(op EtcdOperation) string {
func describeEtcdOperationResponse(op OperationType, resp EtcdOperationResult) string {
switch op {
case Get:
if resp.Value == "" {
return "nil"
}
return fmt.Sprintf("%q", resp.Value)
return describeValueOrHash(resp.Value)
case Put:
return fmt.Sprintf("ok")
case Delete:
Expand All @@ -237,6 +252,16 @@ func describeEtcdOperationResponse(op OperationType, resp EtcdOperationResult) s
}
}

func describeValueOrHash(value ValueOrHash) string {
if value.Hash != 0 {
return fmt.Sprintf("hash: %d", value.Hash)
}
if value.Value == "" {
return "nil"
}
return fmt.Sprintf("%q", value.Value)
}

func step(states PossibleStates, request EtcdRequest, response EtcdResponse) (bool, PossibleStates) {
if len(states) == 0 {
// states were not initialized
Expand All @@ -257,7 +282,7 @@ func step(states PossibleStates, request EtcdRequest, response EtcdResponse) (bo
func initState(request EtcdRequest, response EtcdResponse) EtcdState {
state := EtcdState{
Revision: response.Revision,
KeyValues: map[string]string{},
KeyValues: map[string]ValueOrHash{},
KeyLeases: map[string]int64{},
Leases: map[int64]EtcdLease{},
}
Expand All @@ -270,7 +295,7 @@ func initState(request EtcdRequest, response EtcdResponse) EtcdState {
opResp := response.Txn.OpsResult[i]
switch op.Type {
case Get:
if opResp.Value != "" {
if opResp.Value.Value != "" && opResp.Value.Hash == 0 {
state.KeyValues[op.Key] = opResp.Value
}
case Put:
Expand Down Expand Up @@ -319,7 +344,7 @@ func applyRequest(states PossibleStates, request EtcdRequest, response EtcdRespo

// applyRequestToSingleState handles a successful request, returning updated state and response it would generate.
func applyRequestToSingleState(s EtcdState, request EtcdRequest) (EtcdState, EtcdResponse) {
newKVs := map[string]string{}
newKVs := map[string]ValueOrHash{}
for k, v := range s.KeyValues {
newKVs[k] = v
}
Expand Down
21 changes: 21 additions & 0 deletions tests/linearizability/model/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,17 @@ func TestModelStep(t *testing.T) {
{req: getRequest("key2"), resp: getResponse("12", 2)},
},
},
{
name: "Get response data should match large put",
operations: []testOperation{
{req: putRequest("key", "012345678901234567890"), resp: putResponse(1)},
{req: getRequest("key"), resp: getResponse("123456789012345678901", 1), failure: true},
{req: getRequest("key"), resp: getResponse("012345678901234567890", 1)},
{req: putRequest("key", "123456789012345678901"), resp: putResponse(2)},
{req: getRequest("key"), resp: getResponse("123456789012345678901", 2)},
{req: getRequest("key"), resp: getResponse("012345678901234567890", 2), failure: true},
},
},
{
name: "Put must increase revision by 1",
operations: []testOperation{
Expand Down Expand Up @@ -612,6 +623,11 @@ func TestModelDescribe(t *testing.T) {
resp: getResponse("2", 2),
expectDescribe: `get("key2") -> "2", rev: 2`,
},
{
req: getRequest("key2b"),
resp: getResponse("01234567890123456789", 2),
expectDescribe: `get("key2b") -> hash: 2945867837, rev: 2`,
},
{
req: putRequest("key3", "3"),
resp: putResponse(3),
Expand All @@ -622,6 +638,11 @@ func TestModelDescribe(t *testing.T) {
resp: putResponse(3),
expectDescribe: `put("key3b", "3b", 3) -> ok, rev: 3`,
},
{
req: putRequest("key3c", "01234567890123456789"),
resp: putResponse(3),
expectDescribe: `put("key3c", hash: 2945867837, nil) -> ok, rev: 3`,
},
{
req: putRequest("key4", "4"),
resp: failedResponse(errors.New("failed")),
Expand Down
30 changes: 22 additions & 8 deletions tests/linearizability/traffic.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"math/rand"
"strings"
"time"

"golang.org/x/time/rate"
Expand All @@ -36,6 +37,7 @@ type TrafficRequestType string
const (
Get TrafficRequestType = "get"
Put TrafficRequestType = "put"
LargePut TrafficRequestType = "largePut"
Delete TrafficRequestType = "delete"
PutWithLease TrafficRequestType = "putWithLease"
LeaseRevoke TrafficRequestType = "leaseRevoke"
Expand All @@ -47,18 +49,19 @@ type Traffic interface {
Run(ctx context.Context, clientId int, c *recordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage)
}

type readWriteSingleKey struct {
keyCount int
writes []requestChance
leaseTTL int64
type traffic struct {
keyCount int
writes []requestChance
leaseTTL int64
largePutSize int
}

type requestChance struct {
operation TrafficRequestType
chance int
}

func (t readWriteSingleKey) Run(ctx context.Context, clientId int, c *recordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage) {
func (t traffic) Run(ctx context.Context, clientId int, c *recordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage) {

for {
select {
Expand All @@ -77,7 +80,7 @@ func (t readWriteSingleKey) Run(ctx context.Context, clientId int, c *recordingC
}
}

func (t readWriteSingleKey) Read(ctx context.Context, c *recordingClient, limiter *rate.Limiter, key string) ([]*mvccpb.KeyValue, error) {
func (t traffic) Read(ctx context.Context, c *recordingClient, limiter *rate.Limiter, key string) ([]*mvccpb.KeyValue, error) {
getCtx, cancel := context.WithTimeout(ctx, RequestTimeout)
resp, err := c.Get(getCtx, key)
cancel()
Expand All @@ -87,13 +90,15 @@ func (t readWriteSingleKey) Read(ctx context.Context, c *recordingClient, limite
return resp, err
}

func (t readWriteSingleKey) Write(ctx context.Context, c *recordingClient, limiter *rate.Limiter, key string, newValue string, lm identity.LeaseIdStorage, cid int, lastValues []*mvccpb.KeyValue) error {
func (t traffic) Write(ctx context.Context, c *recordingClient, limiter *rate.Limiter, key string, newValue string, lm identity.LeaseIdStorage, cid int, lastValues []*mvccpb.KeyValue) error {
writeCtx, cancel := context.WithTimeout(ctx, RequestTimeout)

var err error
switch t.pickWriteRequest() {
case Put:
err = c.Put(writeCtx, key, newValue)
case LargePut:
err = c.Put(writeCtx, key, randString(t.largePutSize))
case Delete:
err = c.Delete(writeCtx, key)
case CompareAndSet:
Expand Down Expand Up @@ -137,7 +142,7 @@ func (t readWriteSingleKey) Write(ctx context.Context, c *recordingClient, limit
return err
}

func (t readWriteSingleKey) pickWriteRequest() TrafficRequestType {
func (t traffic) pickWriteRequest() TrafficRequestType {
sum := 0
for _, op := range t.writes {
sum += op.chance
Expand All @@ -151,3 +156,12 @@ func (t readWriteSingleKey) pickWriteRequest() TrafficRequestType {
}
panic("unexpected")
}

func randString(size int) string {
data := strings.Builder{}
data.Grow(size)
for i := 0; i < size; i++ {
data.WriteByte(byte(int('a') + rand.Intn(26)))
}
return data.String()
}
2 changes: 1 addition & 1 deletion tests/linearizability/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func collectMemberWatchEvents(ctx context.Context, t *testing.T, c *clientv3.Cli
Op: model.EtcdOperation{
Type: op,
Key: string(event.Kv.Key),
Value: string(event.Kv.Value),
Value: model.ToValueOrHash(string(event.Kv.Value)),
},
})
}
Expand Down

0 comments on commit 3b612ce

Please sign in to comment.