Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MF-898 - Add bulk connections endpoint #948

Merged
merged 11 commits into from
Nov 15, 2019
14 changes: 8 additions & 6 deletions bootstrap/mocks/things.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,19 +70,21 @@ func (svc *mainfluxThings) ViewThing(_ context.Context, owner, id string) (thing
return things.Thing{}, things.ErrNotFound
}

func (svc *mainfluxThings) Connect(_ context.Context, owner, chID string, thIDs ...string) error {
func (svc *mainfluxThings) Connect(_ context.Context, owner string, chIDs, thIDs []string) error {
svc.mu.Lock()
defer svc.mu.Unlock()

userID, err := svc.users.Identify(context.Background(), &mainflux.Token{Value: owner})
if err != nil {
return things.ErrUnauthorizedAccess
}
if svc.channels[chID].Owner != userID.Value {
return things.ErrUnauthorizedAccess
}
for _, thID := range thIDs {
svc.connections[chID] = append(svc.connections[chID], thID)
for _, chID := range chIDs {
if svc.channels[chID].Owner != userID.Value {
return things.ErrUnauthorizedAccess
}
for _, thID := range thIDs {
svc.connections[chID] = append(svc.connections[chID], thID)
}
}

return nil
Expand Down
12 changes: 10 additions & 2 deletions docs/provisioning.md
Original file line number Diff line number Diff line change
Expand Up @@ -245,12 +245,20 @@ Only user, who is the owner of a channel and of the things, can connect the
things to the channel (which is equivalent of giving permissions to these things
to communicate over given communication group).

To connect thing to the channel you should send following request:
To connect a thing to the channel you should send following request:

```
> This endpoint will be depreciated in 0.11.0. It will be replaced with the bulk endpoint found at /connect.

```bash
curl -s -S -i --cacert docker/ssl/certs/mainflux-server.crt --insecure -X PUT -H "Authorization: <user_auth_token>" https://localhost/channels/<channel_id>/things/<thing_id>
```

To connect multiple things to a channel, you can send the following request:

```bash
curl -s -S -i --cacert docker/ssl/certs/mainflux-server.crt --insecure -X POST -H "Content-Type: application/json" -H "Authorization: <user_auth_token>" https://localhost/connect -d '{"channel_ids":["<channel_id>", "<channel_id>"],"thingIDs":["<thing_id>", "<thing_id>"]}'
nwneisen marked this conversation as resolved.
Show resolved Hide resolved
nwneisen marked this conversation as resolved.
Show resolved Hide resolved
```

You can observe which things are connected to specific channel:

```
Expand Down
4 changes: 2 additions & 2 deletions things/api/auth/grpc/endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestCanAccessByKey(t *testing.T) {
cth := sths[0]
schs, _ := svc.CreateChannels(context.Background(), token, channel)
sch := schs[0]
svc.Connect(context.Background(), token, sch.ID, cth.ID)
svc.Connect(context.Background(), token, []string{sch.ID}, []string{cth.ID})

usersAddr := fmt.Sprintf("localhost:%d", port)
conn, _ := grpc.Dial(usersAddr, grpc.WithInsecure())
Expand Down Expand Up @@ -90,7 +90,7 @@ func TestCanAccessByID(t *testing.T) {
cth := sths[0]
schs, _ := svc.CreateChannels(context.Background(), token, channel)
sch := schs[0]
svc.Connect(context.Background(), token, sch.ID, cth.ID)
svc.Connect(context.Background(), token, []string{sch.ID}, []string{cth.ID})

usersAddr := fmt.Sprintf("localhost:%d", port)
conn, _ := grpc.Dial(usersAddr, grpc.WithInsecure())
Expand Down
4 changes: 2 additions & 2 deletions things/api/auth/http/endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func TestCanAccessByKey(t *testing.T) {
require.Nil(t, err, fmt.Sprintf("failed to create channel: %s", err))
sch := schs[0]

err = svc.Connect(context.Background(), token, sch.ID, sth.ID)
err = svc.Connect(context.Background(), token, []string{sch.ID}, []string{sth.ID})
require.Nil(t, err, fmt.Sprintf("failed to connect thing and channel: %s", err))

car := canAccessByKeyReq{
Expand Down Expand Up @@ -234,7 +234,7 @@ func TestCanAccessByID(t *testing.T) {
require.Nil(t, err, fmt.Sprintf("failed to create channel: %s", err))
sch := schs[0]

err = svc.Connect(context.Background(), token, sch.ID, sth.ID)
err = svc.Connect(context.Background(), token, []string{sch.ID}, []string{sth.ID})
require.Nil(t, err, fmt.Sprintf("failed to connect thing and channel: %s", err))

car := canAccessByIDReq{
Expand Down
6 changes: 3 additions & 3 deletions things/api/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,17 +201,17 @@ func (lm *loggingMiddleware) RemoveChannel(ctx context.Context, token, id string
return lm.svc.RemoveChannel(ctx, token, id)
}

func (lm *loggingMiddleware) Connect(ctx context.Context, token, chID string, thIDs ...string) (err error) {
func (lm *loggingMiddleware) Connect(ctx context.Context, token string, chIDs, thIDs []string) (err error) {
defer func(begin time.Time) {
message := fmt.Sprintf("Method connect for token %s, channel %s and things %s took %s to complete", token, chID, thIDs, time.Since(begin))
message := fmt.Sprintf("Method connect for token %s, channel %s and things %s took %s to complete", token, chIDs, thIDs, time.Since(begin))
nwneisen marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
return
}
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
}(time.Now())

return lm.svc.Connect(ctx, token, chID, thIDs...)
return lm.svc.Connect(ctx, token, chIDs, thIDs)
}

func (lm *loggingMiddleware) Disconnect(ctx context.Context, token, chanID, thingID string) (err error) {
Expand Down
4 changes: 2 additions & 2 deletions things/api/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,13 +148,13 @@ func (ms *metricsMiddleware) RemoveChannel(ctx context.Context, token, id string
return ms.svc.RemoveChannel(ctx, token, id)
}

func (ms *metricsMiddleware) Connect(ctx context.Context, token, chanID string, thIDs ...string) error {
func (ms *metricsMiddleware) Connect(ctx context.Context, token string, chIDs, thIDs []string) error {
defer func(begin time.Time) {
ms.counter.With("method", "connect").Add(1)
ms.latency.With("method", "connect").Observe(time.Since(begin).Seconds())
}(time.Now())

return ms.svc.Connect(ctx, token, chanID, thIDs...)
return ms.svc.Connect(ctx, token, chIDs, thIDs)
}

func (ms *metricsMiddleware) Disconnect(ctx context.Context, token, chanID, thingID string) error {
Expand Down
18 changes: 17 additions & 1 deletion things/api/things/http/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,14 +445,30 @@ func connectEndpoint(svc things.Service) endpoint.Endpoint {
return nil, err
}

if err := svc.Connect(ctx, cr.token, cr.chanID, cr.thingID); err != nil {
if err := svc.Connect(ctx, cr.token, []string{cr.chanID}, []string{cr.thingID}); err != nil {
return nil, err
}

return connectionRes{}, nil
}
}

func createConnectionsEndpoint(svc things.Service) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (interface{}, error) {
cr := request.(createConnectionsReq)

if err := cr.validate(); err != nil {
return nil, err
}

if err := svc.Connect(ctx, cr.token, cr.ChannelIDs, cr.ThingIDs); err != nil {
return nil, err
}

return createConnectionsRes{}, nil
}
}

func disconnectEndpoint(svc things.Service) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (interface{}, error) {
cr := request.(connectionReq)
Expand Down
172 changes: 167 additions & 5 deletions things/api/things/http/endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -790,7 +790,7 @@ func TestListThingsByChannel(t *testing.T) {
sths, err := svc.CreateThings(context.Background(), token, thing)
require.Nil(t, err, fmt.Sprintf("unexpected error: %s", err))
sth := sths[0]
err = svc.Connect(context.Background(), token, sch.ID, sth.ID)
err = svc.Connect(context.Background(), token, []string{sch.ID}, []string{sth.ID})
require.Nil(t, err, fmt.Sprintf("unexpected error: %s", err))

thres := thingRes{
Expand Down Expand Up @@ -1313,7 +1313,7 @@ func TestViewChannel(t *testing.T) {

sths, _ := svc.CreateThings(context.Background(), token, thing)
sth := sths[0]
svc.Connect(context.Background(), token, sch.ID, sth.ID)
svc.Connect(context.Background(), token, []string{sch.ID}, []string{sth.ID})

chres := channelRes{
ID: sch.ID,
Expand Down Expand Up @@ -1396,7 +1396,7 @@ func TestListChannels(t *testing.T) {
sths, err := svc.CreateThings(context.Background(), token, thing)
require.Nil(t, err, fmt.Sprintf("unexpected error: %s", err))
sth := sths[0]
svc.Connect(context.Background(), token, sch.ID, sth.ID)
svc.Connect(context.Background(), token, []string{sch.ID}, []string{sth.ID})

chres := channelRes{
ID: sch.ID,
Expand Down Expand Up @@ -1551,7 +1551,7 @@ func TestListChannelsByThing(t *testing.T) {
schs, err := svc.CreateChannels(context.Background(), token, channel)
require.Nil(t, err, fmt.Sprintf("unexpected error: %s", err))
sch := schs[0]
err = svc.Connect(context.Background(), token, sch.ID, sth.ID)
err = svc.Connect(context.Background(), token, []string{sch.ID}, []string{sth.ID})
require.Nil(t, err, fmt.Sprintf("unexpected error: %s", err))

chres := channelRes{
Expand Down Expand Up @@ -1840,6 +1840,168 @@ func TestConnect(t *testing.T) {
}
}

func TestCreateConnections(t *testing.T) {
otherToken := "other_token"
otherEmail := "other_user@example.com"
svc := newService(map[string]string{
token: email,
otherToken: otherEmail,
})
ts := newServer(svc)
defer ts.Close()

sths, _ := svc.CreateThings(context.Background(), token, thing)
ths := []string{}
for _, th := range sths {
ths = append(ths, th.ID)
}

schs, _ := svc.CreateChannels(context.Background(), token, channel)
achs := []string{}
for _, ch := range schs {
achs = append(achs, ch.ID)
}
schs, _ = svc.CreateChannels(context.Background(), otherToken, channel)
bchs := []string{}
for _, ch := range schs {
bchs = append(bchs, ch.ID)
}

cases := []struct {
desc string
channelIDs []string
thingIDs []string
auth string
contentType string
body string
status int
}{
{
desc: "connect existing things to existing channels",
channelIDs: achs,
thingIDs: ths,
auth: token,
contentType: contentType,
status: http.StatusOK,
},
{
desc: "connect existing things to non-existent channels",
channelIDs: []string{strconv.FormatUint(wrongID, 10)},
thingIDs: ths,
auth: token,
contentType: contentType,
status: http.StatusNotFound,
},
{
desc: "connect non-existing things to existing channels",
channelIDs: achs,
thingIDs: []string{strconv.FormatUint(wrongID, 10)},
auth: token,
contentType: contentType,
status: http.StatusNotFound,
},
{
desc: "connect existing things to channel with invalid id",
channelIDs: []string{"invalid"},
thingIDs: ths,
auth: token,
contentType: contentType,
status: http.StatusNotFound,
},
{
desc: "connect things with invalid id to existing channels",
channelIDs: achs,
thingIDs: []string{"invalid"},
auth: token,
contentType: contentType,
status: http.StatusNotFound,
},
{
desc: "connect existing things to empty channel ids",
channelIDs: []string{""},
thingIDs: ths,
auth: token,
contentType: contentType,
status: http.StatusBadRequest,
},
{
desc: "connect empty things id to existing channels",
channelIDs: achs,
thingIDs: []string{""},
auth: token,
contentType: contentType,
status: http.StatusBadRequest,
},
{
desc: "connect existing things to existing channels with invalid token",
channelIDs: achs,
thingIDs: ths,
auth: wrongValue,
contentType: contentType,
status: http.StatusForbidden,
},
{
desc: "connect existing things to existing channels with empty token",
channelIDs: achs,
thingIDs: ths,
auth: "",
contentType: contentType,
status: http.StatusForbidden,
},
{
desc: "connect things from owner to channels of other user",
channelIDs: bchs,
thingIDs: ths,
auth: token,
contentType: contentType,
status: http.StatusNotFound,
},
{
desc: "invalid content type",
channelIDs: bchs,
thingIDs: ths,
auth: token,
contentType: "invalid",
status: http.StatusUnsupportedMediaType,
},
{
desc: "invalid JSON",
auth: token,
contentType: contentType,
status: http.StatusBadRequest,
body: "{",
},
}

for _, tc := range cases {
data := struct {
ChannelIDs []string `json:"channel_ids"`
ThingIDs []string `json:"thing_ids"`
}{
tc.channelIDs,
tc.thingIDs,
}
body := toJSON(data)

if tc.body != "" {
body = tc.body
}

req := testRequest{
client: ts.Client(),
method: http.MethodPost,
url: fmt.Sprintf("%s/connect", ts.URL),
contentType: tc.contentType,
token: tc.auth,
body: strings.NewReader(body),
}

res, err := req.make()
assert.Nil(t, err, fmt.Sprintf("%s: unexpected error %s", tc.desc, err))
assert.Equal(t, tc.status, res.StatusCode, fmt.Sprintf("%s: expected status code %d got %d", tc.desc, tc.status, res.StatusCode))
}
}

func TestDisconnnect(t *testing.T) {
otherToken := "other_token"
otherEmail := "other_user@example.com"
Expand All @@ -1854,7 +2016,7 @@ func TestDisconnnect(t *testing.T) {
ath := sths[0]
schs, _ := svc.CreateChannels(context.Background(), token, channel)
ach := schs[0]
svc.Connect(context.Background(), token, ach.ID, ath.ID)
svc.Connect(context.Background(), token, []string{ach.ID}, []string{ath.ID})
schs, _ = svc.CreateChannels(context.Background(), otherToken, channel)
bch := schs[0]

Expand Down
25 changes: 25 additions & 0 deletions things/api/things/http/requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,3 +240,28 @@ func (req connectionReq) validate() error {

return nil
}

type createConnectionsReq struct {
token string
ChannelIDs []string `json:"channel_ids,omitempty"`
ThingIDs []string `json:"thing_ids,omitempty"`
nwneisen marked this conversation as resolved.
Show resolved Hide resolved
}

func (req createConnectionsReq) validate() error {
if req.token == "" {
return things.ErrUnauthorizedAccess
}

for _, chID := range req.ChannelIDs {
if chID == "" {
return things.ErrMalformedEntity
}
}
for _, thingID := range req.ThingIDs {
if thingID == "" {
return things.ErrMalformedEntity
}
}

return nil
}
Loading