diff --git a/core/job/job.go b/core/job/job.go index b284a493e1..c29cc5142e 100644 --- a/core/job/job.go +++ b/core/job/job.go @@ -1,6 +1,7 @@ package job import ( + "fmt" "strings" "github.com/odpf/optimus/core/tenant" @@ -44,6 +45,9 @@ func (j Job) FullName() string { func (j Job) GetJobWithUnresolvedUpstream() (*WithUpstream, error) { unresolvedStaticUpstreams, err := j.getStaticUpstreamsToResolve() + if err != nil { + err = errors.InvalidArgument(EntityJob, fmt.Sprintf("failed to get static upstreams to resolve for job %s", j.GetName())) + } unresolvedInferredUpstreams := j.getInferredUpstreamsToResolve() allUpstreams := unresolvedStaticUpstreams allUpstreams = append(allUpstreams, unresolvedInferredUpstreams...) @@ -225,10 +229,10 @@ func (w WithUpstreams) GetSubjectJobNames() []Name { return names } -func (w WithUpstreams) MergeWithResolvedUpstreams(resolvedUpstreamMap map[Name][]*Upstream) []*WithUpstream { +func (w WithUpstreams) MergeWithResolvedUpstreams(resolvedUpstreamsBySubjectJobMap map[Name][]*Upstream) []*WithUpstream { var jobsWithMergedUpstream []*WithUpstream for _, jobWithUnresolvedUpstream := range w { - resolvedUpstreams := resolvedUpstreamMap[jobWithUnresolvedUpstream.Name()] + resolvedUpstreams := resolvedUpstreamsBySubjectJobMap[jobWithUnresolvedUpstream.Name()] resolvedUpstreamMapByFullName := Upstreams(resolvedUpstreams).ToFullNameAndUpstreamMap() resolvedUpstreamMapByDestination := Upstreams(resolvedUpstreams).ToResourceDestinationAndUpstreamMap() @@ -244,7 +248,8 @@ func (w WithUpstreams) MergeWithResolvedUpstreams(resolvedUpstreamMap map[Name][ } mergedUpstream = append(mergedUpstream, unresolvedUpstream) } - jobsWithMergedUpstream = append(jobsWithMergedUpstream, NewWithUpstream(jobWithUnresolvedUpstream.Job(), mergedUpstream)) + distinctMergedUpstream := Upstreams(mergedUpstream).Deduplicate() + jobsWithMergedUpstream = append(jobsWithMergedUpstream, NewWithUpstream(jobWithUnresolvedUpstream.Job(), distinctMergedUpstream)) } return jobsWithMergedUpstream } @@ -371,6 +376,44 @@ func (u Upstreams) ToResourceDestinationAndUpstreamMap() map[string]*Upstream { return resourceDestinationUpstreamMap } +func (u Upstreams) Deduplicate() []*Upstream { + resolvedUpstreamMap := make(map[string]*Upstream) + unresolvedStaticUpstreamMap := make(map[string]*Upstream) + unresolvedInferredUpstreamMap := make(map[string]*Upstream) + + for _, upstream := range u { + if upstream.state == UpstreamStateUnresolved && upstream._type == UpstreamTypeStatic { + unresolvedStaticUpstreamMap[upstream.FullName()] = upstream + continue + } + + if upstream.state == UpstreamStateUnresolved && upstream._type == UpstreamTypeInferred { + unresolvedInferredUpstreamMap[upstream.resource.String()] = upstream + continue + } + + if upstreamInMap, ok := resolvedUpstreamMap[upstream.FullName()]; ok { + // keep static upstreams in the map if exists + if upstreamInMap._type == UpstreamTypeStatic { + continue + } + } + resolvedUpstreamMap[upstream.FullName()] = upstream + } + + return mapsToUpstreams(resolvedUpstreamMap, unresolvedInferredUpstreamMap, unresolvedStaticUpstreamMap) +} + +func mapsToUpstreams(upstreamsMaps ...map[string]*Upstream) []*Upstream { + var result []*Upstream + for _, upstreamsMap := range upstreamsMaps { + for _, upstream := range upstreamsMap { + result = append(result, upstream) + } + } + return result +} + type FullName string func FullNameFrom(projectName tenant.ProjectName, jobName Name) FullName { diff --git a/core/job/job_test.go b/core/job/job_test.go index c3af24d74e..b17d354840 100644 --- a/core/job/job_test.go +++ b/core/job/job_test.go @@ -194,6 +194,78 @@ func TestEntityJob(t *testing.T) { }) }) + t.Run("Deduplicate", func(t *testing.T) { + t.Run("should return upstreams with static being prioritized if duplication is found", func(t *testing.T) { + upstreamResolved1Inferred := job.NewUpstreamResolved("job-a", "host-sample", "project.dataset.sample-a", sampleTenant, job.UpstreamTypeInferred, "", false) + upstreamResolved1Static := job.NewUpstreamResolved("job-a", "host-sample", "project.dataset.sample-a", sampleTenant, job.UpstreamTypeStatic, "", false) + upstreamResolved2 := job.NewUpstreamResolved("job-b", "host-sample", "project.dataset.sample-b", sampleTenant, job.UpstreamTypeInferred, "", false) + upstreamUnresolved1 := job.NewUpstreamUnresolvedStatic("job-c", sampleTenant.ProjectName()) + upstreamUnresolved2 := job.NewUpstreamUnresolvedInferred("project.dataset.sample-d") + upstreamUnresolved3 := job.NewUpstreamUnresolvedStatic("job-c", sampleTenant.ProjectName()) + upstreamUnresolved4 := job.NewUpstreamUnresolvedInferred("project.dataset.sample-d") + + expected := []*job.Upstream{ + upstreamResolved1Static, + upstreamResolved2, + upstreamUnresolved1, + upstreamUnresolved2, + } + + upstreams := job.Upstreams([]*job.Upstream{ + upstreamResolved1Inferred, + upstreamResolved1Static, + upstreamResolved2, + upstreamUnresolved1, + upstreamUnresolved2, + upstreamUnresolved3, + upstreamUnresolved4, + }) + result := upstreams.Deduplicate() + + assert.ElementsMatch(t, expected, result) + }) + t.Run("should successfully return distinct upstreams when only resolved upstream is present", func(t *testing.T) { + upstreamResolved1Inferred := job.NewUpstreamResolved("job-a", "host-sample", "project.dataset.sample-a", sampleTenant, job.UpstreamTypeInferred, "", false) + upstreamResolved1Static := job.NewUpstreamResolved("job-a", "host-sample", "project.dataset.sample-a", sampleTenant, job.UpstreamTypeStatic, "", false) + upstreamResolved2 := job.NewUpstreamResolved("job-b", "host-sample", "project.dataset.sample-b", sampleTenant, job.UpstreamTypeInferred, "", false) + + expected := []*job.Upstream{ + upstreamResolved1Static, + upstreamResolved2, + } + + upstreams := job.Upstreams([]*job.Upstream{ + upstreamResolved1Inferred, + upstreamResolved1Static, + upstreamResolved2, + }) + result := upstreams.Deduplicate() + + assert.ElementsMatch(t, expected, result) + }) + t.Run("should successfully return distinct upstreams when only unresolved upstream is present", func(t *testing.T) { + upstreamUnresolved1 := job.NewUpstreamUnresolvedStatic("job-c", sampleTenant.ProjectName()) + upstreamUnresolved2 := job.NewUpstreamUnresolvedInferred("project.dataset.sample-d") + upstreamUnresolved3 := job.NewUpstreamUnresolvedStatic("job-c", sampleTenant.ProjectName()) + upstreamUnresolved4 := job.NewUpstreamUnresolvedInferred("project.dataset.sample-d") + + expected := []*job.Upstream{ + upstreamUnresolved1, + upstreamUnresolved2, + } + + upstreams := job.Upstreams([]*job.Upstream{ + upstreamUnresolved1, + upstreamUnresolved2, + upstreamUnresolved3, + upstreamUnresolved4, + }) + result := upstreams.Deduplicate() + + assert.ElementsMatch(t, expected, result) + }) + }) + t.Run("FullNameFrom", func(t *testing.T) { t.Run("should return the job full name given project and job name", func(t *testing.T) { fullName := job.FullNameFrom(project.Name(), specA.Name()) @@ -273,31 +345,36 @@ func TestEntityJob(t *testing.T) { }) }) t.Run("MergeWithResolvedUpstreams", func(t *testing.T) { - upstreamCUnresolved := job.NewUpstreamUnresolvedStatic("job-C", project.Name()) - upstreamDUnresolved := job.NewUpstreamUnresolvedInferred("project.dataset.sample-d") + upstreamCUnresolved := job.NewUpstreamUnresolvedInferred("project.dataset.sample-c") + upstreamDUnresolvedInferred := job.NewUpstreamUnresolvedInferred("project.dataset.sample-d") + upstreamDUnresolvedStatic := job.NewUpstreamUnresolvedStatic("job-D", project.Name()) upstreamEUnresolved := job.NewUpstreamUnresolvedStatic("job-E", project.Name()) upstreamFUnresolved := job.NewUpstreamUnresolvedInferred("project.dataset.sample-f") - upstreamCResolved := job.NewUpstreamResolved("job-C", "host-sample", "project.dataset.sample-c", sampleTenant, job.UpstreamTypeStatic, "bq2bq", false) - upstreamDResolved := job.NewUpstreamResolved("job-D", "host-sample", "project.dataset.sample-d", sampleTenant, job.UpstreamTypeInferred, "bq2bq", false) + upstreamCResolved := job.NewUpstreamResolved("job-C", "host-sample", "project.dataset.sample-c", sampleTenant, job.UpstreamTypeInferred, "bq2bq", false) + upstreamDResolvedStatic := job.NewUpstreamResolved("job-D", "host-sample", "project.dataset.sample-d", sampleTenant, job.UpstreamTypeStatic, "bq2bq", false) + upstreamDResolvedInferred := job.NewUpstreamResolved("job-D", "host-sample", "project.dataset.sample-d", sampleTenant, job.UpstreamTypeInferred, "bq2bq", false) resolvedUpstreamMap := map[job.Name][]*job.Upstream{ - "job-A": {upstreamCResolved, upstreamDResolved}, - "job-B": {upstreamDResolved}, + "job-A": {upstreamCResolved, upstreamDResolvedInferred}, + "job-B": {upstreamDResolvedStatic}, } expected := []*job.WithUpstream{ - job.NewWithUpstream(jobA, []*job.Upstream{upstreamCResolved, upstreamDResolved, upstreamEUnresolved}), - job.NewWithUpstream(jobB, []*job.Upstream{upstreamDResolved, upstreamEUnresolved, upstreamFUnresolved}), + job.NewWithUpstream(jobA, []*job.Upstream{upstreamCResolved, upstreamDResolvedInferred, upstreamEUnresolved}), + job.NewWithUpstream(jobB, []*job.Upstream{upstreamDResolvedStatic, upstreamEUnresolved, upstreamFUnresolved}), } jobsWithUnresolvedUpstream := []*job.WithUpstream{ - job.NewWithUpstream(jobA, []*job.Upstream{upstreamCUnresolved, upstreamDUnresolved, upstreamEUnresolved}), - job.NewWithUpstream(jobB, []*job.Upstream{upstreamDUnresolved, upstreamEUnresolved, upstreamFUnresolved}), + job.NewWithUpstream(jobA, []*job.Upstream{upstreamCUnresolved, upstreamDUnresolvedInferred, upstreamEUnresolved}), + job.NewWithUpstream(jobB, []*job.Upstream{upstreamDUnresolvedStatic, upstreamDUnresolvedInferred, upstreamEUnresolved, upstreamFUnresolved}), } result := job.WithUpstreams(jobsWithUnresolvedUpstream).MergeWithResolvedUpstreams(resolvedUpstreamMap) - assert.EqualValues(t, expected, result) + assert.Equal(t, expected[0].Job(), result[0].Job()) + assert.Equal(t, expected[1].Job(), result[1].Job()) + assert.ElementsMatch(t, expected[0].Upstreams(), result[0].Upstreams()) + assert.ElementsMatch(t, expected[1].Upstreams(), result[1].Upstreams()) }) }) @@ -333,7 +410,7 @@ func TestEntityJob(t *testing.T) { jobA := job.NewJob(sampleTenant, specA, jobADestination, jobASources) jobWithUpstream, err := jobA.GetJobWithUnresolvedUpstream() - assert.ErrorContains(t, err, "invalid argument for entity job: name is empty") + assert.ErrorContains(t, err, "invalid argument for entity job: failed to get static upstreams to resolve") assert.Len(t, jobWithUpstream.Upstreams(), 1) }) t.Run("should contains error when get static upstream failed because of project name empty", func(t *testing.T) { @@ -343,7 +420,7 @@ func TestEntityJob(t *testing.T) { jobA := job.NewJob(sampleTenant, specA, jobADestination, jobASources) jobWithUpstream, err := jobA.GetJobWithUnresolvedUpstream() - assert.ErrorContains(t, err, "invalid argument for entity project: project name is empty") + assert.ErrorContains(t, err, "invalid argument for entity job: failed to get static upstreams to resolve") assert.Len(t, jobWithUpstream.Upstreams(), 1) }) t.Run("should get unresolved upstream", func(t *testing.T) { diff --git a/core/job/resolver/external_upstream_resolver.go b/core/job/resolver/external_upstream_resolver.go index 0355b6a4bd..ac7f8376d7 100644 --- a/core/job/resolver/external_upstream_resolver.go +++ b/core/job/resolver/external_upstream_resolver.go @@ -42,9 +42,10 @@ type ResourceManager interface { } func (e *extUpstreamResolver) Resolve(ctx context.Context, jobWithUpstream *job.WithUpstream, lw writer.LogWriter) (*job.WithUpstream, error) { - me := errors.NewMultiError(fmt.Sprintf("external upstream resolution errors for job %s", jobWithUpstream.Name().String())) + me := errors.NewMultiError(fmt.Sprintf("[%s] external upstream resolution errors for job %s", jobWithUpstream.Job().Tenant().NamespaceName().String(), jobWithUpstream.Name().String())) mergedUpstreams := jobWithUpstream.GetResolvedUpstreams() unresolvedUpstreams := jobWithUpstream.GetUnresolvedUpstreams() + resolvedExternally := false for _, unresolvedUpstream := range unresolvedUpstreams { externalUpstream, err := e.fetchOptimusUpstreams(ctx, unresolvedUpstream) if err != nil || len(externalUpstream) == 0 { @@ -53,11 +54,13 @@ func (e *extUpstreamResolver) Resolve(ctx context.Context, jobWithUpstream *job. continue } mergedUpstreams = append(mergedUpstreams, externalUpstream...) + resolvedExternally = true } if len(me.Errors) > 0 { lw.Write(writer.LogLevelError, errors.MultiToError(me).Error()) - } else { - lw.Write(writer.LogLevelDebug, fmt.Sprintf("resolved job %s upstream from external", jobWithUpstream.Name().String())) + } + if resolvedExternally { + lw.Write(writer.LogLevelDebug, fmt.Sprintf("[%s] resolved job %s upstream from external", jobWithUpstream.Job().Tenant().NamespaceName().String(), jobWithUpstream.Name().String())) } return job.NewWithUpstream(jobWithUpstream.Job(), mergedUpstreams), errors.MultiToError(me) } diff --git a/core/job/resolver/internal_upstream_resolver.go b/core/job/resolver/internal_upstream_resolver.go index 427f0a11dc..23210de7b3 100644 --- a/core/job/resolver/internal_upstream_resolver.go +++ b/core/job/resolver/internal_upstream_resolver.go @@ -31,8 +31,9 @@ func (i internalUpstreamResolver) Resolve(ctx context.Context, jobWithUnresolved } internalUpstream := mergeUpstreams(internalUpstreamInferred, internalUpstreamStatic) - fullNameUpstreamMap := job.Upstreams(internalUpstream).ToFullNameAndUpstreamMap() - resourceDestinationUpstreamMap := job.Upstreams(internalUpstream).ToResourceDestinationAndUpstreamMap() + distinctInternalUpstream := job.Upstreams(internalUpstream).Deduplicate() + fullNameUpstreamMap := job.Upstreams(distinctInternalUpstream).ToFullNameAndUpstreamMap() + resourceDestinationUpstreamMap := job.Upstreams(distinctInternalUpstream).ToResourceDestinationAndUpstreamMap() var upstreamResults []*job.Upstream for _, unresolvedUpstream := range jobWithUnresolvedUpstream.Upstreams() { @@ -47,7 +48,8 @@ func (i internalUpstreamResolver) Resolve(ctx context.Context, jobWithUnresolved upstreamResults = append(upstreamResults, unresolvedUpstream) } - return job.NewWithUpstream(jobWithUnresolvedUpstream.Job(), upstreamResults), errors.MultiToError(me) + distinctUpstreams := job.Upstreams(upstreamResults).Deduplicate() + return job.NewWithUpstream(jobWithUnresolvedUpstream.Job(), distinctUpstreams), errors.MultiToError(me) } func (i internalUpstreamResolver) BulkResolve(ctx context.Context, projectName tenant.ProjectName, jobsWithUnresolvedUpstream []*job.WithUpstream) ([]*job.WithUpstream, error) { diff --git a/core/job/resolver/internal_upstream_resolver_test.go b/core/job/resolver/internal_upstream_resolver_test.go index eec056f5a7..77850ca762 100644 --- a/core/job/resolver/internal_upstream_resolver_test.go +++ b/core/job/resolver/internal_upstream_resolver_test.go @@ -61,7 +61,33 @@ func TestInternalUpstreamResolver(t *testing.T) { internalUpstreamResolver := resolver.NewInternalUpstreamResolver(jobRepo) result, err := internalUpstreamResolver.Resolve(ctx, jobWithUnresolvedUpstream) assert.NoError(t, err) - assert.EqualValues(t, expectedJobWithUpstream.Upstreams(), result.Upstreams()) + assert.Equal(t, expectedJobWithUpstream.Job(), result.Job()) + assert.ElementsMatch(t, expectedJobWithUpstream.Upstreams(), result.Upstreams()) + }) + t.Run("resolves inferred and static upstream internally and prioritize static upstream when duplication found", func(t *testing.T) { + jobRepo := new(JobRepository) + logWriter := new(mockWriter) + defer logWriter.AssertExpectations(t) + + specD, _ := job.NewSpecBuilder(jobVersion, "job-D", "sample-owner", jobSchedule, jobWindow, jobTask).WithSpecUpstream(upstreamSpec).Build() + jobDDestination := job.ResourceURN("resource-D") + jobDSources := []job.ResourceURN{"resource-C"} + jobD := job.NewJob(sampleTenant, specD, jobDDestination, jobDSources) + + unresolvedUpstreamCInferred := job.NewUpstreamUnresolvedInferred("resource-C") + unresolvedUpstreamCStatic := job.NewUpstreamUnresolvedStatic("job-C", sampleTenant.ProjectName()) + internalUpstreamCStatic := job.NewUpstreamResolved("job-C", "", "resource-C", sampleTenant, "static", taskName, false) + + jobRepo.On("GetAllByResourceDestination", ctx, jobDSources[0]).Return([]*job.Job{jobC}, nil) + jobRepo.On("GetByJobName", ctx, sampleTenant.ProjectName(), specC.Name()).Return(jobC, nil) + + jobWithUnresolvedUpstream := job.NewWithUpstream(jobD, []*job.Upstream{unresolvedUpstreamCStatic, unresolvedUpstreamCInferred}) + expectedJobWithUpstream := job.NewWithUpstream(jobD, []*job.Upstream{internalUpstreamCStatic}) + + internalUpstreamResolver := resolver.NewInternalUpstreamResolver(jobRepo) + result, err := internalUpstreamResolver.Resolve(ctx, jobWithUnresolvedUpstream) + assert.NoError(t, err) + assert.EqualValues(t, expectedJobWithUpstream, result) }) t.Run("resolves inferred upstream internally", func(t *testing.T) { jobRepo := new(JobRepository) @@ -80,7 +106,7 @@ func TestInternalUpstreamResolver(t *testing.T) { internalUpstreamResolver := resolver.NewInternalUpstreamResolver(jobRepo) result, err := internalUpstreamResolver.Resolve(ctx, jobWithUnresolvedUpstream) assert.NoError(t, err) - assert.EqualValues(t, expectedJobWithUpstream.Upstreams(), result.Upstreams()) + assert.ElementsMatch(t, expectedJobWithUpstream.Upstreams(), result.Upstreams()) }) t.Run("resolves static upstream internally", func(t *testing.T) { jobRepo := new(JobRepository) @@ -99,7 +125,7 @@ func TestInternalUpstreamResolver(t *testing.T) { internalUpstreamResolver := resolver.NewInternalUpstreamResolver(jobRepo) result, err := internalUpstreamResolver.Resolve(ctx, jobWithUnresolvedUpstream) assert.NoError(t, err) - assert.EqualValues(t, expectedJobWithUpstream.Upstreams(), result.Upstreams()) + assert.ElementsMatch(t, expectedJobWithUpstream.Upstreams(), result.Upstreams()) }) t.Run("should not stop the process but keep appending error when unable to resolve inferred upstream", func(t *testing.T) { jobRepo := new(JobRepository) @@ -119,7 +145,7 @@ func TestInternalUpstreamResolver(t *testing.T) { internalUpstreamResolver := resolver.NewInternalUpstreamResolver(jobRepo) result, err := internalUpstreamResolver.Resolve(ctx, jobWithUnresolvedUpstream) assert.ErrorContains(t, err, "internal error") - assert.EqualValues(t, expectedJobWithUpstream.Upstreams(), result.Upstreams()) + assert.ElementsMatch(t, expectedJobWithUpstream.Upstreams(), result.Upstreams()) }) t.Run("should not stop the process but keep appending error when unable to resolve static upstream", func(t *testing.T) { jobRepo := new(JobRepository) @@ -144,7 +170,7 @@ func TestInternalUpstreamResolver(t *testing.T) { internalUpstreamResolver := resolver.NewInternalUpstreamResolver(jobRepo) result, err := internalUpstreamResolver.Resolve(ctx, jobWithUnresolvedUpstream) assert.ErrorContains(t, err, "not found") - assert.EqualValues(t, expectedJobWithUpstream.Upstreams(), result.Upstreams()) + assert.ElementsMatch(t, expectedJobWithUpstream.Upstreams(), result.Upstreams()) }) t.Run("should not stop the process but keep appending error when static upstream name is invalid", func(t *testing.T) { jobRepo := new(JobRepository) @@ -171,7 +197,7 @@ func TestInternalUpstreamResolver(t *testing.T) { internalUpstreamResolver := resolver.NewInternalUpstreamResolver(jobRepo) result, err := internalUpstreamResolver.Resolve(ctx, jobWithUnresolvedUpstream) assert.ErrorContains(t, err, "name is empty") - assert.EqualValues(t, expectedJobWithUpstream.Upstreams(), result.Upstreams()) + assert.ElementsMatch(t, expectedJobWithUpstream.Upstreams(), result.Upstreams()) }) }) t.Run("BulkResolve", func(t *testing.T) { @@ -206,7 +232,10 @@ func TestInternalUpstreamResolver(t *testing.T) { internalUpstreamResolver := resolver.NewInternalUpstreamResolver(jobRepo) result, err := internalUpstreamResolver.BulkResolve(ctx, sampleTenant.ProjectName(), jobsWithUnresolvedUpstream) assert.NoError(t, err) - assert.EqualValues(t, expectedJobsWithUpstream, result) + assert.Equal(t, expectedJobsWithUpstream[0].Job(), result[0].Job()) + assert.Equal(t, expectedJobsWithUpstream[1].Job(), result[1].Job()) + assert.ElementsMatch(t, expectedJobsWithUpstream[0].Upstreams(), result[0].Upstreams()) + assert.ElementsMatch(t, expectedJobsWithUpstream[1].Upstreams(), result[1].Upstreams()) }) t.Run("returns error if unable to resolve upstream internally", func(t *testing.T) { jobRepo := new(JobRepository) diff --git a/core/job/resolver/upstream_resolver.go b/core/job/resolver/upstream_resolver.go index 74d324cc65..f861f797ab 100644 --- a/core/job/resolver/upstream_resolver.go +++ b/core/job/resolver/upstream_resolver.go @@ -47,6 +47,8 @@ func (u UpstreamResolver) BulkResolve(ctx context.Context, projectName tenant.Pr jobsWithUnresolvedUpstream, err := job.Jobs(jobs).GetJobsWithUnresolvedUpstreams() if err != nil { + errorMsg := fmt.Sprintf("[%s] %s", jobs[0].Tenant().NamespaceName().String(), err.Error()) + logWriter.Write(writer.LogLevelError, errorMsg) me.Append(err) } @@ -54,9 +56,9 @@ func (u UpstreamResolver) BulkResolve(ctx context.Context, projectName tenant.Pr if err != nil { errorMsg := fmt.Sprintf("unable to resolve upstream: %s", err.Error()) logWriter.Write(writer.LogLevelError, errorMsg) - return nil, errors.NewError(errors.ErrInternalError, job.EntityJob, errorMsg) + me.Append(errors.NewError(errors.ErrInternalError, job.EntityJob, errorMsg)) + return nil, errors.MultiToError(me) } - me.Append(err) jobsWithResolvedExternalUpstreams, err := u.externalUpstreamResolver.BulkResolve(ctx, jobsWithResolvedInternalUpstreams, logWriter) me.Append(err) diff --git a/core/job/resolver/upstream_resolver_test.go b/core/job/resolver/upstream_resolver_test.go index eb245613ec..ac40cf33fc 100644 --- a/core/job/resolver/upstream_resolver_test.go +++ b/core/job/resolver/upstream_resolver_test.go @@ -313,7 +313,7 @@ func TestUpstreamResolver(t *testing.T) { upstreamResolver := resolver.NewUpstreamResolver(jobRepo, externalUpstreamResolver, internalUpstreamResolver) result, err := upstreamResolver.Resolve(ctx, jobA, logWriter) - assert.ErrorContains(t, err, "name is empty") + assert.ErrorContains(t, err, "failed to get static upstreams to resolve") assert.EqualValues(t, expectedUpstream, result) }) t.Run("should not break process but still return error if failed to resolve some static upstream internally", func(t *testing.T) { diff --git a/core/job/service/job_service.go b/core/job/service/job_service.go index 6c61b35621..1b1d13680c 100644 --- a/core/job/service/job_service.go +++ b/core/job/service/job_service.go @@ -457,7 +457,7 @@ func (j JobService) bulkDelete(ctx context.Context, jobTenant tenant.Tenant, toD if len(downstreamList) > 0 { downstreamFullNames := job.DownstreamList(downstreamList).GetDownstreamFullNames() errorMsg := fmt.Sprintf("deleting job %s failed. job is being used by %s", spec.Name().String(), downstreamFullNames.String()) - logWriter.Write(writer.LogLevelError, fmt.Sprintf("[%s] %s", jobTenant.NamespaceName().String(), spec.Name().String())) + logWriter.Write(writer.LogLevelError, fmt.Sprintf("[%s] %s", jobTenant.NamespaceName().String(), errorMsg)) me.Append(errors.NewError(errors.ErrFailedPrecond, job.EntityJob, errorMsg)) continue } diff --git a/core/job/spec.go b/core/job/spec.go index 8c24f13f9e..0a8f1b7a05 100644 --- a/core/job/spec.go +++ b/core/job/spec.go @@ -470,7 +470,7 @@ type Hook struct { func NewHook(name string, config Config) (*Hook, error) { if name == "" { - return nil, errors.InvalidArgument(EntityJob, "name is empty") + return nil, errors.InvalidArgument(EntityJob, "hook name is empty") } return &Hook{name: name, config: config}, nil } diff --git a/internal/store/postgres/job/job_repository.go b/internal/store/postgres/job/job_repository.go index 9170a20bc3..c313e0ab56 100644 --- a/internal/store/postgres/job/job_repository.go +++ b/internal/store/postgres/job/job_repository.go @@ -324,7 +324,7 @@ func (JobRepository) toUpstreams(storeUpstreams []JobWithUpstream) ([]*job.Upstr if storeUpstream.UpstreamProjectName.Valid && storeUpstream.UpstreamNamespaceName.Valid { upstreamTenant, err = tenant.NewTenant(storeUpstream.UpstreamProjectName.String, storeUpstream.UpstreamNamespaceName.String) if err != nil { - me.Append(err) + me.Append(errors.Wrap(job.EntityJob, fmt.Sprintf("failed to get tenant for upstream %s of job %s", storeUpstream.UpstreamJobName.String, storeUpstream.JobName), err)) continue } } @@ -333,7 +333,7 @@ func (JobRepository) toUpstreams(storeUpstreams []JobWithUpstream) ([]*job.Upstr if storeUpstream.UpstreamTaskName.Valid { taskName, err = job.TaskNameFrom(storeUpstream.UpstreamTaskName.String) if err != nil { - me.Append(err) + me.Append(errors.Wrap(job.EntityJob, fmt.Sprintf("failed to get task for upstream %s of job %s", storeUpstream.UpstreamJobName.String, storeUpstream.JobName), err)) continue } } @@ -359,7 +359,7 @@ func (JobRepository) toUpstreams(storeUpstreams []JobWithUpstream) ([]*job.Upstr if err := me.ToErr(); err != nil { return nil, err } - return upstreams, nil + return job.Upstreams(upstreams).Deduplicate(), nil } func (j JobRepository) GetByJobName(ctx context.Context, projectName tenant.ProjectName, jobName job.Name) (*job.Job, error) { diff --git a/internal/store/postgres/job/job_repository_test.go b/internal/store/postgres/job/job_repository_test.go index a9466c424a..afb7c082e4 100644 --- a/internal/store/postgres/job/job_repository_test.go +++ b/internal/store/postgres/job/job_repository_test.go @@ -556,7 +556,7 @@ func TestPostgresJobRepository(t *testing.T) { upstreams, err := jobRepo.ResolveUpstreams(ctx, proj.Name(), []job.Name{jobSpecA.Name()}) assert.NoError(t, err) - assert.EqualValues(t, expectedUpstreams, upstreams[jobSpecA.Name()]) + assert.ElementsMatch(t, expectedUpstreams, upstreams[jobSpecA.Name()]) }) t.Run("returns job with external project static and inferred upstreams", func(t *testing.T) { db := dbSetup() @@ -606,7 +606,39 @@ func TestPostgresJobRepository(t *testing.T) { upstreams, err := jobRepo.ResolveUpstreams(ctx, proj.Name(), []job.Name{jobSpecA.Name()}) assert.NoError(t, err) - assert.EqualValues(t, expectedUpstreams, upstreams[jobSpecA.Name()]) + assert.ElementsMatch(t, expectedUpstreams, upstreams[jobSpecA.Name()]) + }) + t.Run("returns job with static upstream if found duplicated upstream from static and inferred", func(t *testing.T) { + db := dbSetup() + + tenantDetails, err := tenant.NewTenantDetails(proj, namespace) + assert.NoError(t, err) + + upstreamName := job.SpecUpstreamNameFrom("test-proj/sample-job-B") + jobAUpstream, _ := job.NewSpecUpstreamBuilder().WithUpstreamNames([]job.SpecUpstreamName{upstreamName}).Build() + jobSpecA, _ := job.NewSpecBuilder(jobVersion, "sample-job-A", jobOwner, jobSchedule, jobWindow, jobTask). + WithDescription(jobDescription). + WithSpecUpstream(jobAUpstream). + Build() + jobA := job.NewJob(sampleTenant, jobSpecA, "dev.resource.sample_a", []job.ResourceURN{"dev.resource.sample_b"}) + + jobSpecB, err := job.NewSpecBuilder(jobVersion, "sample-job-B", jobOwner, jobSchedule, jobWindow, jobTask).WithDescription(jobDescription).Build() + assert.NoError(t, err) + jobB := job.NewJob(sampleTenant, jobSpecB, "dev.resource.sample_b", nil) + + jobRepo := postgres.NewJobRepository(db) + _, err = jobRepo.Add(ctx, []*job.Job{jobA, jobB}) + assert.NoError(t, err) + + upstreamB := job.NewUpstreamResolved(jobSpecB.Name(), "", jobB.Destination(), tenantDetails.ToTenant(), "static", taskName, false) + + expectedUpstreams := []*job.Upstream{ + upstreamB, + } + + upstreams, err := jobRepo.ResolveUpstreams(ctx, proj.Name(), []job.Name{jobSpecA.Name()}) + assert.NoError(t, err) + assert.ElementsMatch(t, expectedUpstreams, upstreams[jobSpecA.Name()]) }) })