diff --git a/normalizer/nats/pubsub.go b/normalizer/nats/pubsub.go index bdf317fc34..617c0a4582 100644 --- a/normalizer/nats/pubsub.go +++ b/normalizer/nats/pubsub.go @@ -57,7 +57,7 @@ func (ps pubsub) publish(msg mainflux.RawMessage) error { output := mainflux.OutputSenML normalized, err := ps.svc.Normalize(msg) if err != nil { - switch ct := normalized.ContentType; ct { + switch ct := msg.ContentType; ct { case senML: return err case "": @@ -65,6 +65,11 @@ func (ps pubsub) publish(msg mainflux.RawMessage) error { default: output = fmt.Sprintf("out.%s", ct) } + + if err := ps.nc.Publish(output, msg.GetPayload()); err != nil { + ps.logger.Warn(fmt.Sprintf("Publishing failed: %s", err)) + return err + } } for _, v := range normalized.Messages {