Skip to content

Commit

Permalink
fix eventbus overwrite func
Browse files Browse the repository at this point in the history
  • Loading branch information
shaowenchen committed Dec 19, 2024
1 parent 6bf801c commit 6e630cf
Showing 1 changed file with 13 additions and 63 deletions.
76 changes: 13 additions & 63 deletions pkg/event/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,73 +4,29 @@ import (
"context"
"errors"
"strings"
"sync"

cenats "github.com/cloudevents/sdk-go/protocol/nats/v2"
cloudevents "github.com/cloudevents/sdk-go/v2"
ceclient "github.com/cloudevents/sdk-go/v2/client"
"github.com/google/uuid"
"github.com/nats-io/nats.go"
)

type GlobalEventBusClients struct {
Mutex sync.RWMutex
Clients map[string]*ProducerConsumerClient
}

type ProducerConsumerClient struct {
Producer *cloudevents.Client
Consumer *cloudevents.Client
ConsumerFuncs []func(ctx context.Context, event cloudevents.Event)
}

func (globalClient *GlobalEventBusClients) GetClient(endpoint string, subject string) (*ProducerConsumerClient, error) {
key := endpoint + subject

globalClient.Mutex.RLock()
clientP, ok := globalClient.Clients[key]
globalClient.Mutex.RUnlock()

if ok {
return clientP, nil
func GetClient(endpoint string, subject string) (ceclient.Client, error) {
natsOptions := []nats.Option{
nats.Name(uuid.New().String()),
}

globalClient.Mutex.Lock()
defer globalClient.Mutex.Unlock()

clientP, ok = globalClient.Clients[key]
if ok {
return clientP, nil
}

// build producer
producerP, err := cenats.NewSender(endpoint, subject, cenats.NatsOptions())
if err != nil {
return nil, err
}
producerClient, err := cloudevents.NewClient(producerP)
p, err := cenats.NewProtocol(endpoint, subject, subject, natsOptions)
if err != nil {
return nil, err
}

// build consumer
consumerP, err := cenats.NewConsumer(endpoint, subject, cenats.NatsOptions())
if err != nil {
return nil, err
}
consumerClient, err := cloudevents.NewClient(consumerP)
c, err := ceclient.New(p)
if err != nil {
return nil, err
}

// update cache with the new client
globalClient.Clients[key] = &ProducerConsumerClient{
Producer: &producerClient,
Consumer: &consumerClient,
}

return globalClient.Clients[key], nil
}

var CurrentEventBusClient = &GlobalEventBusClients{
Clients: make(map[string]*ProducerConsumerClient),
return c, nil
}

type EventBus struct {
Expand Down Expand Up @@ -103,28 +59,22 @@ func (bus *EventBus) Publish(ctx context.Context, data interface{}) error {
if err != nil {
return err
}
client, err := CurrentEventBusClient.GetClient(bus.Server, bus.Subject)
client, err := GetClient(bus.Server, bus.Subject)
if err != nil {
return err
}
result := (*client.Producer).Send(ctx, event)
result := client.Send(ctx, event)
if cloudevents.IsUndelivered(result) {
return errors.New("failed to publish")
}
return nil
}

func (bus *EventBus) Subscribe(ctx context.Context, fn func(ctx context.Context, event cloudevents.Event)) error {
client, err := CurrentEventBusClient.GetClient(bus.Server, bus.Subject)
client, err := GetClient(bus.Server, bus.Subject)
if err != nil {
return err
}
client.ConsumerFuncs = append(client.ConsumerFuncs, fn)
receiverFn := func(event cloudevents.Event) {
for _, consumerFunc := range client.ConsumerFuncs {
consumerFunc(ctx, event)
}
}

return (*client.Consumer).StartReceiver(ctx, receiverFn)
return client.StartReceiver(ctx, fn)
}

0 comments on commit 6e630cf

Please sign in to comment.