Skip to content

Commit

Permalink
MF-898 - Add bulk connect to CLI and SDK (#956)
Browse files Browse the repository at this point in the history
* Add bulk connect to CLI and SDK

Signed-off-by: Nick Neisen <nwneisen@gmail.com>

* Add test for bulk connect

Signed-off-by: Nick Neisen <nwneisen@gmail.com>

* Update docs

Signed-off-by: Nick Neisen <nwneisen@gmail.com>

* Add file example

Signed-off-by: Nick Neisen <nwneisen@gmail.com>

* Add JSON example to CLI docs

Signed-off-by: Nick Neisen <nwneisen@gmail.com>

* Change some value datatypes back

Signed-off-by: Nick Neisen <nwneisen@gmail.com>

* Use constants for file extensions

Signed-off-by: Nick Neisen <nwneisen@gmail.com>
  • Loading branch information
nwneisen authored and dborovcanin committed Nov 21, 2019
1 parent 0ab6277 commit 76908d7
Show file tree
Hide file tree
Showing 7 changed files with 257 additions and 4 deletions.
71 changes: 69 additions & 2 deletions cli/provision.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,29 @@ var cmdProvision = []cobra.Command{
logJSON(channels)
},
},
cobra.Command{
Use: "connect",
Short: "connect <connections_file> <user_token>",
Long: `Bulk connect things to channels`,
Run: func(cmd *cobra.Command, args []string) {
if len(args) != 2 {
logUsage(cmd.Short)
return
}

connIDs, err := connectionsFromFile(args[0])
if err != nil {
logError(err)
return
}

err = sdk.Connect(connIDs, args[1])
if err != nil {
logError(err)
return
}
},
},
cobra.Command{
Use: "test",
Short: "test",
Expand Down Expand Up @@ -242,7 +265,7 @@ func channelsFromFile(path string) ([]mfxsdk.Channel, error) {

channels := []mfxsdk.Channel{}
switch filepath.Ext(path) {
case ".csv":
case csvExt:
reader := csv.NewReader(file)

for {
Expand All @@ -264,7 +287,7 @@ func channelsFromFile(path string) ([]mfxsdk.Channel, error) {

channels = append(channels, channel)
}
case ".json":
case jsonExt:
err := json.NewDecoder(file).Decode(&channels)
if err != nil {
return []mfxsdk.Channel{}, err
Expand All @@ -275,3 +298,47 @@ func channelsFromFile(path string) ([]mfxsdk.Channel, error) {

return channels, nil
}

func connectionsFromFile(path string) (mfxsdk.ConnectionIDs, error) {
if _, err := os.Stat(path); os.IsNotExist(err) {
return mfxsdk.ConnectionIDs{}, err
}

file, err := os.OpenFile(path, os.O_RDONLY, os.ModePerm)
if err != nil {
return mfxsdk.ConnectionIDs{}, err
}
defer file.Close()

connections := mfxsdk.ConnectionIDs{}
switch filepath.Ext(path) {
case csvExt:
reader := csv.NewReader(file)

for {
l, err := reader.Read()
if err == io.EOF {
break
}
if err != nil {
return mfxsdk.ConnectionIDs{}, err
}

if len(l) < 1 {
return mfxsdk.ConnectionIDs{}, errors.New("empty line found in file")
}

connections.ThingIDs = append(connections.ThingIDs, l[0])
connections.ChannelIDs = append(connections.ChannelIDs, l[1])
}
case jsonExt:
err := json.NewDecoder(file).Decode(&connections)
if err != nil {
return mfxsdk.ConnectionIDs{}, err
}
default:
return mfxsdk.ConnectionIDs{}, err
}

return connections, nil
}
7 changes: 6 additions & 1 deletion cli/things.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,12 @@ var cmdThings = []cobra.Command{
return
}

if err := sdk.ConnectThing(args[0], args[1], args[2]); err != nil {
connIDs := mfxsdk.ConnectionIDs{
[]string{args[0]},
[]string{args[1]},
}

if err := sdk.Connect(connIDs, args[2]); err != nil {
logError(err)
return
}
Expand Down
2 changes: 1 addition & 1 deletion docker/nginx/nginx-key.conf
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ http {
}

# Proxy pass to things service
location ~ ^/(things|channels) {
location ~ ^/(things|channels|connect) {
include snippets/proxy-headers.conf;
add_header Access-Control-Expose-Headers Location;
proxy_pass http://things:${MF_THINGS_HTTP_PORT};
Expand Down
33 changes: 33 additions & 0 deletions docs/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,39 @@ mainflux-cli channels get <channel_id> <user_auth_token>
mainflux-cli things connect <thing_id> <channel_id> <user_auth_token>
```

#### Bulk Connect Things to Channels

```bash
mainflux-cli provision connect <file> <user_auth_token>
```

* `file` - A CSV or JSON file containing thing and channel ids
* `user_auth_token` - A valid user auth token for the current system

An example CSV file might be

```csv
<thing_id>,<channel_id>
<thing_id>,<channel_id>
```

in which the first column is thing IDs and the second column is channel IDs. A connection will be created for each thing to each channel. This example would result in 4 connections being created.

A comparable JSON file would be

```json
{
"thing_ids": [
"<thing_id>",
"<thing_id>"
],
"channel_ids": [
"<channel_id>",
"<channel_id>"
]
}
```

#### Disconnect Thing from Channel
```
mainflux-cli things disconnect <thing_id> <channel_id> <user_auth_token>
Expand Down
9 changes: 9 additions & 0 deletions sdk/go/sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,12 @@ type MessagesPage struct {
Messages []mainflux.Message `json:"messages,omitempty"`
}

// ConnectionIDs contains ID lists of things and channels to be connected
type ConnectionIDs struct {
ChannelIDs []string `json:"channel_ids"`
ThingIDs []string `json:"thing_ids"`
}

// SDK contains Mainflux API.
type SDK interface {
// CreateUser registers mainflux user.
Expand Down Expand Up @@ -149,6 +155,9 @@ type SDK interface {
// ConnectThing connects thing to specified channel by id.
ConnectThing(thingID, chanID, token string) error

// Connect bulk connects things to channels specified by id.
Connect(conns ConnectionIDs, token string) error

// DisconnectThing disconnect thing from specified channel by id.
DisconnectThing(thingID, chanID, token string) error

Expand Down
32 changes: 32 additions & 0 deletions sdk/go/things.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
)

const thingsEndpoint = "things"
const connectEndpoint = "connect"

func (sdk mfSDK) CreateThing(thing Thing, token string) (string, error) {
data, err := json.Marshal(thing)
Expand Down Expand Up @@ -310,6 +311,37 @@ func (sdk mfSDK) ConnectThing(thingID, chanID, token string) error {
return nil
}

func (sdk mfSDK) Connect(connIDs ConnectionIDs, token string) error {
data, err := json.Marshal(connIDs)
if err != nil {
return ErrInvalidArgs
}

url := createURL(sdk.baseURL, sdk.thingsPrefix, connectEndpoint)
req, err := http.NewRequest(http.MethodPost, url, bytes.NewReader(data))
if err != nil {
return err
}

resp, err := sdk.sendRequest(req, token, string(CTJSON))
if err != nil {
return err
}

if resp.StatusCode != http.StatusOK {
switch resp.StatusCode {
case http.StatusForbidden:
return ErrUnauthorized
case http.StatusNotFound:
return ErrNotFound
default:
return ErrFailedConnection
}
}

return nil
}

func (sdk mfSDK) DisconnectThing(thingID, chanID, token string) error {
endpoint := fmt.Sprintf("%s/%s/%s/%s", channelsEndpoint, chanID, thingsEndpoint, thingID)
url := createURL(sdk.baseURL, sdk.thingsPrefix, endpoint)
Expand Down
107 changes: 107 additions & 0 deletions sdk/go/things_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ const (
token = "token"
otherToken = "other_token"
wrongValue = "wrong_value"
badID = "999"
emptyValue = ""

keyPrefix = "123e4567-e89b-12d3-a456-"
)
Expand Down Expand Up @@ -694,6 +696,111 @@ func TestConnectThing(t *testing.T) {
}
}

func TestConnect(t *testing.T) {
svc := newThingsService(map[string]string{
token: email,
otherToken: otherEmail,
})

ts := newThingsServer(svc)
defer ts.Close()
sdkConf := sdk.Config{
BaseURL: ts.URL,
UsersPrefix: "",
ThingsPrefix: "",
HTTPAdapterPrefix: "",
MsgContentType: contentType,
TLSVerification: false,
}

mainfluxSDK := sdk.NewSDK(sdkConf)
thingID, err := mainfluxSDK.CreateThing(thing, token)
require.Nil(t, err, fmt.Sprintf("unexpected error: %s", err))

chanID1, err := mainfluxSDK.CreateChannel(channel, token)
require.Nil(t, err, fmt.Sprintf("unexpected error: %s", err))

chanID2, err := mainfluxSDK.CreateChannel(channel, otherToken)
require.Nil(t, err, fmt.Sprintf("unexpected error: %s", err))

cases := []struct {
desc string
thingID string
chanID string
token string
err error
}{
{
desc: "connect existing things to existing channels",
thingID: thingID,
chanID: chanID1,
token: token,
err: nil,
},

{
desc: "connect existing things to non-existing channels",
thingID: thingID,
chanID: badID,
token: token,
err: sdk.ErrNotFound,
},
{
desc: "connect non-existing things to existing channels",
thingID: badID,
chanID: chanID1,
token: token,
err: sdk.ErrNotFound,
},
{
desc: "connect existing things to channels with invalid ID",
thingID: thingID,
chanID: emptyValue,
token: token,
err: sdk.ErrFailedConnection,
},
{
desc: "connect things with invalid ID to existing channels",
thingID: emptyValue,
chanID: chanID1,
token: token,
err: sdk.ErrFailedConnection,
},

{
desc: "connect existing things to existing channels with invalid token",
thingID: thingID,
chanID: chanID1,
token: wrongValue,
err: sdk.ErrUnauthorized,
},
{
desc: "connect existing things to existing channels with empty token",
thingID: thingID,
chanID: chanID1,
token: emptyValue,
err: sdk.ErrUnauthorized,
},
{
desc: "connect things from owner to channels of other user",
thingID: thingID,
chanID: chanID2,
token: token,
err: sdk.ErrNotFound,
},
}

for _, tc := range cases {
connIDs := sdk.ConnectionIDs{
[]string{tc.thingID},
[]string{tc.chanID},
}

err := mainfluxSDK.Connect(connIDs, tc.token)
assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected error %s, got %s", tc.desc, tc.err, err))
}
}

func TestDisconnectThing(t *testing.T) {
svc := newThingsService(map[string]string{
token: email,
Expand Down

0 comments on commit 76908d7

Please sign in to comment.