From 458747775c63a9627c7e21843eeb63f724422afb Mon Sep 17 00:00:00 2001 From: Manuel Imperiale Date: Sat, 14 Dec 2019 18:47:16 +0100 Subject: [PATCH 1/3] NOISSUE - Use opcua server timestamp in opcua-adapter messages Signed-off-by: Manuel Imperiale --- cmd/opcua/main.go | 19 +++++++++------- opcua/api/logging.go | 13 ----------- opcua/api/metrics.go | 9 -------- opcua/gopcua/pubsub.go | 49 ++++++++++++++++++++++++++---------------- opcua/message.go | 1 + opcua/service.go | 23 +++++--------------- 6 files changed, 47 insertions(+), 67 deletions(-) diff --git a/cmd/opcua/main.go b/cmd/opcua/main.go index ee35c032d0..f50c3a2bc2 100644 --- a/cmd/opcua/main.go +++ b/cmd/opcua/main.go @@ -108,9 +108,9 @@ func main() { publisher := pub.NewMessagePublisher(natsConn) ctx := context.Background() - pubsub := gopcua.NewPubSub(ctx, publisher, thingRM, chanRM, connRM, logger) + sub := gopcua.NewSubscriber(ctx, publisher, thingRM, chanRM, connRM, logger) - svc := opcua.New(pubsub, thingRM, chanRM, connRM, cfg.opcuaConfig, logger) + svc := opcua.New(sub, thingRM, chanRM, connRM, cfg.opcuaConfig, logger) svc = api.LoggingMiddleware(svc, logger) svc = api.MetricsMiddleware( svc, @@ -128,7 +128,7 @@ func main() { }, []string{"method"}), ) - //go subscribeToNodesFromFile(svc, cfg.nodesConfig, cfg.opcuaConfig, logger) + go subscribeToNodesFromFile(sub, cfg.nodesConfig, cfg.opcuaConfig, logger) go subscribeToThingsES(svc, esConn, cfg.esConsumerName, logger) errs := make(chan error, 2) @@ -193,7 +193,7 @@ func connectToRedis(redisURL, redisPass, redisDB string, logger logger.Logger) * }) } -func subscribeToNodesFromFile(svc opcua.Service, nodes string, cfg opcua.Config, logger logger.Logger) { +func subscribeToNodesFromFile(pubsub opcua.Subscriber, nodes string, cfg opcua.Config, logger logger.Logger) { if _, err := os.Stat(nodes); os.IsNotExist(err) { logger.Warn(fmt.Sprintf("Config file not found: %s", err)) return @@ -218,16 +218,19 @@ func subscribeToNodesFromFile(svc opcua.Service, nodes string, cfg opcua.Config, } if len(l) < columns { - logger.Warn(fmt.Sprintf("Empty or incomplete line found in file")) + logger.Warn("Empty or incomplete line found in file") return } cfg.ServerURI = l[0] cfg.NodeID = l[1] + go subscribe(pubsub, cfg, logger) + } +} - if err := svc.Subscribe(cfg); err != nil { - logger.Warn(fmt.Sprintf("OPC-UA Subscription failed: %s", err)) - } +func subscribe(sub opcua.Subscriber, cfg opcua.Config, logger logger.Logger) { + if err := sub.Subscribe(cfg); err != nil { + logger.Warn(fmt.Sprintf("Subscription failed: %s", err)) } } diff --git a/opcua/api/logging.go b/opcua/api/logging.go index eef2ad5334..13685f2aaf 100644 --- a/opcua/api/logging.go +++ b/opcua/api/logging.go @@ -129,16 +129,3 @@ func (lm loggingMiddleware) DisconnectThing(mfxChanID, mfxThingID string) (err e return lm.svc.DisconnectThing(mfxChanID, mfxThingID) } - -func (lm loggingMiddleware) Subscribe(cfg opcua.Config) (err error) { - defer func(begin time.Time) { - message := fmt.Sprintf("subscribe to server %s and node_id %s, took %s to complete", cfg.ServerURI, cfg.NodeID, time.Since(begin)) - if err != nil { - lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err)) - return - } - lm.logger.Info(fmt.Sprintf("%s without errors.", message)) - }(time.Now()) - - return lm.svc.Subscribe(cfg) -} diff --git a/opcua/api/metrics.go b/opcua/api/metrics.go index 37b7a60026..d7c5682460 100644 --- a/opcua/api/metrics.go +++ b/opcua/api/metrics.go @@ -98,12 +98,3 @@ func (mm *metricsMiddleware) DisconnectThing(mfxChanID, mfxThingID string) error return mm.svc.DisconnectThing(mfxChanID, mfxThingID) } - -func (mm *metricsMiddleware) Subscribe(cfg opcua.Config) error { - defer func(begin time.Time) { - mm.counter.With("method", "subscribe").Add(1) - mm.latency.With("method", "subscribe").Observe(time.Since(begin).Seconds()) - }(time.Now()) - - return mm.svc.Subscribe(cfg) -} diff --git a/opcua/gopcua/pubsub.go b/opcua/gopcua/pubsub.go index b565b4a801..b1c3f5801f 100644 --- a/opcua/gopcua/pubsub.go +++ b/opcua/gopcua/pubsub.go @@ -16,15 +16,22 @@ import ( "github.com/mainflux/mainflux/opcua" ) +const protocol = "opcua" +const token = "" + var ( - errFailedConn = errors.New("Failed to connect") - errFailedRead = errors.New("Failed to read") - errFailedSub = errors.New("Failed to subscribe") - errFailedFindEndpoint = errors.New("Failed to find suitable endpoint") - errFailedFetchEndpoint = errors.New("Failed to fetch OPC-UA server endpoints") - errFailedParseNodeID = errors.New("Failed to parse NodeID") - errFailedCreateReq = errors.New("Failed to create request") - errResponseStatus = errors.New("Response status not OK") + errNotFoundServerURI = errors.New("route map not found for this Server URI") + errNotFoundNodeID = errors.New("route map not found for this Node ID") + errNotFoundConn = errors.New("connection not found") + + errFailedConn = errors.New("failed to connect") + errFailedRead = errors.New("failed to read") + errFailedSub = errors.New("failed to subscribe") + errFailedFindEndpoint = errors.New("failed to find suitable endpoint") + errFailedFetchEndpoint = errors.New("failed to fetch OPC-UA server endpoints") + errFailedParseNodeID = errors.New("failed to parse NodeID") + errFailedCreateReq = errors.New("failed to create request") + errResponseStatus = errors.New("response status not OK") ) var _ opcua.Subscriber = (*client)(nil) @@ -38,8 +45,8 @@ type client struct { logger logger.Logger } -// NewPubSub returns new OPC-UA client instance. -func NewPubSub(ctx context.Context, pub mainflux.MessagePublisher, thingsRM, channelsRM, connectRM opcua.RouteMapRepository, log logger.Logger) opcua.Subscriber { +// NewSubscriber returns new OPC-UA client instance. +func NewSubscriber(ctx context.Context, pub mainflux.MessagePublisher, thingsRM, channelsRM, connectRM opcua.RouteMapRepository, log logger.Logger) opcua.Subscriber { return client{ ctx: ctx, publisher: pub, @@ -117,6 +124,7 @@ func (c client) runHandler(sub *opcuaGopcua.Subscription, cfg opcua.Config) erro go sub.Run(c.ctx) + c.logger.Info(fmt.Sprintf("subscribe to server %s and node_id %s", cfg.ServerURI, cfg.NodeID)) for { select { case <-c.ctx.Done(): @@ -134,6 +142,7 @@ func (c client) runHandler(sub *opcuaGopcua.Subscription, cfg opcua.Config) erro ServerURI: cfg.ServerURI, NodeID: cfg.NodeID, Type: item.Value.Value.Type().String(), + Time: item.Value.SourceTimestamp.Unix(), } switch item.Value.Value.Type() { @@ -151,7 +160,9 @@ func (c client) runHandler(sub *opcuaGopcua.Subscription, cfg opcua.Config) erro msg.Data = 0 } - c.Publish(c.ctx, "", msg) + if err := c.publish(token, msg); err != nil { + c.logger.Warn(fmt.Sprintf("failed to publish: %s", err)) + } } default: @@ -161,38 +172,38 @@ func (c client) runHandler(sub *opcuaGopcua.Subscription, cfg opcua.Config) erro } } -// Publish forwards messages from OPC-UA MQTT broker to Mainflux NATS broker -func (c client) Publish(ctx context.Context, token string, m opcua.Message) error { +// Publish forwards messages from the OPC-UA Server to Mainflux NATS broker +func (c client) publish(token string, m opcua.Message) error { // Get route-map of the OPC-UA ServerURI chanID, err := c.channelsRM.Get(m.ServerURI) if err != nil { - return opcua.ErrNotFoundServerURI + return errNotFoundServerURI } // Get route-map of the OPC-UA NodeID thingID, err := c.thingsRM.Get(m.NodeID) if err != nil { - return opcua.ErrNotFoundNodeID + return errNotFoundNodeID } // Check connection between ServerURI and NodeID cKey := fmt.Sprintf("%s:%s", chanID, thingID) if _, err := c.connectRM.Get(cKey); err != nil { - return opcua.ErrNotFoundConn + return errNotFoundConn } // Publish on Mainflux NATS broker - SenML := fmt.Sprintf(`[{"n":"%s","v":%v}]`, m.Type, m.Data) + SenML := fmt.Sprintf(`[{"n":"%s", "t": %d, "v":%v}]`, m.Type, m.Time, m.Data) payload := []byte(SenML) msg := mainflux.Message{ Publisher: thingID, - Protocol: "opcua", + Protocol: protocol, ContentType: "Content-Type", Channel: chanID, Payload: payload, } - if err := c.publisher.Publish(ctx, token, msg); err != nil { + if err := c.publisher.Publish(c.ctx, token, msg); err != nil { return err } diff --git a/opcua/message.go b/opcua/message.go index 5ff58396d6..3e8d0047ea 100644 --- a/opcua/message.go +++ b/opcua/message.go @@ -8,5 +8,6 @@ type Message struct { ServerURI string NodeID string Type string + Time int64 Data interface{} } diff --git a/opcua/service.go b/opcua/service.go index 1f0559e8df..93785a67ac 100644 --- a/opcua/service.go +++ b/opcua/service.go @@ -6,21 +6,11 @@ package opcua import ( "fmt" - "github.com/mainflux/mainflux/errors" "github.com/mainflux/mainflux/logger" ) const protocol = "opcua" -var ( - // ErrNotFoundServerURI indicates missing ServerURI route-map - ErrNotFoundServerURI = errors.New("route map not found for this Server URI") - // ErrNotFoundNodeID indicates missing NodeID route-map - ErrNotFoundNodeID = errors.New("route map not found for this Node ID") - // ErrNotFoundConn indicates missing connection - ErrNotFoundConn = errors.New("connection not found") -) - // Service specifies an API that must be fullfiled by the domain service // implementation, and all of its decorators (e.g. logging & metrics). type Service interface { @@ -47,9 +37,6 @@ type Service interface { // DisconnectThing removes thing and channel connection route-map DisconnectThing(string, string) error - - // Subscribe subscribes to a given OPC-UA server - Subscribe(Config) error } // Config OPC-UA Server @@ -122,7 +109,7 @@ func (as *adapterService) ConnectThing(mfxChanID, mfxThingID string) error { as.cfg.NodeID = nodeID as.cfg.ServerURI = serverURI - go as.subscriber.Subscribe(as.cfg) + go as.subscribe(as.cfg) c := fmt.Sprintf("%s:%s", mfxChanID, mfxThingID) return as.connectRM.Save(c, c) @@ -133,8 +120,8 @@ func (as *adapterService) DisconnectThing(mfxChanID, mfxThingID string) error { return as.connectRM.Remove(c) } -// Subscribe subscribes to the OPC-UA Server. -func (as *adapterService) Subscribe(cfg Config) error { - go as.subscriber.Subscribe(cfg) - return nil +func (as *adapterService) subscribe(cfg Config) { + if err := as.subscriber.Subscribe(cfg); err != nil { + as.logger.Warn(fmt.Sprintf("subscription failed: %s", err)) + } } From c146d592552e305e22101ff7f9cf8e85d5f0b8d6 Mon Sep 17 00:00:00 2001 From: Manuel Imperiale Date: Sat, 14 Dec 2019 18:49:04 +0100 Subject: [PATCH 2/3] Rename pubsub -> subscribe Signed-off-by: Manuel Imperiale --- opcua/gopcua/{pubsub.go => subscribe.go} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename opcua/gopcua/{pubsub.go => subscribe.go} (100%) diff --git a/opcua/gopcua/pubsub.go b/opcua/gopcua/subscribe.go similarity index 100% rename from opcua/gopcua/pubsub.go rename to opcua/gopcua/subscribe.go From 4912abca987302a0a6aaa188226407c795b10655 Mon Sep 17 00:00:00 2001 From: Manuel Imperiale Date: Sat, 14 Dec 2019 19:09:09 +0100 Subject: [PATCH 3/3] Typo Signed-off-by: Manuel Imperiale --- cmd/opcua/main.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/opcua/main.go b/cmd/opcua/main.go index f50c3a2bc2..63a4c9b27a 100644 --- a/cmd/opcua/main.go +++ b/cmd/opcua/main.go @@ -193,7 +193,7 @@ func connectToRedis(redisURL, redisPass, redisDB string, logger logger.Logger) * }) } -func subscribeToNodesFromFile(pubsub opcua.Subscriber, nodes string, cfg opcua.Config, logger logger.Logger) { +func subscribeToNodesFromFile(sub opcua.Subscriber, nodes string, cfg opcua.Config, logger logger.Logger) { if _, err := os.Stat(nodes); os.IsNotExist(err) { logger.Warn(fmt.Sprintf("Config file not found: %s", err)) return @@ -224,7 +224,7 @@ func subscribeToNodesFromFile(pubsub opcua.Subscriber, nodes string, cfg opcua.C cfg.ServerURI = l[0] cfg.NodeID = l[1] - go subscribe(pubsub, cfg, logger) + go subscribe(sub, cfg, logger) } }