-
Notifications
You must be signed in to change notification settings - Fork 669
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
NOISSUE - Create broker package for NATS #1080
Conversation
Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>
Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>
Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>
Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>
Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>
Codecov Report
@@ Coverage Diff @@
## master #1080 +/- ##
==========================================
- Coverage 77.21% 77.10% -0.12%
==========================================
Files 95 95
Lines 6921 6760 -161
==========================================
- Hits 5344 5212 -132
+ Misses 1238 1211 -27
+ Partials 339 337 -2
Continue to review full report at Codecov.
|
Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>
Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>
Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>
Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>
Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>
@@ -36,7 +36,7 @@ type Observer struct { | |||
// NewObserver instantiates a new Observer. | |||
func NewObserver() *Observer { | |||
return &Observer{ | |||
Messages: make(chan mainflux.Message), | |||
Messages: make(chan broker.Message), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we expired
and msgID
should be in its own structure with its own mutex, and Observer can embed those structures.
Also wouldnt be better that Observer is interface?
} | ||
|
||
go func() { | ||
<-o.Cancel |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I may be wrong but seems that wait group fits here, us if you can, it is faster.
// Service specifies coap service API. | ||
type Service interface { | ||
// Publish Messssage | ||
Publish(context.Context, string, broker.Message) error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this fits nats.Broker, right? why not use embedded interface
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because it doesn't fit broker.Nats interface
cmd/http/main.go
Outdated
@@ -87,10 +80,16 @@ func main() { | |||
thingsTracer, thingsCloser := initJaeger("things", cfg.jaegerURL, logger) | |||
defer thingsCloser.Close() | |||
|
|||
pubsub, err := broker.New(cfg.natsURL) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe it would be better to say nats.New(cfg.natsURL)
you are making new nats conn from nats package (e.g isolated impl of nats broker) or tomorrow kafka.New(cfg.kafka)
from broker/kafka
package. Just an idea
Also I would prefer messenger
or msg
or whatever that sounds better then pubsub.Publish()
for example
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree for pubsub
, I was never convinced. I simply replaced it by b, err := broker.New(cfg.natsURL)
.
I also propose that we think about to use an interface for all brokers or different ones per broker.
In my last commit I replaced broker/nats/pubsub.go
by broker/nats.go
. There is no nats package anymore. Eventually I could replace broker.New
by broker.NewNats
|
||
// Transformer specifies API form Message transformer. | ||
type Transformer interface { | ||
// Transform Mainflux message to any other format. | ||
Transform(mainflux.Message) (interface{}, error) | ||
Transform(broker.Message) (interface{}, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Much better than mainflux.Message
IMHO
broker/broker.go
Outdated
// Nats specifies a NATS message API. | ||
type Nats interface { | ||
// Publish publishes message to the msessage broker. | ||
Publish(context.Context, string, Message) error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you pls put named params, it's easier to understand what are params. Now I see string string
and dunno what it means.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that's what we should do for all the interfaces, starting from here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not against but in most of our services we are not doing this. Let me open an issue to fix it everywhere
broker/broker.go
Outdated
const SubjectAllChannels = "channel.>" | ||
|
||
// Nats specifies a NATS message API. | ||
type Nats interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You think its better to have a separate interface for each broker that we use like nats
kafka
etc...? Not one Interface lets say Messenger
that needs to be satisfied by broker imply for example nats
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My first idea was to have a single interface. But it's not possible unless you create a new service for this because NATS interface will never match with Kafka one (it's doable for publish but not for subscribe). For now if we want to use this as package it's better to have different interfaces .
Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>
broker/broker.go
Outdated
// Nats specifies a NATS message API. | ||
type Nats interface { | ||
// Publish publishes message to the msessage broker. | ||
Publish(context.Context, string, Message) error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that's what we should do for all the interfaces, starting from here.
broker/broker.go
Outdated
const SubjectAllChannels = "channel.>" | ||
|
||
// Nats specifies a NATS message API. | ||
type Nats interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree.
@@ -1,14 +1,15 @@ | |||
// Code generated by protoc-gen-gogo. DO NOT EDIT. | |||
// source: message.proto | |||
|
|||
package mainflux | |||
package broker |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please don't change this manually, but rather fix Makefile and use it to generate this code. I don't mind keeping broker.go
and messages.{proto, pb.go}
in the project root.
Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>
Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>
Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>
Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>
Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>
Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>
Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>
Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>
Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>
Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>
broker/nats.go
Outdated
ps := fmt.Sprintf("%s.%s", prefix, subject) | ||
sub, err := b.conn.Subscribe(ps, f) | ||
if err != nil { | ||
return nil, err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are not wrapping errors? Not sure if it's supposed to be wrapped everywhere or just in some places?
Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
* NOISSUEE - Create broker package for NATS Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Create funcs to return NATS connection Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * mv os.exit to main Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix Reviews Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix tests and typos Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix CI Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix reviews Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Unify Publisher and Subscriber interfaces Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Rename Nats interface Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * typo Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Mv message.pb.go, messsage.proto and topics.go to broker directory Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix go.mod Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Use mainflux broker for writers and twins services Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix go.mod Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix twins tests Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix make proto Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix message.proto Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix golangcibot Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * regenerate message.pb.go Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix comment Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix comment Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix make proto Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Add NATS errors Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>
Signed-off-by: Manuel Imperiale manuel.imperiale@gmail.com