Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MF-898 - Add bulk connect to CLI and SDK #956

Merged
merged 8 commits into from
Nov 21, 2019
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions cli/provision.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,29 @@ var cmdProvision = []cobra.Command{
logJSON(channels)
},
},
cobra.Command{
Use: "connect",
Short: "connect <connections_file> <user_token>",
Long: `Bulk connect things to channels`,
Run: func(cmd *cobra.Command, args []string) {
if len(args) != 2 {
logUsage(cmd.Short)
return
}

connIDs, err := connectionsFromFile(args[0])
if err != nil {
logError(err)
return
}

err = sdk.Connect(connIDs, args[1])
if err != nil {
logError(err)
return
}
},
},
cobra.Command{
Use: "test",
Short: "test",
Expand Down Expand Up @@ -275,3 +298,47 @@ func channelsFromFile(path string) ([]mfxsdk.Channel, error) {

return channels, nil
}

func connectionsFromFile(path string) (mfxsdk.ConnectionIDs, error) {
if _, err := os.Stat(path); os.IsNotExist(err) {
return mfxsdk.ConnectionIDs{}, err
}

file, err := os.OpenFile(path, os.O_RDONLY, os.ModePerm)
if err != nil {
return mfxsdk.ConnectionIDs{}, err
}
defer file.Close()

connections := mfxsdk.ConnectionIDs{}
switch filepath.Ext(path) {
case ".csv":
nwneisen marked this conversation as resolved.
Show resolved Hide resolved
reader := csv.NewReader(file)

for {
l, err := reader.Read()
if err == io.EOF {
break
}
if err != nil {
return mfxsdk.ConnectionIDs{}, err
}

if len(l) < 1 {
return mfxsdk.ConnectionIDs{}, errors.New("empty line found in file")
}

connections.ThingIDs = append(connections.ThingIDs, l[0])
connections.ChannelIDs = append(connections.ChannelIDs, l[1])
}
case ".json":
err := json.NewDecoder(file).Decode(&connections)
if err != nil {
return mfxsdk.ConnectionIDs{}, err
}
default:
return mfxsdk.ConnectionIDs{}, err
}

return connections, nil
}
7 changes: 6 additions & 1 deletion cli/things.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,12 @@ var cmdThings = []cobra.Command{
return
}

if err := sdk.ConnectThing(args[0], args[1], args[2]); err != nil {
connIDs := mfxsdk.ConnectionIDs{
[]string{args[0]},
[]string{args[1]},
}

if err := sdk.Connect(connIDs, args[2]); err != nil {
logError(err)
return
}
Expand Down
2 changes: 1 addition & 1 deletion docker/nginx/nginx-key.conf
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ http {
}

# Proxy pass to things service
location ~ ^/(things|channels) {
location ~ ^/(things|channels|connect) {
include snippets/proxy-headers.conf;
add_header Access-Control-Expose-Headers Location;
proxy_pass http://things:${MF_THINGS_HTTP_PORT};
Expand Down
33 changes: 33 additions & 0 deletions docs/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,39 @@ mainflux-cli channels get <channel_id> <user_auth_token>
mainflux-cli things connect <thing_id> <channel_id> <user_auth_token>
```

#### Bulk Connect Things to Channels

```bash
mainflux-cli provision connect <file> <user_auth_token>
```

* `file` - A CSV or JSON file containing thing and channel ids
nwneisen marked this conversation as resolved.
Show resolved Hide resolved
* `user_auth_token` - A valid user auth token for the current system

An example CSV file might be

```csv
<thing_id>,<channel_id>
<thing_id>,<channel_id>
```

in which the first column is thing IDs and the second column is channel IDs. A connection will be created for each thing to each channel. This example would result in 4 connections being created.

A comparable JSON file would be

```json
{
"thing_ids": [
"<thing_id>",
"<thing_id>"
],
"channel_ids": [
"<channel_id>",
"<channel_id>"
]
}
```

#### Disconnect Thing from Channel
```
mainflux-cli things disconnect <thing_id> <channel_id> <user_auth_token>
Expand Down
9 changes: 9 additions & 0 deletions sdk/go/sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,12 @@ type MessagesPage struct {
Messages []mainflux.Message `json:"messages,omitempty"`
}

// ConnectionIDs contains ID lists of things and channels to be connected
type ConnectionIDs struct {
ChannelIDs []string `json:"channel_ids"`
ThingIDs []string `json:"thing_ids"`
}

// SDK contains Mainflux API.
type SDK interface {
// CreateUser registers mainflux user.
Expand Down Expand Up @@ -149,6 +155,9 @@ type SDK interface {
// ConnectThing connects thing to specified channel by id.
ConnectThing(thingID, chanID, token string) error

// Connect bulk connects things to channels specified by id.
Connect(conns ConnectionIDs, token string) error

// DisconnectThing disconnect thing from specified channel by id.
DisconnectThing(thingID, chanID, token string) error

Expand Down
32 changes: 32 additions & 0 deletions sdk/go/things.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
)

const thingsEndpoint = "things"
const connectEndpoint = "connect"

func (sdk mfSDK) CreateThing(thing Thing, token string) (string, error) {
data, err := json.Marshal(thing)
Expand Down Expand Up @@ -310,6 +311,37 @@ func (sdk mfSDK) ConnectThing(thingID, chanID, token string) error {
return nil
}

func (sdk mfSDK) Connect(connIDs ConnectionIDs, token string) error {
data, err := json.Marshal(connIDs)
if err != nil {
return ErrInvalidArgs
}

url := createURL(sdk.baseURL, sdk.thingsPrefix, connectEndpoint)
req, err := http.NewRequest(http.MethodPost, url, bytes.NewReader(data))
if err != nil {
return err
}

resp, err := sdk.sendRequest(req, token, string(CTJSON))
if err != nil {
return err
}

if resp.StatusCode != http.StatusOK {
switch resp.StatusCode {
case http.StatusForbidden:
return ErrUnauthorized
case http.StatusNotFound:
return ErrNotFound
default:
return ErrFailedConnection
}
}

return nil
}

func (sdk mfSDK) DisconnectThing(thingID, chanID, token string) error {
endpoint := fmt.Sprintf("%s/%s/%s/%s", channelsEndpoint, chanID, thingsEndpoint, thingID)
url := createURL(sdk.baseURL, sdk.thingsPrefix, endpoint)
Expand Down
107 changes: 107 additions & 0 deletions sdk/go/things_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ const (
token = "token"
otherToken = "other_token"
wrongValue = "wrong_value"
badID = "999"
emptyValue = ""

keyPrefix = "123e4567-e89b-12d3-a456-"
)
Expand Down Expand Up @@ -694,6 +696,111 @@ func TestConnectThing(t *testing.T) {
}
}

func TestConnect(t *testing.T) {
svc := newThingsService(map[string]string{
token: email,
otherToken: otherEmail,
})

ts := newThingsServer(svc)
defer ts.Close()
sdkConf := sdk.Config{
BaseURL: ts.URL,
UsersPrefix: "",
ThingsPrefix: "",
HTTPAdapterPrefix: "",
MsgContentType: contentType,
TLSVerification: false,
}

mainfluxSDK := sdk.NewSDK(sdkConf)
thingID, err := mainfluxSDK.CreateThing(thing, token)
require.Nil(t, err, fmt.Sprintf("unexpected error: %s", err))

chanID1, err := mainfluxSDK.CreateChannel(channel, token)
require.Nil(t, err, fmt.Sprintf("unexpected error: %s", err))

chanID2, err := mainfluxSDK.CreateChannel(channel, otherToken)
require.Nil(t, err, fmt.Sprintf("unexpected error: %s", err))

cases := []struct {
desc string
thingID string
chanID string
token string
err error
}{
{
desc: "connect existing things to existing channels",
thingID: thingID,
chanID: chanID1,
token: token,
err: nil,
},

{
desc: "connect existing things to non-existing channels",
thingID: thingID,
chanID: badID,
token: token,
err: sdk.ErrNotFound,
},
{
desc: "connect non-existing things to existing channels",
thingID: badID,
chanID: chanID1,
token: token,
err: sdk.ErrNotFound,
},
{
desc: "connect existing things to channels with invalid ID",
thingID: thingID,
chanID: emptyValue,
token: token,
err: sdk.ErrFailedConnection,
},
{
desc: "connect things with invalid ID to existing channels",
thingID: emptyValue,
chanID: chanID1,
token: token,
err: sdk.ErrFailedConnection,
},

{
desc: "connect existing things to existing channels with invalid token",
thingID: thingID,
chanID: chanID1,
token: wrongValue,
err: sdk.ErrUnauthorized,
},
{
desc: "connect existing things to existing channels with empty token",
thingID: thingID,
chanID: chanID1,
token: emptyValue,
err: sdk.ErrUnauthorized,
},
{
desc: "connect things from owner to channels of other user",
thingID: thingID,
chanID: chanID2,
token: token,
err: sdk.ErrNotFound,
},
}

for _, tc := range cases {
connIDs := sdk.ConnectionIDs{
[]string{tc.thingID},
[]string{tc.chanID},
}

err := mainfluxSDK.Connect(connIDs, tc.token)
assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected error %s, got %s", tc.desc, tc.err, err))
}
}

func TestDisconnectThing(t *testing.T) {
svc := newThingsService(map[string]string{
token: email,
Expand Down