Skip to content

Commit

Permalink
debug
Browse files Browse the repository at this point in the history
  • Loading branch information
shaowenchen committed Dec 19, 2024
1 parent c4becda commit 963935c
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 18 deletions.
27 changes: 14 additions & 13 deletions controllers/eventhooks_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type EventHooksReconciler struct {
client.Client
Scheme *runtime.Scheme
mutex sync.RWMutex
eventbusMap map[string]context.CancelFunc
eventbusMap map[string]opsevent.EventBus
}

//+kubebuilder:rbac:groups=crd.chenshaowen.com,resources=eventhooks,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -75,7 +75,7 @@ func (r *EventHooksReconciler) Reconcile(ctx context.Context, req ctrl.Request)

func (r *EventHooksReconciler) create(logger *opslog.Logger, ctx context.Context, obj *opsv1.EventHooks) error {
if r.eventbusMap == nil {
r.eventbusMap = make(map[string]context.CancelFunc)
r.eventbusMap = make(map[string]opsevent.EventBus)
}

// delete old eventbus
Expand All @@ -85,15 +85,14 @@ func (r *EventHooksReconciler) create(logger *opslog.Logger, ctx context.Context
Name: obj.Name,
})
}

ctx, cancel := context.WithCancel(context.Background())
r.mutex.Lock()
r.eventbusMap[obj.Namespace] = cancel
r.mutex.Unlock()

client := &opsevent.EventBus{}
println("sub subject: ", obj.Spec.Subject)
client.WithEndpoint(os.Getenv("EVENT_ENDPOINT")).WithSubject(obj.Spec.Subject).AddConsumerFunc(func(ctx context.Context, event cloudevents.Event) {
key := obj.Spec.Subject
client := opsevent.EventBus{}
if _, ok := r.eventbusMap[key]; ok {
client = r.eventbusMap[key]
} else {
client.WithEndpoint(os.Getenv("EVENT_ENDPOINT")).WithSubject(obj.Spec.Subject)
}
client.AddConsumerFunc(func(ctx context.Context, event cloudevents.Event) {
eventStrings := opsevent.GetCloudEventReadable(event)
notification := true
if len(obj.Spec.Keywords) > 0 {
Expand All @@ -110,6 +109,9 @@ func (r *EventHooksReconciler) create(logger *opslog.Logger, ctx context.Context
}

})
r.mutex.Lock()
r.eventbusMap[key] = client
r.mutex.Unlock()
client.Subscribe(ctx)
return nil
}
Expand All @@ -118,9 +120,8 @@ func (r *EventHooksReconciler) delete(logger *opslog.Logger, ctx context.Context
r.mutex.Lock()
defer r.mutex.Unlock()

if cancel, ok := r.eventbusMap[namespacedName.Namespace]; ok {
if _, ok := r.eventbusMap[namespacedName.Namespace]; ok {
logger.Debug.Println("canceling EventBus for ", namespacedName.String())
cancel()
delete(r.eventbusMap, namespacedName.Namespace)
}
return nil
Expand Down
15 changes: 11 additions & 4 deletions pkg/event/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,25 @@ import (
"context"
"errors"
"strings"
"sync"

cenats "github.com/cloudevents/sdk-go/protocol/nats/v2"
cloudevents "github.com/cloudevents/sdk-go/v2"
ceclient "github.com/cloudevents/sdk-go/v2/client"
"github.com/google/uuid"
"github.com/nats-io/nats.go"
)

var CacheClient = make(map[string]ceclient.Client)
var MutexClient = sync.RWMutex{}

func GetClient(endpoint string, subject string) (ceclient.Client, error) {
natsOptions := []nats.Option{
nats.Name(uuid.New().String()),
MutexClient.Lock()
defer MutexClient.Unlock()
key := endpoint + subject
if client, ok := CacheClient[key]; ok {
return client, nil
}
natsOptions := []nats.Option{}
p, err := cenats.NewProtocol(endpoint, subject, subject, natsOptions)
if err != nil {
return nil, err
Expand All @@ -25,7 +32,7 @@ func GetClient(endpoint string, subject string) (ceclient.Client, error) {
if err != nil {
return nil, err
}

CacheClient[key] = c
return c, nil
}

Expand Down
1 change: 0 additions & 1 deletion pkg/eventhook/notification.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ func (xiezuo *XiezuoPost) Post(url string, options map[string]string, data strin
}
_, err = client.Do(req)
if err != nil {
println(err.Error())
return err
}
return nil
Expand Down

0 comments on commit 963935c

Please sign in to comment.