Skip to content

Commit

Permalink
flux plugin: add integration test for low memory env and check repo c…
Browse files Browse the repository at this point in the history
…hecksum before indexing (#3736)

* step 1: add integration test scenario for low maxmemory settings in redis

* step 2: add checksum handling. All tests passing

* Michael's feedback
  • Loading branch information
gfichtenholt authored Nov 9, 2021
1 parent 34b09ec commit 77a7c7e
Show file tree
Hide file tree
Showing 12 changed files with 552 additions and 141 deletions.
8 changes: 7 additions & 1 deletion chart/kubeapps/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1829,9 +1829,12 @@ redis:
## allkeys-lru: evict keys by trying to remove the less recently used (LRU) keys first, in order
## to make space for the new data added
- "--maxmemory-policy allkeys-lru"
## ref https://stackoverflow.com/questions/22815364/flushall-and-flushdb-commands-on-redis-return-unk-command
## Redis official Helm chart by default disables FLUSHDB and FLUSHALL commands
disableCommands: []
replica:
## @param redis.replica.replicaCount Number of Redis™ replicas to deploy
replicaCount: 0
replicaCount: 1
## @param master.extraFlags Array with additional command line flags for Redis™ replicas
extraFlags:
## The maxmemory configuration directive is used in order to configure Redis to use a specified
Expand All @@ -1842,3 +1845,6 @@ redis:
## allkeys-lru: evict keys by trying to remove the less recently used (LRU) keys first, in order
## to make space for the new data added
- "--maxmemory-policy allkeys-lru"
## ref https://stackoverflow.com/questions/22815364/flushall-and-flushdb-commands-on-redis-return-unk-command
## Redis official Helm chart by default disables FLUSHDB and FLUSHALL commands
disableCommands: []
140 changes: 101 additions & 39 deletions cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ type NamespacedResourceWatcherCache struct {
eventProcessedWaitGroup *sync.WaitGroup
}

type cacheValueSetter func(string, map[string]interface{}) (interface{}, bool, error)
type cacheValueGetter func(string, interface{}) (interface{}, error)
type cacheValueAdder func(string, map[string]interface{}) (interface{}, bool, error)
type cacheValueModifier func(string, map[string]interface{}, interface{}) (interface{}, bool, error)
type cacheValueDeleter func(string, map[string]interface{}) (bool, error)

// TODO (gfichtenholt) rename this to just Config when caching is separated out into core server
Expand All @@ -65,8 +66,8 @@ type cacheConfig struct {
// in the cache for a given k8s object (passed in as a untyped/unstructured map)
// the list of types actually supported be redis you can find in
// https://github.com/go-redis/redis/blob/v8.10.0/internal/proto/writer.go#L61
onAdd cacheValueSetter
onModify cacheValueSetter
onAdd cacheValueAdder
onModify cacheValueModifier
// the semantics of 'onGet' hook is to convert or "reverse engineer" what was previously
// stored in the cache (via onAdd/onModify hooks) to an object that the plug-in understands
// and wishes to be returned as part of response to fetchCachedObjects() call
Expand Down Expand Up @@ -123,13 +124,13 @@ func newCacheWithRedisClient(config cacheConfig, redisCli *redis.Client, waitGro
if pong, err := redisCli.Ping(redisCli.Context()).Result(); err != nil {
return nil, err
} else {
log.Infof("Redis [PING] -> [%s]", pong)
log.Infof("Redis [PING]: %s", pong)
}

if maxmemory, err := redisCli.ConfigGet(redisCli.Context(), "maxmemory").Result(); err != nil {
return nil, err
} else if len(maxmemory) > 1 {
log.Infof("Redis config maxmemory: %v", maxmemory[1])
log.Infof("Redis [CONFIG GET maxmemory]: %v", maxmemory[1])
}

c := NamespacedResourceWatcherCache{
Expand Down Expand Up @@ -240,8 +241,14 @@ func (c NamespacedResourceWatcherCache) Watch(options metav1.ListOptions) (watch
// when it comes to consistency at any given point, as long as EVENTUALLY consistent
// state is reached, which will be the case
func (c NamespacedResourceWatcherCache) resync() (string, error) {
ctx := context.Background()
// clear the entire cache in one call
if result, err := c.redisCli.FlushDB(c.redisCli.Context()).Result(); err != nil {
return "", err
} else {
log.Infof("Redis [FLUSHDB]: %s", result)
}

ctx := context.Background()
dynamicClient, _, err := c.config.clientGetter(ctx)
if err != nil {
return "", status.Errorf(codes.FailedPrecondition, "unable to get client due to: %v", err)
Expand Down Expand Up @@ -271,12 +278,6 @@ func (c NamespacedResourceWatcherCache) resync() (string, error) {
return "", status.Errorf(codes.Internal, "List() call response does not contain resource version")
}

// clear the entire cache in one call
c.redisCli.FlushDB(c.redisCli.Context()).Err()
if err != nil {
return "", err
}

// re-populate the cache with current state from k8s
c.populateWith(listItems.Items)
return rv, nil
Expand Down Expand Up @@ -342,36 +343,42 @@ func (c NamespacedResourceWatcherCache) onAddOrModify(add bool, unstructuredObj
return nil
}

var oldValue []byte
if !add {
if oldValue, err = c.redisCli.Get(c.redisCli.Context(), key).Bytes(); err != nil {
log.Errorf("Failed to get value for object with key [%s] in cache due to: %v", key, err)
return err
}
}

var newValue interface{}
var setVal bool
var funcName string
var addOrModify cacheValueSetter
if add {
if oldValue == nil {
funcName = "onAdd"
addOrModify = c.config.onAdd
newValue, setVal, err = c.config.onAdd(key, unstructuredObj)
} else {
funcName = "onModify"
addOrModify = c.config.onModify
newValue, setVal, err = c.config.onModify(key, unstructuredObj, oldValue)
}
value, setVal, err := addOrModify(key, unstructuredObj)

if err != nil {
log.Errorf("Invocation of [%s] for object %s\nfailed due to: %v", funcName, prettyPrintMap(unstructuredObj), err)
// clear that key so cache doesn't contain any stale info for this object
c.redisCli.Del(c.redisCli.Context(), key)
return err
}

if setVal {
} else if setVal {
// Zero expiration means the key has no expiration time.
// However, cache entries may be evicted by redis in order to make room for new ones,
// if redis is limited by maxmemory constraint
err = c.redisCli.Set(c.redisCli.Context(), key, value, 0).Err()
result, err := c.redisCli.Set(c.redisCli.Context(), key, newValue, 0).Result()
if err != nil {
log.Errorf("Failed to set value for object with key [%s] in cache due to: %v", key, err)
return err
} else {
if log.V(4).Enabled() {
usedMemory, totalMemory := c.memoryStats()
log.V(4).Infof("Set value for key [%s] in cache. Redis memory usage: [%s/%s]", key, usedMemory, totalMemory)
}
// debugging an intermittent issue
usedMemory, totalMemory := c.memoryStats()
log.Infof("Redis [SET %s]: %s. Redis [INFO memory]: [%s/%s]", key, result, usedMemory, totalMemory)
}
}
return nil
Expand All @@ -398,42 +405,47 @@ func (c NamespacedResourceWatcherCache) onDelete(unstructuredObj map[string]inte
}

if delete {
err = c.redisCli.Del(c.redisCli.Context(), key).Err()
keysremoved, err := c.redisCli.Del(c.redisCli.Context(), key).Result()
if err != nil {
log.Errorf("Failed to delete value for object [%s] from cache due to: %v", key, err)
return err
} else {
// debugging an intermittent failure
log.Infof("Redis [DEL %s]: %d", key, keysremoved)
}
}
return nil
}

// this is effectively a cache GET operation
func (c NamespacedResourceWatcherCache) fetchForOne(key string) (interface{}, error) {
log.Infof("+fectchForOne(%s)", key)
// read back from cache: should be either what we previously wrote or Redis.Nil if the key does
// not exist or has been evicted due to memory pressure/TTL expiry
log.V(4).Infof("+fectchForOne(%s)", key)
// read back from cache: should be either:
// - what we previously wrote OR
// - Redis.Nil if the key does not exist or has been evicted due to memory pressure/TTL expiry
//
// TODO (gfichtenholt) See if there might be a cleaner way than to have onGet() take []byte as
// a 2nd argument. In theory, I would have liked to pass in an interface{}, just like onAdd/onModify.
// The limitation here is caused by the fact that redis go client does not offer a
// generic Get() method that would work with interface{}. Instead, all results are returned as
// strings which can be converted to desired types as needed, e.g.
// redisCli.Get(ctx, key).Bytes() first gets the string and then converts it to bytes.
bytes, err := c.redisCli.Get(c.redisCli.Context(), key).Bytes()
// debugging an intermittent issue
if err == redis.Nil {
// this is normal if the key does not exist
log.V(4).Infof("Redis [GET %s]: Nil", key)
return nil, nil
} else if err != nil {
log.Errorf("Failed to get value for key [%s] from cache due to: %v", key, err)
return nil, err
}
log.V(4).Infof("Redis [GET %s]: %d bytes read", key, len(bytes))

val, err := c.config.onGet(key, bytes)
if err != nil {
log.Errorf("Invocation of 'onGet' for object with key [%s]\nfailed due to: %v", key, err)
return nil, err
}

//log.Infof("Fetched value for key [%s]: %v", key, val)
return val, nil
}

Expand All @@ -445,14 +457,64 @@ func (c NamespacedResourceWatcherCache) fetchForOne(key string) (interface{}, er
// be relied upon to be the "source-of-truth". So I removed it for now as I found it
// of no use

// parallelize the process of value retrieval because fetchForOne() calls
// c.config.onGet() which will de-code the data from bytes into expected struct, which
// may be computationally expensive and thus benefit from multiple threads of execution
func (c NamespacedResourceWatcherCache) fetchForMultiple(keys []string) (map[string]interface{}, error) {
response := make(map[string]interface{})
for _, key := range keys {
result, err := c.fetchForOne(key)
if err != nil {
return nil, err

// max number of concurrent workers retrieving cache values at the same time
const maxWorkers = 10

type getValueJob struct {
key string
}
type getValueJobResult struct {
key string
value interface{}
err error
}

var wg sync.WaitGroup
numWorkers := int(math.Min(float64(len(keys)), float64(maxWorkers)))
requestChan := make(chan getValueJob, numWorkers)
responseChan := make(chan getValueJobResult, numWorkers)

// Process only at most maxWorkers at a time
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func() {
// The following loop will only terminate when the request channel is
// closed (and there are no more items)
for job := range requestChan {
result, err := c.fetchForOne(job.key)
responseChan <- getValueJobResult{job.key, result, err}
}
wg.Done()
}()
}

go func() {
wg.Wait()
close(responseChan)
}()

go func() {
for _, key := range keys {
requestChan <- getValueJob{key}
}
close(requestChan)
}()

// Start receiving results
// The following loop will only terminate when the response channel is closed, i.e.
// after the all the requests have been processed
for resp := range responseChan {
if resp.err == nil {
response[resp.key] = resp.value
} else {
response[key] = result
// TODO (gfichtenholt) this returns first error, see if we can return all of them
return nil, resp.err
}
}
return response, nil
Expand Down Expand Up @@ -506,9 +568,9 @@ func (c NamespacedResourceWatcherCache) populateWith(items []unstructured.Unstru
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func() {
// The following loop will only terminate when the request channel is
// closed (and there are no more items)
for job := range requestChan {
// The following loop will only terminate when the request channel is
// closed (and there are no more items)
c.onAddOrModify(true, job.item)
}
wg.Done()
Expand Down
11 changes: 6 additions & 5 deletions cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/chart.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package main
import (
"context"
"fmt"
"reflect"
"strings"

"github.com/Masterminds/semver"
Expand Down Expand Up @@ -188,18 +189,18 @@ func (s *Server) getChart(ctx context.Context, repo types.NamespacedName, chartN
return nil, status.Errorf(codes.FailedPrecondition, "server cache has not been properly initialized")
}

charts, err := s.cache.fetchForOne(s.cache.keyForNamespacedName(repo))
entry, err := s.cache.fetchForOne(s.cache.keyForNamespacedName(repo))
if err != nil {
return nil, err
}

if charts != nil {
if typedCharts, ok := charts.([]models.Chart); !ok {
if entry != nil {
if typedEntry, ok := entry.(repoCacheEntry); !ok {
return nil, status.Errorf(
codes.Internal,
"unexpected value fetched from cache: %v", charts)
"unexpected value fetched from cache: type: [%s], value: [%v]", reflect.TypeOf(entry), entry)
} else {
for _, chart := range typedCharts {
for _, chart := range typedEntry.Charts {
if chart.Name == chartName {
return &chart, nil // found it
}
Expand Down
38 changes: 20 additions & 18 deletions cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/chart_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,24 +463,7 @@ func TestGetAvailablePackageVersions(t *testing.T) {
AvailablePackageRef: availableRef("bitnami/redis", "kubeapps"),
},
expectedStatusCode: codes.OK,
expectedResponse: &corev1.GetAvailablePackageVersionsResponse{
PackageAppVersions: []*corev1.PackageAppVersion{
{PkgVersion: "14.6.1", AppVersion: "6.2.4"},
{PkgVersion: "14.6.0", AppVersion: "6.2.4"},
{PkgVersion: "14.5.0", AppVersion: "6.2.4"},
{PkgVersion: "14.4.0", AppVersion: "6.2.4"},
{PkgVersion: "13.0.1", AppVersion: "6.2.1"},
{PkgVersion: "13.0.0", AppVersion: "6.2.1"},
{PkgVersion: "12.10.1", AppVersion: "6.0.12"},
{PkgVersion: "12.10.0", AppVersion: "6.0.12"},
{PkgVersion: "12.9.2", AppVersion: "6.0.12"},
{PkgVersion: "12.9.1", AppVersion: "6.0.12"},
{PkgVersion: "12.9.0", AppVersion: "6.0.12"},
{PkgVersion: "12.8.3", AppVersion: "6.0.12"},
{PkgVersion: "12.8.2", AppVersion: "6.0.12"},
{PkgVersion: "12.8.1", AppVersion: "6.0.12"},
},
},
expectedResponse: expected_versions_redis,
},
{
name: "it returns error for non-existent chart",
Expand Down Expand Up @@ -706,3 +689,22 @@ var expected_detail_redis_2 = &corev1.AvailablePackageDetail{
},
},
}

var expected_versions_redis = &corev1.GetAvailablePackageVersionsResponse{
PackageAppVersions: []*corev1.PackageAppVersion{
{PkgVersion: "14.6.1", AppVersion: "6.2.4"},
{PkgVersion: "14.6.0", AppVersion: "6.2.4"},
{PkgVersion: "14.5.0", AppVersion: "6.2.4"},
{PkgVersion: "14.4.0", AppVersion: "6.2.4"},
{PkgVersion: "13.0.1", AppVersion: "6.2.1"},
{PkgVersion: "13.0.0", AppVersion: "6.2.1"},
{PkgVersion: "12.10.1", AppVersion: "6.0.12"},
{PkgVersion: "12.10.0", AppVersion: "6.0.12"},
{PkgVersion: "12.9.2", AppVersion: "6.0.12"},
{PkgVersion: "12.9.1", AppVersion: "6.0.12"},
{PkgVersion: "12.9.0", AppVersion: "6.0.12"},
{PkgVersion: "12.8.3", AppVersion: "6.0.12"},
{PkgVersion: "12.8.2", AppVersion: "6.0.12"},
{PkgVersion: "12.8.1", AppVersion: "6.0.12"},
},
}
Loading

0 comments on commit 77a7c7e

Please sign in to comment.