Skip to content

Commit

Permalink
feat: async job deployment (#350)
Browse files Browse the repository at this point in the history
* placeholder: async deployment

* placeholder: job service deploy method

* refactor: move get namespace spec inside deploy method on job service

* feat: get modified job specs by comparing 2 sources

* feat: add resolve dependency and deploy request to deploy in job service and modify deploy observer

* feat: poll job deployment

* feat: job create returning job spec object + check diff based on job spec value

* test: fix test on job_spec handler

* test: fix test and add test cases on deployer

* test: fix and unit test on job service test

* fix: remove unused code and fix lint

* test: add test for bulk create

* test: add test for get by name

* test: add test for bulk delete

* test: add test for deploy

* refactor: logger on deploy

* test: fix job_spec deploy unit tests

* refactor: remove unnecessary code and fix lint

* feat: add poll deployment response

* fix: linter

* feat: hash comparison between jobspecs + fix log observers

* feat: add logs for job creation + modification and add colors

* feat: capability to handle multinamespace with 1 deployID

* feat: soft error on bulk create and bulk delete

* fix: handling deployment resp when partial namespace deploy fail

* fix: wrap deploy error with namespace

* fix: remove time for submitting deployment request

* refactor: observers send log

* refactor: move ignore resource & job deploy inside their functions

* refactor: job deployment responses

* refactor: rename to JobDeleteEvent, JobCreateEvent and JobModifyEvent

* fix: linter & test

* refactor: make bulk delete and create private

* refactor: return obj jobSpec on Create

* refactor: poll job deployment

* fix: job deploys

Co-authored-by: Arinda Arif <arindaarif05@gmail.com>
  • Loading branch information
deryrahman and arinda-arif authored Jun 14, 2022
1 parent 36a5111 commit 609b62d
Show file tree
Hide file tree
Showing 15 changed files with 2,091 additions and 756 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ NAME = "github.com/odpf/optimus"
LAST_COMMIT := $(shell git rev-parse --short HEAD)
LAST_TAG := "$(shell git rev-list --tags --max-count=1)"
OPMS_VERSION := "$(shell git describe --tags ${LAST_TAG})-next"
PROTON_COMMIT := "4cd69522575a01a496bc5babb95d42f38cc8c2cd"
PROTON_COMMIT := "1b939a3a59d80f627baff045c9457e681e5bbfec"

.PHONY: build test test-ci generate-proto unit-test-ci smoke-test integration-test vet coverage clean install lint

Expand Down
110 changes: 32 additions & 78 deletions api/handler/v1beta1/job_spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"io"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -38,8 +37,12 @@ type JobSpecServiceServer struct {
}

func (sv *JobSpecServiceServer) DeployJobSpecification(stream pb.JobSpecificationService_DeployJobSpecificationServer) error {
startTime := time.Now()
errNamespaces := []string{}
observers := sv.newObserverChain()
observers.Join(&jobDeploymentObserver{
stream: stream,
log: sv.l,
mu: new(sync.Mutex),
})

for {
req, err := stream.Recv()
Expand All @@ -50,78 +53,33 @@ func (sv *JobSpecServiceServer) DeployJobSpecification(stream pb.JobSpecificatio
stream.Send(&pb.DeployJobSpecificationResponse{
Success: false,
Ack: true,
Message: err.Error(),
})
return err // immediate error returned (grpc error level)
}
namespaceSpec, err := sv.namespaceService.Get(stream.Context(), req.GetProjectName(), req.GetNamespaceName())
if err != nil {
stream.Send(&pb.DeployJobSpecificationResponse{
Success: false,
Ack: true,
Message: err.Error(),
})
errNamespaces = append(errNamespaces, req.NamespaceName)
continue
}

jobsToKeep, err := sv.getJobsToKeep(stream.Context(), namespaceSpec, req)
jobSpecs := sv.convertProtoToJobSpec(req.GetJobs())

// Deploying only the modified jobs
deployID, err := sv.jobSvc.Deploy(stream.Context(), req.GetProjectName(), req.GetNamespaceName(), jobSpecs, observers)
if err != nil {
stream.Send(&pb.DeployJobSpecificationResponse{
Success: false,
Ack: true,
Message: err.Error(),
})
errNamespaces = append(errNamespaces, req.NamespaceName)
err = fmt.Errorf("error while deploying namespace %s: %w", req.NamespaceName, err)
observers.Notify(&models.ProgressJobDeploymentRequestCreated{Err: err})
sv.l.Warn(fmt.Sprintf("there's error while deploying namespaces: [%s]", req.NamespaceName))
continue
}

observers := new(progress.ObserverChain)
observers.Join(sv.progressObserver)
observers.Join(&jobSyncObserver{
stream: stream,
log: sv.l,
mu: new(sync.Mutex),
})

// delete specs not sent for deployment from internal repository
if err := sv.jobSvc.KeepOnly(stream.Context(), namespaceSpec, jobsToKeep, observers); err != nil {
stream.Send(&pb.DeployJobSpecificationResponse{
Success: false,
Ack: true,
Message: fmt.Sprintf("failed to delete jobs: \n%s", err.Error()),
})
errNamespaces = append(errNamespaces, req.NamespaceName)
continue
}
if err := sv.jobSvc.Sync(stream.Context(), namespaceSpec, observers); err != nil {
stream.Send(&pb.DeployJobSpecificationResponse{
Success: false,
Ack: true,
Message: fmt.Sprintf("failed to sync jobs: \n%s", err.Error()),
})
errNamespaces = append(errNamespaces, req.NamespaceName)
continue
}
runtimeDeployJobSpecificationCounter.Add(float64(len(req.Jobs)))
stream.Send(&pb.DeployJobSpecificationResponse{
Success: true,
Ack: true,
Message: fmt.Sprintf("jobs with namespace [%s] are deployed successfully", req.NamespaceName),
})
}
sv.l.Info("finished job deployment", "time", time.Since(startTime))
if len(errNamespaces) > 0 {
sv.l.Warn(fmt.Sprintf("there's error while deploying namespaces: [%s]", strings.Join(errNamespaces, ", ")))
return fmt.Errorf("error when deploying: [%s]", strings.Join(errNamespaces, ", "))
sv.l.Info(fmt.Sprintf("deployID %s holds deployment for namespace %s\n", deployID.UUID().String(), req.NamespaceName))
observers.Notify(&models.ProgressJobDeploymentRequestCreated{DeployID: deployID})
}

sv.l.Info("job deployment is successfully submitted")

return nil
}

func (sv *JobSpecServiceServer) getJobsToKeep(ctx context.Context, namespaceSpec models.NamespaceSpec, req *pb.DeployJobSpecificationRequest) ([]models.JobSpec, error) {
jobs := req.GetJobs()
func (sv *JobSpecServiceServer) convertProtoToJobSpec(jobs []*pb.JobSpecification) []models.JobSpec {
if len(jobs) == 0 {
return []models.JobSpec{}, nil
return []models.JobSpec{}
}

var jobsToKeep []models.JobSpec
Expand All @@ -131,20 +89,10 @@ func (sv *JobSpecServiceServer) getJobsToKeep(ctx context.Context, namespaceSpec
sv.l.Error(fmt.Sprintf("%s: cannot adapt job %s", err.Error(), reqJob.GetName()))
continue
}

err = sv.jobSvc.Create(ctx, namespaceSpec, adaptJob)
if err != nil {
sv.l.Error(fmt.Sprintf("%s: failed to save %s", err.Error(), adaptJob.Name))
continue
}
jobsToKeep = append(jobsToKeep, adaptJob)
}

if jobsToKeep == nil {
return nil, errors.New("job spec creation is failed")
}

return jobsToKeep, nil
return jobsToKeep
}

func (sv *JobSpecServiceServer) ListJobSpecification(ctx context.Context, req *pb.ListJobSpecificationRequest) (*pb.ListJobSpecificationResponse, error) {
Expand Down Expand Up @@ -193,8 +141,7 @@ func (sv *JobSpecServiceServer) CheckJobSpecifications(req *pb.CheckJobSpecifica
return mapToGRPCErr(sv.l, err, "unable to get namespace")
}

observers := new(progress.ObserverChain)
observers.Join(sv.progressObserver)
observers := sv.newObserverChain()
observers.Join(&jobCheckObserver{
stream: respStream,
log: sv.l,
Expand Down Expand Up @@ -232,7 +179,7 @@ func (sv *JobSpecServiceServer) CreateJobSpecification(ctx context.Context, req
return nil, status.Errorf(codes.Internal, "spec validation failed\n%s", err.Error())
}

err = sv.jobSvc.Create(ctx, namespaceSpec, jobSpec)
_, err = sv.jobSvc.Create(ctx, namespaceSpec, jobSpec)
if err != nil {
return nil, status.Errorf(codes.Internal, "%s: failed to save job %s", err.Error(), jobSpec.Name)
}
Expand Down Expand Up @@ -290,8 +237,7 @@ func (sv *JobSpecServiceServer) DeleteJobSpecification(ctx context.Context, req
func (sv *JobSpecServiceServer) RefreshJobs(req *pb.RefreshJobsRequest, respStream pb.JobSpecificationService_RefreshJobsServer) error {
startTime := time.Now()

observers := new(progress.ObserverChain)
observers.Join(sv.progressObserver)
observers := sv.newObserverChain()
observers.Join(&jobRefreshObserver{
stream: respStream,
log: sv.l,
Expand Down Expand Up @@ -343,6 +289,14 @@ func (sv *JobSpecServiceServer) GetDeployJobsStatus(ctx context.Context, req *pb
}
}

func (sv *JobSpecServiceServer) newObserverChain() *progress.ObserverChain {
observers := new(progress.ObserverChain)
if sv.progressObserver != nil {
observers.Join(sv.progressObserver)
}
return observers
}

func NewJobSpecServiceServer(l log.Logger, jobService models.JobService, pluginRepo models.PluginRepository,
projectService service.ProjectService, namespaceService service.NamespaceService, progressObserver progress.Observer) *JobSpecServiceServer {
return &JobSpecServiceServer{
Expand Down
Loading

0 comments on commit 609b62d

Please sign in to comment.