Skip to content

Commit

Permalink
1.29 conformance fixes (#92)
Browse files Browse the repository at this point in the history
* use proper gomega for each test
* List results in alphabetical order
* extend list tests with more entries
* Do not ignore startKey in driver.ListCurrent()
* fix Count in list responses
* ignore last 100 revisions instead of 1000 during compaction
* Fix query list with revision
  • Loading branch information
neoaggelos authored Mar 6, 2024
1 parent 4bf13af commit 9fb964d
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 57 deletions.
65 changes: 33 additions & 32 deletions pkg/kine/drivers/generic/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,37 +35,27 @@ var (
ON maxkv.id = kv.id
WHERE
(kv.deleted = 0 OR ?)
ORDER BY kv.id ASC
ORDER BY kv.name ASC, kv.id ASC
`, columns)

// FIXME this query doesn't seem sound.
revisionAfterSQL = fmt.Sprintf(`
SELECT *
FROM (
SELECT %s
FROM kine AS kv
JOIN (
SELECT MAX(mkv.id) AS id
FROM kine AS mkv
WHERE mkv.name >= ? AND mkv.name < ?
AND mkv.id <= ?
AND mkv.id > (
SELECT ikv.id
FROM kine AS ikv
WHERE
ikv.name = ? AND
ikv.id <= ?
ORDER BY ikv.id DESC
LIMIT 1
)
GROUP BY mkv.name
) AS maxkv
ON maxkv.id = kv.id
WHERE
? OR kv.deleted = 0
) AS lkv
ORDER BY lkv.theid ASC
`, columns)
SELECT *
FROM (
SELECT %s
FROM kine AS kv
JOIN (
SELECT MAX(mkv.id) AS id
FROM kine AS mkv
WHERE mkv.name >= ? AND mkv.name < ?
AND mkv.id <= ?
GROUP BY mkv.name
) AS maxkv
ON maxkv.id = kv.id
WHERE
? OR kv.deleted = 0
) AS lkv
ORDER BY lkv.name ASC, lkv.theid ASC
`, columns)

revisionIntervalSQL = `
SELECT (
Expand Down Expand Up @@ -379,25 +369,31 @@ func (d *Generic) queryPrepared(ctx context.Context, txName, sql string, prepare
return r, err
}

func (d *Generic) CountCurrent(ctx context.Context, prefix string) (int64, int64, error) {
func (d *Generic) CountCurrent(ctx context.Context, prefix string, startKey string) (int64, int64, error) {
var (
rev sql.NullInt64
id int64
)

start, end := getPrefixRange(prefix)
if startKey != "" {
start = startKey + "\x01"
}
row := d.queryRowPrepared(ctx, "count_current", d.CountCurrentSQL, d.countCurrentSQLPrepared, start, end, false)
err := row.Scan(&rev, &id)
return rev.Int64, id, err
}

func (d *Generic) Count(ctx context.Context, prefix string, revision int64) (int64, int64, error) {
func (d *Generic) Count(ctx context.Context, prefix, startKey string, revision int64) (int64, int64, error) {
var (
rev sql.NullInt64
id int64
)

start, end := getPrefixRange(prefix)
if startKey != "" {
start = startKey + "\x01"
}
row := d.queryRowPrepared(ctx, "count_revision", d.CountRevisionSQL, d.countRevisionSQLPrepared, start, end, revision, false)
err := row.Scan(&rev, &id)
return rev.Int64, id, err
Expand Down Expand Up @@ -502,13 +498,18 @@ func (d *Generic) DeleteRevision(ctx context.Context, revision int64) error {
return err
}

func (d *Generic) ListCurrent(ctx context.Context, prefix string, limit int64, includeDeleted bool) (*sql.Rows, error) {
func (d *Generic) ListCurrent(ctx context.Context, prefix, startKey string, limit int64, includeDeleted bool) (*sql.Rows, error) {
sql := d.GetCurrentSQL
start, end := getPrefixRange(prefix)
if limit > 0 {
sql = fmt.Sprintf("%s LIMIT %d", sql, limit)
}

// NOTE(neoaggelos): don't ignore startKey if set
if startKey != "" {
start = startKey + "\x01"
}

return d.query(ctx, "get_current_sql", sql, start, end, includeDeleted)
}

Expand All @@ -526,7 +527,7 @@ func (d *Generic) List(ctx context.Context, prefix, startKey string, limit, revi
if limit > 0 {
sql = fmt.Sprintf("%s LIMIT %d", sql, limit)
}
return d.query(ctx, "get_revision_after_sql", sql, start, end, revision, startKey, revision, includeDeleted)
return d.query(ctx, "get_revision_after_sql", sql, startKey+"\x01", end, revision, includeDeleted)
}

func (d *Generic) CurrentRevision(ctx context.Context) (int64, error) {
Expand Down
8 changes: 4 additions & 4 deletions pkg/kine/logstructured/logstructured.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type Log interface {
List(ctx context.Context, prefix, startKey string, limit, revision int64, includeDeletes bool) (int64, []*server.Event, error)
After(ctx context.Context, prefix string, revision, limit int64) (int64, []*server.Event, error)
Watch(ctx context.Context, prefix string) <-chan []*server.Event
Count(ctx context.Context, prefix string, revision int64) (int64, int64, error)
Count(ctx context.Context, prefix, startKey string, revision int64) (int64, int64, error)
Append(ctx context.Context, event *server.Event) (int64, error)
DbSize(ctx context.Context) (int64, error)
DoCompact() error
Expand Down Expand Up @@ -189,11 +189,11 @@ func (l *LogStructured) List(ctx context.Context, prefix, startKey string, limit
return rev, kvs, nil
}

func (l *LogStructured) Count(ctx context.Context, prefix string, revision int64) (revRet int64, count int64, err error) {
func (l *LogStructured) Count(ctx context.Context, prefix, startKey string, revision int64) (revRet int64, count int64, err error) {
defer func() {
logrus.Debugf("COUNT %s => rev=%d, count=%d, err=%v", prefix, revRet, count, err)
logrus.Debugf("COUNT prefix=%s startKey=%s => rev=%d, count=%d, err=%v", prefix, startKey, revRet, count, err)
}()
rev, count, err := l.log.Count(ctx, prefix, revision)
rev, count, err := l.log.Count(ctx, prefix, startKey, revision)
if err != nil {
return 0, 0, err
}
Expand Down
24 changes: 15 additions & 9 deletions pkg/kine/logstructured/sqllog/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ func New(d Dialect) *SQLLog {
}

type Dialect interface {
ListCurrent(ctx context.Context, prefix string, limit int64, includeDeleted bool) (*sql.Rows, error)
ListCurrent(ctx context.Context, prefix, startKey string, limit int64, includeDeleted bool) (*sql.Rows, error)
List(ctx context.Context, prefix, startKey string, limit, revision int64, includeDeleted bool) (*sql.Rows, error)
CountCurrent(ctx context.Context, prefix string) (int64, int64, error)
Count(ctx context.Context, prefix string, revision int64) (int64, int64, error)
CountCurrent(ctx context.Context, prefix, startKey string) (int64, int64, error)
Count(ctx context.Context, prefix, startKey string, revision int64) (int64, int64, error)
CurrentRevision(ctx context.Context) (int64, error)
AfterPrefix(ctx context.Context, prefix string, rev, limit int64) (*sql.Rows, error)
After(ctx context.Context, rev, limit int64) (*sql.Rows, error)
Expand Down Expand Up @@ -126,8 +126,14 @@ func (s *SQLLog) compactor(nextEnd int64) (int64, error) {

end := nextEnd
nextEnd = currentRev
// leave the last 1000
end = end - 1000

// NOTE(neoaggelos): Ignoring the last 1000 revisions causes the following CNCF conformance test to fail.
// This is because of low activity, where the created list is part of the last 1000 revisions and is not compacted.
// Link to failing test: https://github.com/kubernetes/kubernetes/blob/f2cfbf44b1fb482671aedbfff820ae2af256a389/test/e2e/apimachinery/chunking.go#L144
// To address this, we only ignore the last 100 revisions instead

// end = end - 1000
end = end - 100

savedCursor := cursor
// Purposefully start at the current and redo the current as
Expand Down Expand Up @@ -262,7 +268,7 @@ func (s *SQLLog) List(ctx context.Context, prefix, startKey string, limit, revis
}

if revision == 0 {
rows, err = s.d.ListCurrent(ctx, prefix, limit, includeDeleted)
rows, err = s.d.ListCurrent(ctx, prefix, startKey, limit, includeDeleted)
} else {
rows, err = s.d.List(ctx, prefix, startKey, limit, revision, includeDeleted)
}
Expand Down Expand Up @@ -471,12 +477,12 @@ func canSkipRevision(rev, skip int64, skipTime time.Time) bool {
return rev == skip && time.Now().Sub(skipTime) > time.Second
}

func (s *SQLLog) Count(ctx context.Context, prefix string, revision int64) (int64, int64, error) {
func (s *SQLLog) Count(ctx context.Context, prefix, startKey string, revision int64) (int64, int64, error) {
if revision == 0 {
return s.d.CountCurrent(ctx, prefix)
return s.d.CountCurrent(ctx, prefix, startKey)
}

return s.d.Count(ctx, prefix, revision)
return s.d.Count(ctx, prefix, startKey, revision)
}

func (s *SQLLog) Append(ctx context.Context, event *server.Event) (int64, error) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/kine/server/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func (l *LimitedServer) list(ctx context.Context, r *etcdserverpb.RangeRequest)
revision := r.Revision

if r.CountOnly {
rev, count, err := l.backend.Count(ctx, prefix, revision)
rev, count, err := l.backend.Count(ctx, prefix, start, revision)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -60,7 +60,7 @@ func (l *LimitedServer) list(ctx context.Context, r *etcdserverpb.RangeRequest)
}

// count the actual number of results if there are more items in the db.
rev, resp.Count, err = l.backend.Count(ctx, prefix, revision)
rev, resp.Count, err = l.backend.Count(ctx, prefix, start, revision)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kine/server/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type Backend interface {
Create(ctx context.Context, key string, value []byte, lease int64) (int64, error)
Delete(ctx context.Context, key string, revision int64) (int64, *KeyValue, bool, error)
List(ctx context.Context, prefix, startKey string, limit, revision int64) (int64, []*KeyValue, error)
Count(ctx context.Context, prefix string, revision int64) (int64, int64, error)
Count(ctx context.Context, prefix, startKey string, revision int64) (int64, int64, error)
Update(ctx context.Context, key string, value []byte, revision, lease int64) (int64, *KeyValue, bool, error)
Watch(ctx context.Context, key string, revision int64) <-chan []*Event
DbSize(ctx context.Context) (int64, error)
Expand Down
Loading

0 comments on commit 9fb964d

Please sign in to comment.