Skip to content

Commit

Permalink
Merge pull request #92 from simonswine/20211203_r2g
Browse files Browse the repository at this point in the history
WIP K8s ring (aka r2g)
  • Loading branch information
dimitarvdimitrov authored Dec 6, 2021
2 parents c026bbd + 24d6802 commit 1c8311e
Show file tree
Hide file tree
Showing 9 changed files with 955 additions and 8 deletions.
7 changes: 4 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@ require (
golang.org/x/time v0.0.0-20210611083556-38a9dc6acbc6
google.golang.org/grpc v1.38.0
gopkg.in/yaml.v2 v2.4.0
k8s.io/api v0.21.7
k8s.io/apimachinery v0.21.7
k8s.io/client-go v0.21.7
)

replace k8s.io/client-go v12.0.0+incompatible => k8s.io/client-go v0.21.4
replace k8s.io/client-go v12.0.0+incompatible => k8s.io/client-go v0.21.7

// Replace memberlist with our fork which includes some fixes that haven't been
// merged upstream yet.
replace github.com/hashicorp/memberlist v0.2.3 => github.com/grafana/memberlist v0.2.5-0.20211201083710-c7bc8e9df94b
259 changes: 254 additions & 5 deletions go.sum

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/grafana/dskit/kv/codec"
"github.com/grafana/dskit/kv/consul"
"github.com/grafana/dskit/kv/etcd"
"github.com/grafana/dskit/kv/kubernetes"
"github.com/grafana/dskit/kv/memberlist"
)

Expand Down Expand Up @@ -142,6 +143,9 @@ func createClient(backend string, prefix string, cfg StoreConfig, codec codec.Co
})
client = inmemoryStore

case "kubernetes":
client, err = kubernetes.NewClient(&kubernetes.Config{}, codec, logger, reg)

case "memberlist":
kv, err := cfg.MemberlistKV()
if err != nil {
Expand Down
107 changes: 107 additions & 0 deletions kv/kubernetes/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package kubernetes

import (
"fmt"
"time"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)

func (c *Client) startController() error {
// create the pod watcher
configMapWatcher := cache.NewFilteredListWatchFromClient(c.clientset.CoreV1().RESTClient(), "configMaps", c.namespace, func(options *metav1.ListOptions) {
options.FieldSelector = "metadata.name=" + c.name
})

// create the workqueue
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())

indexer, informer := cache.NewIndexerInformer(configMapWatcher, &v1.ConfigMap{}, 0, cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
queue.Add(key)
}
},
UpdateFunc: func(old interface{}, new interface{}) {
key, err := cache.MetaNamespaceKeyFunc(new)
if err == nil {
queue.Add(key)
}
},
DeleteFunc: func(obj interface{}) {
// IndexerInformer uses a delta queue, therefore for deletes we have to use this
// key function.
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
queue.Add(key)
}
},
}, cache.Indexers{})

c.queue = queue
c.indexer = indexer
c.informer = informer

go c.runController()

return nil
}

func (c *Client) runController() {
defer runtime.HandleCrash()

// Let the workers stop when we are done
defer c.queue.ShutDown()
c.logger.Log("msg", "starting configmap controller")

go c.informer.Run(c.stopCh)

// Wait for all involved caches to be synced, before processing items from the queue is started
if !cache.WaitForCacheSync(c.stopCh, c.informer.HasSynced) {
runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
return
}

go wait.Until(func() {
for c.process() {
}
}, time.Second, c.stopCh)

<-c.stopCh
c.logger.Log("msg", "stopping configmap controller")
}

func (c *Client) process() bool {
// Wait until there is a new item in the working queue
key, quit := c.queue.Get()
if quit {
return false
}
// Tell the queue that we are done with processing this key. This unblocks the key for other workers
// This allows safe parallel processing because two pods with the same key are never processed in
// parallel.
defer c.queue.Done(key)

obj, exists, err := c.indexer.GetByKey(key.(string))
if err != nil {
c.logger.Log("Fetching object with key %s from store failed with %v", key.(string), err)
return false
}

if !exists {
// Below we will warm up our cache with a Pod, so that we will see a delete for one pod
fmt.Printf("configMap %s does not exist anymore\n", key)
} else {
// Note that you also have to check the uid if you have a local controlled resource, which
// is dependent on the actual instance, to detect that a Pod was recreated with the same name
fmt.Printf("Sync/Add/Update for configMap %s\n", obj.(*v1.ConfigMap).GetName())
}
return true

}
70 changes: 70 additions & 0 deletions kv/kubernetes/jsonpatch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package kubernetes

import (
"encoding/base64"
"encoding/json"
)

type operation struct {
Op string `json:"op"`
Path string `json:"path"`
// Value should be a base64-encoded value or nil.
Value *string `json:"value"`
}

// preparePatch prepares a JSON patch (RFC 6902) to update a configmap key. If oldHash is empty or nil
// this is equivalent to adding a key.
func preparePatch(key string, oldHash, newVal, newHash []byte) ([]byte, error) {
hashKey := "/binaryData/" + convertKeyToStoreHash(key)

b64 := func(b []byte) *string {
str := base64.StdEncoding.EncodeToString(b)
return &str
}

// testing with nil is equivalent to testing that the key doesn't exist
var expectedHash *string
if len(oldHash) > 0 {
expectedHash = b64(oldHash)
}

testHashOp := operation{
Op: "test",
Path: hashKey,
Value: expectedHash,
}

setHashOp := operation{
Op: "replace",
Path: hashKey,
Value: b64(newHash),
}

setDataOp := operation{
Op: "replace",
Path: "/binaryData/" + convertKeyToStore(key),
Value: b64(newVal),
}

patch := []operation{testHashOp, setHashOp, setDataOp}

return json.Marshal(patch)
}

func prepareDeletePatch(key string) ([]byte, error) {
removeHashOp := operation{
Op: "remove",
Path: "/binaryData/" + convertKeyToStoreHash(key),
Value: nil,
}

removeObjectOp := operation{
Op: "remove",
Path: "/binaryData/" + convertKeyToStore(key),
Value: nil,
}

patch := []operation{removeHashOp, removeObjectOp}

return json.Marshal(patch)
}
Loading

0 comments on commit 1c8311e

Please sign in to comment.