-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: add event-driven messaging support
- Loading branch information
Showing
7 changed files
with
366 additions
and
7 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,95 @@ | ||
package messenger | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"fmt" | ||
"time" | ||
|
||
"github.com/JulesMike/spoty/config" | ||
"github.com/JulesMike/spoty/health" | ||
"github.com/JulesMike/spoty/logger" | ||
"github.com/ThreeDotsLabs/watermill" | ||
"github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp" | ||
"github.com/ThreeDotsLabs/watermill/message" | ||
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel" | ||
) | ||
|
||
// Publisher is a wrapper for amqp.Publisher. | ||
type Publisher struct { | ||
*amqp.Publisher | ||
*gochannel.GoChannel | ||
|
||
cfg *config.Config | ||
logger *logger.Logger | ||
health *health.Checks | ||
} | ||
|
||
// NewPublisher returns a new publisher. | ||
func NewPublisher(cfg *config.Config, logger *logger.Logger, health *health.Checks) (*Publisher, error) { | ||
p := Publisher{ | ||
cfg: cfg, | ||
logger: logger, | ||
health: health, | ||
} | ||
|
||
amqpConfig := amqp.NewDurablePubSubConfig( | ||
cfg.AMQPURI, | ||
nil, | ||
) | ||
|
||
amqpConfig.Publish.ChannelPoolSize = 2 | ||
|
||
publisher, err := amqp.NewPublisher(amqpConfig, watermill.NewStdLoggerWithOut(logger.Writer(), !cfg.Prod, false)) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to create publisher: %w", err) | ||
} | ||
|
||
p.Publisher = publisher | ||
p.health.RegisterChecks(p.Check()) | ||
|
||
return &p, nil | ||
} | ||
|
||
// NewTestPublisher returns a new publisher for testing purposes. | ||
func NewTestPublisher(logger *logger.Logger) *Publisher { | ||
return &Publisher{ | ||
logger: logger, | ||
GoChannel: gochannel.NewGoChannel( | ||
gochannel.Config{}, | ||
watermill.NewStdLoggerWithOut(logger.Writer(), true, false), | ||
), | ||
} | ||
} | ||
|
||
// Publish is a wrapper for the MessagePublishr.Publish. | ||
func (p *Publisher) Publish(topic string, message ...*message.Message) error { | ||
return p.MessagePublisher().Publish(topic, message...) | ||
} | ||
|
||
// MessagePublisher returns the message publisher. | ||
func (p *Publisher) MessagePublisher() message.Publisher { | ||
if p.Publisher != nil { | ||
return p.Publisher | ||
} | ||
|
||
return p.GoChannel | ||
} | ||
|
||
// Check is used to perform healthcheck. | ||
func (p *Publisher) Check() health.Check { | ||
//nolint:revive | ||
return health.Check{ | ||
Name: "messenger.publisher", | ||
RefreshPeriod: 10 * time.Second, | ||
InitialDelay: 10 * time.Second, | ||
Timeout: 5 * time.Second, | ||
Check: func(_ context.Context) error { | ||
if !p.IsConnected() { | ||
return errors.New("publisher is not connected") | ||
} | ||
|
||
return nil | ||
}, | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,141 @@ | ||
package messenger | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"fmt" | ||
"log" | ||
"time" | ||
|
||
"github.com/JulesMike/spoty/config" | ||
"github.com/JulesMike/spoty/health" | ||
"github.com/JulesMike/spoty/logger" | ||
"github.com/ThreeDotsLabs/watermill" | ||
"github.com/ThreeDotsLabs/watermill/message" | ||
"github.com/ThreeDotsLabs/watermill/message/router/middleware" | ||
"go.uber.org/fx" | ||
) | ||
|
||
const ( | ||
retries = 3 | ||
initialInterval = time.Millisecond * 100 | ||
) | ||
|
||
// Module exported to initialise a new Publisher, Subscriber and Router. | ||
var Module = fx.Options( | ||
fx.Provide(NewPublisher, NewSubscriber, NewRouter), | ||
) | ||
|
||
// Router is a wrapper for a message router. | ||
type Router struct { | ||
*message.Router | ||
|
||
publisher *Publisher | ||
subscriber *Subscriber | ||
|
||
cfg *config.Config | ||
logger *logger.Logger | ||
health *health.Checks | ||
} | ||
|
||
// NewRouter returns a new router. | ||
func NewRouter( | ||
cfg *config.Config, | ||
publisher *Publisher, | ||
subscriber *Subscriber, | ||
logger *logger.Logger, | ||
health *health.Checks, | ||
) (*Router, error) { | ||
r := Router{ | ||
publisher: publisher, | ||
subscriber: subscriber, | ||
cfg: cfg, | ||
logger: logger, | ||
health: health, | ||
} | ||
|
||
wlog := watermill.NewStdLoggerWithOut(log.Writer(), !cfg.Prod, false) | ||
|
||
router, err := message.NewRouter(message.RouterConfig{}, wlog) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to create message router: %w", err) | ||
} | ||
|
||
router.AddMiddleware( | ||
middleware.CorrelationID, | ||
|
||
middleware.Retry{ | ||
MaxRetries: retries, | ||
InitialInterval: initialInterval, | ||
Logger: wlog, | ||
}.Middleware, | ||
|
||
middleware.Recoverer, | ||
) | ||
|
||
r.Router = router | ||
r.health.RegisterChecks(r.Check()) | ||
|
||
return &r, nil | ||
} | ||
|
||
// AddHandler is a wrapper around message.Router.AddHandler. | ||
func (r *Router) AddHandler( | ||
handlerName, | ||
subscribeTopic, | ||
publishTopic string, | ||
handlerFunc message.HandlerFunc, | ||
) *message.Handler { | ||
return r.Router.AddHandler( | ||
handlerName, | ||
subscribeTopic, | ||
r.subscriber.MessageSubscriber(), | ||
publishTopic, | ||
r.publisher.MessagePublisher(), | ||
handlerFunc, | ||
) | ||
} | ||
|
||
// AddNoPublisherHandler is a wrapper around message.Router.AddNoPublisherHandler. | ||
func (r *Router) AddNoPublisherHandler( | ||
handlerName, | ||
subscribeTopic string, | ||
handlerFunc message.NoPublishHandlerFunc, | ||
) *message.Handler { | ||
return r.Router.AddNoPublisherHandler( | ||
handlerName, | ||
subscribeTopic, | ||
r.subscriber.MessageSubscriber(), | ||
handlerFunc, | ||
) | ||
} | ||
|
||
// Publisher returns the publisher for the router. | ||
func (r *Router) Publisher() *Publisher { | ||
return r.publisher | ||
} | ||
|
||
// Subscriber returns the subscriber for the router. | ||
func (r *Router) Subscriber() *Subscriber { | ||
return r.subscriber | ||
} | ||
|
||
// Check is used to perform healthcheck. | ||
func (r *Router) Check() health.Check { | ||
//nolint:revive | ||
return health.Check{ | ||
Name: "messenger.router", | ||
RefreshPeriod: 10 * time.Second, | ||
InitialDelay: 10 * time.Second, | ||
Timeout: 5 * time.Second, | ||
Check: func(_ context.Context) error { | ||
if !r.publisher.IsConnected() { | ||
return errors.New("publisher is not connected") | ||
} else if !r.subscriber.IsConnected() { | ||
return errors.New("subscriber is not connected") | ||
} | ||
|
||
return nil | ||
}, | ||
} | ||
} |
Oops, something went wrong.