Skip to content

Commit

Permalink
Merge pull request #86 from canonical/fix/cncf-1.29
Browse files Browse the repository at this point in the history
Implement changes for v1.29 conformance
  • Loading branch information
eaudetcobello authored Feb 21, 2024
2 parents 54151ef + 2bb2ba2 commit 9571bd9
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 27 deletions.
59 changes: 41 additions & 18 deletions pkg/kine/drivers/generic/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,10 @@ type Generic struct {
RevisionSQL string
ListRevisionStartSQL string
GetRevisionAfterSQL string
CountSQL string
countSQLPrepared *sql.Stmt
CountCurrentSQL string
countCurrentSQLPrepared *sql.Stmt
CountRevisionSQL string
countRevisionSQLPrepared *sql.Stmt
AfterSQLPrefix string
afterSQLPrefixPrepared *sql.Stmt
AfterSQL string
Expand Down Expand Up @@ -206,12 +208,18 @@ func Open(ctx context.Context, driverName, dataSourceName string, paramCharacter
ListRevisionStartSQL: q(fmt.Sprintf(listSQL, "AND mkv.id <= ?"), paramCharacter, numbered),
GetRevisionAfterSQL: q(revisionAfterSQL, paramCharacter, numbered),

CountSQL: q(fmt.Sprintf(`
CountCurrentSQL: q(fmt.Sprintf(`
SELECT (%s), COUNT(*)
FROM (
%s
) c`, revSQL, fmt.Sprintf(listSQL, "")), paramCharacter, numbered),

CountRevisionSQL: q(fmt.Sprintf(`
SELECT (%s), COUNT(c.theid)
FROM (
%s
) c`, revSQL, fmt.Sprintf(listSQL, "AND kv.id <= ?")), paramCharacter, numbered),

AfterSQLPrefix: q(fmt.Sprintf(`
SELECT %s
FROM kine AS kv
Expand Down Expand Up @@ -256,7 +264,12 @@ func (d *Generic) Prepare() error {
return err
}

d.countSQLPrepared, err = d.DB.Prepare(d.CountSQL)
d.countCurrentSQLPrepared, err = d.DB.Prepare(d.CountCurrentSQL)
if err != nil {
return err
}

d.countRevisionSQLPrepared, err = d.DB.Prepare(d.CountRevisionSQL)
if err != nil {
return err
}
Expand Down Expand Up @@ -366,6 +379,30 @@ func (d *Generic) queryPrepared(ctx context.Context, txName, sql string, prepare
return r, err
}

func (d *Generic) CountCurrent(ctx context.Context, prefix string) (int64, int64, error) {
var (
rev sql.NullInt64
id int64
)

start, end := getPrefixRange(prefix)
row := d.queryRowPrepared(ctx, "count_current", d.CountCurrentSQL, d.countCurrentSQLPrepared, start, end, false)
err := row.Scan(&rev, &id)
return rev.Int64, id, err
}

func (d *Generic) Count(ctx context.Context, prefix string, revision int64) (int64, int64, error) {
var (
rev sql.NullInt64
id int64
)

start, end := getPrefixRange(prefix)
row := d.queryRowPrepared(ctx, "count_revision", d.CountRevisionSQL, d.countRevisionSQLPrepared, start, end, revision, false)
err := row.Scan(&rev, &id)
return rev.Int64, id, err
}

func (d *Generic) queryRow(ctx context.Context, txName, sql string, args ...interface{}) (result *sql.Row) {
logrus.Tracef("QUERY ROW %v : %s", args, Stripped(sql))
start := time.Now()
Expand Down Expand Up @@ -492,20 +529,6 @@ func (d *Generic) List(ctx context.Context, prefix, startKey string, limit, revi
return d.query(ctx, "get_revision_after_sql", sql, start, end, revision, startKey, revision, includeDeleted)
}

func (d *Generic) Count(ctx context.Context, prefix string) (int64, int64, error) {
var (
rev sql.NullInt64
id int64
)

start, end := getPrefixRange(prefix)

row := d.queryRowPrepared(ctx, "count_sql", d.CountSQL, d.countSQLPrepared, start, end, false)
err := row.Scan(&rev, &id)

return rev.Int64, id, err
}

func (d *Generic) CurrentRevision(ctx context.Context) (int64, error) {
var id int64
var err error
Expand Down
6 changes: 3 additions & 3 deletions pkg/kine/logstructured/logstructured.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type Log interface {
List(ctx context.Context, prefix, startKey string, limit, revision int64, includeDeletes bool) (int64, []*server.Event, error)
After(ctx context.Context, prefix string, revision, limit int64) (int64, []*server.Event, error)
Watch(ctx context.Context, prefix string) <-chan []*server.Event
Count(ctx context.Context, prefix string) (int64, int64, error)
Count(ctx context.Context, prefix string, revision int64) (int64, int64, error)
Append(ctx context.Context, event *server.Event) (int64, error)
DbSize(ctx context.Context) (int64, error)
DoCompact() error
Expand Down Expand Up @@ -189,11 +189,11 @@ func (l *LogStructured) List(ctx context.Context, prefix, startKey string, limit
return rev, kvs, nil
}

func (l *LogStructured) Count(ctx context.Context, prefix string) (revRet int64, count int64, err error) {
func (l *LogStructured) Count(ctx context.Context, prefix string, revision int64) (revRet int64, count int64, err error) {
defer func() {
logrus.Debugf("COUNT %s => rev=%d, count=%d, err=%v", prefix, revRet, count, err)
}()
rev, count, err := l.log.Count(ctx, prefix)
rev, count, err := l.log.Count(ctx, prefix, revision)
if err != nil {
return 0, 0, err
}
Expand Down
15 changes: 12 additions & 3 deletions pkg/kine/logstructured/sqllog/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ func New(d Dialect) *SQLLog {
type Dialect interface {
ListCurrent(ctx context.Context, prefix string, limit int64, includeDeleted bool) (*sql.Rows, error)
List(ctx context.Context, prefix, startKey string, limit, revision int64, includeDeleted bool) (*sql.Rows, error)
Count(ctx context.Context, prefix string) (int64, int64, error)
CountCurrent(ctx context.Context, prefix string) (int64, int64, error)
Count(ctx context.Context, prefix string, revision int64) (int64, int64, error)
CurrentRevision(ctx context.Context) (int64, error)
AfterPrefix(ctx context.Context, prefix string, rev, limit int64) (*sql.Rows, error)
After(ctx context.Context, rev, limit int64) (*sql.Rows, error)
Expand Down Expand Up @@ -470,8 +471,16 @@ func canSkipRevision(rev, skip int64, skipTime time.Time) bool {
return rev == skip && time.Now().Sub(skipTime) > time.Second
}

func (s *SQLLog) Count(ctx context.Context, prefix string) (int64, int64, error) {
return s.d.Count(ctx, prefix)
func (s *SQLLog) Count(ctx context.Context, prefix string, revision int64) (int64, int64, error) {
if strings.HasSuffix(prefix, "/") {
prefix += "%"
}

if revision == 0 {
return s.d.CountCurrent(ctx, prefix)
}

return s.d.Count(ctx, prefix, revision)
}

func (s *SQLLog) Append(ctx context.Context, event *server.Event) (int64, error) {
Expand Down
20 changes: 18 additions & 2 deletions pkg/kine/server/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"strings"

"github.com/sirupsen/logrus"
"go.etcd.io/etcd/api/v3/etcdserverpb"
)

Expand All @@ -19,12 +20,14 @@ func (l *LimitedServer) list(ctx context.Context, r *etcdserverpb.RangeRequest)
prefix = prefix + "/"
}
start := string(bytes.TrimRight(r.Key, "\x00"))
revision := r.Revision

if r.CountOnly {
rev, count, err := l.backend.Count(ctx, prefix)
rev, count, err := l.backend.Count(ctx, prefix, revision)
if err != nil {
return nil, err
}
logrus.Tracef("LIST COUNT key=%s, end=%s, revision=%d, currentRev=%d count=%d", r.Key, r.RangeEnd, revision, rev, count)
return &RangeResponse{
Header: txnHeader(rev),
Count: count,
Expand All @@ -36,7 +39,7 @@ func (l *LimitedServer) list(ctx context.Context, r *etcdserverpb.RangeRequest)
limit++
}

rev, kvs, err := l.backend.List(ctx, prefix, start, limit, r.Revision)
rev, kvs, err := l.backend.List(ctx, prefix, start, limit, revision)
if err != nil {
return nil, err
}
Expand All @@ -47,9 +50,22 @@ func (l *LimitedServer) list(ctx context.Context, r *etcdserverpb.RangeRequest)
Kvs: kvs,
}

// count the actual number of results if there are more items in the db.
if limit > 0 && resp.Count > r.Limit {
resp.More = true
resp.Kvs = kvs[0 : limit-1]

if revision == 0 {
revision = rev
}

// count the actual number of results if there are more items in the db.
rev, resp.Count, err = l.backend.Count(ctx, prefix, revision)
if err != nil {
return nil, err
}
logrus.Tracef("LIST COUNT key=%s, end=%s, revision=%d, currentRev=%d count=%d", r.Key, r.RangeEnd, revision, rev, resp.Count)
resp.Header = txnHeader(rev)
}

return resp, nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/kine/server/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type Backend interface {
Create(ctx context.Context, key string, value []byte, lease int64) (int64, error)
Delete(ctx context.Context, key string, revision int64) (int64, *KeyValue, bool, error)
List(ctx context.Context, prefix, startKey string, limit, revision int64) (int64, []*KeyValue, error)
Count(ctx context.Context, prefix string) (int64, int64, error)
Count(ctx context.Context, prefix string, revision int64) (int64, int64, error)
Update(ctx context.Context, key string, value []byte, revision, lease int64) (int64, *KeyValue, bool, error)
Watch(ctx context.Context, key string, revision int64) <-chan []*Event
DbSize(ctx context.Context) (int64, error)
Expand Down

0 comments on commit 9571bd9

Please sign in to comment.