Skip to content

Commit

Permalink
Delete Query (#175)
Browse files Browse the repository at this point in the history
  • Loading branch information
louiseschmidtgen authored Sep 25, 2024
1 parent 0520ad5 commit eefb3f7
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 111 deletions.
2 changes: 2 additions & 0 deletions pkg/kine/drivers/generic/admission.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ const (
var writeQueries = map[string]bool{
"update_compact_sql": true,
"delete_sql": true,
"update_sql": true,
"delete_rev_sql": true,
"fill_sql": true,
"insert_last_insert_id_sql": true,
"insert_sql": true,
Expand Down
58 changes: 52 additions & 6 deletions pkg/kine/drivers/generic/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ var (
otelTracer trace.Tracer
otelMeter metric.Meter
compactCnt metric.Int64Counter
getRevisionCnt metric.Int64Counter
deleteRevCnt metric.Int64Counter
deleteCnt metric.Int64Counter
currentRevCnt metric.Int64Counter
getCompactRevCnt metric.Int64Counter
)
Expand All @@ -43,11 +43,11 @@ func init() {
if err != nil {
logrus.WithError(err).Warning("Otel failed to create create counter")
}
getRevisionCnt, err = otelMeter.Int64Counter(fmt.Sprintf("%s.get_revision", otelName), metric.WithDescription("Number of get revision requests"))
deleteRevCnt, err = otelMeter.Int64Counter(fmt.Sprintf("%s.delete_rev", otelName), metric.WithDescription("Number of delete revision requests"))
if err != nil {
logrus.WithError(err).Warning("Otel failed to create create counter")
}
deleteRevCnt, err = otelMeter.Int64Counter(fmt.Sprintf("%s.delete_revision", otelName), metric.WithDescription("Number of delete revision requests"))
deleteCnt, err = otelMeter.Int64Counter(fmt.Sprintf("%s.delete", otelName), metric.WithDescription("Number of delete requests"))
if err != nil {
logrus.WithError(err).Warning("Otel failed to create create counter")
}
Expand Down Expand Up @@ -142,9 +142,10 @@ type Generic struct {
CountRevisionSQL string
AfterSQLPrefix string
AfterSQL string
DeleteSQL string
DeleteRevSQL string
CompactSQL string
UpdateCompactSQL string
DeleteSQL string
InsertSQL string
FillSQL string
InsertLastInsertIDSQL string
Expand Down Expand Up @@ -280,7 +281,7 @@ func Open(ctx context.Context, driverName, dataSourceName string, connPoolConfig
ORDER BY kv.id ASC
`, columns), paramCharacter, numbered),

DeleteSQL: q(`
DeleteRevSQL: q(`
DELETE FROM kine
WHERE id = ?`, paramCharacter, numbered),

Expand All @@ -295,6 +296,24 @@ func Open(ctx context.Context, driverName, dataSourceName string, connPoolConfig
InsertSQL: q(`INSERT INTO kine(name, created, deleted, create_revision, prev_revision, lease, value, old_value)
VALUES(?, ?, ?, ?, ?, ?, ?, ?) RETURNING id`, paramCharacter, numbered),

DeleteSQL: q(`
INSERT INTO kine(name, created, deleted, create_revision, prev_revision, lease, value, old_value)
SELECT
name,
0 AS created,
1 AS deleted,
CASE
WHEN kine.created THEN id
ELSE create_revision
END AS create_revision,
id AS prev_revision,
lease,
NULL AS value,
value AS old_value
FROM kine WHERE id = (SELECT MAX(id) FROM kine WHERE name = ?)
AND deleted = 0
AND id = ?`, paramCharacter, numbered),

CreateSQL: q(`
INSERT INTO kine(name, created, deleted, create_revision, prev_revision, lease, value, old_value)
SELECT
Expand Down Expand Up @@ -514,6 +533,7 @@ func (d *Generic) Create(ctx context.Context, key string, value []byte, ttl int6
}
span.RecordError(err)
}
span.SetAttributes(attribute.Int64("revision", rev))
span.End()
}()
span.SetAttributes(
Expand Down Expand Up @@ -560,6 +580,32 @@ func (d *Generic) Update(ctx context.Context, key string, value []byte, preRev,
return rev, true, err
}

func (d *Generic) Delete(ctx context.Context, key string, revision int64) (rev int64, deleted bool, err error) {
deleteCnt.Add(ctx, 1)
ctx, span := otelTracer.Start(ctx, fmt.Sprintf("%s.Delete", otelName))
defer func() {
span.RecordError(err)
span.SetAttributes(attribute.Int64("revision", rev))
span.SetAttributes(attribute.Bool("deleted", deleted))
span.End()
}()
span.SetAttributes(attribute.String("key", key))

result, err := d.execute(ctx, "delete_sql", d.DeleteSQL, key, revision)
if err != nil {
logrus.WithError(err).Error("failed to delete key")
return 0, false, err
}
if insertCount, err := result.RowsAffected(); err != nil {
return 0, false, err
} else if insertCount == 0 {
return 0, false, nil
}

rev, err = result.LastInsertId()
return rev, true, err
}

// Compact compacts the database up to the revision provided in the method's call.
// After the call, any request for a version older than the given revision will return
// a compacted error.
Expand Down Expand Up @@ -698,7 +744,7 @@ func (d *Generic) DeleteRevision(ctx context.Context, revision int64) error {
}()
span.SetAttributes(attribute.Int64("revision", revision))

_, err = d.execute(ctx, "delete_sql", d.DeleteSQL, revision)
_, err = d.execute(ctx, "delete_rev_sql", d.DeleteRevSQL, revision)
return err
}

Expand Down
63 changes: 5 additions & 58 deletions pkg/kine/logstructured/logstructured.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type Log interface {
List(ctx context.Context, prefix, startKey string, limit, revision int64, includeDeletes bool) (int64, []*server.Event, error)
Create(ctx context.Context, key string, value []byte, lease int64) (int64, error)
Update(ctx context.Context, key string, value []byte, revision, lease int64) (revRet int64, updateRet bool, errRet error)
Delete(ctx context.Context, key string, revision int64) (revRet int64, deleted bool, errRet error)
After(ctx context.Context, prefix string, revision, limit int64) (int64, []*server.Event, error)
Watch(ctx context.Context, prefix string) <-chan []*server.Event
Count(ctx context.Context, prefix, startKey string, revision int64) (int64, int64, error)
Expand Down Expand Up @@ -138,69 +139,15 @@ func (l *LogStructured) adjustRevision(ctx context.Context, rev *int64) {
}

func (l *LogStructured) Create(ctx context.Context, key string, value []byte, lease int64) (rev int64, err error) {
ctx, span := otelTracer.Start(ctx, fmt.Sprintf("%s.Create", otelName))
defer span.End()
rev, err = l.log.Create(ctx, key, value, lease)
logrus.Debugf("CREATE %s, size=%d, lease=%d => rev=%d, err=%v", key, len(value), lease, rev, err)
return rev, err
}

func (l *LogStructured) Delete(ctx context.Context, key string, revision int64) (revRet int64, kvRet *server.KeyValue, deletedRet bool, errRet error) {
ctx, span := otelTracer.Start(ctx, fmt.Sprintf("%s.Delete", otelName))

defer func() {
l.adjustRevision(ctx, &revRet)
logrus.Debugf("DELETE %s, rev=%d => rev=%d, kv=%v, deleted=%v, err=%v", key, revision, revRet, kvRet != nil, deletedRet, errRet)
span.SetAttributes(
attribute.String("key", key),
attribute.Int64("revision", revision),
attribute.Int64("adjusted-revision", revRet),
attribute.Bool("deleted", deletedRet),
attribute.Bool("kv-found", kvRet != nil),
)
span.RecordError(errRet)
span.End()
}()

rev, event, err := l.get(ctx, key, "", 1, 0, true)
if err != nil {
span.RecordError(err)
return 0, nil, false, err
}

if event == nil {
return rev, nil, true, nil
}

if event.Delete {
return rev, event.KV, true, nil
}

if revision != 0 && event.KV.ModRevision != revision {
return rev, event.KV, false, nil
}

deleteEvent := &server.Event{
Delete: true,
KV: event.KV,
PrevKV: event.KV,
}

rev, err = l.log.Append(ctx, deleteEvent)
if err != nil {
// If error on Append we assume it's a UNIQUE constraint error, so we fetch the latest (if we can)
// and return that the delete failed
span.AddEvent("Failed to append delete event")
span.RecordError(err)
latestRev, latestEvent, latestErr := l.get(ctx, key, "", 1, 0, true)
if latestErr != nil || latestEvent == nil {
span.RecordError(latestErr)
span.SetAttributes(attribute.Bool("latest-event-found", latestEvent != nil))
return rev, event.KV, false, nil
}
return latestRev, latestEvent.KV, false, nil
}
return rev, event.KV, true, err
func (l *LogStructured) Delete(ctx context.Context, key string, revision int64) (revRet int64, deleted bool, errRet error) {
rev, del, err := l.log.Delete(ctx, key, revision)
logrus.Debugf("DELETE %s, rev=%d => rev=%d, deleted=%v, err=%v", key, revision, rev, del, err)
return rev, del, err
}

func (l *LogStructured) List(ctx context.Context, prefix, startKey string, limit, revision int64) (revRet int64, kvRet []*server.KeyValue, errRet error) {
Expand Down
25 changes: 15 additions & 10 deletions pkg/kine/logstructured/sqllog/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ type Dialect interface {
Insert(ctx context.Context, key string, create, delete bool, createRevision, previousRevision int64, ttl int64, value, prevValue []byte) (int64, error)
Create(ctx context.Context, key string, value []byte, lease int64) (int64, error)
Update(ctx context.Context, key string, value []byte, prevRev, lease int64) (int64, bool, error)
Delete(ctx context.Context, key string, revision int64) (int64, bool, error)
DeleteRevision(ctx context.Context, revision int64) error
GetCompactRevision(ctx context.Context) (int64, int64, error)
Compact(ctx context.Context, revision int64) error
Expand Down Expand Up @@ -534,14 +535,6 @@ func (s *SQLLog) Append(ctx context.Context, event *server.Event) (int64, error)
}

func (s *SQLLog) Create(ctx context.Context, key string, value []byte, lease int64) (rev int64, err error) {
ctx, span := otelTracer.Start(ctx, fmt.Sprintf("%s.Create", otelName))
defer func() {
span.RecordError(err)
span.SetAttributes(attribute.Int64("revision", rev))
span.End()
}()
span.SetAttributes(attribute.String("key", key))

rev, err = s.d.Create(ctx, key, value, lease)
if err != nil {
return 0, err
Expand All @@ -551,13 +544,25 @@ func (s *SQLLog) Create(ctx context.Context, key string, value []byte, lease int
return rev, nil
}

func (s *SQLLog) Delete(ctx context.Context, key string, revision int64) (rev int64, deleted bool, err error) {
rev, deleted, err = s.d.Delete(ctx, key, revision)
if err != nil {
return 0, false, err
}
if deleted {
s.notifyWatcherPoll(rev)
}
return rev, deleted, nil
}

func (s *SQLLog) Update(ctx context.Context, key string, value []byte, prevRev, lease int64) (rev int64, updated bool, err error) {
rev, updated, err = s.d.Update(ctx, key, value, prevRev, lease)
if err != nil {
return 0, false, err
}

s.notifyWatcherPoll(rev)
if updated {
s.notifyWatcherPoll(rev)
}
return rev, updated, nil
}

Expand Down
61 changes: 28 additions & 33 deletions pkg/kine/server/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,6 @@ import (
)

func isDelete(txn *etcdserverpb.TxnRequest) (int64, string, bool) {
if len(txn.Compare) == 0 &&
len(txn.Failure) == 0 &&
len(txn.Success) == 2 &&
txn.Success[0].GetRequestRange() != nil &&
txn.Success[1].GetRequestDeleteRange() != nil {
rng := txn.Success[1].GetRequestDeleteRange()
return 0, string(rng.Key), true
}
if len(txn.Compare) == 1 &&
txn.Compare[0].Target == etcdserverpb.Compare_MOD &&
txn.Compare[0].Result == etcdserverpb.Compare_EQUAL &&
Expand All @@ -42,41 +34,44 @@ func (l *LimitedServer) delete(ctx context.Context, key string, revision int64)
attribute.Int64("revision", revision),
)

rev, kv, ok, err := l.backend.Delete(ctx, key, revision)
rev, deleted, err := l.backend.Delete(ctx, key, revision)
if err != nil {
return nil, err
}
span.SetAttributes(attribute.Bool("ok", ok))

if !ok {
return &etcdserverpb.TxnResponse{
Header: txnHeader(rev),
Responses: []*etcdserverpb.ResponseOp{
{
Response: &etcdserverpb.ResponseOp_ResponseRange{
ResponseRange: &etcdserverpb.RangeResponse{
Header: txnHeader(rev),
Kvs: toKVs(kv),
},
},
},
},
Succeeded: false,
}, nil
span.SetAttributes(attribute.Bool("deleted", deleted))

resp := &etcdserverpb.TxnResponse{
Header: txnHeader(rev),
Succeeded: deleted,
}

return &etcdserverpb.TxnResponse{
Header: txnHeader(rev),
Responses: []*etcdserverpb.ResponseOp{
if deleted {
resp.Responses = []*etcdserverpb.ResponseOp{
{
Response: &etcdserverpb.ResponseOp_ResponseDeleteRange{
ResponseDeleteRange: &etcdserverpb.DeleteRangeResponse{
Header: txnHeader(rev),
PrevKvs: toKVs(kv),
Header: txnHeader(rev),
},
},
},
},
Succeeded: true,
}, nil
}
} else {
rev, kv, err := l.backend.Get(ctx, key, "", 1, rev)
if err != nil {
return nil, err
}
resp.Responses = []*etcdserverpb.ResponseOp{
{
Response: &etcdserverpb.ResponseOp_ResponseRange{
ResponseRange: &etcdserverpb.RangeResponse{
Header: txnHeader(rev),
Kvs: toKVs(kv),
},
},
},
}

}
return resp, nil
}
2 changes: 1 addition & 1 deletion pkg/kine/server/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type Backend interface {
Wait()
Get(ctx context.Context, key, rangeEnd string, limit, revision int64) (int64, *KeyValue, error)
Create(ctx context.Context, key string, value []byte, lease int64) (int64, error)
Delete(ctx context.Context, key string, revision int64) (int64, *KeyValue, bool, error)
Delete(ctx context.Context, key string, revision int64) (int64, bool, error)
List(ctx context.Context, prefix, startKey string, limit, revision int64) (int64, []*KeyValue, error)
Count(ctx context.Context, prefix, startKey string, revision int64) (int64, int64, error)
Update(ctx context.Context, key string, value []byte, revision, lease int64) (int64, bool, error)
Expand Down
12 changes: 10 additions & 2 deletions test/delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,16 @@ func TestDelete(t *testing.T) {
// Delete a key that does not exist
t.Run("NonExistentKeys", func(t *testing.T) {
g := NewWithT(t)
deleteKey(ctx, g, kine.client, "missingKey", 1)
key := "missing key"
rev := 0
resp, err := kine.client.Txn(ctx).
If(clientv3.Compare(clientv3.ModRevision(key), "=", rev)).
Then(clientv3.OpDelete(key)).
Else(clientv3.OpGet(key)).
Commit()

g.Expect(err).To(BeNil())
g.Expect(resp.Succeeded).To(BeFalse())
})

// Add a key, make sure it exists, then delete it, make sure it got deleted,
Expand Down Expand Up @@ -109,7 +118,6 @@ func assertMissingKey(ctx context.Context, g Gomega, client *clientv3.Client, ke
}

func deleteKey(ctx context.Context, g Gomega, client *clientv3.Client, key string, revision int64) int64 {
// The Get before the Delete is to trick kine to accept the transaction
resp, err := client.Txn(ctx).
If(clientv3.Compare(clientv3.ModRevision(key), "=", revision)).
Then(clientv3.OpDelete(key)).
Expand Down
4 changes: 3 additions & 1 deletion test/get_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,11 @@ func TestGet(t *testing.T) {
t.Run("FailNotFound", func(t *testing.T) {
g := NewWithT(t)
key := "testKeyFailNotFound"
value := "testValue"

rev := createKey(ctx, g, kine.client, key, value)
// Delete key
deleteKey(ctx, g, kine.client, key, 0)
deleteKey(ctx, g, kine.client, key, rev)

// Get key
resp, err := kine.client.Get(ctx, key, clientv3.WithRange(""))
Expand Down

0 comments on commit eefb3f7

Please sign in to comment.