Skip to content

Commit

Permalink
refactor: add upload all command for resources (#713)
Browse files Browse the repository at this point in the history
* refactor: add upload all command for resources

* fix: fix lint issues
  • Loading branch information
sbchaos authored Jan 9, 2023
1 parent a6ff29b commit 4d71411
Show file tree
Hide file tree
Showing 4 changed files with 233 additions and 28 deletions.
4 changes: 2 additions & 2 deletions client/cmd/job/replace_all.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func NewReplaceAllCommand() *cobra.Command {
Short: "Replace all current optimus project to server",
Long: heredoc.Doc(`Apply local changes to destination server which includes creating/updating/deleting
jobs`),
Example: "optimus job replace-all [--ignore-resources|--ignore-jobs]",
Example: "optimus job replace-all [--verbose]",
Annotations: map[string]string{
"group:core": "true",
},
Expand Down Expand Up @@ -97,7 +97,7 @@ func (r *replaceAllCommand) replaceAll(selectedNamespaces []*config.Namespace) e
}

func (r *replaceAllCommand) replaceAllJobs(conn *connectivity.Connectivity, selectedNamespaces []*config.Namespace) error {
namespaceNames := []string{}
var namespaceNames []string
for _, namespace := range selectedNamespaces {
namespaceNames = append(namespaceNames, namespace.Name)
}
Expand Down
15 changes: 13 additions & 2 deletions client/cmd/resource/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ type createCommand struct {
clientConfig *config.ClientConfig

namespaceSurvey *survey.NamespaceSurvey
configFilePath string
}

// NewCreateCommand initializes resource create command
func NewCreateCommand(clientConfig *config.ClientConfig) *cobra.Command {
func NewCreateCommand() *cobra.Command {
l := logger.NewClientLogger()
create := &createCommand{
clientConfig: clientConfig,
logger: l,
namespaceSurvey: survey.NewNamespaceSurvey(l),
}
Expand All @@ -36,10 +36,21 @@ func NewCreateCommand(clientConfig *config.ClientConfig) *cobra.Command {
Short: "Create a new resource",
Example: "optimus resource create",
RunE: create.RunE,
PreRunE: create.PreRunE,
}
cmd.Flags().StringVarP(&create.configFilePath, "config", "c", create.configFilePath, "File path for client configuration")
return cmd
}

func (c *createCommand) PreRunE(_ *cobra.Command, _ []string) error {
var err error
c.clientConfig, err = config.LoadClientConfig(c.configFilePath)
if err != nil {
return err
}
return nil
}

func (c createCommand) RunE(_ *cobra.Command, _ []string) error {
selectedNamespace, err := c.namespaceSurvey.AskToSelectNamespace(c.clientConfig)
if err != nil {
Expand Down
26 changes: 2 additions & 24 deletions client/cmd/resource/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,41 +2,19 @@ package resource

import (
"github.com/spf13/cobra"

"github.com/odpf/optimus/config"
)

type resourceCommand struct {
configFilePath string
clientConfig *config.ClientConfig
}

// NewResourceCommand initializes command for resource
func NewResourceCommand() *cobra.Command {
resource := &resourceCommand{
clientConfig: &config.ClientConfig{},
}

cmd := &cobra.Command{
Use: "resource",
Short: "Interact with data resource",
Annotations: map[string]string{
"group:core": "true",
},
PersistentPreRunE: resource.PersistentPreRunE,
}
cmd.PersistentFlags().StringVarP(&resource.configFilePath, "config", "c", resource.configFilePath, "File path for client configuration")

cmd.AddCommand(NewCreateCommand(resource.clientConfig))
cmd.AddCommand(NewCreateCommand())
cmd.AddCommand(NewUploadAllCommand())
return cmd
}

func (r *resourceCommand) PersistentPreRunE(_ *cobra.Command, _ []string) error {
// TODO: find a way to load the config in one place
c, err := config.LoadClientConfig(r.configFilePath)
if err != nil {
return err
}
*r.clientConfig = *c
return nil
}
216 changes: 216 additions & 0 deletions client/cmd/resource/upload_all.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
package resource

import (
"context"
"errors"
"fmt"
"io"
"strings"
"time"

"github.com/MakeNowJust/heredoc"
"github.com/odpf/salt/log"
"github.com/spf13/afero"
"github.com/spf13/cobra"

"github.com/odpf/optimus/client/cmd/internal/connectivity"
"github.com/odpf/optimus/client/cmd/internal/logger"
"github.com/odpf/optimus/client/local/specio"
"github.com/odpf/optimus/config"
pb "github.com/odpf/optimus/protos/odpf/optimus/core/v1beta1"
)

const (
uploadAllTimeout = time.Minute * 30
)

type uploadAllCommand struct {
logger log.Logger
clientConfig *config.ClientConfig

selectedNamespaceNames []string
verbose bool
configFilePath string
}

// NewUploadAllCommand initializes command for uploading all resources
func NewUploadAllCommand() *cobra.Command {
uploadAll := &uploadAllCommand{
logger: logger.NewClientLogger(),
}

cmd := &cobra.Command{
Use: "upload-all",
Short: "Upload all current optimus resources to server",
Long: heredoc.Doc(`Apply local changes to destination server which includes creating/updating resources`),
Example: "optimus resource upload-all [--verbose]",
Annotations: map[string]string{
"group:core": "true",
},
RunE: uploadAll.RunE,
PreRunE: uploadAll.PreRunE,
}
cmd.Flags().StringVarP(&uploadAll.configFilePath, "config", "c", uploadAll.configFilePath, "File path for client configuration")
cmd.Flags().StringSliceVarP(&uploadAll.selectedNamespaceNames, "namespace-names", "N", nil, "Selected namespaces of optimus project")
cmd.Flags().BoolVarP(&uploadAll.verbose, "verbose", "v", false, "Print details related to upload-all stages")
return cmd
}

func (u *uploadAllCommand) PreRunE(_ *cobra.Command, _ []string) error {
var err error
u.clientConfig, err = config.LoadClientConfig(u.configFilePath)
if err != nil {
return err
}
return nil
}

func (u *uploadAllCommand) RunE(_ *cobra.Command, _ []string) error {
u.logger.Info("> Validating namespaces")
selectedNamespaces, err := u.clientConfig.GetSelectedNamespaces(u.selectedNamespaceNames...)
if err != nil {
return err
}
if len(selectedNamespaces) == 0 {
selectedNamespaces = u.clientConfig.Namespaces
}
u.logger.Info("namespace validation finished!\n")

return u.uploadAll(selectedNamespaces)
}

func (u *uploadAllCommand) uploadAll(selectedNamespaces []*config.Namespace) error {
conn, err := connectivity.NewConnectivity(u.clientConfig.Host, uploadAllTimeout)
if err != nil {
return err
}
defer conn.Close()

if err := u.uploadAllResources(conn, selectedNamespaces); err != nil {
return err
}
u.logger.Info("finished uploading resource specifications to server!\n")

return nil
}

func (u *uploadAllCommand) uploadAllResources(conn *connectivity.Connectivity, selectedNamespaces []*config.Namespace) error {
var namespaceNames []string
for _, namespace := range selectedNamespaces {
namespaceNames = append(namespaceNames, namespace.Name)
}

u.logger.Info("> Uploading all resources for namespaces [%s]", strings.Join(namespaceNames, ","))

stream, err := u.getResourceStreamClient(conn)
if err != nil {
return err
}

var totalSpecsCount int
for _, namespace := range selectedNamespaces {
progressFn := func(totalCount int) {
totalSpecsCount += totalCount
}
if err := u.sendNamespaceResourceRequest(stream, namespace, progressFn); err != nil {
return err
}
}

if err := stream.CloseSend(); err != nil {
return err
}

if totalSpecsCount == 0 {
u.logger.Warn("no resource specs are found from all the namespaces")
return nil
}

return u.processResourceDeploymentResponse(stream)
}

func (u *uploadAllCommand) sendNamespaceResourceRequest(stream pb.ResourceService_DeployResourceSpecificationClient,
namespace *config.Namespace, progressFn func(totalCount int),
) error {
datastoreSpecFs := CreateDataStoreSpecFs(namespace)
for storeName, repoFS := range datastoreSpecFs {
u.logger.Info("> Deploying %s resources for namespace [%s]", storeName, namespace.Name)
request, err := u.getResourceDeploymentRequest(namespace.Name, storeName, repoFS)
if err != nil {
return fmt.Errorf("error getting resource specs for namespace [%s]: %w", namespace.Name, err)
}

if err = stream.Send(request); err != nil {
return fmt.Errorf("resource upload for namespace [%s] failed: %w", namespace.Name, err)
}
progressFn(len(request.GetResources()))
}
return nil
}

func (u *uploadAllCommand) getResourceDeploymentRequest(namespaceName, storeName string,
repoFS afero.Fs) (*pb.DeployResourceSpecificationRequest, error) {
resourceSpecReadWriter, err := specio.NewResourceSpecReadWriter(repoFS)
if err != nil {
return nil, err
}

resourceSpecs, err := resourceSpecReadWriter.ReadAll(".")
if err != nil {
return nil, err
}

resourceSpecsProto := make([]*pb.ResourceSpecification, len(resourceSpecs))
for i, resourceSpec := range resourceSpecs {
resourceSpecProto, err := resourceSpec.ToProto()
if err != nil {
return nil, err
}
resourceSpecsProto[i] = resourceSpecProto
}

return &pb.DeployResourceSpecificationRequest{
Resources: resourceSpecsProto,
ProjectName: u.clientConfig.Project.Name,
DatastoreName: storeName,
NamespaceName: namespaceName,
}, nil
}

func (u *uploadAllCommand) getResourceStreamClient(conn *connectivity.Connectivity) (pb.ResourceService_DeployResourceSpecificationClient, error) {
client := pb.NewResourceServiceClient(conn.GetConnection())
// TODO: create a new api for upload-all and remove deploy
stream, err := client.DeployResourceSpecification(conn.GetContext())
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
u.logger.Error("Deployment of resources took too long, timing out")
}
return nil, fmt.Errorf("deployement failed: %w", err)
}
return stream, nil
}

func (u *uploadAllCommand) processResourceDeploymentResponse(stream pb.ResourceService_DeployResourceSpecificationClient) error {
u.logger.Info("> Receiving responses:")

for {
resp, err := stream.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
break
}
return err
}

if logStatus := resp.GetLogStatus(); logStatus != nil {
if u.verbose {
logger.PrintLogStatusVerbose(u.logger, logStatus)
} else {
logger.PrintLogStatus(u.logger, logStatus)
}
continue
}
}

return nil
}

0 comments on commit 4d71411

Please sign in to comment.