Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: plugins validate cmd #83

Merged
merged 5 commits into from
Apr 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/cli/connectctl_plugins.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,4 @@ None, all options are at the subcommand level

* [connectctl](connectctl.md) - connectctl: work with Kafka Connect easily
* [connectctl plugins list](connectctl_plugins_list.md) - List connector plugins
* [connectctl plugins validate](connectctl_plugins_validate.md) - Validates connector config
33 changes: 33 additions & 0 deletions docs/cli/connectctl_plugins_validate.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
## connectctl plugins validate

Validates connector config

### Synopsis

Validate the provided configuration values against the configuration definition. This API performs per config validation, outputs suggested values and error messages during validation.
It exits with code 1 if config is invalid.


```
connectctl plugins validate [flags]
```

### Options

```
-c, --cluster string the URL of the connect cluster (required)
-h, --help help for validate
-i, --input string Input data in json format (required)
-o, --output string specify the output format (valid options: json, table) (default "json")
-q, --quiet disable output logging
```
### Options inherited from parent commands

```
-l, --loglevel loglevel Specify the loglevel for the program (default info)
--logfile Specify a file to output logs to
```

### SEE ALSO

* [connectctl plugins](connectctl_plugins.md) - Manage plugins
10 changes: 10 additions & 0 deletions internal/ctl/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,16 @@ func AddQuietFlag(cmd *cobra.Command, quiet *bool) {
BindBoolVarP(cmd.Flags(), quiet, false, "quiet", "q", "disable output logging")
}

func AddInputFlag(cmd *cobra.Command, required bool, input *string) {
description := "Input data in json format"

if required {
description = requiredDescription(&description)
}

BindStringVarP(cmd.Flags(), input, "", "input", "i", description)
}

func AddDefinitionFilesFlags(cmd *cobra.Command, files *[]string, directory *string, env *string) {
BindStringArrayVarP(cmd.Flags(), files, []string{}, "files", "f", "the connector definitions files (Required if --directory or --env-var not specified)")
BindStringVarP(cmd.Flags(), directory, "", "directory", "d", "the directory containing the connector definitions files (Required if --file or --env-vars not specified)")
Expand Down
27 changes: 27 additions & 0 deletions internal/ctl/output.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package ctl

import (
"encoding/json"
"os"

"github.com/jedib0t/go-pretty/table"
)

func PrintAsJSON(data interface{}) error {
b, err := json.MarshalIndent(data, "", " ")
if err != nil {
return err
}

os.Stdout.Write(b)
return nil
}

func PrintAsTable(handler func(table.Writer)) {
t := table.NewWriter()
t.SetOutputMirror(os.Stdout)

handler(t)

t.Render()
}
1 change: 1 addition & 0 deletions internal/ctl/plugins/plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ func Command() *cobra.Command {

// Add subcommands
pluginsCmd.AddCommand(listPluginsCmd())
pluginsCmd.AddCommand(validatePluginsCmd())

return pluginsCmd
}
117 changes: 117 additions & 0 deletions internal/ctl/plugins/validate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package plugins

import (
"encoding/json"
"fmt"
"strings"

"github.com/90poe/connectctl/internal/ctl"
"github.com/90poe/connectctl/internal/version"
"github.com/90poe/connectctl/pkg/client/connect"
"github.com/90poe/connectctl/pkg/manager"
"github.com/jedib0t/go-pretty/table"
"github.com/pkg/errors"

"github.com/spf13/cobra"
)

type validatePluginsCmdParams struct {
ClusterURL string
Input string
Output string
Quiet bool
}

func validatePluginsCmd() *cobra.Command {
params := &validatePluginsCmdParams{}

validateCmd := &cobra.Command{
Use: "validate",
Short: "Validates plugin config",
Long: "",
RunE: func(cmd *cobra.Command, _ []string) error {
return doValidatePlugins(cmd, params)
},
}

ctl.AddClusterFlag(validateCmd, true, &params.ClusterURL)
ctl.AddInputFlag(validateCmd, true, &params.Input)
ctl.AddOutputFlags(validateCmd, &params.Output)
ctl.AddQuietFlag(validateCmd, &params.Quiet)

return validateCmd
}

func doValidatePlugins(_ *cobra.Command, params *validatePluginsCmdParams) error {
var inputConfig connect.ConnectorConfig
if err := json.Unmarshal([]byte(params.Input), &inputConfig); err != nil {
return errors.Wrap(err, "error parsing input connector config")
}

config := &manager.Config{
ClusterURL: params.ClusterURL,
Version: version.Version,
}

userAgent := fmt.Sprintf("90poe.io/connectctl/%s", version.Version)

client, err := connect.NewClient(params.ClusterURL, connect.WithUserAgent(userAgent))
if err != nil {
return errors.Wrap(err, "error creating connect client")
}

mngr, err := manager.NewConnectorsManager(client, config)
if err != nil {
return errors.Wrap(err, "error creating connectors manager")
}

validation, err := mngr.ValidatePlugins(inputConfig)
if err != nil {
return err
}

if !params.Quiet {
switch params.Output {
case "json":
if err = ctl.PrintAsJSON(validation); err != nil {
return errors.Wrap(err, "error printing validation results as JSON")
}

case "table":
printAsTable(validation)

default:
return fmt.Errorf("invalid output format specified: %s", params.Output)
}
}

if validation.ErrorCount > 0 {
return fmt.Errorf("detected %d errors in the configuation", validation.ErrorCount)
}

return nil
}

func printAsTable(validation *connect.ConfigValidation) {
ctl.PrintAsTable(func(t table.Writer) {
t.Style().Options.SeparateRows = true
t.AppendHeader(table.Row{"Name", "Spec", "Value", "Errors"})

for _, info := range validation.Configs {
spec := fmt.Sprintf(
"default: %s\nrequired: %v",
ctl.StrPtrToStr(info.Definition.DefaultValue),
info.Definition.Required,
)

errors := strings.Join(info.Value.Errors, "\n")

t.AppendRow(table.Row{
info.Definition.Name,
spec,
ctl.StrPtrToStr(info.Value.Value),
errors,
})
}
})
}
9 changes: 9 additions & 0 deletions internal/ctl/transform.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package ctl

func StrPtrToStr(str *string) string {
if str == nil {
return "null"
}

return *str
}
58 changes: 57 additions & 1 deletion pkg/client/connect/plugins.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package connect

import "net/http"
import (
"errors"
"fmt"
"net/http"
"strings"
)

// This is new and not from the original author

Expand All @@ -11,6 +16,40 @@ type Plugin struct {
Version string `json:"version"`
}

type FieldDefinition struct {
Name string `json:"name"`
Type string `json:"type"`
Required bool `json:"required"`
DefaultValue *string `json:"default_value"`
Importance string `json:"importance"`
Documentation string `json:"documentation"`
Group string `json:"group"`
Width string `json:"width"`
DisplayName string `json:"display_name"`
Dependents []map[string]interface{} `json:"dependents"` //unknown type
Order int `json:"order"`
}

type FieldValue struct {
Name string `json:"name"`
Value *string `json:"value"`
RecommendedValues []*string `json:"recommended_values"`
Errors []string `json:"errors"`
Visible bool `json:"visible"`
}

type FieldValidation struct {
Definition FieldDefinition `json:"definition"`
Value FieldValue `json:"value"`
}

type ConfigValidation struct {
Name string `json:"name"`
ErrorCount int `json:"error_count"`
Groups []string `json:"groups"`
Configs []FieldValidation `json:"configs"`
}

// ListPlugins retrieves a list of the installed plugins.
// Note that the API only checks for connectors on the worker
// that handles the request, which means it is possible to see
Expand All @@ -24,3 +63,20 @@ func (c *Client) ListPlugins() ([]*Plugin, *http.Response, error) {
response, err := c.get(path, &names)
return names, response, err
}

// ValidatePlugins validates the provided configuration values against the configuration definition.
// See: https://docs.confluent.io/current/connect/references/restapi.html#put--connector-plugins-(string-name)-config-validate
func (c *Client) ValidatePlugins(config ConnectorConfig) (*ConfigValidation, *http.Response, error) {
connectorClass, ok := config["connector.class"]
if !ok {
return nil, nil, errors.New("missing required key in config: 'connector.class'")
}

tuple := strings.Split(connectorClass, ".")
path := fmt.Sprintf("connector-plugins/%s/config/validate", tuple[len(tuple)-1])

var validation ConfigValidation
response, err := c.doRequest("PUT", path, config, &validation)

return &validation, response, err
}
1 change: 1 addition & 0 deletions pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type client interface {
ListConnectors() ([]string, *http.Response, error)
GetConnector(name string) (*connect.Connector, *http.Response, error)
ListPlugins() ([]*connect.Plugin, *http.Response, error)
ValidatePlugins(config connect.ConnectorConfig) (*connect.ConfigValidation, *http.Response, error)
GetConnectorStatus(name string) (*connect.ConnectorStatus, *http.Response, error)
DeleteConnector(name string) (*http.Response, error)
RestartConnectorTask(name string, taskID int) (*http.Response, error)
Expand Down
Loading