Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NOISSUE - Refactor messaging #1141

Merged
merged 28 commits into from
Apr 28, 2020
Merged

NOISSUE - Refactor messaging #1141

merged 28 commits into from
Apr 28, 2020

Conversation

dborovcanin
Copy link
Collaborator

Signed-off-by: Dušan Borovčanin dusan.borovcanin@mainflux.com

What does this do?

This pull request reorganizes message publishers, subscribers, and the Mainflux Message structure.

Which issue(s) does this PR fix/relate to?

There is no such issue.

List any changes that modify/break current functionality

Writers Start method and protocol adapters factory methods signatures are changed.

Have you included tests for your changes?

Yes, tests are updated according to code updates.

Did you document any new/modified functionality?

No external API changed, so there was no need to update docs.

Protocol: protocol,
Channel: chanID,
Subtopic: subtopic,
Payload: payload,
Created: created,
Occured: time.Now().UnixNano(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this revert?

@codecov-io
Copy link

codecov-io commented Apr 26, 2020

Codecov Report

Merging #1141 into master will decrease coverage by 0.01%.
The diff coverage is 52.94%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master    #1141      +/-   ##
==========================================
- Coverage   75.33%   75.32%   -0.02%     
==========================================
  Files         101      101              
  Lines        6853     6857       +4     
==========================================
+ Hits         5163     5165       +2     
- Misses       1333     1334       +1     
- Partials      357      358       +1     
Impacted Files Coverage Δ
twins/service.go 42.38% <45.45%> (+0.14%) ⬆️
transformers/senml/transformer.go 81.81% <60.00%> (ø)
http/api/transport.go 72.61% <100.00%> (ø)

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 36d00d8...0ca8c60. Read the comment docs.

message.proto Outdated
@@ -12,5 +11,5 @@ message Message {
string publisher = 3;
string protocol = 4;
bytes payload = 5;
google.protobuf.Timestamp created = 6;
int64 occured = 6;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs a comment - that it represents nano

Channel: chanID,
Subtopic: subtopic,
Publisher: publisher,
Protocol: protocol,
Payload: msg.Payload,
Created: created,
Occured: time.Now().UnixNano(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about this Occured... Created is very usual in these occasions, but it is true that payload could have been created before (on device). However, Creted here relates to the Mainflux Message{} entity.

What is the opinion of other @mainflux/maintainers?

nmarcetic
nmarcetic previously approved these changes Apr 27, 2020
@@ -69,19 +70,21 @@ func main() {
log.Fatalf(err.Error())
}

b, err := broker.New(cfg.natsURL)
nc, err := nats.Connect(cfg.natsURL)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main question is - do we want this really? That for every new broker we put this connecting logic in every adapter? Why not solving once and for all in a shared package?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It won't change a lot of even if we do that. If we move connection logic to, for example, pubsub/nats method NewPub(..), we'll need to:

  • return an error alongside with the implementation
  • enrich returned interface with Close method, so that we can call defer(ifc.Close())
  • check returned error if err != nil ...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be hard to have a generic full wrapper? I would prefer that then having a partial wrapping for every broker.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we can do that. It won't help too much, but it will mask the underlying broker specifics (nats.Connect).

Copy link
Contributor

@drasko drasko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Getting there... Starts to look really good!

Protocol: protocol,
Channel: chanID,
Subtopic: subtopic,
Payload: payload,
Created: created,
Occurred: occurred,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer Created. Please change.

message.proto Outdated
bytes payload = 5;
// Timestamp the Message occured in the system.
// By the default, occured represents Unix nanoseconds timestamp.
int64 occurred = 6;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will not accept timestamp change in this PR. And it should be created.

@@ -12,5 +12,5 @@ message Message {
string publisher = 3;
string protocol = 4;
bytes payload = 5;
google.protobuf.Timestamp created = 6;
google.protobuf.Timestamp occurred = 6;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

created

cmd/coap/main.go Outdated
thingsapi "github.com/mainflux/mainflux/things/api/auth/grpc"
broker "github.com/nats-io/nats.go"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would remove the alias here

return
}

err := ps.Subscribe("channels.>", func(msg messaging.Message) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we keep here SubjectAllChannels?

"github.com/mainflux/mainflux/logger"
pubsub "github.com/mainflux/mainflux/messaging/nats"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would use another alias. messaging or messengerNats

cmd/coap/main.go Outdated
"github.com/mainflux/mainflux/coap"
"github.com/mainflux/mainflux/coap/api"
logger "github.com/mainflux/mainflux/logger"
pubsub "github.com/mainflux/mainflux/messaging/nats"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

cmd/http/main.go Outdated
adapter "github.com/mainflux/mainflux/http"
"github.com/mainflux/mainflux/http/api"
"github.com/mainflux/mainflux/logger"
pubsub "github.com/mainflux/mainflux/messaging/nats"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

"github.com/mainflux/mainflux/logger"
pubsub "github.com/mainflux/mainflux/messaging/nats"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

"github.com/mainflux/mainflux/logger"
pubsub "github.com/mainflux/mainflux/messaging/nats"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

cmd/mqtt/main.go Outdated
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/messaging"
pubsub "github.com/mainflux/mainflux/messaging/nats"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed to messaging, but here I need both messaging and messaging/nats. Same for Twins service.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not then calling it nats.NewPublisher - it will be clearer from the code that this is a specific NATS-based implementation of our interface.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBH, actually the best would be probably something like messagingNats.NewPublisher, to distinguish from ordinary NATS package and not to be confused with functions from NATS driver.

I can accept pusub alias as well, just a proposition.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed named import by addressing this one: https://github.com/mainflux/mainflux/pull/1141#discussion_r415905298.

"github.com/mainflux/mainflux/logger"
pubsub "github.com/mainflux/mainflux/messaging/nats"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

"github.com/mainflux/mainflux/logger"
pubsub "github.com/mainflux/mainflux/messaging/nats"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/messaging"
pubsub "github.com/mainflux/mainflux/messaging/nats"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

@drasko drasko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Neatpicking ;)

@@ -225,21 +225,21 @@ func receive(svc coap.Service, msg *gocoap.Message) *gocoap.Message {
return res
}

created, err := ptypes.TimestampProto(time.Now())
occured, err := ptypes.TimestampProto(time.Now())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer calling this var timestamp or just t. Mixing "occured" and "created" is confusing. Here you are just taking a simple timestamp recording, so t would fit nicely.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed back to created. We call it like that in all the other services.

@manuio
Copy link
Contributor

manuio commented Apr 27, 2020

Neatpicking

Nitpicking :)
And yes, but pub := pubsub.NewPublisher it's hard to digest

@drasko
Copy link
Contributor

drasko commented Apr 27, 2020

This branch needs update as well

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
Package `broker` is conceptually renamed to package `nats`.

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
cmd/mqtt/main.go Outdated
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/messaging"
pubsub "github.com/mainflux/mainflux/messaging/nats"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not then calling it nats.NewPublisher - it will be clearer from the code that this is a specific NATS-based implementation of our interface.

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
return
}
if err := h(msg); err != nil {
ps.logger.Warn(fmt.Sprintf("Failed handle Mainflux message: %s", err))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Failed to handle

drasko
drasko previously approved these changes Apr 28, 2020
Copy link
Contributor

@drasko drasko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great now. Approved!

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
Copy link
Contributor

@manuio manuio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor

@drasko drasko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@drasko drasko merged commit c3019ff into absmach:master Apr 28, 2020
@drasko
Copy link
Contributor

drasko commented Apr 28, 2020

Merged! Great PR, thanks a lot @dusanb94!

@dborovcanin dborovcanin deleted the messaging branch April 28, 2020 09:22
manuio pushed a commit that referenced this pull request Oct 12, 2020
* Refactor messaging

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Rename SubscribeHandler to MessageHandler

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Remove `Auth` event logs

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Update message pubsub APi

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Fix topics handling

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Update CoAP adapter

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Update Twins service

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Update LoRa adapter

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Update OPC UA adapter

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Remove broker package

Package `broker` is conceptually renamed to package `nats`.

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Update makefile

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Add comment explanation

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Fix MQTT adapter

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Fix typo

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Move NATS pub/sub implementation to pubsub pkg

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Remove an empty line in main methods

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Move messaging-related code to messaging package

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Fix Twins mocks

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Change Occurred back to Created

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Fix tranformer test

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Fix message proto commands

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Replace string literal with constant

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Remove alias from main method

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Change messaging pubsub alias

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Rename occured to created

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Handle NATS connection in the NATS PubSub

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Rename n to pub/pubSub

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Fix typos

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants