Skip to content

Commit

Permalink
feat: add more fields in K8s meta server
Browse files Browse the repository at this point in the history
  • Loading branch information
Abingcbc committed Nov 20, 2024
1 parent 257aad1 commit 97a5460
Show file tree
Hide file tree
Showing 6 changed files with 249 additions and 97 deletions.
31 changes: 30 additions & 1 deletion pkg/helper/k8smeta/k8s_meta_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,13 @@ func (m *k8sMetaCache) List() []*ObjectWrapper {
return m.metaStore.List()
}

func (m *k8sMetaCache) Filter(filterFunc func(*ObjectWrapper) bool, limit int) []*ObjectWrapper {
return m.metaStore.Filter(filterFunc, limit)
}

func (m *k8sMetaCache) RegisterSendFunc(key string, sendFunc SendFunc, interval int) {
m.metaStore.RegisterSendFunc(key, sendFunc, interval)
logger.Debug(context.Background(), "register send func", m.resourceType)
}

func (m *k8sMetaCache) UnRegisterSendFunc(key string) {
Expand Down Expand Up @@ -186,6 +191,8 @@ func getIdxRules(resourceType string) []IdxFunc {
return []IdxFunc{generateNodeKey}
case POD:
return []IdxFunc{generateCommonKey, generatePodIPKey, generateContainerIDKey, generateHostIPKey}
case SERVICE:
return []IdxFunc{generateCommonKey, generateServiceIPKey}
default:
return []IdxFunc{generateCommonKey}
}
Expand Down Expand Up @@ -274,7 +281,7 @@ func generateContainerIDKey(obj interface{}) ([]string, error) {
}
result := make([]string, len(pod.Status.ContainerStatuses))
for i, containerStatus := range pod.Status.ContainerStatuses {
result[i] = containerStatus.ContainerID
result[i] = truncateContainerID(containerStatus.ContainerID)
}
return result, nil
}
Expand All @@ -286,3 +293,25 @@ func generateHostIPKey(obj interface{}) ([]string, error) {
}
return []string{pod.Status.HostIP}, nil
}

func generateServiceIPKey(obj interface{}) ([]string, error) {
svc, ok := obj.(*v1.Service)
if !ok {
return []string{}, fmt.Errorf("object is not a service")
}
results := make([]string, 0)
for _, ip := range svc.Spec.ClusterIPs {
if ip != "" {
results = append(results, ip)
}
}
for _, ip := range svc.Spec.ExternalIPs {
if ip != "" {
results = append(results, ip)
}
}
if svc.Spec.LoadBalancerIP != "" {
results = append(results, svc.Spec.LoadBalancerIP)
}
return results, nil
}
9 changes: 7 additions & 2 deletions pkg/helper/k8smeta/k8s_meta_const.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,17 @@ const (
)

type PodMetadata struct {
PodName string `json:"podName"`
StartTime int64 `json:"startTime"`
Namespace string `json:"namespace"`
WorkloadName string `json:"workloadName"`
WorkloadKind string `json:"workloadKind"`
ServiceName string `json:"serviceName"`
Labels map[string]string `json:"labels"`
Envs map[string]string `json:"envs"`
Images map[string]string `json:"images"`
IsDeleted bool `json:"-"`

ServiceName string `json:"serviceName,omitempty"`
ContainerIDs []string `json:"containerIDs,omitempty"`
PodIP string `json:"podIP,omitempty"`
IsDeleted bool `json:"-"`
}
85 changes: 57 additions & 28 deletions pkg/helper/k8smeta/k8s_meta_deferred_deletion_meta_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ type DeferredDeletionMetaStore struct {
lock sync.RWMutex

// timer
gracePeriod int64
sendFuncs sync.Map
gracePeriod int64
registerLock sync.RWMutex
sendFuncs map[string]*SendFuncWithStopCh
}

type TimerEvent struct {
Expand All @@ -49,7 +50,7 @@ func NewDeferredDeletionMetaStore(eventCh chan *K8sMetaEvent, stopCh <-chan stru
Index: make(map[string][]string),

gracePeriod: gracePeriod,
sendFuncs: sync.Map{},
sendFuncs: make(map[string]*SendFuncWithStopCh),
}
return m
}
Expand All @@ -65,7 +66,7 @@ func (m *DeferredDeletionMetaStore) Get(key []string) map[string][]*ObjectWrappe
for _, k := range key {
realKeys, ok := m.Index[k]
if !ok {
return nil
continue
}
for _, realKey := range realKeys {
result[k] = append(result[k], m.Items[realKey])
Expand All @@ -84,12 +85,33 @@ func (m *DeferredDeletionMetaStore) List() []*ObjectWrapper {
return result
}

func (m *DeferredDeletionMetaStore) Filter(filterFunc func(*ObjectWrapper) bool, limit int) []*ObjectWrapper {
m.lock.RLock()
defer m.lock.RUnlock()
result := make([]*ObjectWrapper, 0)
for _, item := range m.Items {
if filterFunc != nil {
if filterFunc(item) {
result = append(result, item)
}
} else {
result = append(result, item)
}
if limit > 0 && len(result) >= limit {
break
}
}
return result
}

func (m *DeferredDeletionMetaStore) RegisterSendFunc(key string, f SendFunc, interval int) {
sendFuncWithStopCh := &SendFuncWithStopCh{
SendFunc: f,
StopCh: make(chan struct{}),
}
m.sendFuncs.Store(key, sendFuncWithStopCh)
m.registerLock.Lock()
m.sendFuncs[key] = sendFuncWithStopCh
m.registerLock.Unlock()
go func() {
defer panicRecover()
event := &K8sMetaEvent{
Expand All @@ -111,7 +133,6 @@ func (m *DeferredDeletionMetaStore) RegisterSendFunc(key string, f SendFunc, int

m.eventCh <- event
ticker := time.NewTicker(time.Duration(interval) * time.Second)

for {
select {
case <-ticker.C:
Expand All @@ -124,9 +145,12 @@ func (m *DeferredDeletionMetaStore) RegisterSendFunc(key string, f SendFunc, int
}

func (m *DeferredDeletionMetaStore) UnRegisterSendFunc(key string) {
if stopCh, ok := m.sendFuncs.LoadAndDelete(key); ok {
close(stopCh.(*SendFuncWithStopCh).StopCh)
m.registerLock.Lock()
if stopCh, ok := m.sendFuncs[key]; ok {
close(stopCh.StopCh)
}
delete(m.sendFuncs, key)
m.registerLock.Unlock()
}

// realtime events (add, update, delete) and timer events are handled sequentially
Expand All @@ -150,10 +174,11 @@ func (m *DeferredDeletionMetaStore) handleEvent() {
logger.Error(context.Background(), "unknown event type", event.EventType)
}
case <-m.stopCh:
m.sendFuncs.Range(func(key, value interface{}) bool {
close(value.(*SendFuncWithStopCh).StopCh)
return true
})
m.registerLock.Lock()
for _, f := range m.sendFuncs {
close(f.StopCh)
}
m.registerLock.Unlock()
return
}
}
Expand All @@ -175,10 +200,11 @@ func (m *DeferredDeletionMetaStore) handleAddEvent(event *K8sMetaEvent) {
m.Index[idxKey] = append(m.Index[idxKey], key)
}
m.lock.Unlock()
m.sendFuncs.Range(func(key, value interface{}) bool {
value.(*SendFuncWithStopCh).SendFunc([]*K8sMetaEvent{event})
return true
})
m.registerLock.RLock()
for _, f := range m.sendFuncs {
f.SendFunc([]*K8sMetaEvent{event})
}
m.registerLock.RUnlock()
}

func (m *DeferredDeletionMetaStore) handleUpdateEvent(event *K8sMetaEvent) {
Expand All @@ -200,10 +226,11 @@ func (m *DeferredDeletionMetaStore) handleUpdateEvent(event *K8sMetaEvent) {
m.Index[idxKey] = append(m.Index[idxKey], key)
}
m.lock.Unlock()
m.sendFuncs.Range(func(key, value interface{}) bool {
value.(*SendFuncWithStopCh).SendFunc([]*K8sMetaEvent{event})
return true
})
m.registerLock.RLock()
for _, f := range m.sendFuncs {
f.SendFunc([]*K8sMetaEvent{event})
}
m.registerLock.RUnlock()
}

func (m *DeferredDeletionMetaStore) handleDeleteEvent(event *K8sMetaEvent) {
Expand All @@ -218,10 +245,11 @@ func (m *DeferredDeletionMetaStore) handleDeleteEvent(event *K8sMetaEvent) {
event.Object.FirstObservedTime = obj.FirstObservedTime
}
m.lock.Unlock()
m.sendFuncs.Range(func(key, value interface{}) bool {
value.(*SendFuncWithStopCh).SendFunc([]*K8sMetaEvent{event})
return true
})
m.registerLock.RLock()
for _, f := range m.sendFuncs {
f.SendFunc([]*K8sMetaEvent{event})
}
m.registerLock.RUnlock()
go func() {
// wait and add a deferred delete event
time.Sleep(time.Duration(m.gracePeriod) * time.Second)
Expand Down Expand Up @@ -278,11 +306,11 @@ func (m *DeferredDeletionMetaStore) handleDeferredDeleteEvent(event *K8sMetaEven

func (m *DeferredDeletionMetaStore) handleTimerEvent(event *K8sMetaEvent) {
timerEvent := event.Object.Raw.(*TimerEvent)
if f, ok := m.sendFuncs.Load(timerEvent.ConfigName); ok {
sendFuncWithStopCh := f.(*SendFuncWithStopCh)
m.registerLock.RLock()
defer m.registerLock.RUnlock()
if f, ok := m.sendFuncs[timerEvent.ConfigName]; ok {
allItems := make([]*K8sMetaEvent, 0)
m.lock.RLock()
defer m.lock.RUnlock()
for _, obj := range m.Items {
if !obj.Deleted {
obj.LastObservedTime = time.Now().Unix()
Expand All @@ -292,7 +320,8 @@ func (m *DeferredDeletionMetaStore) handleTimerEvent(event *K8sMetaEvent) {
})
}
}
sendFuncWithStopCh.SendFunc(allItems)
m.lock.RUnlock()
f.SendFunc(allItems)
}
}

Expand Down
Loading

0 comments on commit 97a5460

Please sign in to comment.