Skip to content

Commit

Permalink
NOISSUE - Fix subtopic regex and restrict empty subtopic parts (#659)
Browse files Browse the repository at this point in the history
* Fix subtopic regex and restrict empty subtopic parts

Signed-off-by: Aleksandar Novakovic <aleksandar.novakovic@mainflux.com>

* Update mqtt adapter subtopic processing

Signed-off-by: Aleksandar Novakovic <aleksandar.novakovic@mainflux.com>

* Return old MQTT adapter implementation

Signed-off-by: Aleksandar Novakovic <aleksandar.novakovic@mainflux.com>

* Allow dots and handle empty parts

Signed-off-by: Aleksandar Novakovic <aleksandar.novakovic@mainflux.com>

* Add malformed subtopic error encoding

Signed-off-by: Aleksandar Novakovic <aleksandar.novakovic@mainflux.com>

* Fix MQTT topic validation

Signed-off-by: Aleksandar Novakovic <aleksandar.novakovic@mainflux.com>

* Fix MQTT topic parsing

Signed-off-by: Aleksandar Novakovic <aleksandar.novakovic@mainflux.com>

* Fix subtopic filtering

Signed-off-by: Aleksandar Novakovic <aleksandar.novakovic@mainflux.com>
  • Loading branch information
anovakovic01 authored and drasko committed Mar 19, 2019
1 parent 9bd0deb commit 538f627
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 51 deletions.
60 changes: 45 additions & 15 deletions coap/api/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,15 @@ import (
const protocol = "coap"

var (
errBadRequest = errors.New("bad request")
errBadOption = errors.New("bad option")
auth mainflux.ThingsServiceClient
logger log.Logger
pingPeriod time.Duration
errBadRequest = errors.New("bad request")
errBadOption = errors.New("bad option")
errMalformedSubtopic = errors.New("malformed subtopic")
)

var (
auth mainflux.ThingsServiceClient
logger log.Logger
pingPeriod time.Duration
)

type handler func(conn *net.UDPConn, addr *net.UDPAddr, msg *gocoap.Message) *gocoap.Message
Expand Down Expand Up @@ -69,9 +73,9 @@ func MakeCOAPHandler(svc coap.Service, tc mainflux.ThingsServiceClient, l log.Lo
pingPeriod = pp
r := mux.NewRouter()
r.Handle("/channels/{id}/messages", gocoap.FuncHandler(receive(svc))).Methods(gocoap.POST)
r.Handle("/channels/{id}/messages/{subtopic:.+}", gocoap.FuncHandler(receive(svc))).Methods(gocoap.POST)
r.Handle("/channels/{id}/messages/{subtopic:[^?]*}", gocoap.FuncHandler(receive(svc))).Methods(gocoap.POST)
r.Handle("/channels/{id}/messages", gocoap.FuncHandler(observe(svc, responses)))
r.Handle("/channels/{id}/messages/{subtopic:.+}", gocoap.FuncHandler(observe(svc, responses)))
r.Handle("/channels/{id}/messages/{subtopic:[^?]*}", gocoap.FuncHandler(observe(svc, responses)))
r.NotFoundHandler = gocoap.FuncHandler(notFoundHandler)

return r
Expand Down Expand Up @@ -112,14 +116,30 @@ func authorize(msg *gocoap.Message, res *gocoap.Message, cid string) (string, er
return id.GetValue(), nil
}

func fmtSubtopic(subtopic string) string {
if subtopic != "" {
if strings.HasSuffix(subtopic, "/") {
subtopic = subtopic[:len(subtopic)-1]
func fmtSubtopic(subtopic string) (string, error) {
if subtopic == "" {
return subtopic, nil
}

subtopic = strings.Replace(subtopic, "/", ".", -1)

elems := strings.Split(subtopic, ".")
filteredElems := []string{}
for _, elem := range elems {
if elem == "" {
continue
}

if len(elem) > 1 && (strings.Contains(elem, "*") || strings.Contains(elem, ">")) {
return "", errMalformedSubtopic
}
subtopic = strings.Replace(subtopic, "/", ".", -1)

filteredElems = append(filteredElems, elem)
}
return subtopic

subtopic = strings.Join(filteredElems, ".")

return subtopic, nil
}

func receive(svc coap.Service) handler {
Expand Down Expand Up @@ -151,7 +171,12 @@ func receive(svc coap.Service) handler {
res.Code = gocoap.NotFound
return res
}
subtopic := fmtSubtopic(mux.Var(msg, "subtopic"))

subtopic, err := fmtSubtopic(mux.Var(msg, "subtopic"))
if err != nil {
res.Code = gocoap.BadRequest
return res
}

publisher, err := authorize(msg, res, chanID)
if err != nil {
Expand Down Expand Up @@ -191,7 +216,12 @@ func observe(svc coap.Service, responses chan<- string) handler {
res.Code = gocoap.NotFound
return res
}
subtopic := fmtSubtopic(mux.Var(msg, "subtopic"))

subtopic, err := fmtSubtopic(mux.Var(msg, "subtopic"))
if err != nil {
res.Code = gocoap.BadRequest
return res
}

publisher, err := authorize(msg, res, chanID)
if err != nil {
Expand Down
22 changes: 18 additions & 4 deletions http/api/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ var (

var (
auth mainflux.ThingsServiceClient
channelPartRegExp = regexp.MustCompile(`^/channels/([\w\-]+)/messages((/[^/?]+)*)?(\?.*)?$`)
channelPartRegExp = regexp.MustCompile(`^/channels/([\w\-]+)/messages(/[^?]*)?(\?.*)?$`)
)

// MakeHandler returns a HTTP handler for API endpoints.
Expand Down Expand Up @@ -77,8 +77,22 @@ func parseSubtopic(subtopic string) (string, error) {
return "", errMalformedSubtopic
}
subtopic = strings.Replace(subtopic, "/", ".", -1)
// channelParts[2] contains the subtopic parts starting with char /
subtopic = subtopic[1:]

elems := strings.Split(subtopic, ".")
filteredElems := []string{}
for _, elem := range elems {
if elem == "" {
continue
}

if len(elem) > 1 && (strings.Contains(elem, "*") || strings.Contains(elem, ">")) {
return "", errMalformedSubtopic
}

filteredElems = append(filteredElems, elem)
}

subtopic = strings.Join(filteredElems, ".")
return subtopic, nil
}

Expand Down Expand Up @@ -151,7 +165,7 @@ func encodeResponse(_ context.Context, w http.ResponseWriter, response interface

func encodeError(_ context.Context, err error, w http.ResponseWriter) {
switch err {
case errMalformedData:
case errMalformedData, errMalformedSubtopic:
w.WriteHeader(http.StatusBadRequest)
case things.ErrUnauthorizedAccess:
w.WriteHeader(http.StatusForbidden)
Expand Down
26 changes: 15 additions & 11 deletions mqtt/mqtt.js
Original file line number Diff line number Diff line change
Expand Up @@ -123,16 +123,20 @@ aedes.authorizePublish = function (client, packet, publish) {
},
// Parse unlimited subtopics
baseLength = 3, // First 3 elements which represents the base part of topic.
elements = packet.topic.split('/').slice(baseLength),
isEmpty = function(value) {
return value !== '';
},
elements = packet.topic.split('/').slice(baseLength).join('.').split('.').filter(isEmpty),
baseTopic = 'channel.' + channelId;
// Remove empty elements
for (var i = 0; i < elements.length; i++) {
if (elements[i] === '') {
elements.pop(i)
}
if (elements[i].length > 1 && (elements[i].includes('*') || elements[i].includes('>'))) {
logger.warn('invalid subtopic');
publish(4);
return;
}
}
var channelTopic = elements.length ? baseTopic + '.' + elements.join('.') : baseTopic,

onAuthorize = function (err, res) {
var rawMsg;
if (!err) {
Expand Down Expand Up @@ -161,9 +165,9 @@ aedes.authorizePublish = function (client, packet, publish) {
aedes.authorizeSubscribe = function (client, packet, subscribe) {
var channel = parseTopic(packet.topic);
if (!channel) {
logger.warn('unknown topic');
subscribe(4, packet); // Bad username or password
return;
logger.warn('unknown topic');
subscribe(4, packet); // Bad username or password
return;
}
var channelId = channel[1],
accessReq = {
Expand Down Expand Up @@ -207,9 +211,9 @@ aedes.on('clientDisconnect', function (client) {
});

aedes.on('clientError', function (client, err) {
logger.warn('client error: client: %s, error: %s', client.id, err.message);
logger.warn('client error: client: %s, error: %s', client.id, err.message);
});

aedes.on('connectionError', function (client, err) {
logger.warn('client error: client: %s, error: %s', client.id, err.message);
});
logger.warn('client error: client: %s, error: %s', client.id, err.message);
});
55 changes: 35 additions & 20 deletions ws/api/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ const protocol = "ws"

var (
errUnauthorizedAccess = errors.New("missing or invalid credentials provided")
errMalformedData = errors.New("malformed request data")
errMalformedSubtopic = errors.New("malformed subtopic")
)

Expand All @@ -46,7 +45,7 @@ var (
}
auth mainflux.ThingsServiceClient
logger log.Logger
channelPartRegExp = regexp.MustCompile(`^/channels/([\w\-]+)/messages((/[^/?]+)*)?(\?.*)?$`)
channelPartRegExp = regexp.MustCompile(`^/channels/([\w\-]+)/messages(/[^?]*)?(\?.*)?$`)
)

// MakeHandler returns http handler with handshake endpoint.
Expand All @@ -68,10 +67,6 @@ func handshake(svc ws.Service) http.HandlerFunc {
sub, err := authorize(r)
if err != nil {
switch err {
case errMalformedData:
logger.Warn(fmt.Sprintf("Empty channel id or malformed url"))
w.WriteHeader(http.StatusBadRequest)
return
case things.ErrUnauthorizedAccess:
w.WriteHeader(http.StatusForbidden)
return
Expand All @@ -82,6 +77,20 @@ func handshake(svc ws.Service) http.HandlerFunc {
}
}

channelParts := channelPartRegExp.FindStringSubmatch(r.RequestURI)
if len(channelParts) < 2 {
logger.Warn(fmt.Sprintf("Empty channel id or malformed url"))
w.WriteHeader(http.StatusBadRequest)
return
}

sub.subtopic, err = parseSubtopic(channelParts[2])
if err != nil {
logger.Warn(fmt.Sprintf("Empty channel id or malformed url"))
w.WriteHeader(http.StatusBadRequest)
return
}

// Create new ws connection.
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
Expand Down Expand Up @@ -113,9 +122,25 @@ func parseSubtopic(subtopic string) (string, error) {
if err != nil {
return "", errMalformedSubtopic
}

subtopic = strings.Replace(subtopic, "/", ".", -1)
// channelParts[2] contains the subtopic parts starting with char /
subtopic = subtopic[1:]

elems := strings.Split(subtopic, ".")
filteredElems := []string{}
for _, elem := range elems {
if elem == "" {
continue
}

if len(elem) > 1 && (strings.Contains(elem, "*") || strings.Contains(elem, ">")) {
return "", errMalformedSubtopic
}

filteredElems = append(filteredElems, elem)
}

subtopic = strings.Join(filteredElems, ".")

return subtopic, nil
}

Expand All @@ -129,16 +154,7 @@ func authorize(r *http.Request) (subscription, error) {
authKey = authKeys[0]
}

channelParts := channelPartRegExp.FindStringSubmatch(r.RequestURI)
if len(channelParts) < 2 {
return subscription{}, errMalformedData
}

chanID := bone.GetValue(r, "id")
subtopic, err := parseSubtopic(channelParts[2])
if err != nil {
return subscription{}, err
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
Expand All @@ -153,9 +169,8 @@ func authorize(r *http.Request) (subscription, error) {
}

sub := subscription{
pubID: id.GetValue(),
chanID: chanID,
subtopic: subtopic,
pubID: id.GetValue(),
chanID: chanID,
}

return sub, nil
Expand Down
2 changes: 1 addition & 1 deletion ws/api/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func TestHandshake(t *testing.T) {
{"connect and send message with token as query parameter", id, "", false, token, http.StatusSwitchingProtocols, msg},
{"connect and send message that cannot be published", id, "", true, token, http.StatusSwitchingProtocols, []byte{}},
{"connect and send message to subtopic", id, "subtopic", true, token, http.StatusSwitchingProtocols, msg},
{"connect and send message to subtopic with invalid name", id, "sub//topic", true, token, http.StatusBadRequest, msg},
{"connect and send message to subtopic with invalid name", id, "sub/a*b/topic", true, token, http.StatusBadRequest, msg},
{"connect and send message to nested subtopic", id, "subtopic/nested", true, token, http.StatusSwitchingProtocols, msg},
{"connect and send message to all subtopics", id, ">", true, token, http.StatusSwitchingProtocols, msg},
}
Expand Down

0 comments on commit 538f627

Please sign in to comment.