Skip to content

Commit

Permalink
Fix watch test
Browse files Browse the repository at this point in the history
  • Loading branch information
marco6 committed Sep 25, 2024
1 parent 522785f commit 8a31c94
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 161 deletions.
5 changes: 4 additions & 1 deletion pkg/kine/drivers/generic/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,10 @@ func Open(ctx context.Context, driverName, dataSourceName string, connPoolConfig
? AS name,
0 AS created,
0 AS deleted,
create_revision,
CASE
WHEN kine.created THEN id
ELSE create_revision
END AS create_revision,
id AS prev_revision,
? AS lease,
? AS value,
Expand Down
4 changes: 4 additions & 0 deletions pkg/kine/logstructured/sqllog/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,10 @@ func scan(rows *sql.Rows, event *server.Event) error {
if event.Create {
event.KV.CreateRevision = event.KV.ModRevision
event.PrevKV = nil
} else {
event.PrevKV.Key = event.KV.Key
event.PrevKV.CreateRevision = event.KV.CreateRevision
event.PrevKV.Lease = event.KV.Lease
}

return nil
Expand Down
1 change: 1 addition & 0 deletions pkg/kine/server/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func toEvent(event *Event) *mvccpb.Event {
}
if event.Delete {
e.Type = mvccpb.DELETE
e.Kv.Value = nil
} else {
e.Type = mvccpb.PUT
}
Expand Down
295 changes: 135 additions & 160 deletions test/watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,21 @@ import (

"github.com/canonical/k8s-dqlite/pkg/kine/endpoint"
. "github.com/onsi/gomega"
"github.com/onsi/gomega/types"
clientv3 "go.etcd.io/etcd/client/v3"
)

const (
// testWatchEventPollTimeout is the timeout for waiting to receive an event.
testWatchEventPollTimeout = 50 * time.Millisecond

// testWatchEventIdleTimeout is the amount of time to wait to ensure that no events
// are received when they should not.
testWatchEventIdleTimeout = 100 * time.Millisecond
)

// TestWatch is unit testing for the Watch operation.
func TestWatch(t *testing.T) {
var (
revAfterCreate int64
revAfterUpdate int64
revAfterDelete int64

key = "testKey"
value = "testValue"
updatedValue = "testUpdatedValue"
const (
// pollTimeout is the timeout for waiting to receive an event.
pollTimeout = 50 * time.Millisecond

// idleTimeout is the amount of time to wait to ensure that no events
// are received when they should not.
idleTimeout = 100 * time.Millisecond
)

for _, backendType := range []string{endpoint.SQLiteBackend, endpoint.DQLiteBackend} {
t.Run(backendType, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -38,171 +30,154 @@ func TestWatch(t *testing.T) {
kine := newKineServer(ctx, t, &kineOptions{backendType: backendType})

// start watching for events on key
watchCh := kine.client.Watch(ctx, key)
const prefix = "test/"
watchCh := kine.client.Watch(ctx, prefix)

t.Run("ReceiveNothingUntilActivity", func(t *testing.T) {
g := NewWithT(t)
g.Consistently(watchCh, testWatchEventIdleTimeout).ShouldNot(Receive())
g.Consistently(watchCh, idleTimeout).ShouldNot(Receive())
})

t.Run("Create", func(t *testing.T) {
g := NewWithT(t)

// create a key
createKey(ctx, g, kine.client, key, value)

// receive event
t.Run("Receive", func(t *testing.T) {
g := NewWithT(t)
g.Eventually(watchCh, testWatchEventPollTimeout).Should(Receive(Satisfy(func(v clientv3.WatchResponse) bool {
g.Expect(v.Events).To(HaveLen(1))
g.Expect(v.Events[0].Type).To(Equal(clientv3.EventTypePut))
g.Expect(v.Events[0].PrevKv).To(BeNil())
g.Expect(v.Events[0].Kv.Key).To(Equal([]byte(key)))
g.Expect(v.Events[0].Kv.Value).To(Equal([]byte(value)))
g.Expect(v.Events[0].Kv.Version).To(Equal(int64(0)))

revAfterCreate = v.Events[0].Kv.ModRevision

return true
})))
})

t.Run("ReceiveNothingUntilNewActivity", func(t *testing.T) {
g := NewWithT(t)
g.Consistently(watchCh, testWatchEventIdleTimeout).ShouldNot(Receive())
})
key := prefix + "createdKey"
value := "testValue"
rev := createKey(ctx, g, kine.client, key, value)

g.Eventually(watchCh, pollTimeout).Should(ReceiveEvents(g,
CreateEvent(g, key, value, rev),
))
g.Consistently(watchCh, idleTimeout).ShouldNot(Receive())
})

t.Run("Update", func(t *testing.T) {
g := NewWithT(t)

// update key
{
resp, err := kine.client.Txn(ctx).
If(clientv3.Compare(clientv3.ModRevision(key), "=", revAfterCreate)).
Then(clientv3.OpPut(key, string(updatedValue))).
Else(clientv3.OpGet(key)).
Commit()

g.Expect(err).To(BeNil())
g.Expect(resp.Succeeded).To(BeTrue())
}

t.Run("Receive", func(t *testing.T) {
g := NewWithT(t)

// receive event
g.Eventually(watchCh, testWatchEventPollTimeout).Should(Receive(Satisfy(func(v clientv3.WatchResponse) bool {
g.Expect(v.Events).To(HaveLen(1))
g.Expect(v.Events[0].Type).To(Equal(clientv3.EventTypePut))
g.Expect(v.Events[0].PrevKv).NotTo(BeNil())
g.Expect(v.Events[0].PrevKv.Value).To(Equal([]byte(value)))
g.Expect(v.Events[0].PrevKv.ModRevision).To(Equal(revAfterCreate))

g.Expect(v.Events[0].Kv.Key).To(Equal([]byte(key)))
g.Expect(v.Events[0].Kv.Value).To(Equal([]byte(updatedValue)))
g.Expect(v.Events[0].Kv.Version).To(Equal(int64(0)))
g.Expect(v.Events[0].Kv.ModRevision).To(BeNumerically(">", revAfterCreate))

revAfterUpdate = v.Events[0].Kv.ModRevision

return true
})))
})

t.Run("ReceiveNothingUntilNewActivity", func(t *testing.T) {
g := NewWithT(t)
g.Consistently(watchCh, testWatchEventIdleTimeout).ShouldNot(Receive())
})
key := prefix + "updatedKey"
createValue := "testValue1"
createRev := createKey(ctx, g, kine.client, key, createValue)
g.Eventually(watchCh, pollTimeout).Should(ReceiveEvents(g,
CreateEvent(g, key, createValue, createRev),
))

updateValue := "testValue2"
updateRev := updateRev(ctx, g, kine.client, key, createRev, updateValue)
g.Eventually(watchCh, pollTimeout).Should(ReceiveEvents(g,
UpdateEvent(g, key, createValue, updateValue, createRev, updateRev),
))

g.Consistently(watchCh, idleTimeout).ShouldNot(Receive())
})

t.Run("Delete", func(t *testing.T) {
g := NewWithT(t)

// delete key
{
resp, err := kine.client.Txn(ctx).
If(clientv3.Compare(clientv3.ModRevision(key), "=", revAfterUpdate)).
Then(clientv3.OpDelete(key)).
Else(clientv3.OpGet(key)).
Commit()

g.Expect(err).To(BeNil())
g.Expect(resp.Succeeded).To(BeTrue())
}

t.Run("Receive", func(t *testing.T) {
g := NewWithT(t)

// receive event
g.Eventually(watchCh, testWatchEventPollTimeout).Should(Receive(Satisfy(func(v clientv3.WatchResponse) bool {
g.Expect(v.Events).To(HaveLen(1))
g.Expect(v.Events[0].Type).To(Equal(clientv3.EventTypeDelete))
g.Expect(v.Events[0].PrevKv).NotTo(BeNil())
g.Expect(v.Events[0].PrevKv.Value).To(Equal([]byte(updatedValue)))
g.Expect(v.Events[0].PrevKv.ModRevision).To(Equal(revAfterUpdate))

g.Expect(v.Events[0].Kv).NotTo(BeNil())
g.Expect(v.Events[0].Kv.Key).To(Equal([]byte(key)))
g.Expect(v.Events[0].Kv.Value).To(Equal([]byte(updatedValue)))
g.Expect(v.Events[0].Kv.Version).To(Equal(int64(0)))
g.Expect(v.Events[0].Kv.ModRevision).To(BeNumerically(">", revAfterUpdate))

revAfterDelete = v.Events[0].Kv.ModRevision

return true
})))
})

t.Run("ReceiveNothingUntilNewActivity", func(t *testing.T) {
g := NewWithT(t)
g.Consistently(watchCh, testWatchEventIdleTimeout).ShouldNot(Receive())
})
key := prefix + "deletedKey"
createValue := "testValue"
createRev := createKey(ctx, g, kine.client, key, createValue)
g.Eventually(watchCh, pollTimeout).Should(ReceiveEvents(g,
CreateEvent(g, key, createValue, createRev),
))

deleteRev := deleteKey(ctx, g, kine.client, key, createRev)
g.Eventually(watchCh, pollTimeout).Should(ReceiveEvents(g,
DeleteEvent(g, key, createValue, createRev, deleteRev),
))

g.Consistently(watchCh, idleTimeout).ShouldNot(Receive())
})

t.Run("StartRevision", func(t *testing.T) {
watchAfterDeleteCh := kine.client.Watch(ctx, key, clientv3.WithRev(revAfterUpdate))

t.Run("Receive", func(t *testing.T) {
g := NewWithT(t)

g.Eventually(watchAfterDeleteCh, testWatchEventPollTimeout).Should(Receive(Satisfy(func(v clientv3.WatchResponse) bool {
// receive 2 events
g.Expect(v.Events).To(HaveLen(2))

// receive update event
g.Expect(v.Events[0].Type).To(Equal(clientv3.EventTypePut))
g.Expect(v.Events[0].PrevKv).NotTo(BeNil())
g.Expect(v.Events[0].PrevKv.Value).To(Equal([]byte(value)))
g.Expect(v.Events[0].PrevKv.ModRevision).To(Equal(revAfterCreate))

g.Expect(v.Events[0].Kv.Key).To(Equal([]byte(key)))
g.Expect(v.Events[0].Kv.Value).To(Equal([]byte(updatedValue)))
g.Expect(v.Events[0].Kv.Version).To(Equal(int64(0)))
g.Expect(v.Events[0].Kv.ModRevision).To(Equal(revAfterUpdate))

// receive delete event
g.Expect(v.Events[1].Type).To(Equal(clientv3.EventTypeDelete))
g.Expect(v.Events[1].PrevKv).NotTo(BeNil())
g.Expect(v.Events[1].PrevKv.Value).To(Equal([]byte(updatedValue)))
g.Expect(v.Events[1].PrevKv.ModRevision).To(Equal(revAfterUpdate))

g.Expect(v.Events[1].Kv).NotTo(BeNil())
g.Expect(v.Events[1].Kv.Key).To(Equal([]byte(key)))
g.Expect(v.Events[1].Kv.Value).To(Equal([]byte(updatedValue)))
g.Expect(v.Events[1].Kv.Version).To(Equal(int64(0)))
g.Expect(v.Events[1].Kv.ModRevision).To(Equal(revAfterDelete))

return true
})))
})

t.Run("OtherWatcherIdle", func(t *testing.T) {
g := NewWithT(t)
g.Consistently(watchCh, testWatchEventIdleTimeout).ShouldNot(Receive())
})
ctx, cancel := context.WithCancel(ctx)
defer cancel()
g := NewWithT(t)

key := prefix + "revisionKey"
createValue := "testValue1"
createRev := createKey(ctx, g, kine.client, key, createValue)

updateValue := "testValue2"
updateRev := updateRev(ctx, g, kine.client, key, createRev, updateValue)

deleteRev := deleteKey(ctx, g, kine.client, key, updateRev)

watchCh := kine.client.Watch(ctx, key, clientv3.WithRev(createRev))
g.Eventually(watchCh, pollTimeout).Should(ReceiveEvents(g,
CreateEvent(g, key, createValue, createRev),
UpdateEvent(g, key, createValue, updateValue, createRev, updateRev),
DeleteEvent(g, key, updateValue, updateRev, deleteRev),
))

g.Consistently(watchCh, idleTimeout).ShouldNot(Receive())
})
})
}
}

type EventMatcher func(*clientv3.Event) bool

func ReceiveEvents(g Gomega, checks ...EventMatcher) types.GomegaMatcher {
return Receive(Satisfy(func(watch clientv3.WatchResponse) bool {
ok := g.Expect(watch.Events).To(HaveLen(len(checks)))
for i := range checks {
event := watch.Events[i]
check := checks[i]
ok = check(event) && ok
}
return ok
}))
}

func CreateEvent(g Gomega, key, value string, revision int64) EventMatcher {
return func(event *clientv3.Event) bool {
ok := g.Expect(event.Type).To(Equal(clientv3.EventTypePut))
ok = g.Expect(event.Kv.Key).To(Equal([]byte(key))) && ok
ok = g.Expect(event.Kv.ModRevision).To(Equal(revision)) && ok
ok = g.Expect(event.Kv.CreateRevision).To(Equal(revision)) && ok
ok = g.Expect(event.Kv.Value).To(Equal([]byte(value))) && ok
ok = g.Expect(event.Kv.Version).To(Equal(int64(0))) && ok
ok = g.Expect(event.PrevKv).To(BeNil()) && ok
return ok
}
}

func UpdateEvent(g Gomega, key, prevValue, value string, prevRevision, updateRevision int64) EventMatcher {
return func(event *clientv3.Event) bool {
ok := g.Expect(event.Type).To(Equal(clientv3.EventTypePut))
ok = g.Expect(event.Kv.Key).To(Equal([]byte(key))) && ok
ok = g.Expect(event.Kv.ModRevision).To(Equal(updateRevision)) && ok
ok = g.Expect(event.Kv.CreateRevision).To(BeNumerically("<=", prevRevision)) && ok
ok = g.Expect(event.Kv.Value).To(Equal([]byte(value))) && ok
ok = g.Expect(event.Kv.Version).To(Equal(int64(0))) && ok

ok = g.Expect(event.PrevKv).NotTo(BeNil()) && ok
ok = g.Expect(event.PrevKv.Key).To(Equal([]byte(key))) && ok
ok = g.Expect(event.PrevKv.ModRevision).To(Equal(prevRevision)) && ok
ok = g.Expect(event.PrevKv.CreateRevision).To(Equal(event.Kv.CreateRevision)) && ok
ok = g.Expect(event.PrevKv.Value).To(Equal([]byte(prevValue))) && ok
ok = g.Expect(event.PrevKv.Version).To(Equal(int64(0))) && ok

return ok
}
}

func DeleteEvent(g Gomega, key, prevValue string, prevRevision, deleteRevision int64) EventMatcher {
return func(event *clientv3.Event) bool {
ok := g.Expect(event.Type).To(Equal(clientv3.EventTypeDelete))
ok = g.Expect(event.Kv.Key).To(Equal([]byte(key))) && ok
ok = g.Expect(event.Kv.ModRevision).To(Equal(deleteRevision)) && ok
ok = g.Expect(event.Kv.CreateRevision).To(BeNumerically("<=", prevRevision)) && ok
ok = g.Expect(event.Kv.Value).To(BeNil()) && ok
ok = g.Expect(event.Kv.Version).To(Equal(int64(0))) && ok

ok = g.Expect(event.PrevKv).NotTo(BeNil()) && ok
ok = g.Expect(event.PrevKv.Key).To(Equal([]byte(key))) && ok
ok = g.Expect(event.PrevKv.ModRevision).To(Equal(prevRevision)) && ok
ok = g.Expect(event.PrevKv.CreateRevision).To(Equal(event.Kv.CreateRevision)) && ok
ok = g.Expect(event.PrevKv.Value).To(Equal([]byte(prevValue))) && ok
ok = g.Expect(event.PrevKv.Version).To(Equal(int64(0))) && ok

return ok
}
}

0 comments on commit 8a31c94

Please sign in to comment.