From 8698a0661d4ca9d3c03f9923d69651a0145b4ea4 Mon Sep 17 00:00:00 2001 From: nwneisen Date: Mon, 11 Nov 2019 21:07:07 -0700 Subject: [PATCH 01/10] Change connect endpoint to use bulk connections Signed-off-by: nwneisen --- things/api/things/http/endpoint.go | 16 ++++ things/api/things/http/endpoint_test.go | 111 ++++++++++++++++++++++++ things/api/things/http/requests.go | 24 +++++ things/api/things/http/responses.go | 18 +++- things/api/things/http/transport.go | 20 +++++ 5 files changed, 188 insertions(+), 1 deletion(-) diff --git a/things/api/things/http/endpoint.go b/things/api/things/http/endpoint.go index b5ba3cc360..8e320f13d0 100644 --- a/things/api/things/http/endpoint.go +++ b/things/api/things/http/endpoint.go @@ -453,6 +453,22 @@ func connectEndpoint(svc things.Service) endpoint.Endpoint { } } +func createConnectionsEndpoint(svc things.Service) endpoint.Endpoint { + return func(ctx context.Context, request interface{}) (interface{}, error) { + cr := request.(createConnectionsReq) + + if err := cr.validate(); err != nil { + return nil, err + } + + if err := svc.Connect(ctx, cr.token, cr.ChanID, cr.ThingIDs...); err != nil { + return nil, err + } + + return createConnectionsRes{}, nil + } +} + func disconnectEndpoint(svc things.Service) endpoint.Endpoint { return func(ctx context.Context, request interface{}) (interface{}, error) { cr := request.(connectionReq) diff --git a/things/api/things/http/endpoint_test.go b/things/api/things/http/endpoint_test.go index ca60beac3e..e4d63ed52b 100644 --- a/things/api/things/http/endpoint_test.go +++ b/things/api/things/http/endpoint_test.go @@ -1840,6 +1840,117 @@ func TestConnect(t *testing.T) { } } +func TestCreateConnections(t *testing.T) { + otherToken := "other_token" + otherEmail := "other_user@example.com" + svc := newService(map[string]string{ + token: email, + otherToken: otherEmail, + }) + ts := newServer(svc) + defer ts.Close() + + sths, _ := svc.CreateThings(context.Background(), token, thing) + ths := []string{} + for _, th := range sths { + ths = append(ths, th.ID) + } + + schs, _ := svc.CreateChannels(context.Background(), token, channel) + ach := schs[0] + schs, _ = svc.CreateChannels(context.Background(), otherToken, channel) + bch := schs[0] + + cases := []struct { + desc string + chanID string + thingIDs []string + auth string + status int + }{ + { + desc: "connect existing things to existing channel", + chanID: ach.ID, + thingIDs: ths, + auth: token, + status: http.StatusOK, + }, + { + desc: "connect existing things to non-existent channel", + chanID: strconv.FormatUint(wrongID, 10), + thingIDs: ths, + auth: token, + status: http.StatusNotFound, + }, + { + desc: "connect non-existing things to existing channel", + chanID: ach.ID, + thingIDs: []string{strconv.FormatUint(wrongID, 10)}, + auth: token, + status: http.StatusNotFound, + }, + { + desc: "connect existing things to channel with invalid id", + chanID: "invalid", + thingIDs: ths, + auth: token, + status: http.StatusNotFound, + }, + { + desc: "connect things with invalid id to existing channel", + chanID: ach.ID, + thingIDs: []string{"invalid"}, + auth: token, + status: http.StatusNotFound, + }, + { + desc: "connect existing things to existing channel with invalid token", + chanID: ach.ID, + thingIDs: ths, + auth: wrongValue, + status: http.StatusForbidden, + }, + { + desc: "connect existing things to existing channel with empty token", + chanID: ach.ID, + thingIDs: ths, + auth: "", + status: http.StatusForbidden, + }, + { + desc: "connect things from owner to channel of other user", + chanID: bch.ID, + thingIDs: ths, + auth: token, + status: http.StatusNotFound, + }, + } + + for _, tc := range cases { + data := struct { + ChanID string + ThingIDs []string + }{ + tc.chanID, + tc.thingIDs, + } + body := toJSON(data) + + req := testRequest{ + client: ts.Client(), + method: http.MethodPost, + url: fmt.Sprintf("%s/connect", ts.URL), + contentType: contentType, + token: tc.auth, + body: strings.NewReader(body), + } + + res, err := req.make() + assert.Nil(t, err, fmt.Sprintf("%s: unexpected error %s", tc.desc, err)) + assert.Equal(t, tc.status, res.StatusCode, fmt.Sprintf("%s: expected status code %d got %d", tc.desc, tc.status, res.StatusCode)) + } +} + func TestDisconnnect(t *testing.T) { otherToken := "other_token" otherEmail := "other_user@example.com" diff --git a/things/api/things/http/requests.go b/things/api/things/http/requests.go index 14bd88aa15..ac8c724d32 100644 --- a/things/api/things/http/requests.go +++ b/things/api/things/http/requests.go @@ -240,3 +240,27 @@ func (req connectionReq) validate() error { return nil } + +type createConnectionsReq struct { + token string + ChanID string `json:"chanID,omitempty"` + ThingIDs []string `json:"thingIDs,omitempty"` +} + +func (req createConnectionsReq) validate() error { + if req.token == "" { + return things.ErrUnauthorizedAccess + } + + if req.ChanID == "" { + return things.ErrMalformedEntity + } + + for _, thingID := range req.ThingIDs { + if thingID == "" { + return things.ErrMalformedEntity + } + } + + return nil +} diff --git a/things/api/things/http/responses.go b/things/api/things/http/responses.go index 4d6993fe4e..b7b8a6f2eb 100644 --- a/things/api/things/http/responses.go +++ b/things/api/things/http/responses.go @@ -220,13 +220,29 @@ func (res connectionRes) Code() int { } func (res connectionRes) Headers() map[string]string { - return map[string]string{} + return map[string]string{ + "Warning-Deprecated": "This endpoint will be depreciated in 0.11.0. It will be replaced with the bulk endpoint found at /connect.", + } } func (res connectionRes) Empty() bool { return true } +type createConnectionsRes struct{} + +func (res createConnectionsRes) Code() int { + return http.StatusOK +} + +func (res createConnectionsRes) Headers() map[string]string { + return map[string]string{} +} + +func (res createConnectionsRes) Empty() bool { + return true +} + type disconnectionRes struct{} func (res disconnectionRes) Code() int { diff --git a/things/api/things/http/transport.go b/things/api/things/http/transport.go index 3a32291f5b..ac464088e2 100644 --- a/things/api/things/http/transport.go +++ b/things/api/things/http/transport.go @@ -157,6 +157,13 @@ func MakeHandler(tracer opentracing.Tracer, svc things.Service) http.Handler { opts..., )) + r.Post("/connect", kithttp.NewServer( + kitot.TraceServer(tracer, "create_connections")(createConnectionsEndpoint(svc)), + decodeCreateConnections, + encodeResponse, + opts..., + )) + r.Delete("/channels/:chanId/things/:thingId", kithttp.NewServer( kitot.TraceServer(tracer, "disconnect")(disconnectEndpoint(svc)), decodeConnection, @@ -343,6 +350,19 @@ func decodeConnection(_ context.Context, r *http.Request) (interface{}, error) { return req, nil } +func decodeCreateConnections(_ context.Context, r *http.Request) (interface{}, error) { + if !strings.Contains(r.Header.Get("Content-Type"), contentType) { + return nil, errUnsupportedContentType + } + + req := createConnectionsReq{token: r.Header.Get("Authorization")} + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + return nil, err + } + + return req, nil +} + func encodeResponse(_ context.Context, w http.ResponseWriter, response interface{}) error { w.Header().Set("Content-Type", contentType) From 1c213d88239ac25eb362176f12d6809d771b5e8c Mon Sep 17 00:00:00 2001 From: nwneisen Date: Mon, 11 Nov 2019 21:40:31 -0700 Subject: [PATCH 02/10] Add documentation for bulk connect Signed-off-by: nwneisen --- docs/provisioning.md | 12 ++++++++++-- things/api/things/http/endpoint_test.go | 4 ++-- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/docs/provisioning.md b/docs/provisioning.md index 8980859a7b..958aaeb35d 100644 --- a/docs/provisioning.md +++ b/docs/provisioning.md @@ -245,12 +245,20 @@ Only user, who is the owner of a channel and of the things, can connect the things to the channel (which is equivalent of giving permissions to these things to communicate over given communication group). -To connect thing to the channel you should send following request: +To connect a thing to the channel you should send following request: -``` +> This endpoint will be depreciated in 0.11.0. It will be replaced with the bulk endpoint found at /connect. + +```bash curl -s -S -i --cacert docker/ssl/certs/mainflux-server.crt --insecure -X PUT -H "Authorization: " https://localhost/channels//things/ ``` +To connect multiple things to a channel, you can send the following request: + +```bash +curl -s -S -i --cacert docker/ssl/certs/mainflux-server.crt --insecure -X POST -H "Content-Type: application/json" -H "Authorization: " https://localhost/connect -d '{"chanID":"","thingIDs":["", ""]}' +``` + You can observe which things are connected to specific channel: ``` diff --git a/things/api/things/http/endpoint_test.go b/things/api/things/http/endpoint_test.go index e4d63ed52b..15865e3361 100644 --- a/things/api/things/http/endpoint_test.go +++ b/things/api/things/http/endpoint_test.go @@ -1928,8 +1928,8 @@ func TestCreateConnections(t *testing.T) { for _, tc := range cases { data := struct { - ChanID string - ThingIDs []string + ChanID string `json:"chanID"` + ThingIDs []string `json:"thingIDs"` }{ tc.chanID, tc.thingIDs, From 9fb26a6188396e91d92e850ad0b5e32363ea4e93 Mon Sep 17 00:00:00 2001 From: Nick Neisen Date: Thu, 14 Nov 2019 08:52:42 -0700 Subject: [PATCH 03/10] Update service and postgres for multiple channels Signed-off-by: Nick Neisen --- bootstrap/mocks/things.go | 14 ++++++---- things/api/auth/grpc/endpoint_test.go | 4 +-- things/api/auth/http/endpoint_test.go | 4 +-- things/api/logging.go | 6 ++-- things/api/metrics.go | 4 +-- things/channels.go | 2 +- things/mocks/channels.go | 34 ++++++++++++----------- things/postgres/channels.go | 40 ++++++++++++++------------- things/postgres/channels_test.go | 12 ++++---- things/redis/streams.go | 26 +++++++++-------- things/redis/streams_test.go | 8 +++--- things/service.go | 6 ++-- things/service_test.go | 12 ++++---- things/tracing/channels.go | 4 +-- 14 files changed, 92 insertions(+), 84 deletions(-) diff --git a/bootstrap/mocks/things.go b/bootstrap/mocks/things.go index b5d0058a76..94ac19153c 100644 --- a/bootstrap/mocks/things.go +++ b/bootstrap/mocks/things.go @@ -70,7 +70,7 @@ func (svc *mainfluxThings) ViewThing(_ context.Context, owner, id string) (thing return things.Thing{}, things.ErrNotFound } -func (svc *mainfluxThings) Connect(_ context.Context, owner, chID string, thIDs ...string) error { +func (svc *mainfluxThings) Connect(_ context.Context, owner string, chIDs []string, thIDs []string) error { svc.mu.Lock() defer svc.mu.Unlock() @@ -78,11 +78,13 @@ func (svc *mainfluxThings) Connect(_ context.Context, owner, chID string, thIDs if err != nil { return things.ErrUnauthorizedAccess } - if svc.channels[chID].Owner != userID.Value { - return things.ErrUnauthorizedAccess - } - for _, thID := range thIDs { - svc.connections[chID] = append(svc.connections[chID], thID) + for _, chID := range chIDs { + if svc.channels[chID].Owner != userID.Value { + return things.ErrUnauthorizedAccess + } + for _, thID := range thIDs { + svc.connections[chID] = append(svc.connections[chID], thID) + } } return nil diff --git a/things/api/auth/grpc/endpoint_test.go b/things/api/auth/grpc/endpoint_test.go index b49a0bab58..f27bfc0770 100644 --- a/things/api/auth/grpc/endpoint_test.go +++ b/things/api/auth/grpc/endpoint_test.go @@ -34,7 +34,7 @@ func TestCanAccessByKey(t *testing.T) { cth := sths[0] schs, _ := svc.CreateChannels(context.Background(), token, channel) sch := schs[0] - svc.Connect(context.Background(), token, sch.ID, cth.ID) + svc.Connect(context.Background(), token, []string{sch.ID}, []string{cth.ID}) usersAddr := fmt.Sprintf("localhost:%d", port) conn, _ := grpc.Dial(usersAddr, grpc.WithInsecure()) @@ -90,7 +90,7 @@ func TestCanAccessByID(t *testing.T) { cth := sths[0] schs, _ := svc.CreateChannels(context.Background(), token, channel) sch := schs[0] - svc.Connect(context.Background(), token, sch.ID, cth.ID) + svc.Connect(context.Background(), token, []string{sch.ID}, []string{cth.ID}) usersAddr := fmt.Sprintf("localhost:%d", port) conn, _ := grpc.Dial(usersAddr, grpc.WithInsecure()) diff --git a/things/api/auth/http/endpoint_test.go b/things/api/auth/http/endpoint_test.go index 50a9d6edcc..e7f5ffa952 100644 --- a/things/api/auth/http/endpoint_test.go +++ b/things/api/auth/http/endpoint_test.go @@ -155,7 +155,7 @@ func TestCanAccessByKey(t *testing.T) { require.Nil(t, err, fmt.Sprintf("failed to create channel: %s", err)) sch := schs[0] - err = svc.Connect(context.Background(), token, sch.ID, sth.ID) + err = svc.Connect(context.Background(), token, []string{sch.ID}, []string{sth.ID}) require.Nil(t, err, fmt.Sprintf("failed to connect thing and channel: %s", err)) car := canAccessByKeyReq{ @@ -234,7 +234,7 @@ func TestCanAccessByID(t *testing.T) { require.Nil(t, err, fmt.Sprintf("failed to create channel: %s", err)) sch := schs[0] - err = svc.Connect(context.Background(), token, sch.ID, sth.ID) + err = svc.Connect(context.Background(), token, []string{sch.ID}, []string{sth.ID}) require.Nil(t, err, fmt.Sprintf("failed to connect thing and channel: %s", err)) car := canAccessByIDReq{ diff --git a/things/api/logging.go b/things/api/logging.go index 62e2e00112..51f1c46f9b 100644 --- a/things/api/logging.go +++ b/things/api/logging.go @@ -201,9 +201,9 @@ func (lm *loggingMiddleware) RemoveChannel(ctx context.Context, token, id string return lm.svc.RemoveChannel(ctx, token, id) } -func (lm *loggingMiddleware) Connect(ctx context.Context, token, chID string, thIDs ...string) (err error) { +func (lm *loggingMiddleware) Connect(ctx context.Context, token string, chIDs []string, thIDs []string) (err error) { defer func(begin time.Time) { - message := fmt.Sprintf("Method connect for token %s, channel %s and things %s took %s to complete", token, chID, thIDs, time.Since(begin)) + message := fmt.Sprintf("Method connect for token %s, channel %s and things %s took %s to complete", token, chIDs, thIDs, time.Since(begin)) if err != nil { lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err)) return @@ -211,7 +211,7 @@ func (lm *loggingMiddleware) Connect(ctx context.Context, token, chID string, th lm.logger.Info(fmt.Sprintf("%s without errors.", message)) }(time.Now()) - return lm.svc.Connect(ctx, token, chID, thIDs...) + return lm.svc.Connect(ctx, token, chIDs, thIDs) } func (lm *loggingMiddleware) Disconnect(ctx context.Context, token, chanID, thingID string) (err error) { diff --git a/things/api/metrics.go b/things/api/metrics.go index 58dd1970ad..dc7eb0c55c 100644 --- a/things/api/metrics.go +++ b/things/api/metrics.go @@ -148,13 +148,13 @@ func (ms *metricsMiddleware) RemoveChannel(ctx context.Context, token, id string return ms.svc.RemoveChannel(ctx, token, id) } -func (ms *metricsMiddleware) Connect(ctx context.Context, token, chanID string, thIDs ...string) error { +func (ms *metricsMiddleware) Connect(ctx context.Context, token string, chIDs []string, thIDs []string) error { defer func(begin time.Time) { ms.counter.With("method", "connect").Add(1) ms.latency.With("method", "connect").Observe(time.Since(begin).Seconds()) }(time.Now()) - return ms.svc.Connect(ctx, token, chanID, thIDs...) + return ms.svc.Connect(ctx, token, chIDs, thIDs) } func (ms *metricsMiddleware) Disconnect(ctx context.Context, token, chanID, thingID string) error { diff --git a/things/channels.go b/things/channels.go index 5610ce20e3..3c9879b913 100644 --- a/things/channels.go +++ b/things/channels.go @@ -48,7 +48,7 @@ type ChannelRepository interface { Remove(context.Context, string, string) error // Connect adds things to the channel's list of connected things. - Connect(context.Context, string, string, ...string) error + Connect(context.Context, string, []string, []string) error // Disconnect removes thing from the channel's list of connected // things. diff --git a/things/mocks/channels.go b/things/mocks/channels.go index f82204be60..9c0c6f743b 100644 --- a/things/mocks/channels.go +++ b/things/mocks/channels.go @@ -160,27 +160,29 @@ func (crm *channelRepositoryMock) Remove(_ context.Context, owner, id string) er return nil } -func (crm *channelRepositoryMock) Connect(_ context.Context, owner, chID string, thIDs ...string) error { - ch, err := crm.RetrieveByID(context.Background(), owner, chID) - if err != nil { - return err - } - - for _, thID := range thIDs { - th, err := crm.things.RetrieveByID(context.Background(), owner, thID) +func (crm *channelRepositoryMock) Connect(_ context.Context, owner string, chIDs []string, thIDs []string) error { + for _, chID := range chIDs { + ch, err := crm.RetrieveByID(context.Background(), owner, chID) if err != nil { return err } - crm.tconns <- Connection{ - chanID: chID, - thing: th, - connected: true, - } - if _, ok := crm.cconns[thID]; !ok { - crm.cconns[thID] = make(map[string]things.Channel) + for _, thID := range thIDs { + th, err := crm.things.RetrieveByID(context.Background(), owner, thID) + if err != nil { + return err + } + + crm.tconns <- Connection{ + chanID: chID, + thing: th, + connected: true, + } + if _, ok := crm.cconns[thID]; !ok { + crm.cconns[thID] = make(map[string]things.Channel) + } + crm.cconns[thID][chID] = ch } - crm.cconns[thID][chID] = ch } return nil diff --git a/things/postgres/channels.go b/things/postgres/channels.go index 516b4ccb9e..08f1ccaab1 100644 --- a/things/postgres/channels.go +++ b/things/postgres/channels.go @@ -250,7 +250,7 @@ func (cr channelRepository) Remove(ctx context.Context, owner, id string) error return nil } -func (cr channelRepository) Connect(ctx context.Context, owner, chanID string, thIDs ...string) error { +func (cr channelRepository) Connect(ctx context.Context, owner string, chIDs []string, thIDs []string) error { tx, err := cr.db.BeginTxx(ctx, nil) if err != nil { return err @@ -259,27 +259,29 @@ func (cr channelRepository) Connect(ctx context.Context, owner, chanID string, t q := `INSERT INTO connections (channel_id, channel_owner, thing_id, thing_owner) VALUES (:channel, :owner, :thing, :owner);` - for _, thID := range thIDs { - dbco := dbConnection{ - Channel: chanID, - Thing: thID, - Owner: owner, - } + for _, chID := range chIDs { + for _, thID := range thIDs { + dbco := dbConnection{ + Channel: chID, + Thing: thID, + Owner: owner, + } - _, err := tx.NamedExecContext(ctx, q, dbco) - if err != nil { - tx.Rollback() - pqErr, ok := err.(*pq.Error) - if ok { - switch pqErr.Code.Name() { - case errFK: - return things.ErrNotFound - case errDuplicate: - return things.ErrConflict + _, err := tx.NamedExecContext(ctx, q, dbco) + if err != nil { + tx.Rollback() + pqErr, ok := err.(*pq.Error) + if ok { + switch pqErr.Code.Name() { + case errFK: + return things.ErrNotFound + case errDuplicate: + return things.ErrConflict + } } - } - return err + return err + } } } diff --git a/things/postgres/channels_test.go b/things/postgres/channels_test.go index d80da5a5ad..e2d551a914 100644 --- a/things/postgres/channels_test.go +++ b/things/postgres/channels_test.go @@ -179,7 +179,7 @@ func TestSingleChannelRetrieval(t *testing.T) { schs, _ := chanRepo.Save(context.Background(), ch) ch.ID = schs[0].ID - chanRepo.Connect(context.Background(), email, ch.ID, th.ID) + chanRepo.Connect(context.Background(), email, []string{ch.ID}, []string{th.ID}) nonexistentChanID, err := uuid.New().ID() require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) @@ -351,7 +351,7 @@ func TestMultiChannelRetrievalByThing(t *testing.T) { schs, err := chanRepo.Save(context.Background(), ch) require.Nil(t, err, fmt.Sprintf("unexpected error: %s", err)) cid := schs[0].ID - err = chanRepo.Connect(context.Background(), email, cid, tid) + err = chanRepo.Connect(context.Background(), email, []string{cid}, []string{tid}) require.Nil(t, err, fmt.Sprintf("unexpected error: %s", err)) } @@ -516,7 +516,7 @@ func TestConnect(t *testing.T) { } for _, tc := range cases { - err := chanRepo.Connect(context.Background(), tc.owner, tc.chanID, tc.thingID) + err := chanRepo.Connect(context.Background(), tc.owner, []string{tc.chanID}, []string{tc.thingID}) assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) } } @@ -548,7 +548,7 @@ func TestDisconnect(t *testing.T) { Owner: email, }) chanID := schs[0].ID - chanRepo.Connect(context.Background(), email, chanID, thingID) + chanRepo.Connect(context.Background(), email, []string{chanID}, []string{thingID}) nonexistentThingID, err := uuid.New().ID() require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) @@ -632,7 +632,7 @@ func TestHasThing(t *testing.T) { Owner: email, }) chanID := schs[0].ID - chanRepo.Connect(context.Background(), email, chanID, thingID) + chanRepo.Connect(context.Background(), email, []string{chanID}, []string{thingID}) nonexistentChanID, err := uuid.New().ID() require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) @@ -705,7 +705,7 @@ func TestHasThingByID(t *testing.T) { Owner: email, }) chanID := schs[0].ID - chanRepo.Connect(context.Background(), email, chanID, thingID) + chanRepo.Connect(context.Background(), email, []string{chanID}, []string{thingID}) nonexistentChanID, err := uuid.New().ID() require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) diff --git a/things/redis/streams.go b/things/redis/streams.go index e045a957d3..8367867e7f 100644 --- a/things/redis/streams.go +++ b/things/redis/streams.go @@ -186,22 +186,24 @@ func (es eventStore) RemoveChannel(ctx context.Context, token, id string) error return nil } -func (es eventStore) Connect(ctx context.Context, token, chID string, thIDs ...string) error { - if err := es.svc.Connect(ctx, token, chID, thIDs...); err != nil { +func (es eventStore) Connect(ctx context.Context, token string, chIDs []string, thIDs []string) error { + if err := es.svc.Connect(ctx, token, chIDs, thIDs); err != nil { return err } - for _, thID := range thIDs { - event := connectThingEvent{ - chanID: chID, - thingID: thID, + for _, chID := range chIDs { + for _, thID := range thIDs { + event := connectThingEvent{ + chanID: chID, + thingID: thID, + } + record := &redis.XAddArgs{ + Stream: streamID, + MaxLenApprox: streamLen, + Values: event.Encode(), + } + es.client.XAdd(record).Err() } - record := &redis.XAddArgs{ - Stream: streamID, - MaxLenApprox: streamLen, - Values: event.Encode(), - } - es.client.XAdd(record).Err() } return nil diff --git a/things/redis/streams_test.go b/things/redis/streams_test.go index 7e80d8ef2c..0b349e19f6 100644 --- a/things/redis/streams_test.go +++ b/things/redis/streams_test.go @@ -209,7 +209,7 @@ func TestListThingsByChannel(t *testing.T) { schs, err := svc.CreateChannels(context.Background(), token, things.Channel{Name: "a"}) require.Nil(t, err, fmt.Sprintf("unexpected error %s", err)) sch := schs[0] - err = svc.Connect(context.Background(), token, sch.ID, sth.ID) + err = svc.Connect(context.Background(), token, []string{sch.ID}, []string{sth.ID}) require.Nil(t, err, fmt.Sprintf("unexpected error %s", err)) essvc := redis.NewEventStoreMiddleware(svc, redisClient) @@ -445,7 +445,7 @@ func TestListChannelsByThing(t *testing.T) { schs, err := svc.CreateChannels(context.Background(), token, things.Channel{Name: "a"}) require.Nil(t, err, fmt.Sprintf("unexpected error %s", err)) sch := schs[0] - err = svc.Connect(context.Background(), token, sch.ID, sth.ID) + err = svc.Connect(context.Background(), token, []string{sch.ID}, []string{sth.ID}) require.Nil(t, err, fmt.Sprintf("unexpected error %s", err)) essvc := redis.NewEventStoreMiddleware(svc, redisClient) @@ -560,7 +560,7 @@ func TestConnectEvent(t *testing.T) { lastID := "0" for _, tc := range cases { - err := svc.Connect(context.Background(), tc.key, tc.chanID, tc.thingID) + err := svc.Connect(context.Background(), tc.key, []string{tc.chanID}, []string{tc.thingID}) assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) streams := redisClient.XRead(&r.XReadArgs{ @@ -591,7 +591,7 @@ func TestDisconnectEvent(t *testing.T) { schs, err := svc.CreateChannels(context.Background(), token, things.Channel{Name: "a"}) require.Nil(t, err, fmt.Sprintf("unexpected error %s", err)) sch := schs[0] - err = svc.Connect(context.Background(), token, sch.ID, sth.ID) + err = svc.Connect(context.Background(), token, []string{sch.ID}, []string{sth.ID}) require.Nil(t, err, fmt.Sprintf("unexpected error %s", err)) svc = redis.NewEventStoreMiddleware(svc, redisClient) diff --git a/things/service.go b/things/service.go index d607088235..af7669e864 100644 --- a/things/service.go +++ b/things/service.go @@ -85,7 +85,7 @@ type Service interface { RemoveChannel(context.Context, string, string) error // Connect adds things to the channel's list of connected things. - Connect(context.Context, string, string, ...string) error + Connect(context.Context, string, []string, []string) error // Disconnect removes thing from the channel's list of connected // things. @@ -283,13 +283,13 @@ func (ts *thingsService) RemoveChannel(ctx context.Context, token, id string) er return ts.channels.Remove(ctx, res.GetValue(), id) } -func (ts *thingsService) Connect(ctx context.Context, token, chID string, thIDs ...string) error { +func (ts *thingsService) Connect(ctx context.Context, token string, chIDs []string, thIDs []string) error { res, err := ts.users.Identify(ctx, &mainflux.Token{Value: token}) if err != nil { return ErrUnauthorizedAccess } - return ts.channels.Connect(ctx, res.GetValue(), chID, thIDs...) + return ts.channels.Connect(ctx, res.GetValue(), chIDs, thIDs) } func (ts *thingsService) Disconnect(ctx context.Context, token, chanID, thingID string) error { diff --git a/things/service_test.go b/things/service_test.go index d04f05e3af..ce8ed4f998 100644 --- a/things/service_test.go +++ b/things/service_test.go @@ -281,7 +281,7 @@ func TestListThingsByChannel(t *testing.T) { sths, err := svc.CreateThings(context.Background(), token, thing) require.Nil(t, err, fmt.Sprintf("unexpected error: %s", err)) sth := sths[0] - svc.Connect(context.Background(), token, sch.ID, sth.ID) + svc.Connect(context.Background(), token, []string{sch.ID}, []string{sth.ID}) } // Wait for things and channels to connect @@ -622,7 +622,7 @@ func TestListChannelsByThing(t *testing.T) { schs, err := svc.CreateChannels(context.Background(), token, channel) require.Nil(t, err, fmt.Sprintf("unexpected error: %s", err)) sch := schs[0] - svc.Connect(context.Background(), token, sch.ID, sth.ID) + svc.Connect(context.Background(), token, []string{sch.ID}, []string{sth.ID}) } // Wait for things and channels to connect. @@ -791,7 +791,7 @@ func TestConnect(t *testing.T) { } for _, tc := range cases { - err := svc.Connect(context.Background(), tc.token, tc.chanID, tc.thingID) + err := svc.Connect(context.Background(), tc.token, []string{tc.chanID}, []string{tc.thingID}) assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) } } @@ -803,7 +803,7 @@ func TestDisconnect(t *testing.T) { sth := sths[0] schs, _ := svc.CreateChannels(context.Background(), token, channel) sch := schs[0] - svc.Connect(context.Background(), token, sch.ID, sth.ID) + svc.Connect(context.Background(), token, []string{sch.ID}, []string{sth.ID}) cases := []struct { desc string @@ -863,7 +863,7 @@ func TestCanAccessByKey(t *testing.T) { sth := sths[0] schs, _ := svc.CreateChannels(context.Background(), token, channel) sch := schs[0] - svc.Connect(context.Background(), token, sch.ID, sth.ID) + svc.Connect(context.Background(), token, []string{sch.ID}, []string{sth.ID}) cases := map[string]struct { token string @@ -900,7 +900,7 @@ func TestCanAccessByID(t *testing.T) { sth := sths[0] schs, _ := svc.CreateChannels(context.Background(), token, channel) sch := schs[0] - svc.Connect(context.Background(), token, sch.ID, sth.ID) + svc.Connect(context.Background(), token, []string{sch.ID}, []string{sth.ID}) cases := map[string]struct { thingID string diff --git a/things/tracing/channels.go b/things/tracing/channels.go index 220ac5f421..7b9effaf6d 100644 --- a/things/tracing/channels.go +++ b/things/tracing/channels.go @@ -91,12 +91,12 @@ func (crm channelRepositoryMiddleware) Remove(ctx context.Context, owner, id str return crm.repo.Remove(ctx, owner, id) } -func (crm channelRepositoryMiddleware) Connect(ctx context.Context, owner, chanID string, thingID ...string) error { +func (crm channelRepositoryMiddleware) Connect(ctx context.Context, owner string, chIDs []string, thIDs []string) error { span := createSpan(ctx, crm.tracer, connectOp) defer span.Finish() ctx = opentracing.ContextWithSpan(ctx, span) - return crm.repo.Connect(ctx, owner, chanID, thingID...) + return crm.repo.Connect(ctx, owner, chIDs, thIDs) } func (crm channelRepositoryMiddleware) Disconnect(ctx context.Context, owner, chanID, thingID string) error { From dda94e512b68a7b1826acf96a345dad19b831ba1 Mon Sep 17 00:00:00 2001 From: Nick Neisen Date: Thu, 14 Nov 2019 09:00:05 -0700 Subject: [PATCH 04/10] Update bulk connect endpoint Signed-off-by: Nick Neisen --- docs/provisioning.md | 2 +- things/api/things/http/endpoint.go | 4 +- things/api/things/http/endpoint_test.go | 118 +++++++++++++----------- things/api/things/http/requests.go | 13 +-- 4 files changed, 73 insertions(+), 64 deletions(-) diff --git a/docs/provisioning.md b/docs/provisioning.md index 958aaeb35d..656e83a562 100644 --- a/docs/provisioning.md +++ b/docs/provisioning.md @@ -256,7 +256,7 @@ curl -s -S -i --cacert docker/ssl/certs/mainflux-server.crt --insecure -X PUT -H To connect multiple things to a channel, you can send the following request: ```bash -curl -s -S -i --cacert docker/ssl/certs/mainflux-server.crt --insecure -X POST -H "Content-Type: application/json" -H "Authorization: " https://localhost/connect -d '{"chanID":"","thingIDs":["", ""]}' +curl -s -S -i --cacert docker/ssl/certs/mainflux-server.crt --insecure -X POST -H "Content-Type: application/json" -H "Authorization: " https://localhost/connect -d '{"channel_ids":["", ""],"thingIDs":["", ""]}' ``` You can observe which things are connected to specific channel: diff --git a/things/api/things/http/endpoint.go b/things/api/things/http/endpoint.go index 8e320f13d0..c5b430a754 100644 --- a/things/api/things/http/endpoint.go +++ b/things/api/things/http/endpoint.go @@ -445,7 +445,7 @@ func connectEndpoint(svc things.Service) endpoint.Endpoint { return nil, err } - if err := svc.Connect(ctx, cr.token, cr.chanID, cr.thingID); err != nil { + if err := svc.Connect(ctx, cr.token, []string{cr.chanID}, []string{cr.thingID}); err != nil { return nil, err } @@ -461,7 +461,7 @@ func createConnectionsEndpoint(svc things.Service) endpoint.Endpoint { return nil, err } - if err := svc.Connect(ctx, cr.token, cr.ChanID, cr.ThingIDs...); err != nil { + if err := svc.Connect(ctx, cr.token, cr.ChannelIDs, cr.ThingIDs); err != nil { return nil, err } diff --git a/things/api/things/http/endpoint_test.go b/things/api/things/http/endpoint_test.go index 15865e3361..6af586419c 100644 --- a/things/api/things/http/endpoint_test.go +++ b/things/api/things/http/endpoint_test.go @@ -790,7 +790,7 @@ func TestListThingsByChannel(t *testing.T) { sths, err := svc.CreateThings(context.Background(), token, thing) require.Nil(t, err, fmt.Sprintf("unexpected error: %s", err)) sth := sths[0] - err = svc.Connect(context.Background(), token, sch.ID, sth.ID) + err = svc.Connect(context.Background(), token, []string{sch.ID}, []string{sth.ID}) require.Nil(t, err, fmt.Sprintf("unexpected error: %s", err)) thres := thingRes{ @@ -1313,7 +1313,7 @@ func TestViewChannel(t *testing.T) { sths, _ := svc.CreateThings(context.Background(), token, thing) sth := sths[0] - svc.Connect(context.Background(), token, sch.ID, sth.ID) + svc.Connect(context.Background(), token, []string{sch.ID}, []string{sth.ID}) chres := channelRes{ ID: sch.ID, @@ -1396,7 +1396,7 @@ func TestListChannels(t *testing.T) { sths, err := svc.CreateThings(context.Background(), token, thing) require.Nil(t, err, fmt.Sprintf("unexpected error: %s", err)) sth := sths[0] - svc.Connect(context.Background(), token, sch.ID, sth.ID) + svc.Connect(context.Background(), token, []string{sch.ID}, []string{sth.ID}) chres := channelRes{ ID: sch.ID, @@ -1551,7 +1551,7 @@ func TestListChannelsByThing(t *testing.T) { schs, err := svc.CreateChannels(context.Background(), token, channel) require.Nil(t, err, fmt.Sprintf("unexpected error: %s", err)) sch := schs[0] - err = svc.Connect(context.Background(), token, sch.ID, sth.ID) + err = svc.Connect(context.Background(), token, []string{sch.ID}, []string{sth.ID}) require.Nil(t, err, fmt.Sprintf("unexpected error: %s", err)) chres := channelRes{ @@ -1857,85 +1857,93 @@ func TestCreateConnections(t *testing.T) { } schs, _ := svc.CreateChannels(context.Background(), token, channel) - ach := schs[0] + achs := []string{} + for _, ch := range schs { + achs = append(achs, ch.ID) + } schs, _ = svc.CreateChannels(context.Background(), otherToken, channel) - bch := schs[0] + bchs := []string{} + for _, ch := range schs { + bchs = append(bchs, ch.ID) + } cases := []struct { - desc string - chanID string - thingIDs []string - auth string - status int + desc string + channelIDs []string + thingIDs []string + auth string + status int }{ { - desc: "connect existing things to existing channel", - chanID: ach.ID, - thingIDs: ths, - auth: token, - status: http.StatusOK, + desc: "connect existing things to existing channels", + channelIDs: achs, + thingIDs: ths, + auth: token, + status: http.StatusOK, }, { - desc: "connect existing things to non-existent channel", - chanID: strconv.FormatUint(wrongID, 10), - thingIDs: ths, - auth: token, - status: http.StatusNotFound, + desc: "connect existing things to non-existent channels", + channelIDs: []string{strconv.FormatUint(wrongID, 10)}, + thingIDs: ths, + auth: token, + status: http.StatusNotFound, }, { - desc: "connect non-existing things to existing channel", - chanID: ach.ID, - thingIDs: []string{strconv.FormatUint(wrongID, 10)}, - auth: token, - status: http.StatusNotFound, + desc: "connect non-existing things to existing channels", + channelIDs: achs, + thingIDs: []string{strconv.FormatUint(wrongID, 10)}, + auth: token, + status: http.StatusNotFound, }, { - desc: "connect existing things to channel with invalid id", - chanID: "invalid", - thingIDs: ths, - auth: token, - status: http.StatusNotFound, + desc: "connect existing things to channel with invalid id", + channelIDs: []string{"invalid"}, + thingIDs: ths, + auth: token, + status: http.StatusNotFound, }, { - desc: "connect things with invalid id to existing channel", - chanID: ach.ID, - thingIDs: []string{"invalid"}, - auth: token, - status: http.StatusNotFound, + desc: "connect things with invalid id to existing channels", + channelIDs: achs, + thingIDs: []string{"invalid"}, + auth: token, + status: http.StatusNotFound, }, { - desc: "connect existing things to existing channel with invalid token", - chanID: ach.ID, - thingIDs: ths, - auth: wrongValue, - status: http.StatusForbidden, + desc: "connect existing things to existing channels with invalid token", + channelIDs: achs, + thingIDs: ths, + auth: wrongValue, + status: http.StatusForbidden, }, { - desc: "connect existing things to existing channel with empty token", - chanID: ach.ID, - thingIDs: ths, - auth: "", - status: http.StatusForbidden, + desc: "connect existing things to existing channels with empty token", + channelIDs: achs, + thingIDs: ths, + auth: "", + status: http.StatusForbidden, }, { - desc: "connect things from owner to channel of other user", - chanID: bch.ID, - thingIDs: ths, - auth: token, - status: http.StatusNotFound, + desc: "connect things from owner to channels of other user", + channelIDs: bchs, + thingIDs: ths, + auth: token, + status: http.StatusNotFound, }, } for _, tc := range cases { data := struct { - ChanID string `json:"chanID"` - ThingIDs []string `json:"thingIDs"` + ChannelIDs []string `json:"channel_ids"` + ThingIDs []string `json:"thing_ids"` }{ - tc.chanID, + tc.channelIDs, tc.thingIDs, } body := toJSON(data) + fmt.Println(body) + req := testRequest{ client: ts.Client(), method: http.MethodPost, @@ -1965,7 +1973,7 @@ func TestDisconnnect(t *testing.T) { ath := sths[0] schs, _ := svc.CreateChannels(context.Background(), token, channel) ach := schs[0] - svc.Connect(context.Background(), token, ach.ID, ath.ID) + svc.Connect(context.Background(), token, []string{ach.ID}, []string{ath.ID}) schs, _ = svc.CreateChannels(context.Background(), otherToken, channel) bch := schs[0] diff --git a/things/api/things/http/requests.go b/things/api/things/http/requests.go index ac8c724d32..39e0c8f229 100644 --- a/things/api/things/http/requests.go +++ b/things/api/things/http/requests.go @@ -242,9 +242,9 @@ func (req connectionReq) validate() error { } type createConnectionsReq struct { - token string - ChanID string `json:"chanID,omitempty"` - ThingIDs []string `json:"thingIDs,omitempty"` + token string + ChannelIDs []string `json:"channel_ids,omitempty"` + ThingIDs []string `json:"thing_ids,omitempty"` } func (req createConnectionsReq) validate() error { @@ -252,10 +252,11 @@ func (req createConnectionsReq) validate() error { return things.ErrUnauthorizedAccess } - if req.ChanID == "" { - return things.ErrMalformedEntity + for _, chID := range req.ChannelIDs { + if chID == "" { + return things.ErrMalformedEntity + } } - for _, thingID := range req.ThingIDs { if thingID == "" { return things.ErrMalformedEntity From 1a4d79351cc79f2efe028524c8da1eccbd89bde4 Mon Sep 17 00:00:00 2001 From: Nick Neisen Date: Thu, 14 Nov 2019 10:23:17 -0700 Subject: [PATCH 05/10] Fix missed test Signed-off-by: Nick Neisen --- things/postgres/things_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/things/postgres/things_test.go b/things/postgres/things_test.go index a352d22f09..cbc4b0b984 100644 --- a/things/postgres/things_test.go +++ b/things/postgres/things_test.go @@ -542,7 +542,7 @@ func TestMultiThingRetrievalByChannel(t *testing.T) { sths, err := thingRepo.Save(context.Background(), th) require.Nil(t, err, fmt.Sprintf("unexpected error: %s", err)) tid := sths[0].ID - err = channelRepo.Connect(context.Background(), email, cid, tid) + err = channelRepo.Connect(context.Background(), email, []string{cid}, []string{tid}) require.Nil(t, err, fmt.Sprintf("unexpected error: %s", err)) } From 85e8e0252360f68043ff901e9ecb36ff6ab4da9e Mon Sep 17 00:00:00 2001 From: Nick Neisen Date: Thu, 14 Nov 2019 10:49:11 -0700 Subject: [PATCH 06/10] Improve test code coverage Signed-off-by: Nick Neisen --- things/api/things/http/endpoint_test.go | 137 ++++++++++++++++-------- 1 file changed, 90 insertions(+), 47 deletions(-) diff --git a/things/api/things/http/endpoint_test.go b/things/api/things/http/endpoint_test.go index 6af586419c..7bad42f732 100644 --- a/things/api/things/http/endpoint_test.go +++ b/things/api/things/http/endpoint_test.go @@ -1868,67 +1868,108 @@ func TestCreateConnections(t *testing.T) { } cases := []struct { - desc string - channelIDs []string - thingIDs []string - auth string - status int + desc string + channelIDs []string + thingIDs []string + auth string + contentType string + body string + status int }{ { - desc: "connect existing things to existing channels", - channelIDs: achs, - thingIDs: ths, - auth: token, - status: http.StatusOK, + desc: "connect existing things to existing channels", + channelIDs: achs, + thingIDs: ths, + auth: token, + contentType: contentType, + status: http.StatusOK, + }, + { + desc: "connect existing things to non-existent channels", + channelIDs: []string{strconv.FormatUint(wrongID, 10)}, + thingIDs: ths, + auth: token, + contentType: contentType, + status: http.StatusNotFound, + }, + { + desc: "connect non-existing things to existing channels", + channelIDs: achs, + thingIDs: []string{strconv.FormatUint(wrongID, 10)}, + auth: token, + contentType: contentType, + status: http.StatusNotFound, + }, + { + desc: "connect existing things to channel with invalid id", + channelIDs: []string{"invalid"}, + thingIDs: ths, + auth: token, + contentType: contentType, + status: http.StatusNotFound, + }, + { + desc: "connect things with invalid id to existing channels", + channelIDs: achs, + thingIDs: []string{"invalid"}, + auth: token, + contentType: contentType, + status: http.StatusNotFound, }, { - desc: "connect existing things to non-existent channels", - channelIDs: []string{strconv.FormatUint(wrongID, 10)}, - thingIDs: ths, - auth: token, - status: http.StatusNotFound, + desc: "connect existing things to empty channel ids", + channelIDs: []string{""}, + thingIDs: ths, + auth: token, + contentType: contentType, + status: http.StatusBadRequest, }, { - desc: "connect non-existing things to existing channels", - channelIDs: achs, - thingIDs: []string{strconv.FormatUint(wrongID, 10)}, - auth: token, - status: http.StatusNotFound, + desc: "connect empty things id to existing channels", + channelIDs: achs, + thingIDs: []string{""}, + auth: token, + contentType: contentType, + status: http.StatusBadRequest, }, { - desc: "connect existing things to channel with invalid id", - channelIDs: []string{"invalid"}, - thingIDs: ths, - auth: token, - status: http.StatusNotFound, + desc: "connect existing things to existing channels with invalid token", + channelIDs: achs, + thingIDs: ths, + auth: wrongValue, + contentType: contentType, + status: http.StatusForbidden, }, { - desc: "connect things with invalid id to existing channels", - channelIDs: achs, - thingIDs: []string{"invalid"}, - auth: token, - status: http.StatusNotFound, + desc: "connect existing things to existing channels with empty token", + channelIDs: achs, + thingIDs: ths, + auth: "", + contentType: contentType, + status: http.StatusForbidden, }, { - desc: "connect existing things to existing channels with invalid token", - channelIDs: achs, - thingIDs: ths, - auth: wrongValue, - status: http.StatusForbidden, + desc: "connect things from owner to channels of other user", + channelIDs: bchs, + thingIDs: ths, + auth: token, + contentType: contentType, + status: http.StatusNotFound, }, { - desc: "connect existing things to existing channels with empty token", - channelIDs: achs, - thingIDs: ths, - auth: "", - status: http.StatusForbidden, + desc: "invalid content type", + channelIDs: bchs, + thingIDs: ths, + auth: token, + contentType: "invalid", + status: http.StatusUnsupportedMediaType, }, { - desc: "connect things from owner to channels of other user", - channelIDs: bchs, - thingIDs: ths, - auth: token, - status: http.StatusNotFound, + desc: "invalid JSON", + auth: token, + contentType: contentType, + status: http.StatusBadRequest, + body: "{", }, } @@ -1942,13 +1983,15 @@ func TestCreateConnections(t *testing.T) { } body := toJSON(data) - fmt.Println(body) + if tc.body != "" { + body = tc.body + } req := testRequest{ client: ts.Client(), method: http.MethodPost, url: fmt.Sprintf("%s/connect", ts.URL), - contentType: contentType, + contentType: tc.contentType, token: tc.auth, body: strings.NewReader(body), } From 7bdf41e53eb4dcfcd2f1d02be6fe440f917b070b Mon Sep 17 00:00:00 2001 From: Nick Neisen Date: Thu, 14 Nov 2019 11:11:32 -0700 Subject: [PATCH 07/10] Change methods to use shared data type Signed-off-by: Nick Neisen --- bootstrap/mocks/things.go | 2 +- things/api/logging.go | 2 +- things/api/metrics.go | 2 +- things/mocks/channels.go | 2 +- things/postgres/channels.go | 2 +- things/redis/streams.go | 2 +- things/service.go | 2 +- things/tracing/channels.go | 2 +- 8 files changed, 8 insertions(+), 8 deletions(-) diff --git a/bootstrap/mocks/things.go b/bootstrap/mocks/things.go index 94ac19153c..cdd198b06c 100644 --- a/bootstrap/mocks/things.go +++ b/bootstrap/mocks/things.go @@ -70,7 +70,7 @@ func (svc *mainfluxThings) ViewThing(_ context.Context, owner, id string) (thing return things.Thing{}, things.ErrNotFound } -func (svc *mainfluxThings) Connect(_ context.Context, owner string, chIDs []string, thIDs []string) error { +func (svc *mainfluxThings) Connect(_ context.Context, owner string, chIDs, thIDs []string) error { svc.mu.Lock() defer svc.mu.Unlock() diff --git a/things/api/logging.go b/things/api/logging.go index 51f1c46f9b..0002d52e65 100644 --- a/things/api/logging.go +++ b/things/api/logging.go @@ -201,7 +201,7 @@ func (lm *loggingMiddleware) RemoveChannel(ctx context.Context, token, id string return lm.svc.RemoveChannel(ctx, token, id) } -func (lm *loggingMiddleware) Connect(ctx context.Context, token string, chIDs []string, thIDs []string) (err error) { +func (lm *loggingMiddleware) Connect(ctx context.Context, token string, chIDs, thIDs []string) (err error) { defer func(begin time.Time) { message := fmt.Sprintf("Method connect for token %s, channel %s and things %s took %s to complete", token, chIDs, thIDs, time.Since(begin)) if err != nil { diff --git a/things/api/metrics.go b/things/api/metrics.go index dc7eb0c55c..426b13dcf4 100644 --- a/things/api/metrics.go +++ b/things/api/metrics.go @@ -148,7 +148,7 @@ func (ms *metricsMiddleware) RemoveChannel(ctx context.Context, token, id string return ms.svc.RemoveChannel(ctx, token, id) } -func (ms *metricsMiddleware) Connect(ctx context.Context, token string, chIDs []string, thIDs []string) error { +func (ms *metricsMiddleware) Connect(ctx context.Context, token string, chIDs, thIDs []string) error { defer func(begin time.Time) { ms.counter.With("method", "connect").Add(1) ms.latency.With("method", "connect").Observe(time.Since(begin).Seconds()) diff --git a/things/mocks/channels.go b/things/mocks/channels.go index 9c0c6f743b..79b9daf499 100644 --- a/things/mocks/channels.go +++ b/things/mocks/channels.go @@ -160,7 +160,7 @@ func (crm *channelRepositoryMock) Remove(_ context.Context, owner, id string) er return nil } -func (crm *channelRepositoryMock) Connect(_ context.Context, owner string, chIDs []string, thIDs []string) error { +func (crm *channelRepositoryMock) Connect(_ context.Context, owner string, chIDs, thIDs []string) error { for _, chID := range chIDs { ch, err := crm.RetrieveByID(context.Background(), owner, chID) if err != nil { diff --git a/things/postgres/channels.go b/things/postgres/channels.go index 46dc507f6a..34001c10a3 100644 --- a/things/postgres/channels.go +++ b/things/postgres/channels.go @@ -244,7 +244,7 @@ func (cr channelRepository) Remove(ctx context.Context, owner, id string) error return nil } -func (cr channelRepository) Connect(ctx context.Context, owner string, chIDs []string, thIDs []string) error { +func (cr channelRepository) Connect(ctx context.Context, owner string, chIDs, thIDs []string) error { tx, err := cr.db.BeginTxx(ctx, nil) if err != nil { return err diff --git a/things/redis/streams.go b/things/redis/streams.go index 8367867e7f..ab71ef5beb 100644 --- a/things/redis/streams.go +++ b/things/redis/streams.go @@ -186,7 +186,7 @@ func (es eventStore) RemoveChannel(ctx context.Context, token, id string) error return nil } -func (es eventStore) Connect(ctx context.Context, token string, chIDs []string, thIDs []string) error { +func (es eventStore) Connect(ctx context.Context, token string, chIDs, thIDs []string) error { if err := es.svc.Connect(ctx, token, chIDs, thIDs); err != nil { return err } diff --git a/things/service.go b/things/service.go index af7669e864..d6328e3b38 100644 --- a/things/service.go +++ b/things/service.go @@ -283,7 +283,7 @@ func (ts *thingsService) RemoveChannel(ctx context.Context, token, id string) er return ts.channels.Remove(ctx, res.GetValue(), id) } -func (ts *thingsService) Connect(ctx context.Context, token string, chIDs []string, thIDs []string) error { +func (ts *thingsService) Connect(ctx context.Context, token string, chIDs, thIDs []string) error { res, err := ts.users.Identify(ctx, &mainflux.Token{Value: token}) if err != nil { return ErrUnauthorizedAccess diff --git a/things/tracing/channels.go b/things/tracing/channels.go index 7b9effaf6d..f8f8f74783 100644 --- a/things/tracing/channels.go +++ b/things/tracing/channels.go @@ -91,7 +91,7 @@ func (crm channelRepositoryMiddleware) Remove(ctx context.Context, owner, id str return crm.repo.Remove(ctx, owner, id) } -func (crm channelRepositoryMiddleware) Connect(ctx context.Context, owner string, chIDs []string, thIDs []string) error { +func (crm channelRepositoryMiddleware) Connect(ctx context.Context, owner string, chIDs, thIDs []string) error { span := createSpan(ctx, crm.tracer, connectOp) defer span.Finish() ctx = opentracing.ContextWithSpan(ctx, span) From 3d3e2ae1e87ed9b86a36de4d2b5d88fffc36e007 Mon Sep 17 00:00:00 2001 From: Nick Neisen Date: Thu, 14 Nov 2019 12:54:54 -0700 Subject: [PATCH 08/10] Fix provisioning example Signed-off-by: Nick Neisen --- docs/provisioning.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/provisioning.md b/docs/provisioning.md index 656e83a562..f4c23b40df 100644 --- a/docs/provisioning.md +++ b/docs/provisioning.md @@ -256,7 +256,7 @@ curl -s -S -i --cacert docker/ssl/certs/mainflux-server.crt --insecure -X PUT -H To connect multiple things to a channel, you can send the following request: ```bash -curl -s -S -i --cacert docker/ssl/certs/mainflux-server.crt --insecure -X POST -H "Content-Type: application/json" -H "Authorization: " https://localhost/connect -d '{"channel_ids":["", ""],"thingIDs":["", ""]}' +curl -s -S -i --cacert docker/ssl/certs/mainflux-server.crt --insecure -X POST -H "Content-Type: application/json" -H "Authorization: " https://localhost/connect -d '{"channel_ids":["", ""],"thing_ids":["", ""]}' ``` You can observe which things are connected to specific channel: From 5b5dd43b3e721872c69ff1c57f9976dcdf27b90f Mon Sep 17 00:00:00 2001 From: nwneisen Date: Fri, 15 Nov 2019 10:27:02 -0700 Subject: [PATCH 09/10] Change depreciation messages to specify v1.0.0 Signed-off-by: nwneisen --- things/api/things/http/responses.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/things/api/things/http/responses.go b/things/api/things/http/responses.go index b7b8a6f2eb..0b9f613895 100644 --- a/things/api/things/http/responses.go +++ b/things/api/things/http/responses.go @@ -56,7 +56,7 @@ func (res thingRes) Headers() map[string]string { if res.created { return map[string]string{ "Location": fmt.Sprintf("/things/%s", res.ID), - "Warning-Deprecated": "This endpoint will be depreciated in 0.11.0. It will be replaced with the bulk endpoint currently found at /things/bulk.", + "Warning-Deprecated": "This endpoint will be depreciated in v1.0.0. It will be replaced with the bulk endpoint currently found at /things/bulk.", } } @@ -144,7 +144,7 @@ func (res channelRes) Headers() map[string]string { if res.created { return map[string]string{ "Location": fmt.Sprintf("/channels/%s", res.ID), - "Warning-Deprecated": "This endpoint will be depreciated in 0.11.0. It will be replaced with the bulk endpoint currently found at /channels/bulk.", + "Warning-Deprecated": "This endpoint will be depreciated in v1.0.0. It will be replaced with the bulk endpoint currently found at /channels/bulk.", } } @@ -221,7 +221,7 @@ func (res connectionRes) Code() int { func (res connectionRes) Headers() map[string]string { return map[string]string{ - "Warning-Deprecated": "This endpoint will be depreciated in 0.11.0. It will be replaced with the bulk endpoint found at /connect.", + "Warning-Deprecated": "This endpoint will be depreciated in v1.0.0. It will be replaced with the bulk endpoint found at /connect.", } } From 4ac942d57261625485cbdd110eb142ab7536a3ec Mon Sep 17 00:00:00 2001 From: nwneisen Date: Fri, 15 Nov 2019 10:33:46 -0700 Subject: [PATCH 10/10] Switch singluar log to plural Signed-off-by: nwneisen --- things/api/logging.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/things/api/logging.go b/things/api/logging.go index 0002d52e65..71e2ed634f 100644 --- a/things/api/logging.go +++ b/things/api/logging.go @@ -203,7 +203,7 @@ func (lm *loggingMiddleware) RemoveChannel(ctx context.Context, token, id string func (lm *loggingMiddleware) Connect(ctx context.Context, token string, chIDs, thIDs []string) (err error) { defer func(begin time.Time) { - message := fmt.Sprintf("Method connect for token %s, channel %s and things %s took %s to complete", token, chIDs, thIDs, time.Since(begin)) + message := fmt.Sprintf("Method connect for token %s, channels %s and things %s took %s to complete", token, chIDs, thIDs, time.Since(begin)) if err != nil { lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err)) return