Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NOISSUE - Use opcua server timestamp in opcua-adapter messages #980

Merged
merged 3 commits into from
Dec 16, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 11 additions & 8 deletions cmd/opcua/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,9 @@ func main() {
publisher := pub.NewMessagePublisher(natsConn)

ctx := context.Background()
pubsub := gopcua.NewPubSub(ctx, publisher, thingRM, chanRM, connRM, logger)
sub := gopcua.NewSubscriber(ctx, publisher, thingRM, chanRM, connRM, logger)

svc := opcua.New(pubsub, thingRM, chanRM, connRM, cfg.opcuaConfig, logger)
svc := opcua.New(sub, thingRM, chanRM, connRM, cfg.opcuaConfig, logger)
svc = api.LoggingMiddleware(svc, logger)
svc = api.MetricsMiddleware(
svc,
Expand All @@ -128,7 +128,7 @@ func main() {
}, []string{"method"}),
)

//go subscribeToNodesFromFile(svc, cfg.nodesConfig, cfg.opcuaConfig, logger)
go subscribeToNodesFromFile(sub, cfg.nodesConfig, cfg.opcuaConfig, logger)
go subscribeToThingsES(svc, esConn, cfg.esConsumerName, logger)

errs := make(chan error, 2)
Expand Down Expand Up @@ -193,7 +193,7 @@ func connectToRedis(redisURL, redisPass, redisDB string, logger logger.Logger) *
})
}

func subscribeToNodesFromFile(svc opcua.Service, nodes string, cfg opcua.Config, logger logger.Logger) {
func subscribeToNodesFromFile(sub opcua.Subscriber, nodes string, cfg opcua.Config, logger logger.Logger) {
if _, err := os.Stat(nodes); os.IsNotExist(err) {
logger.Warn(fmt.Sprintf("Config file not found: %s", err))
return
Expand All @@ -218,16 +218,19 @@ func subscribeToNodesFromFile(svc opcua.Service, nodes string, cfg opcua.Config,
}

if len(l) < columns {
logger.Warn(fmt.Sprintf("Empty or incomplete line found in file"))
logger.Warn("Empty or incomplete line found in file")
return
}

cfg.ServerURI = l[0]
cfg.NodeID = l[1]
go subscribe(sub, cfg, logger)
}
}

if err := svc.Subscribe(cfg); err != nil {
logger.Warn(fmt.Sprintf("OPC-UA Subscription failed: %s", err))
}
func subscribe(sub opcua.Subscriber, cfg opcua.Config, logger logger.Logger) {
if err := sub.Subscribe(cfg); err != nil {
logger.Warn(fmt.Sprintf("Subscription failed: %s", err))
}
}

Expand Down
13 changes: 0 additions & 13 deletions opcua/api/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,16 +129,3 @@ func (lm loggingMiddleware) DisconnectThing(mfxChanID, mfxThingID string) (err e

return lm.svc.DisconnectThing(mfxChanID, mfxThingID)
}

func (lm loggingMiddleware) Subscribe(cfg opcua.Config) (err error) {
defer func(begin time.Time) {
message := fmt.Sprintf("subscribe to server %s and node_id %s, took %s to complete", cfg.ServerURI, cfg.NodeID, time.Since(begin))
if err != nil {
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
return
}
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
}(time.Now())

return lm.svc.Subscribe(cfg)
}
9 changes: 0 additions & 9 deletions opcua/api/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,3 @@ func (mm *metricsMiddleware) DisconnectThing(mfxChanID, mfxThingID string) error

return mm.svc.DisconnectThing(mfxChanID, mfxThingID)
}

func (mm *metricsMiddleware) Subscribe(cfg opcua.Config) error {
defer func(begin time.Time) {
mm.counter.With("method", "subscribe").Add(1)
mm.latency.With("method", "subscribe").Observe(time.Since(begin).Seconds())
}(time.Now())

return mm.svc.Subscribe(cfg)
}
49 changes: 30 additions & 19 deletions opcua/gopcua/pubsub.go → opcua/gopcua/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,22 @@ import (
"github.com/mainflux/mainflux/opcua"
)

const protocol = "opcua"
const token = ""

var (
errFailedConn = errors.New("Failed to connect")
errFailedRead = errors.New("Failed to read")
errFailedSub = errors.New("Failed to subscribe")
errFailedFindEndpoint = errors.New("Failed to find suitable endpoint")
errFailedFetchEndpoint = errors.New("Failed to fetch OPC-UA server endpoints")
errFailedParseNodeID = errors.New("Failed to parse NodeID")
errFailedCreateReq = errors.New("Failed to create request")
errResponseStatus = errors.New("Response status not OK")
errNotFoundServerURI = errors.New("route map not found for this Server URI")
errNotFoundNodeID = errors.New("route map not found for this Node ID")
errNotFoundConn = errors.New("connection not found")

errFailedConn = errors.New("failed to connect")
errFailedRead = errors.New("failed to read")
errFailedSub = errors.New("failed to subscribe")
errFailedFindEndpoint = errors.New("failed to find suitable endpoint")
errFailedFetchEndpoint = errors.New("failed to fetch OPC-UA server endpoints")
errFailedParseNodeID = errors.New("failed to parse NodeID")
errFailedCreateReq = errors.New("failed to create request")
errResponseStatus = errors.New("response status not OK")
)

var _ opcua.Subscriber = (*client)(nil)
Expand All @@ -38,8 +45,8 @@ type client struct {
logger logger.Logger
}

// NewPubSub returns new OPC-UA client instance.
func NewPubSub(ctx context.Context, pub mainflux.MessagePublisher, thingsRM, channelsRM, connectRM opcua.RouteMapRepository, log logger.Logger) opcua.Subscriber {
// NewSubscriber returns new OPC-UA client instance.
func NewSubscriber(ctx context.Context, pub mainflux.MessagePublisher, thingsRM, channelsRM, connectRM opcua.RouteMapRepository, log logger.Logger) opcua.Subscriber {
return client{
ctx: ctx,
publisher: pub,
Expand Down Expand Up @@ -117,6 +124,7 @@ func (c client) runHandler(sub *opcuaGopcua.Subscription, cfg opcua.Config) erro

go sub.Run(c.ctx)

c.logger.Info(fmt.Sprintf("subscribe to server %s and node_id %s", cfg.ServerURI, cfg.NodeID))
for {
select {
case <-c.ctx.Done():
Expand All @@ -134,6 +142,7 @@ func (c client) runHandler(sub *opcuaGopcua.Subscription, cfg opcua.Config) erro
ServerURI: cfg.ServerURI,
NodeID: cfg.NodeID,
Type: item.Value.Value.Type().String(),
Time: item.Value.SourceTimestamp.Unix(),
}

switch item.Value.Value.Type() {
Expand All @@ -151,7 +160,9 @@ func (c client) runHandler(sub *opcuaGopcua.Subscription, cfg opcua.Config) erro
msg.Data = 0
}

c.Publish(c.ctx, "", msg)
if err := c.publish(token, msg); err != nil {
c.logger.Warn(fmt.Sprintf("failed to publish: %s", err))
}
}

default:
Expand All @@ -161,38 +172,38 @@ func (c client) runHandler(sub *opcuaGopcua.Subscription, cfg opcua.Config) erro
}
}

// Publish forwards messages from OPC-UA MQTT broker to Mainflux NATS broker
func (c client) Publish(ctx context.Context, token string, m opcua.Message) error {
// Publish forwards messages from the OPC-UA Server to Mainflux NATS broker
func (c client) publish(token string, m opcua.Message) error {
// Get route-map of the OPC-UA ServerURI
chanID, err := c.channelsRM.Get(m.ServerURI)
if err != nil {
return opcua.ErrNotFoundServerURI
return errNotFoundServerURI
nmarcetic marked this conversation as resolved.
Show resolved Hide resolved
}

// Get route-map of the OPC-UA NodeID
thingID, err := c.thingsRM.Get(m.NodeID)
if err != nil {
return opcua.ErrNotFoundNodeID
return errNotFoundNodeID
}

// Check connection between ServerURI and NodeID
cKey := fmt.Sprintf("%s:%s", chanID, thingID)
if _, err := c.connectRM.Get(cKey); err != nil {
return opcua.ErrNotFoundConn
return errNotFoundConn
}

// Publish on Mainflux NATS broker
SenML := fmt.Sprintf(`[{"n":"%s","v":%v}]`, m.Type, m.Data)
SenML := fmt.Sprintf(`[{"n":"%s", "t": %d, "v":%v}]`, m.Type, m.Time, m.Data)
payload := []byte(SenML)
msg := mainflux.Message{
Publisher: thingID,
Protocol: "opcua",
Protocol: protocol,
ContentType: "Content-Type",
Channel: chanID,
Payload: payload,
}

if err := c.publisher.Publish(ctx, token, msg); err != nil {
if err := c.publisher.Publish(c.ctx, token, msg); err != nil {
return err
}

Expand Down
1 change: 1 addition & 0 deletions opcua/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@ type Message struct {
ServerURI string
NodeID string
Type string
Time int64
Data interface{}
}
23 changes: 5 additions & 18 deletions opcua/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,11 @@ package opcua
import (
"fmt"

"github.com/mainflux/mainflux/errors"
"github.com/mainflux/mainflux/logger"
)

const protocol = "opcua"

var (
// ErrNotFoundServerURI indicates missing ServerURI route-map
ErrNotFoundServerURI = errors.New("route map not found for this Server URI")
// ErrNotFoundNodeID indicates missing NodeID route-map
ErrNotFoundNodeID = errors.New("route map not found for this Node ID")
// ErrNotFoundConn indicates missing connection
ErrNotFoundConn = errors.New("connection not found")
)

// Service specifies an API that must be fullfiled by the domain service
// implementation, and all of its decorators (e.g. logging & metrics).
type Service interface {
Expand All @@ -47,9 +37,6 @@ type Service interface {

// DisconnectThing removes thing and channel connection route-map
DisconnectThing(string, string) error

// Subscribe subscribes to a given OPC-UA server
Subscribe(Config) error
}

// Config OPC-UA Server
Expand Down Expand Up @@ -122,7 +109,7 @@ func (as *adapterService) ConnectThing(mfxChanID, mfxThingID string) error {

as.cfg.NodeID = nodeID
as.cfg.ServerURI = serverURI
go as.subscriber.Subscribe(as.cfg)
go as.subscribe(as.cfg)

c := fmt.Sprintf("%s:%s", mfxChanID, mfxThingID)
return as.connectRM.Save(c, c)
Expand All @@ -133,8 +120,8 @@ func (as *adapterService) DisconnectThing(mfxChanID, mfxThingID string) error {
return as.connectRM.Remove(c)
}

// Subscribe subscribes to the OPC-UA Server.
func (as *adapterService) Subscribe(cfg Config) error {
go as.subscriber.Subscribe(cfg)
return nil
func (as *adapterService) subscribe(cfg Config) {
if err := as.subscriber.Subscribe(cfg); err != nil {
as.logger.Warn(fmt.Sprintf("subscription failed: %s", err))
}
}