diff --git a/docs/cli/connectctl_connectors.md b/docs/cli/connectctl_connectors.md index 7813d68..f59f5bd 100644 --- a/docs/cli/connectctl_connectors.md +++ b/docs/cli/connectctl_connectors.md @@ -28,6 +28,7 @@ None, all options are at the subcommand level ### SEE ALSO * [connectctl](connectctl.md) - connectctl: work with Kafka Connect easily +* [connectctl connectors status](connectctl_connectors_status.md) - Connectors status * [connectctl connectors add](connectctl_connectors_add.md) - Add connectors * [connectctl connectors remove](connectctl_connectors_remove.md) - Remove connectors * [connectctl connectors list](connectctl_connectors_list.md) - List connectors diff --git a/docs/cli/connectctl_connectors_status.md b/docs/cli/connectctl_connectors_status.md new file mode 100644 index 0000000..755a38b --- /dev/null +++ b/docs/cli/connectctl_connectors_status.md @@ -0,0 +1,37 @@ +## connectctl connectors status + +Status of connectors + +### Synopsis + + +Display status of selected connectors. +If some tasks or connectors are failing, command will exit with code 1. + + +``` +connectctl connectors status [flags] +``` + +### Options + +``` + -h, --help help for add + -c, --clusterURL the url of the kafka connect cluster + -n, --connectors the names of the connectors. Multiple connector names + can be specified either by comma separating conn1,conn2 + or by repeating the flag --n conn1 --n conn2. If no name is + supplied status of ALL connectors will be displayed. + -o, --output specify the output format (valid options: json, table) (default "json") + -q, --quiet disable output logging +``` +### Options inherited from parent commands + +``` + -l, --loglevel loglevel Specify the loglevel for the program (default info) + --logfile Specify a file to output logs to +``` + +### SEE ALSO + +* [connectctl connectors](connectctl_connectors.md) - Manage connectors \ No newline at end of file diff --git a/docs/cli/connectctl_version.md b/docs/cli/connectctl_version.md index 890c769..00b965f 100644 --- a/docs/cli/connectctl_version.md +++ b/docs/cli/connectctl_version.md @@ -10,6 +10,12 @@ Prints version information of connectctl connectctl version ``` +### Options + +``` + -h, --help help for version + -c, --clusterURL the url of the kafka connect cluster +``` ### SEE ALSO * [connectctl](connectctl.md) - connectctl: work with Kafka Connect easily \ No newline at end of file diff --git a/internal/ctl/connectors/connectors.go b/internal/ctl/connectors/connectors.go index f9796d2..f45b07d 100644 --- a/internal/ctl/connectors/connectors.go +++ b/internal/ctl/connectors/connectors.go @@ -23,6 +23,7 @@ func Command() *cobra.Command { connectorsCmd.AddCommand(removeConnectorCmd()) connectorsCmd.AddCommand(pauseConnectorsCmd()) connectorsCmd.AddCommand(resumeConnectorsCmd()) + connectorsCmd.AddCommand(connectorsStatusCmd()) return connectorsCmd } diff --git a/internal/ctl/connectors/status.go b/internal/ctl/connectors/status.go new file mode 100644 index 0000000..aad79f2 --- /dev/null +++ b/internal/ctl/connectors/status.go @@ -0,0 +1,149 @@ +package connectors + +import ( + "encoding/json" + "fmt" + "os" + + "github.com/90poe/connectctl/internal/ctl" + "github.com/90poe/connectctl/internal/version" + "github.com/90poe/connectctl/pkg/client/connect" + "github.com/90poe/connectctl/pkg/manager" + "github.com/jedib0t/go-pretty/table" + "github.com/pkg/errors" + + "github.com/spf13/cobra" +) + +type connectorsStatusCmdParams struct { + ClusterURL string + Connectors []string + Output string + Quiet bool +} + +func connectorsStatusCmd() *cobra.Command { + params := &connectorsStatusCmdParams{} + + statusCmd := &cobra.Command{ + Use: "status", + Short: "Get status for connectors in a cluster", + Long: "", + RunE: func(cmd *cobra.Command, _ []string) error { + return doConnectorsStatus(cmd, params) + }, + } + + ctl.AddCommonConnectorsFlags(statusCmd, ¶ms.ClusterURL) + ctl.AddConnectorNamesFlags(statusCmd, ¶ms.Connectors) + ctl.AddOutputFlags(statusCmd, ¶ms.Output) + ctl.AddQuietFlag(statusCmd, ¶ms.Quiet) + + return statusCmd +} + +func doConnectorsStatus(_ *cobra.Command, params *connectorsStatusCmdParams) error { + config := &manager.Config{ + ClusterURL: params.ClusterURL, + Version: version.Version, + } + + userAgent := fmt.Sprintf("90poe.io/connectctl/%s", version.Version) + + client, err := connect.NewClient(params.ClusterURL, connect.WithUserAgent(userAgent)) + if err != nil { + return errors.Wrap(err, "error creating connect client") + } + + mngr, err := manager.NewConnectorsManager(client, config) + if err != nil { + return errors.Wrap(err, "error creating connectors manager") + } + + statusList, err := mngr.Status(params.Connectors) + if err != nil { + return errors.Wrap(err, "error getting connectors status") + } + + if !params.Quiet { + switch params.Output { + case "json": + if err = printAsJSON(statusList); err != nil { + return errors.Wrap(err, "error printing connectors status as JSON") + } + + case "table": + printAsTable(statusList) + + default: + return fmt.Errorf("invalid output format specified: %s", params.Output) + } + } + + failingConnectors, failingTasks := countFailing(statusList) + + if failingConnectors != 0 || failingTasks != 0 { + return fmt.Errorf("%d connectors are failng, %d tasks are failing", failingConnectors, failingTasks) + } + + return nil +} + +func countFailing(statusList []*connect.ConnectorStatus) (int, int) { + connectorCount := 0 + taskCount := 0 + + for _, status := range statusList { + if status.Connector.State == "FAILED" { + connectorCount++ + } + + taskCount += countFailingTasks(&status.Tasks) + } + + return connectorCount, taskCount +} + +func countFailingTasks(tasks *[]connect.TaskState) int { + count := 0 + + for _, task := range *tasks { + if task.State == "FAILED" { + count++ + } + } + + return count +} + +func printAsJSON(statusList []*connect.ConnectorStatus) error { + b, err := json.MarshalIndent(statusList, "", " ") + if err != nil { + return err + } + + os.Stdout.Write(b) + return nil +} + +func printAsTable(statusList []*connect.ConnectorStatus) { + t := table.NewWriter() + t.SetOutputMirror(os.Stdout) + t.AppendHeader(table.Row{"Name", "State", "WorkerId", "Tasks"}) + + for _, status := range statusList { + tasks := "" + for _, task := range status.Tasks { + tasks += fmt.Sprintf("%d(%s): %s\n", task.ID, task.WorkerID, task.State) + } + + t.AppendRow(table.Row{ + status.Name, + status.Connector.State, + status.Connector.WorkerID, + tasks, + }) + } + + t.Render() +} diff --git a/internal/ctl/connectors/status_test.go b/internal/ctl/connectors/status_test.go new file mode 100644 index 0000000..970b2c7 --- /dev/null +++ b/internal/ctl/connectors/status_test.go @@ -0,0 +1,45 @@ +package connectors + +import ( + "testing" + + "github.com/90poe/connectctl/pkg/client/connect" + "github.com/stretchr/testify/require" +) + +func TestCountFailing(t *testing.T) { + statusList := []*connect.ConnectorStatus{ + { + Connector: connect.ConnectorState{ + State: "RUNNING", + }, + Tasks: []connect.TaskState{ + { + State: "RUNNING", + }, + { + State: "FAILED", + }, + }, + }, + { + Connector: connect.ConnectorState{ + State: "FAILED", + }, + Tasks: []connect.TaskState{ + { + State: "FAILED", + }, + { + State: "FAILED", + }, + }, + }, + } + + connectorsFailing, tasksFailing := countFailing(statusList) + + require.Equal(t, 1, connectorsFailing) + require.Equal(t, 3, tasksFailing) + +} diff --git a/internal/ctl/flags.go b/internal/ctl/flags.go index d9bf9ac..5f687be 100644 --- a/internal/ctl/flags.go +++ b/internal/ctl/flags.go @@ -10,7 +10,7 @@ import ( ) func AddClusterFlag(cmd *cobra.Command, required bool, clusterURL *string) { - description := "the URL of the connect cluster to manage" + description := "the URL of the connect cluster" if required { description = requiredDescription(&description) @@ -27,6 +27,10 @@ func AddOutputFlags(cmd *cobra.Command, output *string) { BindStringVarP(cmd.Flags(), output, "json", "output", "o", "specify the output format (valid options: json, table)") } +func AddQuietFlag(cmd *cobra.Command, quiet *bool) { + BindBoolVarP(cmd.Flags(), quiet, false, "quiet", "q", "disable output logging") +} + func AddDefinitionFilesFlags(cmd *cobra.Command, files *[]string, directory *string, env *string) { BindStringArrayVarP(cmd.Flags(), files, []string{}, "files", "f", "the connector definitions files (Required if --directory or --env-var not specified)") BindStringVarP(cmd.Flags(), directory, "", "directory", "d", "the directory containing the connector definitions files (Required if --file or --env-vars not specified)") @@ -34,7 +38,7 @@ func AddDefinitionFilesFlags(cmd *cobra.Command, files *[]string, directory *str } func AddConnectorNamesFlags(cmd *cobra.Command, names *[]string) { - BindStringArrayVarP(cmd.Flags(), names, []string{}, "connectors", "n", "The connect names to restart (if not specified all connectors will be restarted)") + BindStringArrayVarP(cmd.Flags(), names, []string{}, "connectors", "n", "The connect names to perform action on (if not specified action will be performed on all connectors)") } func BindDurationVarP(f *pflag.FlagSet, p *time.Duration, value time.Duration, long, short, description string) { @@ -55,6 +59,12 @@ func BindBoolVar(f *pflag.FlagSet, p *bool, value bool, long, description string viper.SetDefault(long, value) } +func BindBoolVarP(f *pflag.FlagSet, p *bool, value bool, long, short, description string) { + f.BoolVarP(p, long, short, value, description) + _ = viper.BindPFlag(long, f.Lookup(long)) + viper.SetDefault(long, value) +} + func BindStringVarP(f *pflag.FlagSet, p *string, value, long, short, description string) { f.StringVarP(p, long, short, value, description) _ = viper.BindPFlag(long, f.Lookup(long)) diff --git a/pkg/manager/status.go b/pkg/manager/status.go new file mode 100644 index 0000000..d795338 --- /dev/null +++ b/pkg/manager/status.go @@ -0,0 +1,39 @@ +package manager + +import ( + "github.com/90poe/connectctl/pkg/client/connect" + "github.com/pkg/errors" +) + +// Status - gets status of specified (or all) connectors +func (c *ConnectorManager) Status(connectors []string) ([]*connect.ConnectorStatus, error) { + if len(connectors) == 0 { + return c.allConnectorsStatus() + } + + return c.specifiedConnectorsStatus(connectors) +} + +func (c *ConnectorManager) allConnectorsStatus() ([]*connect.ConnectorStatus, error) { + existing, _, err := c.client.ListConnectors() + if err != nil { + return nil, errors.Wrap(err, "error listing connectors") + } + + return c.specifiedConnectorsStatus(existing) +} + +func (c *ConnectorManager) specifiedConnectorsStatus(connectors []string) ([]*connect.ConnectorStatus, error) { + statusList := make([]*connect.ConnectorStatus, len(connectors)) + + for idx, connectorName := range connectors { + status, _, err := c.client.GetConnectorStatus(connectorName) + if err != nil { + return nil, errors.Wrapf(err, "error getting connector status for %s", connectorName) + } + + statusList[idx] = status + } + + return statusList, nil +}