Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1.29 conformance fixes #92

Merged
merged 8 commits into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 33 additions & 32 deletions pkg/kine/drivers/generic/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,37 +35,27 @@ var (
ON maxkv.id = kv.id
WHERE
(kv.deleted = 0 OR ?)
ORDER BY kv.id ASC
ORDER BY kv.name ASC, kv.id ASC
`, columns)

// FIXME this query doesn't seem sound.
revisionAfterSQL = fmt.Sprintf(`
SELECT *
FROM (
SELECT %s
FROM kine AS kv
JOIN (
SELECT MAX(mkv.id) AS id
FROM kine AS mkv
WHERE mkv.name >= ? AND mkv.name < ?
AND mkv.id <= ?
AND mkv.id > (
SELECT ikv.id
FROM kine AS ikv
WHERE
ikv.name = ? AND
ikv.id <= ?
ORDER BY ikv.id DESC
LIMIT 1
)
GROUP BY mkv.name
) AS maxkv
ON maxkv.id = kv.id
WHERE
? OR kv.deleted = 0
) AS lkv
ORDER BY lkv.theid ASC
`, columns)
SELECT *
FROM (
SELECT %s
FROM kine AS kv
JOIN (
SELECT MAX(mkv.id) AS id
FROM kine AS mkv
WHERE mkv.name >= ? AND mkv.name < ?
AND mkv.id <= ?
GROUP BY mkv.name
) AS maxkv
ON maxkv.id = kv.id
WHERE
? OR kv.deleted = 0
) AS lkv
ORDER BY lkv.name ASC, lkv.theid ASC
`, columns)

revisionIntervalSQL = `
SELECT (
Expand Down Expand Up @@ -379,25 +369,31 @@ func (d *Generic) queryPrepared(ctx context.Context, txName, sql string, prepare
return r, err
}

func (d *Generic) CountCurrent(ctx context.Context, prefix string) (int64, int64, error) {
func (d *Generic) CountCurrent(ctx context.Context, prefix string, startKey string) (int64, int64, error) {
var (
rev sql.NullInt64
id int64
)

start, end := getPrefixRange(prefix)
if startKey != "" {
start = startKey + "\x01"
}
row := d.queryRowPrepared(ctx, "count_current", d.CountCurrentSQL, d.countCurrentSQLPrepared, start, end, false)
err := row.Scan(&rev, &id)
return rev.Int64, id, err
}

func (d *Generic) Count(ctx context.Context, prefix string, revision int64) (int64, int64, error) {
func (d *Generic) Count(ctx context.Context, prefix, startKey string, revision int64) (int64, int64, error) {
var (
rev sql.NullInt64
id int64
)

start, end := getPrefixRange(prefix)
if startKey != "" {
start = startKey + "\x01"
}
row := d.queryRowPrepared(ctx, "count_revision", d.CountRevisionSQL, d.countRevisionSQLPrepared, start, end, revision, false)
err := row.Scan(&rev, &id)
return rev.Int64, id, err
Expand Down Expand Up @@ -502,13 +498,18 @@ func (d *Generic) DeleteRevision(ctx context.Context, revision int64) error {
return err
}

func (d *Generic) ListCurrent(ctx context.Context, prefix string, limit int64, includeDeleted bool) (*sql.Rows, error) {
func (d *Generic) ListCurrent(ctx context.Context, prefix, startKey string, limit int64, includeDeleted bool) (*sql.Rows, error) {
sql := d.GetCurrentSQL
start, end := getPrefixRange(prefix)
if limit > 0 {
sql = fmt.Sprintf("%s LIMIT %d", sql, limit)
}

// NOTE(neoaggelos): don't ignore startKey if set
if startKey != "" {
start = startKey + "\x01"
}

return d.query(ctx, "get_current_sql", sql, start, end, includeDeleted)
}

Expand All @@ -526,7 +527,7 @@ func (d *Generic) List(ctx context.Context, prefix, startKey string, limit, revi
if limit > 0 {
sql = fmt.Sprintf("%s LIMIT %d", sql, limit)
}
return d.query(ctx, "get_revision_after_sql", sql, start, end, revision, startKey, revision, includeDeleted)
return d.query(ctx, "get_revision_after_sql", sql, startKey+"\x01", end, revision, includeDeleted)
}

func (d *Generic) CurrentRevision(ctx context.Context) (int64, error) {
Expand Down
8 changes: 4 additions & 4 deletions pkg/kine/logstructured/logstructured.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type Log interface {
List(ctx context.Context, prefix, startKey string, limit, revision int64, includeDeletes bool) (int64, []*server.Event, error)
After(ctx context.Context, prefix string, revision, limit int64) (int64, []*server.Event, error)
Watch(ctx context.Context, prefix string) <-chan []*server.Event
Count(ctx context.Context, prefix string, revision int64) (int64, int64, error)
Count(ctx context.Context, prefix, startKey string, revision int64) (int64, int64, error)
Append(ctx context.Context, event *server.Event) (int64, error)
DbSize(ctx context.Context) (int64, error)
DoCompact() error
Expand Down Expand Up @@ -189,11 +189,11 @@ func (l *LogStructured) List(ctx context.Context, prefix, startKey string, limit
return rev, kvs, nil
}

func (l *LogStructured) Count(ctx context.Context, prefix string, revision int64) (revRet int64, count int64, err error) {
func (l *LogStructured) Count(ctx context.Context, prefix, startKey string, revision int64) (revRet int64, count int64, err error) {
defer func() {
logrus.Debugf("COUNT %s => rev=%d, count=%d, err=%v", prefix, revRet, count, err)
logrus.Debugf("COUNT prefix=%s startKey=%s => rev=%d, count=%d, err=%v", prefix, startKey, revRet, count, err)
}()
rev, count, err := l.log.Count(ctx, prefix, revision)
rev, count, err := l.log.Count(ctx, prefix, startKey, revision)
if err != nil {
return 0, 0, err
}
Expand Down
24 changes: 15 additions & 9 deletions pkg/kine/logstructured/sqllog/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ func New(d Dialect) *SQLLog {
}

type Dialect interface {
ListCurrent(ctx context.Context, prefix string, limit int64, includeDeleted bool) (*sql.Rows, error)
ListCurrent(ctx context.Context, prefix, startKey string, limit int64, includeDeleted bool) (*sql.Rows, error)
List(ctx context.Context, prefix, startKey string, limit, revision int64, includeDeleted bool) (*sql.Rows, error)
CountCurrent(ctx context.Context, prefix string) (int64, int64, error)
Count(ctx context.Context, prefix string, revision int64) (int64, int64, error)
CountCurrent(ctx context.Context, prefix, startKey string) (int64, int64, error)
Count(ctx context.Context, prefix, startKey string, revision int64) (int64, int64, error)
CurrentRevision(ctx context.Context) (int64, error)
AfterPrefix(ctx context.Context, prefix string, rev, limit int64) (*sql.Rows, error)
After(ctx context.Context, rev, limit int64) (*sql.Rows, error)
Expand Down Expand Up @@ -126,8 +126,14 @@ func (s *SQLLog) compactor(nextEnd int64) (int64, error) {

end := nextEnd
nextEnd = currentRev
// leave the last 1000
end = end - 1000

// NOTE(neoaggelos): Ignoring the last 1000 revisions causes the following CNCF conformance test to fail.
// This is because of low activity, where the created list is part of the last 1000 revisions and is not compacted.
// Link to failing test: https://github.com/kubernetes/kubernetes/blob/f2cfbf44b1fb482671aedbfff820ae2af256a389/test/e2e/apimachinery/chunking.go#L144
// To address this, we only ignore the last 100 revisions instead

// end = end - 1000
end = end - 100

savedCursor := cursor
// Purposefully start at the current and redo the current as
Expand Down Expand Up @@ -262,7 +268,7 @@ func (s *SQLLog) List(ctx context.Context, prefix, startKey string, limit, revis
}

if revision == 0 {
rows, err = s.d.ListCurrent(ctx, prefix, limit, includeDeleted)
rows, err = s.d.ListCurrent(ctx, prefix, startKey, limit, includeDeleted)
} else {
rows, err = s.d.List(ctx, prefix, startKey, limit, revision, includeDeleted)
}
Expand Down Expand Up @@ -471,12 +477,12 @@ func canSkipRevision(rev, skip int64, skipTime time.Time) bool {
return rev == skip && time.Now().Sub(skipTime) > time.Second
}

func (s *SQLLog) Count(ctx context.Context, prefix string, revision int64) (int64, int64, error) {
func (s *SQLLog) Count(ctx context.Context, prefix, startKey string, revision int64) (int64, int64, error) {
if revision == 0 {
return s.d.CountCurrent(ctx, prefix)
return s.d.CountCurrent(ctx, prefix, startKey)
}

return s.d.Count(ctx, prefix, revision)
return s.d.Count(ctx, prefix, startKey, revision)
}

func (s *SQLLog) Append(ctx context.Context, event *server.Event) (int64, error) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/kine/server/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func (l *LimitedServer) list(ctx context.Context, r *etcdserverpb.RangeRequest)
revision := r.Revision

if r.CountOnly {
rev, count, err := l.backend.Count(ctx, prefix, revision)
rev, count, err := l.backend.Count(ctx, prefix, start, revision)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -60,7 +60,7 @@ func (l *LimitedServer) list(ctx context.Context, r *etcdserverpb.RangeRequest)
}

// count the actual number of results if there are more items in the db.
rev, resp.Count, err = l.backend.Count(ctx, prefix, revision)
rev, resp.Count, err = l.backend.Count(ctx, prefix, start, revision)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kine/server/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type Backend interface {
Create(ctx context.Context, key string, value []byte, lease int64) (int64, error)
Delete(ctx context.Context, key string, revision int64) (int64, *KeyValue, bool, error)
List(ctx context.Context, prefix, startKey string, limit, revision int64) (int64, []*KeyValue, error)
Count(ctx context.Context, prefix string, revision int64) (int64, int64, error)
Count(ctx context.Context, prefix, startKey string, revision int64) (int64, int64, error)
Update(ctx context.Context, key string, value []byte, revision, lease int64) (int64, *KeyValue, bool, error)
Watch(ctx context.Context, key string, revision int64) <-chan []*Event
DbSize(ctx context.Context) (int64, error)
Expand Down
Loading
Loading