Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhancement: add configmap to configure user agents for specify response cache. #466

Merged
merged 1 commit into from
Sep 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/yurthub/app/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func Run(cfg *config.YurtHubConfiguration, stopCh <-chan struct{}) error {
trace++

klog.Infof("%d. new cache manager with storage wrapper and serializer manager", trace)
cacheMgr, err := cachemanager.NewCacheManager(cfg.StorageWrapper, cfg.SerializerManager, cfg.RESTMapperManager)
cacheMgr, err := cachemanager.NewCacheManager(cfg.StorageWrapper, cfg.SerializerManager, cfg.RESTMapperManager, cfg.SharedFactory)
if err != nil {
return fmt.Errorf("could not new cache manager, %v", err)
}
Expand Down
7 changes: 7 additions & 0 deletions config/setup/yurthub-cfg.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: yurt-hub-cfg
namespace: kube-system
data:
cache_agents: ""
163 changes: 93 additions & 70 deletions pkg/yurthub/cachemanager/cache_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,94 +18,65 @@ package cachemanager

import (
"strings"

"github.com/openyurtio/openyurt/pkg/projectinfo"

"time"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/sets"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/klog"

"github.com/openyurtio/openyurt/pkg/yurthub/util"
)

var (
defaultCacheAgents = []string{
"kubelet",
"kube-proxy",
"flanneld",
"coredns",
projectinfo.GetAgentName(),
projectinfo.GetHubName(),
}
cacheAgentsKey = "_internal/cache-manager/cache-agent.conf"
sepForAgent = ","
sepForAgent = ","
)

func (cm *cacheManager) initCacheAgents() error {
agents := make([]string, 0)
b, err := cm.storage.GetRaw(cacheAgentsKey)
if err == nil && len(b) != 0 {
localAgents := strings.Split(string(b), sepForAgent)
if len(localAgents) < len(defaultCacheAgents) {
err = cm.storage.Delete(cacheAgentsKey)
if err != nil {
klog.Errorf("failed to delete agents cache, %v", err)
return err
}
} else {
agents = append(agents, localAgents...)
for _, agent := range localAgents {
cm.cacheAgents[agent] = false
}
}
}
for _, agent := range defaultCacheAgents {
if cm.cacheAgents == nil {
cm.cacheAgents = make(map[string]bool)
}

if _, ok := cm.cacheAgents[agent]; !ok {
agents = append(agents, agent)
}
cm.cacheAgents[agent] = true
if cm.sharedFactory == nil {
return nil
}
configmapInformer := cm.sharedFactory.InformerFor(&corev1.ConfigMap{}, newConfigmapInformer)
configmapInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: cm.addConfigmap,
UpdateFunc: cm.updateConfigmap,
})

klog.Infof("reset cache agents to %v", agents)
return cm.storage.UpdateRaw(cacheAgentsKey, []byte(strings.Join(agents, sepForAgent)))
klog.Infof("init cache agents to %v", cm.cacheAgents)
return nil
}

// UpdateCacheAgents update cache agents
func (cm *cacheManager) UpdateCacheAgents(agents []string) error {
if len(agents) == 0 {
klog.Infof("no cache agent is set for update")
return nil
}
func (cm *cacheManager) UpdateCacheAgents(cache_agents, action string) sets.String {
userAgents := strings.TrimSpace(cache_agents)
agents := strings.Split(userAgents, sepForAgent)
newAgents := sets.NewString(agents...)

hasUpdated := false
updatedAgents := append(defaultCacheAgents, agents...)
cm.Lock()
defer cm.Unlock()
if len(updatedAgents) != len(cm.cacheAgents) {
hasUpdated = true
} else {
for _, agent := range agents {
if _, ok := cm.cacheAgents[agent]; !ok {
hasUpdated = true
break
}
}
oldAgents := cm.cacheAgents.Delete(util.DefaultCacheAgents...)

if oldAgents.Equal(newAgents) {
// add default cache agents
cm.cacheAgents = cm.cacheAgents.Insert(util.DefaultCacheAgents...)
return sets.String{}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should you add the DefaultCacheAgents back before returning sets.String{} here and for constructing new cache agents below?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should you add the DefaultCacheAgents back before returning sets.String{} here and for constructing new cache agents below?

fixed, DefaultCacheAgents have been added.

}

if hasUpdated {
for k, v := range cm.cacheAgents {
if !v {
// not default agent
delete(cm.cacheAgents, k)
}
}
// get deleted agents
deletedAgents := oldAgents.Difference(newAgents)

for _, agent := range agents {
cm.cacheAgents[agent] = false
}
return cm.storage.UpdateRaw(cacheAgentsKey, []byte(strings.Join(updatedAgents, sepForAgent)))
}
return nil
// construct new cache agents
cm.cacheAgents = cm.cacheAgents.Delete(deletedAgents.List()...)
cm.cacheAgents = cm.cacheAgents.Insert(agents...)
cm.cacheAgents = cm.cacheAgents.Insert(util.DefaultCacheAgents...)
klog.Infof("current cache agents after %s are: %v", action, cm.cacheAgents)

// return deleted agents
return deletedAgents
}

// ListCacheAgents get all of cache agents
Expand All @@ -118,3 +89,55 @@ func (cm *cacheManager) ListCacheAgents() []string {
}
return agents
}

func newConfigmapInformer(cs clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
selector := fields.Set{"metadata.name": util.YurthubConfigMapName}.String()
tweakListOptions := func(options *metav1.ListOptions) {
options.FieldSelector = selector
}

return coreinformers.NewFilteredConfigMapInformer(cs, util.YurtHubNamespace, resyncPeriod, nil, tweakListOptions)
}

func (cm *cacheManager) addConfigmap(obj interface{}) {
cfg, ok := obj.(*corev1.ConfigMap)
if !ok {
return
}

deletedAgents := cm.UpdateCacheAgents(cfg.Data[util.CacheUserAgentsKey], "add")
cm.deleteAgentCache(deletedAgents)
}

func (cm *cacheManager) updateConfigmap(oldObj, newObj interface{}) {
oldCfg, ok := oldObj.(*corev1.ConfigMap)
if !ok {
return
}

newCfg, ok := newObj.(*corev1.ConfigMap)
if !ok {
return
}

if oldCfg.Data[util.CacheUserAgentsKey] == newCfg.Data[util.CacheUserAgentsKey] {
return
}

deletedAgents := cm.UpdateCacheAgents(newCfg.Data[util.CacheUserAgentsKey], "update")
cm.deleteAgentCache(deletedAgents)
}

func (cm *cacheManager) deleteAgentCache(deletedAgents sets.String) {
// delete cache data for deleted agents
if deletedAgents.Len() > 0 {
keys := deletedAgents.List()
for i := range keys {
if err := cm.storage.DeleteCollection(keys[i]); err != nil {
klog.Errorf("failed to cleanup cache for deleted agent(%s), %v", keys[i], err)
} else {
klog.Infof("cleanup cache for agent(%s) successfully", keys[i])
}
}
}
}
145 changes: 32 additions & 113 deletions pkg/yurthub/cachemanager/cache_agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,137 +17,56 @@ limitations under the License.
package cachemanager

import (
"strings"
"testing"

"github.com/openyurtio/openyurt/pkg/yurthub/storage/disk"
"github.com/openyurtio/openyurt/pkg/yurthub/util"

"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/endpoints/request"
)

func TestInitCacheAgents(t *testing.T) {
dStorage, err := disk.NewDiskStorage(rootDir)
if err != nil {
t.Errorf("failed to create disk storage, %v", err)
}
s := NewStorageWrapper(dStorage)
m, _ := NewCacheManager(s, nil, nil)

// default cache agents in fake store
b, err := s.GetRaw(cacheAgentsKey)
if err != nil {
t.Fatalf("failed to get agents, %v", err)
}

gotAgents := strings.Split(string(b), sepForAgent)
if ok := compareAgents(gotAgents, defaultCacheAgents); !ok {
t.Errorf("Got agents: %v, expect agents: %v", gotAgents, defaultCacheAgents)
}

if !compareAgents(gotAgents, m.ListCacheAgents()) {
t.Errorf("Got agents: %v, cache agents map: %v", gotAgents, m.ListCacheAgents())
}

// add agents for next init cache
_ = m.UpdateCacheAgents([]string{"agent1"})

_, _ = NewCacheManager(s, nil, nil)

b2, err := s.GetRaw(cacheAgentsKey)
if err != nil {
t.Fatalf("failed to get agents, %v", err)
}

expectedAgents := append(defaultCacheAgents, "agent1")
gotAgents2 := strings.Split(string(b2), sepForAgent)
if ok := compareAgents(gotAgents2, expectedAgents); !ok {
t.Errorf("Got agents: %v, expect agents: %v", gotAgents2, expectedAgents)
}

if !compareAgents(gotAgents2, m.ListCacheAgents()) {
t.Errorf("Got agents: %v, cache agents map: %v", gotAgents2, m.ListCacheAgents())
}

err = s.Delete(cacheAgentsKey)
if err != nil {
t.Errorf("failed to delete cache agents key, %v", err)
}
}

func TestUpdateCacheAgents(t *testing.T) {
dStorage, err := disk.NewDiskStorage(rootDir)
if err != nil {
t.Errorf("failed to create disk storage, %v", err)
}
s := NewStorageWrapper(dStorage)
m, _ := NewCacheManager(s, nil, nil)

testcases := map[string]struct {
desc string
addAgents []string
expectAgents []string
desc string
initAgents []string
cacheAgents string
resultAgents sets.String
deletedAgents sets.String
}{
"add one agent": {addAgents: []string{"agent1"}, expectAgents: append(defaultCacheAgents, "agent1")},
"update with two agents": {addAgents: []string{"agent2", "agent3"}, expectAgents: append(defaultCacheAgents, "agent2", "agent3")},
"update with more two agents": {addAgents: []string{"agent4", "agent5"}, expectAgents: append(defaultCacheAgents, "agent4", "agent5")},
"two new agents updated": {
initAgents: []string{},
cacheAgents: "agent1,agent2",
resultAgents: sets.NewString(append([]string{"agent1", "agent2"}, util.DefaultCacheAgents...)...),
deletedAgents: sets.String{},
},
"two new agents updated but an old agent deleted": {
initAgents: []string{"agent1", "agent2"},
cacheAgents: "agent2,agent3",
resultAgents: sets.NewString(append([]string{"agent2", "agent3"}, util.DefaultCacheAgents...)...),
deletedAgents: sets.NewString("agent1"),
},
"no agents updated ": {
initAgents: []string{"agent1", "agent2"},
cacheAgents: "agent1,agent2",
resultAgents: sets.NewString(append([]string{"agent1", "agent2"}, util.DefaultCacheAgents...)...),
deletedAgents: sets.String{},
},
}
for k, tt := range testcases {
t.Run(k, func(t *testing.T) {

// add agents
err := m.UpdateCacheAgents(tt.addAgents)
if err != nil {
t.Fatalf("failed to add cache agents, %v", err)
}

b, err := s.GetRaw(cacheAgentsKey)
if err != nil {
t.Fatalf("failed to get agents, %v", err)
m := &cacheManager{
cacheAgents: sets.NewString(tt.initAgents...),
}

gotAgents := strings.Split(string(b), sepForAgent)
if ok := compareAgents(gotAgents, tt.expectAgents); !ok {
t.Errorf("Got agents: %v, expect agents: %v", gotAgents, tt.expectAgents)
}
// add agents
deletedAgents := m.UpdateCacheAgents(tt.cacheAgents, "")

if !compareAgents(gotAgents, m.ListCacheAgents()) {
t.Errorf("Got agents: %v, cache agents map: %v", gotAgents, m.ListCacheAgents())
if !deletedAgents.Equal(tt.deletedAgents) {
t.Errorf("Got deleted agents: %v, expect agents: %v", deletedAgents, tt.deletedAgents)
}

err = s.Delete(cacheAgentsKey)
if err != nil {
t.Errorf("failed to delete cache agents key, %v", err)
if !m.cacheAgents.Equal(tt.resultAgents) {
t.Errorf("Got cache agents: %v, expect agents: %v", m.cacheAgents, tt.resultAgents)
}
})
}
}

func compareAgents(gotAgents []string, expectedAgents []string) bool {
if len(gotAgents) != len(expectedAgents) {
return false
}

for _, agent := range gotAgents {
notFound := true
for i := range expectedAgents {
if expectedAgents[i] == agent {
notFound = false
break
}
}

if notFound {
return false
}
}

return true
}

func newTestRequestInfoResolver() *request.RequestInfoFactory {
return &request.RequestInfoFactory{
APIPrefixes: sets.NewString("api", "apis"),
GrouplessAPIPrefixes: sets.NewString("api"),
}
}
Loading