Skip to content

Commit

Permalink
feat: yaml export for job and resource (#717)
Browse files Browse the repository at this point in the history
* refactor: clean up create command for export command

* feat: enhance GetJobSpecifications to support namespace name filter

* feat: add resource export sub-command

* feat: enhance GetJobSpecifications to support include deleted jobs filter

* Revert "feat: enhance GetJobSpecifications to support include deleted jobs filter"

This reverts commit 337c53c.

* refactor: remove include deleted filter from GetJobSpecifications request

* feat: add job export sub-command

* chore: fix lint issues

* fix: nil writer issue in job export and add successful case log

* fix: convert job alert config type to the proper value

* refactor: add logging activity on resource export

Co-authored-by: Arinda Arif <arindaarif05@gmail.com>

* fix: read config automatically if not being set

Co-authored-by: Arinda Arif <arindaarif05@gmail.com>

* refactor: improve logs for job export

* test: add job_spec test in client side

* refactor: improve logging on job and resource export

Co-authored-by: Arinda Arif <arindaarif05@gmail.com>
  • Loading branch information
irainia and arinda-arif authored Jan 16, 2023
1 parent f32d70c commit 3e6f584
Show file tree
Hide file tree
Showing 15 changed files with 1,093 additions and 196 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ NAME = "github.com/odpf/optimus"
LAST_COMMIT := $(shell git rev-parse --short HEAD)
LAST_TAG := "$(shell git rev-list --tags --max-count=1)"
OPMS_VERSION := "$(shell git describe --tags ${LAST_TAG})-next"
PROTON_COMMIT := "90b5d53e3e58e017032d12275597b93f53263add"
PROTON_COMMIT := "e75e288ec2a42ecd3358b2a12ddcd8bea50a4a07"

.PHONY: build test test-ci generate-proto unit-test-ci integration-test vet coverage clean install lint

Expand Down
1 change: 1 addition & 0 deletions api/handler/v1beta1/job_spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,7 @@ func (sv *JobSpecServiceServer) GetJobSpecifications(ctx context.Context, req *p
ProjectName: req.GetProjectName(),
JobName: req.GetJobName(),
ResourceDestination: req.GetResourceDestination(),
NamespaceName: req.GetNamespaceName(),
}
jobSpecs, err := sv.jobSvc.GetByFilter(ctx, jobSpecFilter)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions api/handler/v1beta1/job_spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,8 +375,8 @@ func (s *JobSpecServiceServerTestSuite) TestGetJobSpecification_Fail_JobServiceG
}

func (s *JobSpecServiceServerTestSuite) TestGetJobSpecifications_Success() {
req := &pb.GetJobSpecificationsRequest{JobName: "job-1"}
jobSpecFilter := models.JobSpecFilter{JobName: req.GetJobName()}
req := &pb.GetJobSpecificationsRequest{JobName: "job-1", NamespaceName: "namespace-1"}
jobSpecFilter := models.JobSpecFilter{JobName: req.GetJobName(), NamespaceName: req.GetNamespaceName()}

execUnit1 := new(mock.YamlMod)
execUnit1.On("PluginInfo").Return(&models.PluginInfoResponse{Name: "task"}, nil)
Expand Down
316 changes: 316 additions & 0 deletions client/cmd/job/export.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,316 @@
package job

import (
"errors"
"fmt"
"path"
"strings"
"time"

"github.com/odpf/salt/log"
"github.com/spf13/afero"
"github.com/spf13/cobra"

"github.com/odpf/optimus/client/cmd/internal/connectivity"
"github.com/odpf/optimus/client/cmd/internal/logger"
"github.com/odpf/optimus/client/local"
"github.com/odpf/optimus/client/local/model"
"github.com/odpf/optimus/client/local/specio"
"github.com/odpf/optimus/config"
pb "github.com/odpf/optimus/protos/odpf/optimus/core/v1beta1"
)

const (
fetchTenantTimeout = time.Minute
fetchJobTimeout = time.Minute * 15
)

type exportCommand struct {
logger log.Logger
writer local.SpecWriter[*model.JobSpec]

configFilePath string
outputDirPath string
host string

projectName string
namespaceName string
jobName string
}

// NewExportCommand initializes command for exporting job specification to yaml file
func NewExportCommand() *cobra.Command {
export := &exportCommand{
logger: logger.NewClientLogger(),
}

cmd := &cobra.Command{
Use: "export",
Short: "Export job specifications to YAML files",
Example: "optimus job export",
RunE: export.RunE,
PreRunE: export.PreRunE,
}

cmd.Flags().StringVarP(&export.configFilePath, "config", "c", export.configFilePath, "File path for client configuration")
cmd.Flags().StringVar(&export.outputDirPath, "dir", "", "Output directory path")
cmd.Flags().StringVar(&export.host, "host", "", "Host of the server source (will override value from config)")

cmd.Flags().StringVarP(&export.projectName, "project-name", "p", "", "Project name target")
cmd.Flags().StringVarP(&export.namespaceName, "namespace-name", "n", "", "Namespace name target within the selected project name")
cmd.Flags().StringVarP(&export.jobName, "job-name", "r", "", "Job name target")

cmd.MarkFlagRequired("dir")
return cmd
}

func (e *exportCommand) PreRunE(_ *cobra.Command, _ []string) error {
readWriter, err := specio.NewJobSpecReadWriter(afero.NewOsFs())
if err != nil {
e.logger.Error(err.Error())
}
e.writer = readWriter

if e.host != "" {
return nil
}

if e.configFilePath != "" {
e.logger.Info("Loading client config from %s", e.configFilePath)
}
cfg, err := config.LoadClientConfig(e.configFilePath)
if err != nil {
e.logger.Warn("error is encountered when reading config file: %s", err)
} else {
e.host = cfg.Host
}
return err
}

func (e *exportCommand) RunE(_ *cobra.Command, _ []string) error {
e.logger.Info("Validating input")
if err := e.validate(); err != nil {
return err
}

var success bool
if e.projectName != "" && e.namespaceName != "" && e.jobName != "" {
e.logger.Info("Downloading job [%s] from project [%s] namespace [%s]", e.jobName, e.projectName, e.namespaceName)
success = e.downloadSpecificJob(e.projectName, e.namespaceName, e.jobName)
} else if e.projectName != "" && e.namespaceName != "" {
e.logger.Info("Downloading all jobs within project [%s] namespace [%s]", e.projectName, e.namespaceName)
success = e.downloadByProjectNameAndNamespaceName(e.projectName, e.namespaceName)
} else if e.projectName != "" {
e.logger.Info("Downloading all jobs within project [%s]", e.projectName)
success = e.downloadByProjectName(e.projectName)
} else {
e.logger.Info("Downloading all jobs")
success = e.downloadAll()
}

if !success {
e.logger.Error("Download process failed")
return errors.New("encountered one or more errors during download jobs")
}
e.logger.Info("Download process success")
return nil
}

func (e *exportCommand) downloadAll() bool {
e.logger.Info("Fetching all project names")
projectNames, err := e.fetchProjectNames()
if err != nil {
e.logger.Error("error is encountered when fetching project names: %s", err)
return false
}
if len(projectNames) == 0 {
e.logger.Warn("no project is found from the specified host")
return true
}

success := true
for _, pName := range projectNames {
if !e.downloadByProjectName(pName) {
success = false
}
}
return success
}

func (e *exportCommand) downloadByProjectName(projectName string) bool {
e.logger.Info("Fetching all jobs for project [%s]", projectName)
namespaceJobs, err := e.fetchNamespaceJobsByProjectName(projectName)
if err != nil {
e.logger.Error("error is encountered when fetching job specs for project [%s]: %s", projectName, err)
return false
}

success := true
for namespaceName, jobSpecs := range namespaceJobs {
if len(jobSpecs) == 0 {
e.logger.Warn("No jobs found for project [%s] namespace [%s]", projectName, namespaceName)
continue
}
if err := e.writeJobs(projectName, namespaceName, jobSpecs); err != nil {
e.logger.Error(err.Error())
success = false
}
}
return success
}

func (e *exportCommand) downloadByProjectNameAndNamespaceName(projectName, namespaceName string) bool {
e.logger.Info("Fetching all jobs for project [%s] namespace [%s]", projectName, namespaceName)
jobs, err := e.fetchJobsByProjectAndNamespaceName(projectName, namespaceName)
if err != nil {
e.logger.Error("error is encountered when fetching job specs for project [%s]: %s", projectName, err)
return false
}
if len(jobs) == 0 {
e.logger.Warn("No jobs found for project [%s] namespace [%s]", projectName, namespaceName)
return true
}
if err := e.writeJobs(projectName, namespaceName, jobs); err != nil {
e.logger.Error(err.Error())
return false
}
return true
}

func (e *exportCommand) downloadSpecificJob(projectName, namespaceName, jobName string) bool {
e.logger.Info("Fetching job [%s] from project [%s] namespace [%s]", jobName, projectName, namespaceName)
job, err := e.fetchSpecificJob(projectName, namespaceName, jobName)
if err != nil {
e.logger.Error("error is encountered when fetching job specs for project [%s]: %s", projectName, err)
return false
}

if err := e.writeJobs(projectName, namespaceName, []*model.JobSpec{job}); err != nil {
e.logger.Error(err.Error())
return false
}
return true
}

func (e *exportCommand) writeJobs(projectName, namespaceName string, jobs []*model.JobSpec) error {
e.logger.Info("Writing %d jobs for project [%s] namespace [%s]", len(jobs), projectName, namespaceName)

var errMsgs []string
for _, spec := range jobs {
dirPath := path.Join(e.outputDirPath, projectName, namespaceName, "jobs", spec.Name)

e.logger.Info("Writing job to [%s]", dirPath)
if err := e.writer.Write(dirPath, spec); err != nil {
errMsgs = append(errMsgs, err.Error())
}
}
if len(errMsgs) > 0 {
return fmt.Errorf("encountered one or more errors when writing jobs:\n%s", strings.Join(errMsgs, "\n"))
}
return nil
}

func (e *exportCommand) fetchNamespaceJobsByProjectName(projectName string) (map[string][]*model.JobSpec, error) {
conn, err := connectivity.NewConnectivity(e.host, fetchJobTimeout)
if err != nil {
return nil, err
}
defer conn.Close()

jobSpecificationServiceClient := pb.NewJobSpecificationServiceClient(conn.GetConnection())

response, err := jobSpecificationServiceClient.GetJobSpecifications(conn.GetContext(), &pb.GetJobSpecificationsRequest{
ProjectName: projectName,
})
if err != nil {
return nil, err
}

namespaceJobsMap := make(map[string][]*model.JobSpec)
for _, jobProto := range response.JobSpecificationResponses {
namespaceJobsMap[jobProto.GetNamespaceName()] = append(namespaceJobsMap[jobProto.GetNamespaceName()], model.ToJobSpec(jobProto.Job))
}
return namespaceJobsMap, nil
}

func (e *exportCommand) fetchJobsByProjectAndNamespaceName(projectName, namespaceName string) ([]*model.JobSpec, error) {
conn, err := connectivity.NewConnectivity(e.host, fetchJobTimeout)
if err != nil {
return nil, err
}
defer conn.Close()

jobSpecificationServiceClient := pb.NewJobSpecificationServiceClient(conn.GetConnection())

response, err := jobSpecificationServiceClient.GetJobSpecifications(conn.GetContext(), &pb.GetJobSpecificationsRequest{
ProjectName: projectName,
NamespaceName: namespaceName,
})
if err != nil {
return nil, err
}

jobs := make([]*model.JobSpec, len(response.JobSpecificationResponses))
for i, jobProto := range response.JobSpecificationResponses {
jobs[i] = model.ToJobSpec(jobProto.Job)
}
return jobs, nil
}

func (e *exportCommand) fetchSpecificJob(projectName, namespaceName, jobName string) (*model.JobSpec, error) {
conn, err := connectivity.NewConnectivity(e.host, fetchJobTimeout)
if err != nil {
return nil, err
}
defer conn.Close()

jobSpecificationServiceClient := pb.NewJobSpecificationServiceClient(conn.GetConnection())

response, err := jobSpecificationServiceClient.GetJobSpecifications(conn.GetContext(), &pb.GetJobSpecificationsRequest{
ProjectName: projectName,
NamespaceName: namespaceName,
JobName: jobName,
})
if err != nil {
return nil, err
}

if len(response.JobSpecificationResponses) == 0 {
return nil, errors.New("job is not found")
}
return model.ToJobSpec(response.JobSpecificationResponses[0].Job), nil
}

func (e *exportCommand) fetchProjectNames() ([]string, error) {
conn, err := connectivity.NewConnectivity(e.host, fetchTenantTimeout)
if err != nil {
return nil, err
}
defer conn.Close()

projectServiceClient := pb.NewProjectServiceClient(conn.GetConnection())

response, err := projectServiceClient.ListProjects(conn.GetContext(), &pb.ListProjectsRequest{})
if err != nil {
return nil, err
}

output := make([]string, len(response.Projects))
for i, p := range response.Projects {
output[i] = p.GetName()
}
return output, nil
}

func (e *exportCommand) validate() error {
if e.host == "" {
return errors.New("host is not specified in both config file and flag argument")
}
if e.namespaceName != "" && e.projectName == "" {
return errors.New("project name has to be specified since namespace name is specified")
}
if e.jobName != "" && (e.projectName == "" || e.namespaceName == "") {
return errors.New("project name and namespace name have to be specified since job name is specified")
}
return nil
}
1 change: 1 addition & 0 deletions client/cmd/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func NewJobCommand() *cobra.Command {
NewValidateCommand(),
NewJobRunInputCommand(),
NewInspectCommand(),
NewExportCommand(),
)
return cmd
}
Expand Down
16 changes: 11 additions & 5 deletions client/cmd/resource/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,16 @@ import (
)

type createCommand struct {
logger log.Logger
clientConfig *config.ClientConfig
logger log.Logger
configFilePath string

namespaceSurvey *survey.NamespaceSurvey
}

// NewCreateCommand initializes resource create command
func NewCreateCommand(clientConfig *config.ClientConfig) *cobra.Command {
func NewCreateCommand() *cobra.Command {
l := logger.NewClientLogger()
create := &createCommand{
clientConfig: clientConfig,
logger: l,
namespaceSurvey: survey.NewNamespaceSurvey(l),
}
Expand All @@ -37,11 +36,18 @@ func NewCreateCommand(clientConfig *config.ClientConfig) *cobra.Command {
Example: "optimus resource create",
RunE: create.RunE,
}
cmd.Flags().StringVarP(&create.configFilePath, "config", "c", create.configFilePath, "File path for client configuration")
cmd.MarkFlagRequired("config")
return cmd
}

func (c createCommand) RunE(_ *cobra.Command, _ []string) error {
selectedNamespace, err := c.namespaceSurvey.AskToSelectNamespace(c.clientConfig)
cfg, err := config.LoadClientConfig(c.configFilePath)
if err != nil {
return err
}

selectedNamespace, err := c.namespaceSurvey.AskToSelectNamespace(cfg)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 3e6f584

Please sign in to comment.