diff --git a/pkg/kine/drivers/generic/generic.go b/pkg/kine/drivers/generic/generic.go index 3f1daebe..f11c02e2 100644 --- a/pkg/kine/drivers/generic/generic.go +++ b/pkg/kine/drivers/generic/generic.go @@ -35,37 +35,27 @@ var ( ON maxkv.id = kv.id WHERE (kv.deleted = 0 OR ?) - ORDER BY kv.id ASC + ORDER BY kv.name ASC, kv.id ASC `, columns) - // FIXME this query doesn't seem sound. revisionAfterSQL = fmt.Sprintf(` - SELECT * - FROM ( - SELECT %s - FROM kine AS kv - JOIN ( - SELECT MAX(mkv.id) AS id - FROM kine AS mkv - WHERE mkv.name >= ? AND mkv.name < ? - AND mkv.id <= ? - AND mkv.id > ( - SELECT ikv.id - FROM kine AS ikv - WHERE - ikv.name = ? AND - ikv.id <= ? - ORDER BY ikv.id DESC - LIMIT 1 - ) - GROUP BY mkv.name - ) AS maxkv - ON maxkv.id = kv.id - WHERE - ? OR kv.deleted = 0 - ) AS lkv - ORDER BY lkv.theid ASC - `, columns) + SELECT * + FROM ( + SELECT %s + FROM kine AS kv + JOIN ( + SELECT MAX(mkv.id) AS id + FROM kine AS mkv + WHERE mkv.name >= ? AND mkv.name < ? + AND mkv.id <= ? + GROUP BY mkv.name + ) AS maxkv + ON maxkv.id = kv.id + WHERE + ? OR kv.deleted = 0 + ) AS lkv + ORDER BY lkv.name ASC, lkv.theid ASC + `, columns) revisionIntervalSQL = ` SELECT ( @@ -379,25 +369,31 @@ func (d *Generic) queryPrepared(ctx context.Context, txName, sql string, prepare return r, err } -func (d *Generic) CountCurrent(ctx context.Context, prefix string) (int64, int64, error) { +func (d *Generic) CountCurrent(ctx context.Context, prefix string, startKey string) (int64, int64, error) { var ( rev sql.NullInt64 id int64 ) start, end := getPrefixRange(prefix) + if startKey != "" { + start = startKey + "\x01" + } row := d.queryRowPrepared(ctx, "count_current", d.CountCurrentSQL, d.countCurrentSQLPrepared, start, end, false) err := row.Scan(&rev, &id) return rev.Int64, id, err } -func (d *Generic) Count(ctx context.Context, prefix string, revision int64) (int64, int64, error) { +func (d *Generic) Count(ctx context.Context, prefix, startKey string, revision int64) (int64, int64, error) { var ( rev sql.NullInt64 id int64 ) start, end := getPrefixRange(prefix) + if startKey != "" { + start = startKey + "\x01" + } row := d.queryRowPrepared(ctx, "count_revision", d.CountRevisionSQL, d.countRevisionSQLPrepared, start, end, revision, false) err := row.Scan(&rev, &id) return rev.Int64, id, err @@ -502,13 +498,18 @@ func (d *Generic) DeleteRevision(ctx context.Context, revision int64) error { return err } -func (d *Generic) ListCurrent(ctx context.Context, prefix string, limit int64, includeDeleted bool) (*sql.Rows, error) { +func (d *Generic) ListCurrent(ctx context.Context, prefix, startKey string, limit int64, includeDeleted bool) (*sql.Rows, error) { sql := d.GetCurrentSQL start, end := getPrefixRange(prefix) if limit > 0 { sql = fmt.Sprintf("%s LIMIT %d", sql, limit) } + // NOTE(neoaggelos): don't ignore startKey if set + if startKey != "" { + start = startKey + "\x01" + } + return d.query(ctx, "get_current_sql", sql, start, end, includeDeleted) } @@ -526,7 +527,7 @@ func (d *Generic) List(ctx context.Context, prefix, startKey string, limit, revi if limit > 0 { sql = fmt.Sprintf("%s LIMIT %d", sql, limit) } - return d.query(ctx, "get_revision_after_sql", sql, start, end, revision, startKey, revision, includeDeleted) + return d.query(ctx, "get_revision_after_sql", sql, startKey+"\x01", end, revision, includeDeleted) } func (d *Generic) CurrentRevision(ctx context.Context) (int64, error) { diff --git a/pkg/kine/logstructured/logstructured.go b/pkg/kine/logstructured/logstructured.go index 71a6478a..635d6e1a 100644 --- a/pkg/kine/logstructured/logstructured.go +++ b/pkg/kine/logstructured/logstructured.go @@ -15,7 +15,7 @@ type Log interface { List(ctx context.Context, prefix, startKey string, limit, revision int64, includeDeletes bool) (int64, []*server.Event, error) After(ctx context.Context, prefix string, revision, limit int64) (int64, []*server.Event, error) Watch(ctx context.Context, prefix string) <-chan []*server.Event - Count(ctx context.Context, prefix string, revision int64) (int64, int64, error) + Count(ctx context.Context, prefix, startKey string, revision int64) (int64, int64, error) Append(ctx context.Context, event *server.Event) (int64, error) DbSize(ctx context.Context) (int64, error) DoCompact() error @@ -189,11 +189,11 @@ func (l *LogStructured) List(ctx context.Context, prefix, startKey string, limit return rev, kvs, nil } -func (l *LogStructured) Count(ctx context.Context, prefix string, revision int64) (revRet int64, count int64, err error) { +func (l *LogStructured) Count(ctx context.Context, prefix, startKey string, revision int64) (revRet int64, count int64, err error) { defer func() { - logrus.Debugf("COUNT %s => rev=%d, count=%d, err=%v", prefix, revRet, count, err) + logrus.Debugf("COUNT prefix=%s startKey=%s => rev=%d, count=%d, err=%v", prefix, startKey, revRet, count, err) }() - rev, count, err := l.log.Count(ctx, prefix, revision) + rev, count, err := l.log.Count(ctx, prefix, startKey, revision) if err != nil { return 0, 0, err } diff --git a/pkg/kine/logstructured/sqllog/sql.go b/pkg/kine/logstructured/sqllog/sql.go index 7afbb622..23482191 100644 --- a/pkg/kine/logstructured/sqllog/sql.go +++ b/pkg/kine/logstructured/sqllog/sql.go @@ -28,10 +28,10 @@ func New(d Dialect) *SQLLog { } type Dialect interface { - ListCurrent(ctx context.Context, prefix string, limit int64, includeDeleted bool) (*sql.Rows, error) + ListCurrent(ctx context.Context, prefix, startKey string, limit int64, includeDeleted bool) (*sql.Rows, error) List(ctx context.Context, prefix, startKey string, limit, revision int64, includeDeleted bool) (*sql.Rows, error) - CountCurrent(ctx context.Context, prefix string) (int64, int64, error) - Count(ctx context.Context, prefix string, revision int64) (int64, int64, error) + CountCurrent(ctx context.Context, prefix, startKey string) (int64, int64, error) + Count(ctx context.Context, prefix, startKey string, revision int64) (int64, int64, error) CurrentRevision(ctx context.Context) (int64, error) AfterPrefix(ctx context.Context, prefix string, rev, limit int64) (*sql.Rows, error) After(ctx context.Context, rev, limit int64) (*sql.Rows, error) @@ -126,8 +126,14 @@ func (s *SQLLog) compactor(nextEnd int64) (int64, error) { end := nextEnd nextEnd = currentRev - // leave the last 1000 - end = end - 1000 + + // NOTE(neoaggelos): Ignoring the last 1000 revisions causes the following CNCF conformance test to fail. + // This is because of low activity, where the created list is part of the last 1000 revisions and is not compacted. + // Link to failing test: https://github.com/kubernetes/kubernetes/blob/f2cfbf44b1fb482671aedbfff820ae2af256a389/test/e2e/apimachinery/chunking.go#L144 + // To address this, we only ignore the last 100 revisions instead + + // end = end - 1000 + end = end - 100 savedCursor := cursor // Purposefully start at the current and redo the current as @@ -262,7 +268,7 @@ func (s *SQLLog) List(ctx context.Context, prefix, startKey string, limit, revis } if revision == 0 { - rows, err = s.d.ListCurrent(ctx, prefix, limit, includeDeleted) + rows, err = s.d.ListCurrent(ctx, prefix, startKey, limit, includeDeleted) } else { rows, err = s.d.List(ctx, prefix, startKey, limit, revision, includeDeleted) } @@ -471,12 +477,12 @@ func canSkipRevision(rev, skip int64, skipTime time.Time) bool { return rev == skip && time.Now().Sub(skipTime) > time.Second } -func (s *SQLLog) Count(ctx context.Context, prefix string, revision int64) (int64, int64, error) { +func (s *SQLLog) Count(ctx context.Context, prefix, startKey string, revision int64) (int64, int64, error) { if revision == 0 { - return s.d.CountCurrent(ctx, prefix) + return s.d.CountCurrent(ctx, prefix, startKey) } - return s.d.Count(ctx, prefix, revision) + return s.d.Count(ctx, prefix, startKey, revision) } func (s *SQLLog) Append(ctx context.Context, event *server.Event) (int64, error) { diff --git a/pkg/kine/server/list.go b/pkg/kine/server/list.go index f9b97ea8..07802060 100644 --- a/pkg/kine/server/list.go +++ b/pkg/kine/server/list.go @@ -23,7 +23,7 @@ func (l *LimitedServer) list(ctx context.Context, r *etcdserverpb.RangeRequest) revision := r.Revision if r.CountOnly { - rev, count, err := l.backend.Count(ctx, prefix, revision) + rev, count, err := l.backend.Count(ctx, prefix, start, revision) if err != nil { return nil, err } @@ -60,7 +60,7 @@ func (l *LimitedServer) list(ctx context.Context, r *etcdserverpb.RangeRequest) } // count the actual number of results if there are more items in the db. - rev, resp.Count, err = l.backend.Count(ctx, prefix, revision) + rev, resp.Count, err = l.backend.Count(ctx, prefix, start, revision) if err != nil { return nil, err } diff --git a/pkg/kine/server/types.go b/pkg/kine/server/types.go index d7a1916b..eccdd478 100644 --- a/pkg/kine/server/types.go +++ b/pkg/kine/server/types.go @@ -17,7 +17,7 @@ type Backend interface { Create(ctx context.Context, key string, value []byte, lease int64) (int64, error) Delete(ctx context.Context, key string, revision int64) (int64, *KeyValue, bool, error) List(ctx context.Context, prefix, startKey string, limit, revision int64) (int64, []*KeyValue, error) - Count(ctx context.Context, prefix string, revision int64) (int64, int64, error) + Count(ctx context.Context, prefix, startKey string, revision int64) (int64, int64, error) Update(ctx context.Context, key string, value []byte, revision, lease int64) (int64, *KeyValue, bool, error) Watch(ctx context.Context, key string, revision int64) <-chan []*Event DbSize(ctx context.Context) (int64, error) diff --git a/test/list_test.go b/test/list_test.go index 0de24a74..f5fc66ba 100644 --- a/test/list_test.go +++ b/test/list_test.go @@ -3,6 +3,7 @@ package test import ( "context" "fmt" + "math/rand" "testing" . "github.com/onsi/gomega" @@ -18,7 +19,7 @@ func TestList(t *testing.T) { g := NewWithT(t) // Create some keys - keys := []string{"/key/1", "/key/2", "/key/3"} + keys := shuffleList([]string{"/key/1", "/key/2", "/key/3", "/key/4", "/key/5"}) for _, key := range keys { resp, err := client.Txn(ctx). If(clientv3.Compare(clientv3.ModRevision(key), "=", 0)). @@ -31,20 +32,75 @@ func TestList(t *testing.T) { } t.Run("ListAll", func(t *testing.T) { + g := NewWithT(t) // Get a list of all the keys resp, err := client.Get(ctx, "/key", clientv3.WithPrefix()) g.Expect(err).To(BeNil()) - g.Expect(resp.Kvs).To(HaveLen(3)) + g.Expect(resp.Kvs).To(HaveLen(5)) g.Expect(resp.Header.Revision).ToNot(BeZero()) g.Expect(resp.Kvs[0].Key).To(Equal([]byte("/key/1"))) g.Expect(resp.Kvs[1].Key).To(Equal([]byte("/key/2"))) g.Expect(resp.Kvs[2].Key).To(Equal([]byte("/key/3"))) + g.Expect(resp.Kvs[3].Key).To(Equal([]byte("/key/4"))) + g.Expect(resp.Kvs[4].Key).To(Equal([]byte("/key/5"))) + }) + + t.Run("ListAllLimit", func(t *testing.T) { + var revision int64 + t.Run("FirstPage", func(t *testing.T) { + g := NewWithT(t) + + resp, err := client.Get(ctx, "/key", clientv3.WithPrefix(), clientv3.WithLimit(2)) + + g.Expect(err).To(BeNil()) + g.Expect(resp.Kvs).To(HaveLen(2)) + g.Expect(resp.More).To(BeTrue()) + g.Expect(resp.Count).To(Equal(int64(5))) + g.Expect(resp.Header.Revision).ToNot(BeZero()) + g.Expect(resp.Kvs[0].Key).To(Equal([]byte("/key/1"))) + g.Expect(resp.Kvs[1].Key).To(Equal([]byte("/key/2"))) + + revision = resp.Header.Revision + }) + + t.Run("SecondPage", func(t *testing.T) { + g := NewWithT(t) + + // Inspired from https://github.com/kubernetes/kubernetes/blob/3f4d3b67682335db510f85deb65b322127a3a0a1/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go#L788-L793 + // Key is "last_key" + "\x00", and we use the prefix range end + resp, err := client.Get(ctx, "/key/2\x00", clientv3.WithRange(clientv3.GetPrefixRangeEnd("/key")), clientv3.WithLimit(2), clientv3.WithRev(revision)) + + g.Expect(err).To(BeNil()) + g.Expect(resp.Kvs).To(HaveLen(2)) + g.Expect(resp.More).To(BeTrue()) + g.Expect(resp.Count).To(Equal(int64(3))) + g.Expect(resp.Header.Revision).ToNot(BeZero()) + g.Expect(resp.Kvs[0].Key).To(Equal([]byte("/key/3"))) + g.Expect(resp.Kvs[1].Key).To(Equal([]byte("/key/4"))) + + revision = resp.Header.Revision + }) + + t.Run("ThirdPage", func(t *testing.T) { + g := NewWithT(t) + + // Get a list of all the keys + resp, err := client.Get(ctx, "/key/4\x00", clientv3.WithRange(clientv3.GetPrefixRangeEnd("/key")), clientv3.WithLimit(2), clientv3.WithRev(revision)) + + g.Expect(err).To(BeNil()) + g.Expect(resp.Kvs).To(HaveLen(1)) + g.Expect(resp.More).To(BeFalse()) + g.Expect(resp.Count).To(Equal(int64(1))) + g.Expect(resp.Header.Revision).ToNot(BeZero()) + g.Expect(resp.Kvs[0].Key).To(Equal([]byte("/key/5"))) + }) }) t.Run("ListPrefix", func(t *testing.T) { + g := NewWithT(t) // Create some keys - keys := []string{"key/sub/1", "key/sub/2", "key/other/1"} + keys := []string{"key/sub/2", "key/sub/1", "key/other/1"} for _, key := range keys { resp, err := client.Txn(ctx). If(clientv3.Compare(clientv3.ModRevision(key), "=", 0)). @@ -60,11 +116,13 @@ func TestList(t *testing.T) { resp, err := client.Get(ctx, "/key", clientv3.WithPrefix()) g.Expect(err).To(BeNil()) - g.Expect(resp.Kvs).To(HaveLen(3)) + g.Expect(resp.Kvs).To(HaveLen(5)) g.Expect(resp.Header.Revision).ToNot(BeZero()) g.Expect(resp.Kvs[0].Key).To(Equal([]byte("/key/1"))) g.Expect(resp.Kvs[1].Key).To(Equal([]byte("/key/2"))) g.Expect(resp.Kvs[2].Key).To(Equal([]byte("/key/3"))) + g.Expect(resp.Kvs[3].Key).To(Equal([]byte("/key/4"))) + g.Expect(resp.Kvs[4].Key).To(Equal([]byte("/key/5"))) // Get a list of all the keys sice they have '/key/sub' prefix resp, err = client.Get(ctx, "key/sub", clientv3.WithPrefix()) @@ -85,6 +143,8 @@ func TestList(t *testing.T) { }) t.Run("ListRange", func(t *testing.T) { + g := NewWithT(t) + // Get a list of with key/1, as only key/1 falls within the specified range. resp, err := client.Get(ctx, "/key/1", clientv3.WithRange("")) @@ -96,6 +156,8 @@ func TestList(t *testing.T) { t.Run("ListRevision", func(t *testing.T) { t.Run("Create", func(t *testing.T) { + g := NewWithT(t) + // Create some keys keys := []string{"/revkey/1"} for _, key := range keys { @@ -113,7 +175,7 @@ func TestList(t *testing.T) { t.Run("Update", func(t *testing.T) { g := NewWithT(t) var rev int64 - for rev < 30 { + for rev < 50 { get, err := client.Get(ctx, "/revkey/1", clientv3.WithRange("")) g.Expect(err).To(BeNil()) g.Expect(get.Kvs).To(HaveLen(1)) @@ -136,15 +198,16 @@ func TestList(t *testing.T) { resp, err := client.Get(ctx, "/revkey/", clientv3.WithPrefix()) g.Expect(err).To(BeNil()) g.Expect(resp.Kvs).To(HaveLen(1)) - g.Expect(resp.Kvs[0].ModRevision).To(Equal(int64(31))) + g.Expect(resp.Kvs[0].ModRevision).To(Equal(int64(51))) g.Expect(resp.Count).To(Equal(int64(1))) }) + t.Run("OldRevision", func(t *testing.T) { g := NewWithT(t) - resp, err := client.Get(ctx, "/revkey/", clientv3.WithPrefix(), clientv3.WithRev(10)) + resp, err := client.Get(ctx, "/revkey/", clientv3.WithPrefix(), clientv3.WithRev(30)) g.Expect(err).To(BeNil()) g.Expect(resp.Kvs).To(HaveLen(1)) - g.Expect(resp.Kvs[0].ModRevision).To(Equal(int64(10))) + g.Expect(resp.Kvs[0].ModRevision).To(Equal(int64(30))) g.Expect(resp.Count).To(Equal(int64(1))) }) t.Run("LaterRevision", func(t *testing.T) { @@ -152,7 +215,7 @@ func TestList(t *testing.T) { resp, err := client.Get(ctx, "/revkey/", clientv3.WithPrefix(), clientv3.WithRev(100)) g.Expect(err).To(BeNil()) g.Expect(resp.Kvs).To(HaveLen(1)) - g.Expect(resp.Kvs[0].ModRevision).To(Equal(int64(31))) + g.Expect(resp.Kvs[0].ModRevision).To(Equal(int64(51))) g.Expect(resp.Count).To(Equal(int64(1))) }) }) @@ -190,3 +253,17 @@ func BenchmarkList(b *testing.B) { } }) } + +func shuffleList[T any](vals []T) []T { + if len(vals) == 0 { + return vals + } + + perm := rand.Perm(len(vals)) + shuffled := make([]T, 0, len(vals)) + for _, i := range perm { + shuffled = append(shuffled, vals[perm[i]]) + } + + return shuffled +}