Skip to content

Commit

Permalink
Merge pull request #1 from dimitarvdimitrov/20211203_r2g
Browse files Browse the repository at this point in the history
Add k8s kv CAS implementation
  • Loading branch information
simonswine authored Dec 3, 2021
2 parents ac3712a + 5c38440 commit 279fde7
Show file tree
Hide file tree
Showing 4 changed files with 174 additions and 10 deletions.
2 changes: 1 addition & 1 deletion kv/kubernetes/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (c *Client) process() bool {
} else {
// Note that you also have to check the uid if you have a local controlled resource, which
// is dependent on the actual instance, to detect that a Pod was recreated with the same name
fmt.Printf("Sync/Add/Update for configMap %s\n", obj.(*v1.Pod).GetName())
fmt.Printf("Sync/Add/Update for configMap %s\n", obj.(*v1.ConfigMap).GetName())
}
return true

Expand Down
70 changes: 70 additions & 0 deletions kv/kubernetes/jsonpatch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package kubernetes

import (
"encoding/base64"
"encoding/json"
)

type operation struct {
Op string `json:"op"`
Path string `json:"path"`
// Value should be a base64-encoded value or nil.
Value *string `json:"value"`
}

// preparePatch prepares a JSON patch (RFC 6902) to update a configmap key. If oldHash is empty or nil
// this is equivalent to adding a key.
func preparePatch(key string, oldHash, newVal, newHash []byte) ([]byte, error) {
hashKey := "/binaryData/" + convertKeyToStoreHash(key)

b64 := func(b []byte) *string {
str := base64.StdEncoding.EncodeToString(b)
return &str
}

// testing with nil is equivalent to testing that the key doesn't exist
var expectedHash *string
if len(oldHash) > 0 {
expectedHash = b64(oldHash)
}

testHashOp := operation{
Op: "test",
Path: hashKey,
Value: expectedHash,
}

setHashOp := operation{
Op: "replace",
Path: hashKey,
Value: b64(newHash),
}

setDataOp := operation{
Op: "replace",
Path: "/binaryData/" + convertKeyToStore(key),
Value: b64(newVal),
}

patch := []operation{testHashOp, setHashOp, setDataOp}

return json.Marshal(patch)
}

func prepareDeletePatch(key string) ([]byte, error) {
removeHashOp := operation{
Op: "remove",
Path: "/binaryData/" + convertKeyToStoreHash(key),
Value: nil,
}

removeObjectOp := operation{
Op: "remove",
Path: "/binaryData/" + convertKeyToStore(key),
Value: nil,
}

patch := []operation{removeHashOp, removeObjectOp}

return json.Marshal(patch)
}
65 changes: 58 additions & 7 deletions kv/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kubernetes

import (
"context"
"crypto/sha1"
"encoding/base64"
"fmt"
"os"
Expand All @@ -14,6 +15,7 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
Expand Down Expand Up @@ -106,6 +108,8 @@ func NewClient(cfg *Config, cod codec.Codec, logger log.Logger, registerer prome
ObjectMeta: metav1.ObjectMeta{
Name: client.name,
},
// We want non-empty .data and .binaryData; otherwise CAS will fail because it cannot find the parent key
BinaryData: map[string][]byte{convertKeyToStore("_"): []byte("_")},
}
client.configMap, err = clientset.CoreV1().ConfigMaps(client.namespace).Create(context.Background(), client.configMap, metav1.CreateOptions{})
if err != nil {
Expand All @@ -125,6 +129,10 @@ func convertKeyToStore(in string) string {
return base64.RawURLEncoding.EncodeToString([]byte(in))
}

func convertKeyToStoreHash(in string) string {
return "__hash_" + base64.RawURLEncoding.EncodeToString([]byte(in))
}

func convertKeyFromStore(in string) (string, error) {
body, err := base64.RawURLEncoding.DecodeString(in)
if err != nil {
Expand All @@ -148,6 +156,9 @@ func (c *Client) List(ctx context.Context, prefix string) ([]string, error) {
c.logger.Log(fmt.Sprintf("unable to decode key '%s'", keyStore))
continue
}
if key == "_" {
continue
}
if strings.HasPrefix(key, prefix) {
keys = append(keys, key)
}
Expand All @@ -163,17 +174,46 @@ func (c *Client) Get(ctx context.Context, key string) (interface{}, error) {
cm := c.configMap
c.configMapMtx.RUnlock()

if key == "_" {
return nil, nil
}

value, ok := cm.BinaryData[convertKeyToStore(key)]
if !ok {
return nil, nil
}
return value, nil

return c.codec.Decode(value)
}

// Delete a specific key. Deletions are best-effort and no error will
// be returned if the key does not exist.
func (c *Client) Delete(ctx context.Context, key string) error {
return fmt.Errorf("unimplemented")
c.configMapMtx.RLock()
cm := c.configMap
c.configMapMtx.RUnlock()

_, ok := cm.BinaryData[convertKeyToStore(key)]
if !ok {
// Object is already deleted or never existed
return nil
}

patch, err := prepareDeletePatch(key)
if err != nil {
return err
}

updatedCM, err := c.client.CoreV1().ConfigMaps(c.namespace).Patch(ctx, c.name, types.JSONPatchType, patch, metav1.PatchOptions{})
if err != nil {
return err
}

c.configMapMtx.Lock()
c.configMap = updatedCM
c.configMapMtx.Unlock()

return nil
}

// CAS stands for Compare-And-Swap. Will call provided callback f with the
Expand Down Expand Up @@ -217,13 +257,15 @@ func (c *Client) CAS(ctx context.Context, key string, f func(in interface{}) (ou
return err
}

newCM := cm.DeepCopy()
if newCM.BinaryData == nil {
newCM.BinaryData = make(map[string][]byte)
oldEncodedHash := cm.BinaryData[convertKeyToStoreHash(key)]
newHash := hash(encoded)

patch, err := preparePatch(key, oldEncodedHash, encoded, newHash)
if err != nil {
return err
}
newCM.BinaryData[convertKeyToStore(key)] = encoded

updatedCM, err := c.client.CoreV1().ConfigMaps(c.namespace).Update(ctx, newCM, metav1.UpdateOptions{})
updatedCM, err := c.client.CoreV1().ConfigMaps(c.namespace).Patch(ctx, c.name, types.JSONPatchType, patch, metav1.PatchOptions{})
if err != nil {
return err
}
Expand All @@ -235,6 +277,15 @@ func (c *Client) CAS(ctx context.Context, key string, f func(in interface{}) (ou
return nil
}

func hash(b []byte) []byte {
hasher := sha1.New()
_, err := hasher.Write(b)
if err != nil {
panic(err)
}
return hasher.Sum(nil)
}

// WatchKey calls f whenever the value stored under key changes.
func (c *Client) WatchKey(ctx context.Context, key string, f func(interface{}) bool) {
panic("implement me")
Expand Down
47 changes: 45 additions & 2 deletions kv/kubernetes/kubernetes_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func Test_Integration(t *testing.T) {

keys, err := c.List(context.Background(), "")
require.NoError(t, err)
assert.ElementsMatch(t, []string{}, keys)
assert.Empty(t, keys)

value, err := c.Get(context.Background(), "not-exists")
require.NoError(t, err)
Expand All @@ -82,11 +82,54 @@ func Test_Integration(t *testing.T) {
return
}))

require.NoError(t, c.CAS(context.Background(), "/test", func(old interface{}) (out interface{}, retry bool, err error) {
assert.Equal(t, "test", old)
out = nil
retry = false
return
}))

keys, err = c.List(context.TODO(), "/test")
require.NoError(t, err)
assert.ElementsMatch(t, []string{"/test"}, keys)

value, err = c.Get(context.TODO(), "/test")
require.NoError(t, err)
assert.NotNil(t, value)
assert.Equal(t, "test", value)
}

func Test_Delete(t *testing.T) {
t.Run("happy flow", func(t *testing.T) {
c := newClient(t)

require.NoError(t, c.CAS(context.Background(), "/test", func(_ interface{}) (out interface{}, retry bool, err error) {
out = "test"
retry = false
return
}))

require.NoError(t, c.Delete(context.Background(), "/test"))

keys, err := c.List(context.TODO(), "/test")
require.NoError(t, err)
assert.Empty(t, keys)

value, err := c.Get(context.TODO(), "/test")
assert.NoError(t, err)
assert.Nil(t, value)
})

t.Run("deleting non-existent key also works", func(t *testing.T) {
c := newClient(t)

require.NoError(t, c.Delete(context.Background(), "/test"))

keys, err := c.List(context.TODO(), "/test")
require.NoError(t, err)
assert.Empty(t, keys)

value, err := c.Get(context.TODO(), "/test")
assert.NoError(t, err)
assert.Nil(t, value)
})
}

0 comments on commit 279fde7

Please sign in to comment.