From 3c06d44aed4f54479c1423d13e923813388bafb9 Mon Sep 17 00:00:00 2001 From: Jules Michael Date: Thu, 31 Mar 2022 15:43:58 +0400 Subject: [PATCH] feat: add event-driven messaging support --- cmd/serve.go | 2 + config/config.go | 1 + go.mod | 12 +++- go.sum | 26 ++++++-- messenger/publisher.go | 95 +++++++++++++++++++++++++++ messenger/router.go | 141 ++++++++++++++++++++++++++++++++++++++++ messenger/subscriber.go | 96 +++++++++++++++++++++++++++ 7 files changed, 366 insertions(+), 7 deletions(-) create mode 100644 messenger/publisher.go create mode 100644 messenger/router.go create mode 100644 messenger/subscriber.go diff --git a/cmd/serve.go b/cmd/serve.go index bda1fa9..c6e5437 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -9,6 +9,7 @@ import ( "github.com/JulesMike/spoty/health" "github.com/JulesMike/spoty/http" "github.com/JulesMike/spoty/logger" + "github.com/JulesMike/spoty/messenger" "github.com/JulesMike/spoty/spoty" "github.com/JulesMike/spoty/tracer" "github.com/spf13/cobra" @@ -27,6 +28,7 @@ var serveCmd = &cobra.Command{ tracer.Module, cache.Module, health.Module, + messenger.Module, http.Module, spoty.Module, fx.Invoke(serve), diff --git a/config/config.go b/config/config.go index b16cb48..a2faf76 100644 --- a/config/config.go +++ b/config/config.go @@ -23,6 +23,7 @@ type Config struct { CacheMaxKeys int64 `envconfig:"CACHE_MAX_KEYS" default:"64"` CacheMaxCost int64 `envconfig:"CACHE_MAX_COST" default:"1000000"` JaegerEndpoint string `envconfig:"JAEGER_ENDPOINT" default:"http://localhost:14268/api/traces"` + AMQPURI string `envconfig:"AMQP_URI" default:"amqp://guest:guest@localhost:5672"` } // New processes and returns a new application Config. diff --git a/go.mod b/go.mod index c9ab00f..87cf6c0 100644 --- a/go.mod +++ b/go.mod @@ -3,13 +3,15 @@ module github.com/JulesMike/spoty go 1.18 require ( + github.com/ThreeDotsLabs/watermill v1.2.0-rc.7 + github.com/ThreeDotsLabs/watermill-amqp/v2 v2.0.4 github.com/alexliesenfeld/health v0.6.0 github.com/cenkalti/dominantcolor v0.0.0-20171020061837-df772e8dd39e github.com/dgraph-io/ristretto v0.1.0 github.com/gin-contrib/zap v0.0.2 github.com/gin-gonic/gin v1.7.7 github.com/go-resty/resty/v2 v2.7.0 - github.com/google/uuid v1.2.0 + github.com/google/uuid v1.3.0 github.com/iancoleman/strcase v0.2.0 github.com/imdario/mergo v0.3.12 github.com/json-iterator/go v1.1.10 @@ -33,6 +35,7 @@ require ( github.com/KyleBanks/depth v1.2.1 // indirect github.com/PuerkitoBio/purell v1.1.1 // indirect github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect + github.com/cenkalti/backoff/v3 v3.2.2 // indirect github.com/cespare/xxhash/v2 v2.1.1 // indirect github.com/dustin/go-humanize v1.0.0 // indirect github.com/gin-contrib/sse v0.1.0 // indirect @@ -46,16 +49,21 @@ require ( github.com/go-playground/universal-translator v0.17.0 // indirect github.com/go-playground/validator/v10 v10.4.1 // indirect github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b // indirect - github.com/golang/protobuf v1.5.1 // indirect + github.com/golang/protobuf v1.5.2 // indirect + github.com/hashicorp/errwrap v1.0.0 // indirect + github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/inconshreveable/mousetrap v1.0.0 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/leodido/go-urn v1.2.1 // indirect + github.com/lithammer/shortuuid/v3 v3.0.7 // indirect github.com/mailru/easyjson v0.7.7 // indirect github.com/mattn/go-isatty v0.0.12 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.1 // indirect github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646 // indirect + github.com/oklog/ulid v1.3.1 // indirect github.com/pkg/errors v0.9.1 // indirect + github.com/rabbitmq/amqp091-go v1.2.0 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/ugorji/go/codec v1.2.4 // indirect github.com/uptrace/opentelemetry-go-extra/otelutil v0.1.11 // indirect diff --git a/go.sum b/go.sum index 68f7525..26470b9 100644 --- a/go.sum +++ b/go.sum @@ -6,6 +6,10 @@ github.com/PuerkitoBio/purell v1.1.1 h1:WEQqlqaGbrPkxLJWfBwQmfEAE1Z7ONdDLqrN38tN github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0= github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 h1:d+Bc7a5rLufV/sSk/8dngufqelfh6jnri85riMAaF/M= github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE= +github.com/ThreeDotsLabs/watermill v1.2.0-rc.7 h1:c7rlzfhUFPtNo2WiJuIhrdwAfZcuAfbvB29uF+I0z7c= +github.com/ThreeDotsLabs/watermill v1.2.0-rc.7/go.mod h1:QLZSaklpSZ/7yv288LL2DFOgCEi86VYEmQvzmaMlHoA= +github.com/ThreeDotsLabs/watermill-amqp/v2 v2.0.4 h1:DlsSpT+1HiIG3eUI3ysF2g6EF/FdsgzvqztwgTcpP/0= +github.com/ThreeDotsLabs/watermill-amqp/v2 v2.0.4/go.mod h1:DKOBUoMVtPMV8jBEbRP3NL6TgnOMyRvVza3W297cdqU= github.com/agiledragon/gomonkey/v2 v2.3.1 h1:k+UnUY0EMNYUFUAQVETGY9uUTxjMdnUkP0ARyJS1zzs= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafoB+tBA3gMyHYHrpOtNuDiK/uB5uXxq5wM= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= @@ -13,6 +17,8 @@ github.com/alexliesenfeld/health v0.6.0 h1:HRBTCgybNSe4lqGEk7nU82c3bjwh9W+3b46W6 github.com/alexliesenfeld/health v0.6.0/go.mod h1:N4NDIeQtlWumG+6z1ne1v62eQxktz5ylEgGgH9emdMw= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A= +github.com/cenkalti/backoff/v3 v3.2.2 h1:cfUAAO3yvKMYKPrvhDuHSwQnhZNk/RMHKdZqKTxfm6M= +github.com/cenkalti/backoff/v3 v3.2.2/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs= github.com/cenkalti/dominantcolor v0.0.0-20171020061837-df772e8dd39e h1:5KS7rBZBuj4CrssHHg40+e15Myj2aA+V3tg7vfZBvE8= github.com/cenkalti/dominantcolor v0.0.0-20171020061837-df772e8dd39e/go.mod h1:Bu/hFErJeI1/nnRDAnOiGytVqsnIgUd2s0b5ZKDPYws= github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY= @@ -77,14 +83,19 @@ github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= -github.com/golang/protobuf v1.5.1 h1:jAbXjIeW2ZSW2AwFxlGTDoc2CjI2XujLkV3ArsZFCvc= -github.com/golang/protobuf v1.5.1/go.mod h1:DopwsBzvsk0Fs44TXzsVbJyPhcCPeIwnvohx4u74HPM= +github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= +github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.7 h1:81/ik6ipDQS2aGcBfIN5dHDB36BwrStyeAQquSYCV4o= github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= -github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs= github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= +github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= github.com/iancoleman/strcase v0.2.0 h1:05I4QRnGpI0m37iZQRuskXh+w77mr6Z41lwQzuHLwW0= github.com/iancoleman/strcase v0.2.0/go.mod h1:iwCmte+B7n89clKwxIoIXy/HfoL7AsD47ZCWhYzw7ho= github.com/imdario/mergo v0.3.12 h1:b6R2BslTbIEToALKP7LxUvijTsNI9TAe80pLWN2g/HU= @@ -108,6 +119,8 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII= github.com/leodido/go-urn v1.2.1 h1:BqpAaACuzVSgi/VLzGZIobT2z4v53pjosyNd9Yv6n/w= github.com/leodido/go-urn v1.2.1/go.mod h1:zt4jvISO2HfUBqxjfIshjdMTYS56ZS/qv49ictyFfxY= +github.com/lithammer/shortuuid/v3 v3.0.7 h1:trX0KTHy4Pbwo/6ia8fscyHoGA+mf1jWbPJVuvyJQQ8= +github.com/lithammer/shortuuid/v3 v3.0.7/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts= github.com/magefile/mage v1.13.0 h1:XtLJl8bcCM7EFoO8FyH8XK3t7G5hQAeK+i4tq+veT9M= github.com/magefile/mage v1.13.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A= github.com/mailru/easyjson v0.0.0-20180823135443-60711f1a8329/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= @@ -131,12 +144,16 @@ github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646 h1:zYyBkD/k9seD2A7fsi6 github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646/go.mod h1:jpp1/29i3P1S/RLdc7JQKbRpFeM1dOBd8T9ki5s+AY8= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= +github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4= +github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= github.com/otiai10/copy v1.7.0 h1:hVoPiN+t+7d2nzzwMiDHPSOogsWAStewq3TwU05+clE= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rabbitmq/amqp091-go v1.2.0 h1:1pHBxAsQh54R9eX/xo679fUEAfv3loMqi0pvRFOj2nk= +github.com/rabbitmq/amqp091-go v1.2.0/go.mod h1:ogQDLSOACsLPsIq0NpbtiifNZi2YOz0VTJ0kHRghqbM= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/spf13/cobra v1.4.0 h1:y+wJpx64xcgO1V+RcnwW0LEHxTKRi2ZDPSBjWnrg88Q= github.com/spf13/cobra v1.4.0/go.mod h1:Wo4iy3BUC+X2Fybo0PDqwJIv3dNRiZLHQymsfxlB84g= @@ -222,8 +239,6 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20210421230115-4e50805a0758/go.mod h1:72T/g9IO56b78aLF+1Kcs5dz7/ng1VjMUvfKvpfy+jM= -golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d h1:20cMwl2fHAzkJMEA+8J4JgqBQcQGzbisXo31MIeenXI= -golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211029224645-99673261e6eb h1:pirldcYWx7rx7kE5r+9WsOXPXK0+WH5+uZ7uPmJ44uM= golang.org/x/net v0.0.0-20211029224645-99673261e6eb/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d h1:TzXSXBo42m9gQenoE3b9BGiEpg5IG2JkU5FkPIawgtw= @@ -241,6 +256,7 @@ golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210420072515-93ed5bcd2bfe/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220319134239-a9b59b0215f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/messenger/publisher.go b/messenger/publisher.go new file mode 100644 index 0000000..648b2be --- /dev/null +++ b/messenger/publisher.go @@ -0,0 +1,95 @@ +package messenger + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/JulesMike/spoty/config" + "github.com/JulesMike/spoty/health" + "github.com/JulesMike/spoty/logger" + "github.com/ThreeDotsLabs/watermill" + "github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp" + "github.com/ThreeDotsLabs/watermill/message" + "github.com/ThreeDotsLabs/watermill/pubsub/gochannel" +) + +// Publisher is a wrapper for amqp.Publisher. +type Publisher struct { + *amqp.Publisher + *gochannel.GoChannel + + cfg *config.Config + logger *logger.Logger + health *health.Checks +} + +// NewPublisher returns a new publisher. +func NewPublisher(cfg *config.Config, logger *logger.Logger, health *health.Checks) (*Publisher, error) { + p := Publisher{ + cfg: cfg, + logger: logger, + health: health, + } + + amqpConfig := amqp.NewDurablePubSubConfig( + cfg.AMQPURI, + nil, + ) + + amqpConfig.Publish.ChannelPoolSize = 2 + + publisher, err := amqp.NewPublisher(amqpConfig, watermill.NewStdLoggerWithOut(logger.Writer(), !cfg.Prod, false)) + if err != nil { + return nil, fmt.Errorf("failed to create publisher: %w", err) + } + + p.Publisher = publisher + p.health.RegisterChecks(p.Check()) + + return &p, nil +} + +// NewTestPublisher returns a new publisher for testing purposes. +func NewTestPublisher(logger *logger.Logger) *Publisher { + return &Publisher{ + logger: logger, + GoChannel: gochannel.NewGoChannel( + gochannel.Config{}, + watermill.NewStdLoggerWithOut(logger.Writer(), true, false), + ), + } +} + +// Publish is a wrapper for the MessagePublishr.Publish. +func (p *Publisher) Publish(topic string, message ...*message.Message) error { + return p.MessagePublisher().Publish(topic, message...) +} + +// MessagePublisher returns the message publisher. +func (p *Publisher) MessagePublisher() message.Publisher { + if p.Publisher != nil { + return p.Publisher + } + + return p.GoChannel +} + +// Check is used to perform healthcheck. +func (p *Publisher) Check() health.Check { + //nolint:revive + return health.Check{ + Name: "messenger.publisher", + RefreshPeriod: 10 * time.Second, + InitialDelay: 10 * time.Second, + Timeout: 5 * time.Second, + Check: func(_ context.Context) error { + if !p.IsConnected() { + return errors.New("publisher is not connected") + } + + return nil + }, + } +} diff --git a/messenger/router.go b/messenger/router.go new file mode 100644 index 0000000..d513b1a --- /dev/null +++ b/messenger/router.go @@ -0,0 +1,141 @@ +package messenger + +import ( + "context" + "errors" + "fmt" + "log" + "time" + + "github.com/JulesMike/spoty/config" + "github.com/JulesMike/spoty/health" + "github.com/JulesMike/spoty/logger" + "github.com/ThreeDotsLabs/watermill" + "github.com/ThreeDotsLabs/watermill/message" + "github.com/ThreeDotsLabs/watermill/message/router/middleware" + "go.uber.org/fx" +) + +const ( + retries = 3 + initialInterval = time.Millisecond * 100 +) + +// Module exported to initialise a new Publisher, Subscriber and Router. +var Module = fx.Options( + fx.Provide(NewPublisher, NewSubscriber, NewRouter), +) + +// Router is a wrapper for a message router. +type Router struct { + *message.Router + + publisher *Publisher + subscriber *Subscriber + + cfg *config.Config + logger *logger.Logger + health *health.Checks +} + +// NewRouter returns a new router. +func NewRouter( + cfg *config.Config, + publisher *Publisher, + subscriber *Subscriber, + logger *logger.Logger, + health *health.Checks, +) (*Router, error) { + r := Router{ + publisher: publisher, + subscriber: subscriber, + cfg: cfg, + logger: logger, + health: health, + } + + wlog := watermill.NewStdLoggerWithOut(log.Writer(), !cfg.Prod, false) + + router, err := message.NewRouter(message.RouterConfig{}, wlog) + if err != nil { + return nil, fmt.Errorf("failed to create message router: %w", err) + } + + router.AddMiddleware( + middleware.CorrelationID, + + middleware.Retry{ + MaxRetries: retries, + InitialInterval: initialInterval, + Logger: wlog, + }.Middleware, + + middleware.Recoverer, + ) + + r.Router = router + r.health.RegisterChecks(r.Check()) + + return &r, nil +} + +// AddHandler is a wrapper around message.Router.AddHandler. +func (r *Router) AddHandler( + handlerName, + subscribeTopic, + publishTopic string, + handlerFunc message.HandlerFunc, +) *message.Handler { + return r.Router.AddHandler( + handlerName, + subscribeTopic, + r.subscriber.MessageSubscriber(), + publishTopic, + r.publisher.MessagePublisher(), + handlerFunc, + ) +} + +// AddNoPublisherHandler is a wrapper around message.Router.AddNoPublisherHandler. +func (r *Router) AddNoPublisherHandler( + handlerName, + subscribeTopic string, + handlerFunc message.NoPublishHandlerFunc, +) *message.Handler { + return r.Router.AddNoPublisherHandler( + handlerName, + subscribeTopic, + r.subscriber.MessageSubscriber(), + handlerFunc, + ) +} + +// Publisher returns the publisher for the router. +func (r *Router) Publisher() *Publisher { + return r.publisher +} + +// Subscriber returns the subscriber for the router. +func (r *Router) Subscriber() *Subscriber { + return r.subscriber +} + +// Check is used to perform healthcheck. +func (r *Router) Check() health.Check { + //nolint:revive + return health.Check{ + Name: "messenger.router", + RefreshPeriod: 10 * time.Second, + InitialDelay: 10 * time.Second, + Timeout: 5 * time.Second, + Check: func(_ context.Context) error { + if !r.publisher.IsConnected() { + return errors.New("publisher is not connected") + } else if !r.subscriber.IsConnected() { + return errors.New("subscriber is not connected") + } + + return nil + }, + } +} diff --git a/messenger/subscriber.go b/messenger/subscriber.go new file mode 100644 index 0000000..ad4d651 --- /dev/null +++ b/messenger/subscriber.go @@ -0,0 +1,96 @@ +package messenger + +import ( + "context" + "errors" + "fmt" + "strings" + "time" + + "github.com/JulesMike/spoty/config" + "github.com/JulesMike/spoty/health" + "github.com/JulesMike/spoty/logger" + "github.com/ThreeDotsLabs/watermill" + "github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp" + "github.com/ThreeDotsLabs/watermill/message" + "github.com/ThreeDotsLabs/watermill/pubsub/gochannel" +) + +// Subscriber is a wrapper for amqp.Subscriber. +type Subscriber struct { + *amqp.Subscriber + *gochannel.GoChannel + + cfg *config.Config + logger *logger.Logger + health *health.Checks +} + +// NewSubscriber returns a new subscriber. +func NewSubscriber(cfg *config.Config, logger *logger.Logger, health *health.Checks) (*Subscriber, error) { + s := Subscriber{ + cfg: cfg, + logger: logger, + health: health, + } + + amqpConfig := amqp.NewDurablePubSubConfig( + cfg.AMQPURI, + func(topic string) string { + return strings.ToUpper(cfg.ServiceName) + "_" + topic + }, + ) + + subscriber, err := amqp.NewSubscriber(amqpConfig, watermill.NewStdLoggerWithOut(logger.Writer(), !cfg.Prod, false)) + if err != nil { + return nil, fmt.Errorf("failed to create subscriber: %w", err) + } + + s.Subscriber = subscriber + s.health.RegisterChecks(s.Check()) + + return &s, nil +} + +// NewTestSubscriber returns a new subscriber for testing purposes. +func NewTestSubscriber(logger *logger.Logger) *Subscriber { + return &Subscriber{ + logger: logger, + GoChannel: gochannel.NewGoChannel( + gochannel.Config{}, + watermill.NewStdLoggerWithOut(logger.Writer(), true, false), + ), + } +} + +// Subscribe is a wrapper for the MessageSubscriber.Subscribe. +func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error) { + return s.MessageSubscriber().Subscribe(ctx, topic) +} + +// MessageSubscriber returns the message subscriber. +func (s *Subscriber) MessageSubscriber() message.Subscriber { + if s.Subscriber != nil { + return s.Subscriber + } + + return s.GoChannel +} + +// Check is used to perform healthcheck. +func (s *Subscriber) Check() health.Check { + //nolint:revive + return health.Check{ + Name: "messenger.subscriber", + RefreshPeriod: 10 * time.Second, + InitialDelay: 10 * time.Second, + Timeout: 5 * time.Second, + Check: func(_ context.Context) error { + if !s.IsConnected() { + return errors.New("subscriber is not connected") + } + + return nil + }, + } +}