Skip to content

Commit

Permalink
minor
Browse files Browse the repository at this point in the history
  • Loading branch information
shaowenchen committed Dec 19, 2024
1 parent cbe5da0 commit e69c266
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 59 deletions.
112 changes: 67 additions & 45 deletions controllers/eventhooks_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,11 @@ import (
// EventHooksReconciler reconciles a EventHooks object
type EventHooksReconciler struct {
client.Client
Scheme *runtime.Scheme
mutex sync.RWMutex
eventbusMap map[string]opsevent.EventBus
Scheme *runtime.Scheme
subjectEventBusMapMutex sync.RWMutex
subjectEventBusMap map[string]opsevent.EventBus
objSubjectMapMutex sync.RWMutex
objSubjectMap map[string]string
}

//+kubebuilder:rbac:groups=crd.chenshaowen.com,resources=eventhooks,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -69,62 +71,76 @@ func (r *EventHooksReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, err
}
logger.Info.Println("Reconcile EventHooks: ", req.NamespacedName.String())
go r.create(logger, ctx, obj)
go r.update(logger, ctx, obj)
return ctrl.Result{}, nil
}

func (r *EventHooksReconciler) create(logger *opslog.Logger, ctx context.Context, obj *opsv1.EventHooks) error {
if r.eventbusMap == nil {
r.eventbusMap = make(map[string]opsevent.EventBus)
func (r *EventHooksReconciler) update(logger *opslog.Logger, ctx context.Context, obj *opsv1.EventHooks) error {
r.subjectEventBusMapMutex.Lock()
if obj.Spec.Subject == "" {
return nil
}

// delete old eventbus
if _, ok := r.eventbusMap[obj.Namespace]; ok {
r.delete(logger, ctx, types.NamespacedName{
Namespace: obj.Namespace,
Name: obj.Name,
})
subject := obj.Spec.Subject
if r.subjectEventBusMap == nil {
r.subjectEventBusMap = make(map[string]opsevent.EventBus)
}
if r.objSubjectMap == nil {
r.objSubjectMap = make(map[string]string)
}
if _, ok := r.objSubjectMap[obj.Name]; !ok {
r.objSubjectMap[obj.Name] = subject
}
existingEventHooksList := &opsv1.EventHooksList{}
listOpts := []client.ListOption{
client.InNamespace(obj.Namespace),
client.MatchingFields{".spec.subject": subject},
}
if err := r.List(ctx, existingEventHooksList, listOpts...); err != nil {
return err
}
key := obj.Spec.Subject
client := opsevent.EventBus{}
if _, ok := r.eventbusMap[key]; ok {
client = r.eventbusMap[key]
if _, ok := r.subjectEventBusMap[subject]; ok {
client = r.subjectEventBusMap[subject]
} else {
client.WithEndpoint(os.Getenv("EVENT_ENDPOINT")).WithSubject(obj.Spec.Subject)
}
client.AddConsumerFunc(func(ctx context.Context, event cloudevents.Event) {
eventStrings := opsevent.GetCloudEventReadable(event)
println("subjcet: ", event.Subject())
notification := true
if len(obj.Spec.Keywords) > 0 {
notification = false
for _, keyword := range obj.Spec.Keywords {
if strings.Contains(eventStrings, keyword) {
notification = true
break
client.WithEndpoint(os.Getenv("EVENT_ENDPOINT")).WithSubject(subject)
}
for _, eventhook := range existingEventHooksList.Items {
client.AddConsumerFunc(func(ctx context.Context, event cloudevents.Event) {
eventStrings := opsevent.GetCloudEventReadable(event)
notification := true
if len(eventhook.Spec.Keywords) > 0 {
notification = false
for _, keyword := range obj.Spec.Keywords {
if strings.Contains(eventStrings, keyword) {
notification = true
break
}
}
}
}
if notification {
println("notification: ", eventStrings)
go opseventhook.NotificationMap[obj.Spec.Type].Post(obj.Spec.URL, obj.Spec.Options, eventStrings, obj.Spec.Additional)
}

})
r.mutex.Lock()
r.eventbusMap[key] = client
r.mutex.Unlock()
if notification {
go opseventhook.NotificationMap[obj.Spec.Type].Post(obj.Spec.URL, obj.Spec.Options, eventStrings, obj.Spec.Additional)
}

})
}
r.subjectEventBusMapMutex.Unlock()
client.Subscribe(ctx)
return nil
}

func (r *EventHooksReconciler) delete(logger *opslog.Logger, ctx context.Context, namespacedName types.NamespacedName) error {
r.mutex.Lock()
defer r.mutex.Unlock()

if _, ok := r.eventbusMap[namespacedName.Namespace]; ok {
logger.Debug.Println("canceling EventBus for ", namespacedName.String())
delete(r.eventbusMap, namespacedName.Namespace)
if r.objSubjectMap == nil {
return nil
}
if subject, ok := r.objSubjectMap[namespacedName.String()]; ok {
r.update(logger, ctx, &opsv1.EventHooks{
Spec: opsv1.EventHooksSpec{
Subject: subject,
},
})
r.objSubjectMapMutex.Lock()
delete(r.objSubjectMap, namespacedName.Name)
r.objSubjectMapMutex.Unlock()
}
return nil
}
Expand All @@ -138,6 +154,12 @@ func (r *EventHooksReconciler) SetupWithManager(mgr ctrl.Manager) error {
Kind: opsconstants.EventHooks,
})
}
if err := mgr.GetFieldIndexer().IndexField(context.TODO(), &opsv1.EventHooks{}, ".spec.subject", func(rawObj client.Object) []string {
tr := rawObj.(*opsv1.EventHooks)
return []string{tr.Spec.Subject}
}); err != nil {
return err
}
return ctrl.NewControllerManagedBy(mgr).
For(&opsv1.EventHooks{}).
Complete(r)
Expand Down
14 changes: 10 additions & 4 deletions pkg/event/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ func GetClient(endpoint string, subject string) (ceclient.Client, error) {
}

type EventBus struct {
Server string
Subject string
ConsumerFuncs []func(ctx context.Context, event cloudevents.Event)
Server string
Subject string
SubScribeCancel context.CancelFunc
ConsumerFuncs []func(ctx context.Context, event cloudevents.Event)
}

func (bus *EventBus) AddConsumerFunc(fn func(ctx context.Context, event cloudevents.Event)) {
Expand Down Expand Up @@ -88,10 +89,15 @@ func (bus *EventBus) Subscribe(ctx context.Context) error {
return err
}
combineFn := func(ctx context.Context, event cloudevents.Event) {
// println("len: ", len(bus.ConsumerFuncs))
for _, fn := range bus.ConsumerFuncs {
fn(ctx, event)
}
}

if bus.SubScribeCancel != nil {
bus.SubScribeCancel()
}
ctx, cancel := context.WithCancel(ctx)
bus.SubScribeCancel = cancel
return client.StartReceiver(ctx, combineFn)
}
4 changes: 2 additions & 2 deletions taskruns/alert-hosts-disk.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ metadata:
name: alert-hosts-disk
namespace: ops-system
spec:
crontab: "*/10 * * * *"
crontab: "*/30 * * * *"
taskRef: alert-hosts-disk
variables:
threshold: "80"
threshold: "84"
2 changes: 1 addition & 1 deletion taskruns/alert-hosts-failed.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ metadata:
name: alert-hosts-failed
namespace: ops-system
spec:
crontab: "*/10 * * * *"
crontab: "*/30 * * * *"
taskRef: alert-hosts-failed
4 changes: 2 additions & 2 deletions taskruns/alert-hosts-mem.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ metadata:
name: alert-hosts-mem
namespace: ops-system
spec:
crontab: "*/10 * * * *"
crontab: "*/30 * * * *"
taskRef: alert-hosts-mem
variables:
threshold: "80"
threshold: "84"
5 changes: 3 additions & 2 deletions tasks/alert-evicted-pods.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@ spec:
def send(status, message, value):
payload = {
'host': '${HOSTNAME}',
'kind': '${TASKRUN}',
'type': 'TaskRunReport',
'status': status,
'message': message,
'threshold': '${threshold}',
'operator': '>',
'value': value
'value': str(value)
}
headers = {
'Content-Type': 'application/json'
Expand All @@ -53,7 +54,7 @@ spec:
message = f"Evicted pods count {evicted_count} exceeds threshold {threshold}."
send('alerting', message, evicted_count)
else:
message = f"Evicted pods count {evicted_count} is less than threshold {threshold}."
message = f"Evicted pods count {evicted_count} not exceeds threshold {threshold}."
send('normal', message, evicted_count)
if __name__ == "__main__":
Expand Down
8 changes: 5 additions & 3 deletions tasks/clear-disk.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ spec:
content: find /data/run -type f \( -name "*.log" -o -name "*.log.[0-9]*" \) -size +100M -exec sh -c 'for file do echo "cleaned $file" >&2; echo "" > "$file"; done' sh {} + 2>/dev/null || true
- name: clear jfs cache
content: |
find /data*/jfs*/*/raw/chunks -maxdepth 3 -type h -ctime +2 -exec rm -rf {} + 2>/dev/null || true
find /data*/jfs*/*/*/raw/chunks -maxdepth 3 -type h -ctime +2 -exec rm -rf {} + 2>/dev/null || true
find /var/lib/jfs/cache*/raw/chunks -maxdepth 3 -type h -ctime +2 -exec rm -rf {} + 2>/dev/null || true
find /data*/jfs*/*/raw/chunks -maxdepth 3 -type d -ctime +3 -exec rm -rf {} + 2>/dev/null || true
find /data*/jfs*/*/*/raw/chunks -maxdepth 3 -type d -ctime +3 -exec rm -rf {} + 2>/dev/null || true
find /data*/jfs*/*/*/*/raw/chunks -maxdepth 3 -type d -ctime +3 -exec rm -rf {} + 2>/dev/null || true
find /data*/jfs*/*/*/*/*/raw/chunks -maxdepth 3 -type d -ctime +3 -exec rm -rf {} + 2>/dev/null || true
find /var/lib/jfs/cache*/raw/chunks -maxdepth 3 -type d -ctime +3 -exec rm -rf {} + 2>/dev/null || true
- name: after clear
content: |
timeout 5 df -H | grep -vE '^Filesystem|tmpfs|cdrom|loop|udev' | awk '{ print $5 "/" $2 " " $1 }' |grep " "/

0 comments on commit e69c266

Please sign in to comment.