From 79fddca0fcfbd09b39bee88f52b2a201b735aa12 Mon Sep 17 00:00:00 2001 From: Arinda Arif Date: Mon, 9 Jan 2023 16:22:17 +0700 Subject: [PATCH 01/12] fix: misleading job replace-all command example --- client/cmd/job/replace_all.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/cmd/job/replace_all.go b/client/cmd/job/replace_all.go index d2e2cd8e92..778b854b99 100644 --- a/client/cmd/job/replace_all.go +++ b/client/cmd/job/replace_all.go @@ -45,7 +45,7 @@ func NewReplaceAllCommand() *cobra.Command { Short: "Replace all current optimus project to server", Long: heredoc.Doc(`Apply local changes to destination server which includes creating/updating/deleting jobs`), - Example: "optimus job replace-all [--ignore-resources|--ignore-jobs]", + Example: "optimus job replace-all", Annotations: map[string]string{ "group:core": "true", }, From b151272561a8f93059141f7e717c33635bed20b5 Mon Sep 17 00:00:00 2001 From: Arinda Arif Date: Mon, 9 Jan 2023 16:26:14 +0700 Subject: [PATCH 02/12] fix: add namespace prefix in external upstream resolver message events --- core/job/resolver/external_upstream_resolver.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/job/resolver/external_upstream_resolver.go b/core/job/resolver/external_upstream_resolver.go index 0355b6a4bd..90f202e7a4 100644 --- a/core/job/resolver/external_upstream_resolver.go +++ b/core/job/resolver/external_upstream_resolver.go @@ -42,7 +42,7 @@ type ResourceManager interface { } func (e *extUpstreamResolver) Resolve(ctx context.Context, jobWithUpstream *job.WithUpstream, lw writer.LogWriter) (*job.WithUpstream, error) { - me := errors.NewMultiError(fmt.Sprintf("external upstream resolution errors for job %s", jobWithUpstream.Name().String())) + me := errors.NewMultiError(fmt.Sprintf("[%s] external upstream resolution errors for job %s", jobWithUpstream.Job().Tenant().NamespaceName().String(), jobWithUpstream.Name().String())) mergedUpstreams := jobWithUpstream.GetResolvedUpstreams() unresolvedUpstreams := jobWithUpstream.GetUnresolvedUpstreams() for _, unresolvedUpstream := range unresolvedUpstreams { @@ -57,7 +57,7 @@ func (e *extUpstreamResolver) Resolve(ctx context.Context, jobWithUpstream *job. if len(me.Errors) > 0 { lw.Write(writer.LogLevelError, errors.MultiToError(me).Error()) } else { - lw.Write(writer.LogLevelDebug, fmt.Sprintf("resolved job %s upstream from external", jobWithUpstream.Name().String())) + lw.Write(writer.LogLevelDebug, fmt.Sprintf("[%s] resolved job %s upstream from external", jobWithUpstream.Job().Tenant().NamespaceName().String(), jobWithUpstream.Name().String())) } return job.NewWithUpstream(jobWithUpstream.Job(), mergedUpstreams), errors.MultiToError(me) } From 4541c88a0043f6e4eab0d7f077be4e7e48e54a6f Mon Sep 17 00:00:00 2001 From: Arinda Arif Date: Fri, 13 Jan 2023 15:53:57 +0700 Subject: [PATCH 03/12] refactor: improve logging on job bounded context --- core/job/resolver/upstream_resolver.go | 12 ++++++++++-- core/job/service/job_service.go | 2 +- core/job/spec.go | 2 +- internal/store/postgres/job/job_repository.go | 4 ++-- 4 files changed, 14 insertions(+), 6 deletions(-) diff --git a/core/job/resolver/upstream_resolver.go b/core/job/resolver/upstream_resolver.go index 0c2485d6c3..faf6b13b8c 100644 --- a/core/job/resolver/upstream_resolver.go +++ b/core/job/resolver/upstream_resolver.go @@ -48,7 +48,11 @@ func (u UpstreamResolver) BulkResolve(ctx context.Context, projectName tenant.Pr var jobsWithUnresolvedUpstream []*job.WithUpstream for _, subjectJob := range jobs { jobWithUnresolvedUpstream, err := u.getJobWithUnresolvedUpstream(subjectJob) - me.Append(err) + if err != nil { + errorMsg := fmt.Sprintf("[%s] %s", subjectJob.Tenant().NamespaceName().String(), err.Error()) + logWriter.Write(writer.LogLevelError, errorMsg) + me.Append(err) + } jobsWithUnresolvedUpstream = append(jobsWithUnresolvedUpstream, jobWithUnresolvedUpstream) } @@ -56,7 +60,8 @@ func (u UpstreamResolver) BulkResolve(ctx context.Context, projectName tenant.Pr if err != nil { errorMsg := fmt.Sprintf("unable to resolve upstream: %s", err.Error()) logWriter.Write(writer.LogLevelError, errorMsg) - return nil, errors.NewError(errors.ErrInternalError, job.EntityJob, errorMsg) + me.Append(errors.NewError(errors.ErrInternalError, job.EntityJob, errorMsg)) + return nil, errors.MultiToError(me) } jobsWithResolvedExternalUpstreams, err := u.externalUpstreamResolver.BulkResolve(ctx, jobsWithResolvedInternalUpstreams, logWriter) @@ -84,6 +89,9 @@ func (u UpstreamResolver) Resolve(ctx context.Context, subjectJob *job.Job, logW func (u UpstreamResolver) getJobWithUnresolvedUpstream(subjectJob *job.Job) (*job.WithUpstream, error) { unresolvedStaticUpstreams, err := u.getStaticUpstreamsToResolve(subjectJob.StaticUpstreamNames(), subjectJob.ProjectName()) + if err != nil { + err = errors.InvalidArgument(job.EntityJob, fmt.Sprintf("failed to get static upstreams to resolve for job %s", subjectJob.GetName())) + } unresolvedInferredUpstreams := u.getInferredUpstreamsToResolve(subjectJob.Sources()) diff --git a/core/job/service/job_service.go b/core/job/service/job_service.go index 60dd198ccd..3dbe662e92 100644 --- a/core/job/service/job_service.go +++ b/core/job/service/job_service.go @@ -427,7 +427,7 @@ func (j JobService) bulkDelete(ctx context.Context, jobTenant tenant.Tenant, toD if len(downstreamList) > 0 { downstreamFullNames := job.DownstreamList(downstreamList).GetDownstreamFullNames() errorMsg := fmt.Sprintf("deleting job %s failed. job is being used by %s", spec.Name().String(), downstreamFullNames.String()) - logWriter.Write(writer.LogLevelError, fmt.Sprintf("[%s] %s", jobTenant.NamespaceName().String(), spec.Name().String())) + logWriter.Write(writer.LogLevelError, fmt.Sprintf("[%s] %s", jobTenant.NamespaceName().String(), errorMsg)) me.Append(errors.NewError(errors.ErrFailedPrecond, job.EntityJob, errorMsg)) continue } diff --git a/core/job/spec.go b/core/job/spec.go index 8c24f13f9e..0a8f1b7a05 100644 --- a/core/job/spec.go +++ b/core/job/spec.go @@ -470,7 +470,7 @@ type Hook struct { func NewHook(name string, config Config) (*Hook, error) { if name == "" { - return nil, errors.InvalidArgument(EntityJob, "name is empty") + return nil, errors.InvalidArgument(EntityJob, "hook name is empty") } return &Hook{name: name, config: config}, nil } diff --git a/internal/store/postgres/job/job_repository.go b/internal/store/postgres/job/job_repository.go index 9170a20bc3..3d6e5fad25 100644 --- a/internal/store/postgres/job/job_repository.go +++ b/internal/store/postgres/job/job_repository.go @@ -324,7 +324,7 @@ func (JobRepository) toUpstreams(storeUpstreams []JobWithUpstream) ([]*job.Upstr if storeUpstream.UpstreamProjectName.Valid && storeUpstream.UpstreamNamespaceName.Valid { upstreamTenant, err = tenant.NewTenant(storeUpstream.UpstreamProjectName.String, storeUpstream.UpstreamNamespaceName.String) if err != nil { - me.Append(err) + me.Append(errors.Wrap(job.EntityJob, fmt.Sprintf("failed to get tenant for upstream %s of job %s", storeUpstream.UpstreamJobName.String, storeUpstream.JobName), err)) continue } } @@ -333,7 +333,7 @@ func (JobRepository) toUpstreams(storeUpstreams []JobWithUpstream) ([]*job.Upstr if storeUpstream.UpstreamTaskName.Valid { taskName, err = job.TaskNameFrom(storeUpstream.UpstreamTaskName.String) if err != nil { - me.Append(err) + me.Append(errors.Wrap(job.EntityJob, fmt.Sprintf("failed to get task for upstream %s of job %s", storeUpstream.UpstreamJobName.String, storeUpstream.JobName), err)) continue } } From 55520afe89ef47504e641cd0c2ba92429d1906c2 Mon Sep 17 00:00:00 2001 From: Arinda Arif Date: Fri, 13 Jan 2023 15:57:23 +0700 Subject: [PATCH 04/12] fix: failing job bc test case --- core/job/resolver/upstream_resolver_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/job/resolver/upstream_resolver_test.go b/core/job/resolver/upstream_resolver_test.go index 86bf53adfe..8447aa5a7c 100644 --- a/core/job/resolver/upstream_resolver_test.go +++ b/core/job/resolver/upstream_resolver_test.go @@ -313,7 +313,7 @@ func TestUpstreamResolver(t *testing.T) { upstreamResolver := resolver.NewUpstreamResolver(jobRepo, externalUpstreamResolver, internalUpstreamResolver) result, err := upstreamResolver.Resolve(ctx, jobA, logWriter) - assert.ErrorContains(t, err, "name is empty") + assert.ErrorContains(t, err, "failed to get static upstreams to resolve") assert.EqualValues(t, expectedUpstream, result) }) t.Run("should not break process but still return error if failed to resolve some static upstream internally", func(t *testing.T) { From 4f62b151918fd302b7877058260de2658b7b687a Mon Sep 17 00:00:00 2001 From: Arinda Arif Date: Fri, 13 Jan 2023 16:14:45 +0700 Subject: [PATCH 05/12] fix: ambiguous log on external resolver --- core/job/resolver/external_upstream_resolver.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/core/job/resolver/external_upstream_resolver.go b/core/job/resolver/external_upstream_resolver.go index 90f202e7a4..ac7f8376d7 100644 --- a/core/job/resolver/external_upstream_resolver.go +++ b/core/job/resolver/external_upstream_resolver.go @@ -45,6 +45,7 @@ func (e *extUpstreamResolver) Resolve(ctx context.Context, jobWithUpstream *job. me := errors.NewMultiError(fmt.Sprintf("[%s] external upstream resolution errors for job %s", jobWithUpstream.Job().Tenant().NamespaceName().String(), jobWithUpstream.Name().String())) mergedUpstreams := jobWithUpstream.GetResolvedUpstreams() unresolvedUpstreams := jobWithUpstream.GetUnresolvedUpstreams() + resolvedExternally := false for _, unresolvedUpstream := range unresolvedUpstreams { externalUpstream, err := e.fetchOptimusUpstreams(ctx, unresolvedUpstream) if err != nil || len(externalUpstream) == 0 { @@ -53,10 +54,12 @@ func (e *extUpstreamResolver) Resolve(ctx context.Context, jobWithUpstream *job. continue } mergedUpstreams = append(mergedUpstreams, externalUpstream...) + resolvedExternally = true } if len(me.Errors) > 0 { lw.Write(writer.LogLevelError, errors.MultiToError(me).Error()) - } else { + } + if resolvedExternally { lw.Write(writer.LogLevelDebug, fmt.Sprintf("[%s] resolved job %s upstream from external", jobWithUpstream.Job().Tenant().NamespaceName().String(), jobWithUpstream.Name().String())) } return job.NewWithUpstream(jobWithUpstream.Job(), mergedUpstreams), errors.MultiToError(me) From 5a460ef1f87c5515c6b1fcd289517fb7811dde02 Mon Sep 17 00:00:00 2001 From: Arinda Arif Date: Mon, 16 Jan 2023 10:53:23 +0700 Subject: [PATCH 06/12] fix: duplicate inferred and static upstream issue --- core/job/job.go | 27 ++++++++++++++-- core/job/job_test.go | 22 +++++++------ internal/store/postgres/job/job_repository.go | 2 +- .../store/postgres/job/job_repository_test.go | 32 +++++++++++++++++++ 4 files changed, 69 insertions(+), 14 deletions(-) diff --git a/core/job/job.go b/core/job/job.go index 6364b8fb56..1a309ac76a 100644 --- a/core/job/job.go +++ b/core/job/job.go @@ -164,10 +164,10 @@ func (w WithUpstreams) GetSubjectJobNames() []Name { return names } -func (w WithUpstreams) MergeWithResolvedUpstreams(resolvedUpstreamMap map[Name][]*Upstream) []*WithUpstream { +func (w WithUpstreams) MergeWithResolvedUpstreams(resolvedUpstreamsBySubjectJobMap map[Name][]*Upstream) []*WithUpstream { var jobsWithMergedUpstream []*WithUpstream for _, jobWithUnresolvedUpstream := range w { - resolvedUpstreams := resolvedUpstreamMap[jobWithUnresolvedUpstream.Name()] + resolvedUpstreams := resolvedUpstreamsBySubjectJobMap[jobWithUnresolvedUpstream.Name()] resolvedUpstreamMapByFullName := Upstreams(resolvedUpstreams).ToFullNameAndUpstreamMap() resolvedUpstreamMapByDestination := Upstreams(resolvedUpstreams).ToResourceDestinationAndUpstreamMap() @@ -183,7 +183,8 @@ func (w WithUpstreams) MergeWithResolvedUpstreams(resolvedUpstreamMap map[Name][ } mergedUpstream = append(mergedUpstream, unresolvedUpstream) } - jobsWithMergedUpstream = append(jobsWithMergedUpstream, NewWithUpstream(jobWithUnresolvedUpstream.Job(), mergedUpstream)) + distinctMergedUpstream := Upstreams(mergedUpstream).Deduplicate() + jobsWithMergedUpstream = append(jobsWithMergedUpstream, NewWithUpstream(jobWithUnresolvedUpstream.Job(), distinctMergedUpstream)) } return jobsWithMergedUpstream } @@ -310,6 +311,26 @@ func (u Upstreams) ToResourceDestinationAndUpstreamMap() map[string]*Upstream { return resourceDestinationUpstreamMap } +func (u Upstreams) Deduplicate() []*Upstream { + distinctUpstreamsMap := make(map[string]*Upstream) + for _, upstream := range u { + if upstreamInMap, ok := distinctUpstreamsMap[upstream.FullName()]; ok { + // prioritize static upstreams + if upstreamInMap._type == UpstreamTypeStatic { + continue + } + } + distinctUpstreamsMap[upstream.FullName()] = upstream + } + + var distinctUpstreams []*Upstream + for _, upstream := range distinctUpstreamsMap { + distinctUpstreams = append(distinctUpstreams, upstream) + } + + return distinctUpstreams +} + type FullName string func FullNameFrom(projectName tenant.ProjectName, jobName Name) FullName { diff --git a/core/job/job_test.go b/core/job/job_test.go index 1e15a11443..30aee72cc7 100644 --- a/core/job/job_test.go +++ b/core/job/job_test.go @@ -273,27 +273,29 @@ func TestEntityJob(t *testing.T) { }) }) t.Run("MergeWithResolvedUpstreams", func(t *testing.T) { - upstreamCUnresolved := job.NewUpstreamUnresolvedStatic("job-C", project.Name()) - upstreamDUnresolved := job.NewUpstreamUnresolvedInferred("project.dataset.sample-d") + upstreamCUnresolved := job.NewUpstreamUnresolvedInferred("project.dataset.sample-c") + upstreamDUnresolvedInferred := job.NewUpstreamUnresolvedInferred("project.dataset.sample-d") + upstreamDUnresolvedStatic := job.NewUpstreamUnresolvedStatic("job-D", project.Name()) upstreamEUnresolved := job.NewUpstreamUnresolvedStatic("job-E", project.Name()) upstreamFUnresolved := job.NewUpstreamUnresolvedInferred("project.dataset.sample-f") - upstreamCResolved := job.NewUpstreamResolved("job-C", "host-sample", "project.dataset.sample-c", sampleTenant, job.UpstreamTypeStatic, "bq2bq", false) - upstreamDResolved := job.NewUpstreamResolved("job-D", "host-sample", "project.dataset.sample-d", sampleTenant, job.UpstreamTypeInferred, "bq2bq", false) + upstreamCResolved := job.NewUpstreamResolved("job-C", "host-sample", "project.dataset.sample-c", sampleTenant, job.UpstreamTypeInferred, "bq2bq", false) + upstreamDResolvedStatic := job.NewUpstreamResolved("job-D", "host-sample", "project.dataset.sample-d", sampleTenant, job.UpstreamTypeStatic, "bq2bq", false) + upstreamDResolvedInferred := job.NewUpstreamResolved("job-D", "host-sample", "project.dataset.sample-d", sampleTenant, job.UpstreamTypeInferred, "bq2bq", false) resolvedUpstreamMap := map[job.Name][]*job.Upstream{ - "job-A": {upstreamCResolved, upstreamDResolved}, - "job-B": {upstreamDResolved}, + "job-A": {upstreamCResolved, upstreamDResolvedInferred}, + "job-B": {upstreamDResolvedStatic}, } expected := []*job.WithUpstream{ - job.NewWithUpstream(jobA, []*job.Upstream{upstreamCResolved, upstreamDResolved, upstreamEUnresolved}), - job.NewWithUpstream(jobB, []*job.Upstream{upstreamDResolved, upstreamEUnresolved, upstreamFUnresolved}), + job.NewWithUpstream(jobA, []*job.Upstream{upstreamCResolved, upstreamDResolvedInferred, upstreamEUnresolved}), + job.NewWithUpstream(jobB, []*job.Upstream{upstreamDResolvedStatic, upstreamEUnresolved, upstreamFUnresolved}), } jobsWithUnresolvedUpstream := []*job.WithUpstream{ - job.NewWithUpstream(jobA, []*job.Upstream{upstreamCUnresolved, upstreamDUnresolved, upstreamEUnresolved}), - job.NewWithUpstream(jobB, []*job.Upstream{upstreamDUnresolved, upstreamEUnresolved, upstreamFUnresolved}), + job.NewWithUpstream(jobA, []*job.Upstream{upstreamCUnresolved, upstreamDUnresolvedInferred, upstreamEUnresolved}), + job.NewWithUpstream(jobB, []*job.Upstream{upstreamDUnresolvedStatic, upstreamDUnresolvedInferred, upstreamEUnresolved, upstreamFUnresolved}), } result := job.WithUpstreams(jobsWithUnresolvedUpstream).MergeWithResolvedUpstreams(resolvedUpstreamMap) diff --git a/internal/store/postgres/job/job_repository.go b/internal/store/postgres/job/job_repository.go index 3d6e5fad25..c313e0ab56 100644 --- a/internal/store/postgres/job/job_repository.go +++ b/internal/store/postgres/job/job_repository.go @@ -359,7 +359,7 @@ func (JobRepository) toUpstreams(storeUpstreams []JobWithUpstream) ([]*job.Upstr if err := me.ToErr(); err != nil { return nil, err } - return upstreams, nil + return job.Upstreams(upstreams).Deduplicate(), nil } func (j JobRepository) GetByJobName(ctx context.Context, projectName tenant.ProjectName, jobName job.Name) (*job.Job, error) { diff --git a/internal/store/postgres/job/job_repository_test.go b/internal/store/postgres/job/job_repository_test.go index a9466c424a..85894e60f3 100644 --- a/internal/store/postgres/job/job_repository_test.go +++ b/internal/store/postgres/job/job_repository_test.go @@ -604,6 +604,38 @@ func TestPostgresJobRepository(t *testing.T) { upstreamE, } + upstreams, err := jobRepo.ResolveUpstreams(ctx, proj.Name(), []job.Name{jobSpecA.Name()}) + assert.NoError(t, err) + assert.EqualValues(t, expectedUpstreams, upstreams[jobSpecA.Name()]) + }) + t.Run("returns job with static upstream if found duplicated upstream from static and inferred", func(t *testing.T) { + db := dbSetup() + + tenantDetails, err := tenant.NewTenantDetails(proj, namespace) + assert.NoError(t, err) + + upstreamName := job.SpecUpstreamNameFrom("test-proj/sample-job-B") + jobAUpstream, _ := job.NewSpecUpstreamBuilder().WithUpstreamNames([]job.SpecUpstreamName{upstreamName}).Build() + jobSpecA, _ := job.NewSpecBuilder(jobVersion, "sample-job-A", jobOwner, jobSchedule, jobWindow, jobTask). + WithDescription(jobDescription). + WithSpecUpstream(jobAUpstream). + Build() + jobA := job.NewJob(sampleTenant, jobSpecA, "dev.resource.sample_a", []job.ResourceURN{"dev.resource.sample_b"}) + + jobSpecB, err := job.NewSpecBuilder(jobVersion, "sample-job-B", jobOwner, jobSchedule, jobWindow, jobTask).WithDescription(jobDescription).Build() + assert.NoError(t, err) + jobB := job.NewJob(sampleTenant, jobSpecB, "dev.resource.sample_b", nil) + + jobRepo := postgres.NewJobRepository(db) + _, err = jobRepo.Add(ctx, []*job.Job{jobA, jobB}) + assert.NoError(t, err) + + upstreamB := job.NewUpstreamResolved(jobSpecB.Name(), "", jobB.Destination(), tenantDetails.ToTenant(), "static", taskName, false) + + expectedUpstreams := []*job.Upstream{ + upstreamB, + } + upstreams, err := jobRepo.ResolveUpstreams(ctx, proj.Name(), []job.Name{jobSpecA.Name()}) assert.NoError(t, err) assert.EqualValues(t, expectedUpstreams, upstreams[jobSpecA.Name()]) From 248968dc54bde56b092cbb1dc6b8ae3c5354d47c Mon Sep 17 00:00:00 2001 From: Arinda Arif Date: Mon, 16 Jan 2023 12:23:58 +0700 Subject: [PATCH 07/12] fix: failing tests due to order of upstream slice not match --- core/job/job_test.go | 5 ++++- core/job/resolver/internal_upstream_resolver_test.go | 5 ++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/core/job/job_test.go b/core/job/job_test.go index 30aee72cc7..dfdebeaa75 100644 --- a/core/job/job_test.go +++ b/core/job/job_test.go @@ -299,7 +299,10 @@ func TestEntityJob(t *testing.T) { } result := job.WithUpstreams(jobsWithUnresolvedUpstream).MergeWithResolvedUpstreams(resolvedUpstreamMap) - assert.EqualValues(t, expected, result) + assert.Equal(t, expected[0].Job(), result[0].Job()) + assert.Equal(t, expected[1].Job(), result[1].Job()) + assert.ElementsMatch(t, expected[0].Upstreams(), result[0].Upstreams()) + assert.ElementsMatch(t, expected[1].Upstreams(), result[1].Upstreams()) }) }) diff --git a/core/job/resolver/internal_upstream_resolver_test.go b/core/job/resolver/internal_upstream_resolver_test.go index eec056f5a7..5c9d57b81f 100644 --- a/core/job/resolver/internal_upstream_resolver_test.go +++ b/core/job/resolver/internal_upstream_resolver_test.go @@ -206,7 +206,10 @@ func TestInternalUpstreamResolver(t *testing.T) { internalUpstreamResolver := resolver.NewInternalUpstreamResolver(jobRepo) result, err := internalUpstreamResolver.BulkResolve(ctx, sampleTenant.ProjectName(), jobsWithUnresolvedUpstream) assert.NoError(t, err) - assert.EqualValues(t, expectedJobsWithUpstream, result) + assert.Equal(t, expectedJobsWithUpstream[0].Job(), result[0].Job()) + assert.Equal(t, expectedJobsWithUpstream[1].Job(), result[1].Job()) + assert.ElementsMatch(t, expectedJobsWithUpstream[0].Upstreams(), result[0].Upstreams()) + assert.ElementsMatch(t, expectedJobsWithUpstream[1].Upstreams(), result[1].Upstreams()) }) t.Run("returns error if unable to resolve upstream internally", func(t *testing.T) { jobRepo := new(JobRepository) From 1fb575a35be4683ccf1c2f9af2163120eb1a07a9 Mon Sep 17 00:00:00 2001 From: Arinda Arif Date: Mon, 16 Jan 2023 12:29:27 +0700 Subject: [PATCH 08/12] test: add deduplicate test case --- core/job/job_test.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/core/job/job_test.go b/core/job/job_test.go index dfdebeaa75..5afb0a0dd2 100644 --- a/core/job/job_test.go +++ b/core/job/job_test.go @@ -194,6 +194,24 @@ func TestEntityJob(t *testing.T) { }) }) + t.Run("Deduplicate", func(t *testing.T) { + t.Run("should return upstreams with static being prioritized if duplication is found", func(t *testing.T) { + upstreamResolved1Inferred := job.NewUpstreamResolved("job-a", "host-sample", "project.dataset.sample-a", sampleTenant, job.UpstreamTypeInferred, "", false) + upstreamResolved1Static := job.NewUpstreamResolved("job-a", "host-sample", "project.dataset.sample-a", sampleTenant, job.UpstreamTypeStatic, "", false) + upstreamResolved2 := job.NewUpstreamResolved("job-b", "host-sample", "project.dataset.sample-b", sampleTenant, job.UpstreamTypeInferred, "", false) + + expectedMap := []*job.Upstream{ + upstreamResolved1Static, + upstreamResolved2, + } + + upstreams := job.Upstreams([]*job.Upstream{upstreamResolved1Inferred, upstreamResolved1Static, upstreamResolved2}) + resultMap := upstreams.Deduplicate() + + assert.EqualValues(t, expectedMap, resultMap) + }) + }) + t.Run("FullNameFrom", func(t *testing.T) { t.Run("should return the job full name given project and job name", func(t *testing.T) { fullName := job.FullNameFrom(project.Name(), specA.Name()) From 15fb292e37fc226b9323341b49301b8b06a49c4f Mon Sep 17 00:00:00 2001 From: Arinda Arif Date: Mon, 16 Jan 2023 17:25:33 +0700 Subject: [PATCH 09/12] refactor: avoid doing multiple upstream deduplication --- core/job/job.go | 27 ++++++++++-- core/job/job_test.go | 44 +++++++++++++++++-- internal/store/postgres/job/job_repository.go | 2 +- .../store/postgres/job/job_repository_test.go | 32 -------------- 4 files changed, 64 insertions(+), 41 deletions(-) diff --git a/core/job/job.go b/core/job/job.go index 1a309ac76a..3ec28968d9 100644 --- a/core/job/job.go +++ b/core/job/job.go @@ -295,6 +295,12 @@ func (u Upstreams) ToFullNameAndUpstreamMap() map[string]*Upstream { fullNameUpstreamMap := make(map[string]*Upstream) for _, upstream := range u { fullName := upstream.ProjectName().String() + "/" + upstream.name.String() + // keep static upstreams in the map if exists + if upstreamInMap, ok := fullNameUpstreamMap[fullName]; ok { + if upstreamInMap._type == UpstreamTypeStatic { + continue + } + } fullNameUpstreamMap[fullName] = upstream } return fullNameUpstreamMap @@ -306,16 +312,30 @@ func (u Upstreams) ToResourceDestinationAndUpstreamMap() map[string]*Upstream { if upstream.resource == "" { continue } + // keep static upstreams in the map if exists + if upstreamInMap, ok := resourceDestinationUpstreamMap[upstream.resource.String()]; ok { + if upstreamInMap._type == UpstreamTypeStatic { + continue + } + } resourceDestinationUpstreamMap[upstream.resource.String()] = upstream } return resourceDestinationUpstreamMap } func (u Upstreams) Deduplicate() []*Upstream { + var upstreamsResult []*Upstream distinctUpstreamsMap := make(map[string]*Upstream) + for _, upstream := range u { + // add unresolved upstreams straight to the result list + if upstream.state == UpstreamStateUnresolved { + upstreamsResult = append(upstreamsResult, upstream) + continue + } + + // keep static upstreams in the map if exists if upstreamInMap, ok := distinctUpstreamsMap[upstream.FullName()]; ok { - // prioritize static upstreams if upstreamInMap._type == UpstreamTypeStatic { continue } @@ -323,12 +343,11 @@ func (u Upstreams) Deduplicate() []*Upstream { distinctUpstreamsMap[upstream.FullName()] = upstream } - var distinctUpstreams []*Upstream for _, upstream := range distinctUpstreamsMap { - distinctUpstreams = append(distinctUpstreams, upstream) + upstreamsResult = append(upstreamsResult, upstream) } - return distinctUpstreams + return upstreamsResult } type FullName string diff --git a/core/job/job_test.go b/core/job/job_test.go index 5afb0a0dd2..91c2baf07b 100644 --- a/core/job/job_test.go +++ b/core/job/job_test.go @@ -160,6 +160,22 @@ func TestEntityJob(t *testing.T) { upstreams := job.Upstreams([]*job.Upstream{upstreamResolved1, upstreamResolved2}) resultMap := upstreams.ToFullNameAndUpstreamMap() + assert.EqualValues(t, expectedMap, resultMap) + }) + t.Run("should return a map with static upstream being prioritized when duplication is found", func(t *testing.T) { + upstreamResolved1 := job.NewUpstreamResolved("job-a", "host-sample", "project.dataset.sample-a", sampleTenant, job.UpstreamTypeStatic, "", false) + upstreamResolved2 := job.NewUpstreamResolved("job-a", "host-sample", "project.dataset.sample-a", sampleTenant, job.UpstreamTypeInferred, "", false) + upstreamResolved3 := job.NewUpstreamResolved("job-b", "host-sample", "project.dataset.sample-b", sampleTenant, job.UpstreamTypeInferred, "", false) + upstreamResolved4 := job.NewUpstreamResolved("job-b", "host-sample", "project.dataset.sample-b", sampleTenant, job.UpstreamTypeStatic, "", false) + + expectedMap := map[string]*job.Upstream{ + "test-proj/job-a": upstreamResolved1, + "test-proj/job-b": upstreamResolved4, + } + + upstreams := job.Upstreams([]*job.Upstream{upstreamResolved1, upstreamResolved2, upstreamResolved3, upstreamResolved4}) + resultMap := upstreams.ToFullNameAndUpstreamMap() + assert.EqualValues(t, expectedMap, resultMap) }) }) @@ -190,6 +206,22 @@ func TestEntityJob(t *testing.T) { upstreams := job.Upstreams([]*job.Upstream{upstreamResolved1, upstreamResolved2}) resultMap := upstreams.ToResourceDestinationAndUpstreamMap() + assert.EqualValues(t, expectedMap, resultMap) + }) + t.Run("should return a map with static upstream being prioritized when duplication is found", func(t *testing.T) { + upstreamResolved1 := job.NewUpstreamResolved("job-a", "host-sample", "project.dataset.sample-a", sampleTenant, job.UpstreamTypeStatic, "", false) + upstreamResolved2 := job.NewUpstreamResolved("job-a", "host-sample", "project.dataset.sample-a", sampleTenant, job.UpstreamTypeInferred, "", false) + upstreamResolved3 := job.NewUpstreamResolved("job-b", "host-sample", "project.dataset.sample-b", sampleTenant, job.UpstreamTypeInferred, "", false) + upstreamResolved4 := job.NewUpstreamResolved("job-b", "host-sample", "project.dataset.sample-b", sampleTenant, job.UpstreamTypeStatic, "", false) + + expectedMap := map[string]*job.Upstream{ + "project.dataset.sample-a": upstreamResolved1, + "project.dataset.sample-b": upstreamResolved4, + } + + upstreams := job.Upstreams([]*job.Upstream{upstreamResolved1, upstreamResolved2, upstreamResolved3, upstreamResolved4}) + resultMap := upstreams.ToResourceDestinationAndUpstreamMap() + assert.EqualValues(t, expectedMap, resultMap) }) }) @@ -199,16 +231,20 @@ func TestEntityJob(t *testing.T) { upstreamResolved1Inferred := job.NewUpstreamResolved("job-a", "host-sample", "project.dataset.sample-a", sampleTenant, job.UpstreamTypeInferred, "", false) upstreamResolved1Static := job.NewUpstreamResolved("job-a", "host-sample", "project.dataset.sample-a", sampleTenant, job.UpstreamTypeStatic, "", false) upstreamResolved2 := job.NewUpstreamResolved("job-b", "host-sample", "project.dataset.sample-b", sampleTenant, job.UpstreamTypeInferred, "", false) + upstreamUnresolved1 := job.NewUpstreamUnresolvedStatic("job-c", sampleTenant.ProjectName()) + upstreamUnresolved2 := job.NewUpstreamUnresolvedInferred("project.dataset.sample-d") - expectedMap := []*job.Upstream{ + expected := []*job.Upstream{ upstreamResolved1Static, upstreamResolved2, + upstreamUnresolved1, + upstreamUnresolved2, } - upstreams := job.Upstreams([]*job.Upstream{upstreamResolved1Inferred, upstreamResolved1Static, upstreamResolved2}) - resultMap := upstreams.Deduplicate() + upstreams := job.Upstreams([]*job.Upstream{upstreamResolved1Inferred, upstreamResolved1Static, upstreamResolved2, upstreamUnresolved1, upstreamUnresolved2}) + result := upstreams.Deduplicate() - assert.EqualValues(t, expectedMap, resultMap) + assert.ElementsMatch(t, expected, result) }) }) diff --git a/internal/store/postgres/job/job_repository.go b/internal/store/postgres/job/job_repository.go index c313e0ab56..3d6e5fad25 100644 --- a/internal/store/postgres/job/job_repository.go +++ b/internal/store/postgres/job/job_repository.go @@ -359,7 +359,7 @@ func (JobRepository) toUpstreams(storeUpstreams []JobWithUpstream) ([]*job.Upstr if err := me.ToErr(); err != nil { return nil, err } - return job.Upstreams(upstreams).Deduplicate(), nil + return upstreams, nil } func (j JobRepository) GetByJobName(ctx context.Context, projectName tenant.ProjectName, jobName job.Name) (*job.Job, error) { diff --git a/internal/store/postgres/job/job_repository_test.go b/internal/store/postgres/job/job_repository_test.go index 85894e60f3..a9466c424a 100644 --- a/internal/store/postgres/job/job_repository_test.go +++ b/internal/store/postgres/job/job_repository_test.go @@ -604,38 +604,6 @@ func TestPostgresJobRepository(t *testing.T) { upstreamE, } - upstreams, err := jobRepo.ResolveUpstreams(ctx, proj.Name(), []job.Name{jobSpecA.Name()}) - assert.NoError(t, err) - assert.EqualValues(t, expectedUpstreams, upstreams[jobSpecA.Name()]) - }) - t.Run("returns job with static upstream if found duplicated upstream from static and inferred", func(t *testing.T) { - db := dbSetup() - - tenantDetails, err := tenant.NewTenantDetails(proj, namespace) - assert.NoError(t, err) - - upstreamName := job.SpecUpstreamNameFrom("test-proj/sample-job-B") - jobAUpstream, _ := job.NewSpecUpstreamBuilder().WithUpstreamNames([]job.SpecUpstreamName{upstreamName}).Build() - jobSpecA, _ := job.NewSpecBuilder(jobVersion, "sample-job-A", jobOwner, jobSchedule, jobWindow, jobTask). - WithDescription(jobDescription). - WithSpecUpstream(jobAUpstream). - Build() - jobA := job.NewJob(sampleTenant, jobSpecA, "dev.resource.sample_a", []job.ResourceURN{"dev.resource.sample_b"}) - - jobSpecB, err := job.NewSpecBuilder(jobVersion, "sample-job-B", jobOwner, jobSchedule, jobWindow, jobTask).WithDescription(jobDescription).Build() - assert.NoError(t, err) - jobB := job.NewJob(sampleTenant, jobSpecB, "dev.resource.sample_b", nil) - - jobRepo := postgres.NewJobRepository(db) - _, err = jobRepo.Add(ctx, []*job.Job{jobA, jobB}) - assert.NoError(t, err) - - upstreamB := job.NewUpstreamResolved(jobSpecB.Name(), "", jobB.Destination(), tenantDetails.ToTenant(), "static", taskName, false) - - expectedUpstreams := []*job.Upstream{ - upstreamB, - } - upstreams, err := jobRepo.ResolveUpstreams(ctx, proj.Name(), []job.Name{jobSpecA.Name()}) assert.NoError(t, err) assert.EqualValues(t, expectedUpstreams, upstreams[jobSpecA.Name()]) From 64cbf1e1ccd1af1c2c86cfd2f150aa382f8dee8b Mon Sep 17 00:00:00 2001 From: Arinda Arif Date: Mon, 16 Jan 2023 17:27:36 +0700 Subject: [PATCH 10/12] fix: duplicate upstream issue on job inspect --- .../resolver/internal_upstream_resolver.go | 3 +- .../internal_upstream_resolver_test.go | 38 ++++++++++++++++--- 2 files changed, 34 insertions(+), 7 deletions(-) diff --git a/core/job/resolver/internal_upstream_resolver.go b/core/job/resolver/internal_upstream_resolver.go index 427f0a11dc..ad7ac58382 100644 --- a/core/job/resolver/internal_upstream_resolver.go +++ b/core/job/resolver/internal_upstream_resolver.go @@ -47,7 +47,8 @@ func (i internalUpstreamResolver) Resolve(ctx context.Context, jobWithUnresolved upstreamResults = append(upstreamResults, unresolvedUpstream) } - return job.NewWithUpstream(jobWithUnresolvedUpstream.Job(), upstreamResults), errors.MultiToError(me) + distinctUpstreams := job.Upstreams(upstreamResults).Deduplicate() + return job.NewWithUpstream(jobWithUnresolvedUpstream.Job(), distinctUpstreams), errors.MultiToError(me) } func (i internalUpstreamResolver) BulkResolve(ctx context.Context, projectName tenant.ProjectName, jobsWithUnresolvedUpstream []*job.WithUpstream) ([]*job.WithUpstream, error) { diff --git a/core/job/resolver/internal_upstream_resolver_test.go b/core/job/resolver/internal_upstream_resolver_test.go index 5c9d57b81f..2122062455 100644 --- a/core/job/resolver/internal_upstream_resolver_test.go +++ b/core/job/resolver/internal_upstream_resolver_test.go @@ -61,7 +61,33 @@ func TestInternalUpstreamResolver(t *testing.T) { internalUpstreamResolver := resolver.NewInternalUpstreamResolver(jobRepo) result, err := internalUpstreamResolver.Resolve(ctx, jobWithUnresolvedUpstream) assert.NoError(t, err) - assert.EqualValues(t, expectedJobWithUpstream.Upstreams(), result.Upstreams()) + assert.Equal(t, expectedJobWithUpstream.Job(), result.Job()) + assert.ElementsMatch(t, expectedJobWithUpstream.Upstreams(), result.Upstreams()) + }) + t.Run("resolves inferred and static upstream internally and prioritize static upstream when duplication found", func(t *testing.T) { + jobRepo := new(JobRepository) + logWriter := new(mockWriter) + defer logWriter.AssertExpectations(t) + + specD, _ := job.NewSpecBuilder(jobVersion, "job-D", "sample-owner", jobSchedule, jobWindow, jobTask).WithSpecUpstream(upstreamSpec).Build() + jobDDestination := job.ResourceURN("resource-D") + jobDSources := []job.ResourceURN{"resource-C"} + jobD := job.NewJob(sampleTenant, specD, jobDDestination, jobDSources) + + unresolvedUpstreamCInferred := job.NewUpstreamUnresolvedInferred("resource-C") + unresolvedUpstreamCStatic := job.NewUpstreamUnresolvedStatic("job-C", sampleTenant.ProjectName()) + internalUpstreamCStatic := job.NewUpstreamResolved("job-C", "", "resource-C", sampleTenant, "static", taskName, false) + + jobRepo.On("GetAllByResourceDestination", ctx, jobDSources[0]).Return([]*job.Job{jobC}, nil) + jobRepo.On("GetByJobName", ctx, sampleTenant.ProjectName(), specC.Name()).Return(jobC, nil) + + jobWithUnresolvedUpstream := job.NewWithUpstream(jobD, []*job.Upstream{unresolvedUpstreamCInferred, unresolvedUpstreamCStatic}) + expectedJobWithUpstream := job.NewWithUpstream(jobD, []*job.Upstream{internalUpstreamCStatic}) + + internalUpstreamResolver := resolver.NewInternalUpstreamResolver(jobRepo) + result, err := internalUpstreamResolver.Resolve(ctx, jobWithUnresolvedUpstream) + assert.NoError(t, err) + assert.EqualValues(t, expectedJobWithUpstream, result) }) t.Run("resolves inferred upstream internally", func(t *testing.T) { jobRepo := new(JobRepository) @@ -80,7 +106,7 @@ func TestInternalUpstreamResolver(t *testing.T) { internalUpstreamResolver := resolver.NewInternalUpstreamResolver(jobRepo) result, err := internalUpstreamResolver.Resolve(ctx, jobWithUnresolvedUpstream) assert.NoError(t, err) - assert.EqualValues(t, expectedJobWithUpstream.Upstreams(), result.Upstreams()) + assert.ElementsMatch(t, expectedJobWithUpstream.Upstreams(), result.Upstreams()) }) t.Run("resolves static upstream internally", func(t *testing.T) { jobRepo := new(JobRepository) @@ -99,7 +125,7 @@ func TestInternalUpstreamResolver(t *testing.T) { internalUpstreamResolver := resolver.NewInternalUpstreamResolver(jobRepo) result, err := internalUpstreamResolver.Resolve(ctx, jobWithUnresolvedUpstream) assert.NoError(t, err) - assert.EqualValues(t, expectedJobWithUpstream.Upstreams(), result.Upstreams()) + assert.ElementsMatch(t, expectedJobWithUpstream.Upstreams(), result.Upstreams()) }) t.Run("should not stop the process but keep appending error when unable to resolve inferred upstream", func(t *testing.T) { jobRepo := new(JobRepository) @@ -119,7 +145,7 @@ func TestInternalUpstreamResolver(t *testing.T) { internalUpstreamResolver := resolver.NewInternalUpstreamResolver(jobRepo) result, err := internalUpstreamResolver.Resolve(ctx, jobWithUnresolvedUpstream) assert.ErrorContains(t, err, "internal error") - assert.EqualValues(t, expectedJobWithUpstream.Upstreams(), result.Upstreams()) + assert.ElementsMatch(t, expectedJobWithUpstream.Upstreams(), result.Upstreams()) }) t.Run("should not stop the process but keep appending error when unable to resolve static upstream", func(t *testing.T) { jobRepo := new(JobRepository) @@ -144,7 +170,7 @@ func TestInternalUpstreamResolver(t *testing.T) { internalUpstreamResolver := resolver.NewInternalUpstreamResolver(jobRepo) result, err := internalUpstreamResolver.Resolve(ctx, jobWithUnresolvedUpstream) assert.ErrorContains(t, err, "not found") - assert.EqualValues(t, expectedJobWithUpstream.Upstreams(), result.Upstreams()) + assert.ElementsMatch(t, expectedJobWithUpstream.Upstreams(), result.Upstreams()) }) t.Run("should not stop the process but keep appending error when static upstream name is invalid", func(t *testing.T) { jobRepo := new(JobRepository) @@ -171,7 +197,7 @@ func TestInternalUpstreamResolver(t *testing.T) { internalUpstreamResolver := resolver.NewInternalUpstreamResolver(jobRepo) result, err := internalUpstreamResolver.Resolve(ctx, jobWithUnresolvedUpstream) assert.ErrorContains(t, err, "name is empty") - assert.EqualValues(t, expectedJobWithUpstream.Upstreams(), result.Upstreams()) + assert.ElementsMatch(t, expectedJobWithUpstream.Upstreams(), result.Upstreams()) }) }) t.Run("BulkResolve", func(t *testing.T) { From 424cbc36e774a1c2d721c5da293eb856841d6047 Mon Sep 17 00:00:00 2001 From: Arinda Arif Date: Tue, 17 Jan 2023 07:25:53 +0700 Subject: [PATCH 11/12] refactor: start deduplicate from root and support unresolved upstream deduplication --- core/job/job.go | 47 +++++------ core/job/job_test.go | 84 +++++++++++-------- .../resolver/internal_upstream_resolver.go | 5 +- .../internal_upstream_resolver_test.go | 2 +- internal/store/postgres/job/job_repository.go | 2 +- .../store/postgres/job/job_repository_test.go | 36 +++++++- 6 files changed, 113 insertions(+), 63 deletions(-) diff --git a/core/job/job.go b/core/job/job.go index 3ec28968d9..3274b08af1 100644 --- a/core/job/job.go +++ b/core/job/job.go @@ -295,12 +295,6 @@ func (u Upstreams) ToFullNameAndUpstreamMap() map[string]*Upstream { fullNameUpstreamMap := make(map[string]*Upstream) for _, upstream := range u { fullName := upstream.ProjectName().String() + "/" + upstream.name.String() - // keep static upstreams in the map if exists - if upstreamInMap, ok := fullNameUpstreamMap[fullName]; ok { - if upstreamInMap._type == UpstreamTypeStatic { - continue - } - } fullNameUpstreamMap[fullName] = upstream } return fullNameUpstreamMap @@ -312,42 +306,47 @@ func (u Upstreams) ToResourceDestinationAndUpstreamMap() map[string]*Upstream { if upstream.resource == "" { continue } - // keep static upstreams in the map if exists - if upstreamInMap, ok := resourceDestinationUpstreamMap[upstream.resource.String()]; ok { - if upstreamInMap._type == UpstreamTypeStatic { - continue - } - } resourceDestinationUpstreamMap[upstream.resource.String()] = upstream } return resourceDestinationUpstreamMap } func (u Upstreams) Deduplicate() []*Upstream { - var upstreamsResult []*Upstream - distinctUpstreamsMap := make(map[string]*Upstream) + resolvedUpstreamMap := make(map[string]*Upstream) + unresolvedStaticUpstreamMap := make(map[string]*Upstream) + unresolvedInferredUpstreamMap := make(map[string]*Upstream) for _, upstream := range u { - // add unresolved upstreams straight to the result list - if upstream.state == UpstreamStateUnresolved { - upstreamsResult = append(upstreamsResult, upstream) + if upstream.state == UpstreamStateUnresolved && upstream._type == UpstreamTypeStatic { + unresolvedStaticUpstreamMap[upstream.FullName()] = upstream + continue + } + + if upstream.state == UpstreamStateUnresolved && upstream._type == UpstreamTypeInferred { + unresolvedInferredUpstreamMap[upstream.resource.String()] = upstream continue } - // keep static upstreams in the map if exists - if upstreamInMap, ok := distinctUpstreamsMap[upstream.FullName()]; ok { + if upstreamInMap, ok := resolvedUpstreamMap[upstream.FullName()]; ok { + // keep static upstreams in the map if exists if upstreamInMap._type == UpstreamTypeStatic { continue } } - distinctUpstreamsMap[upstream.FullName()] = upstream + resolvedUpstreamMap[upstream.FullName()] = upstream } - for _, upstream := range distinctUpstreamsMap { - upstreamsResult = append(upstreamsResult, upstream) - } + return mapsToUpstreams(resolvedUpstreamMap, unresolvedInferredUpstreamMap, unresolvedStaticUpstreamMap) +} - return upstreamsResult +func mapsToUpstreams(upstreamsMaps ...map[string]*Upstream) []*Upstream { + var result []*Upstream + for _, upstreamsMap := range upstreamsMaps { + for _, upstream := range upstreamsMap { + result = append(result, upstream) + } + } + return result } type FullName string diff --git a/core/job/job_test.go b/core/job/job_test.go index 91c2baf07b..7d7e01817b 100644 --- a/core/job/job_test.go +++ b/core/job/job_test.go @@ -160,22 +160,6 @@ func TestEntityJob(t *testing.T) { upstreams := job.Upstreams([]*job.Upstream{upstreamResolved1, upstreamResolved2}) resultMap := upstreams.ToFullNameAndUpstreamMap() - assert.EqualValues(t, expectedMap, resultMap) - }) - t.Run("should return a map with static upstream being prioritized when duplication is found", func(t *testing.T) { - upstreamResolved1 := job.NewUpstreamResolved("job-a", "host-sample", "project.dataset.sample-a", sampleTenant, job.UpstreamTypeStatic, "", false) - upstreamResolved2 := job.NewUpstreamResolved("job-a", "host-sample", "project.dataset.sample-a", sampleTenant, job.UpstreamTypeInferred, "", false) - upstreamResolved3 := job.NewUpstreamResolved("job-b", "host-sample", "project.dataset.sample-b", sampleTenant, job.UpstreamTypeInferred, "", false) - upstreamResolved4 := job.NewUpstreamResolved("job-b", "host-sample", "project.dataset.sample-b", sampleTenant, job.UpstreamTypeStatic, "", false) - - expectedMap := map[string]*job.Upstream{ - "test-proj/job-a": upstreamResolved1, - "test-proj/job-b": upstreamResolved4, - } - - upstreams := job.Upstreams([]*job.Upstream{upstreamResolved1, upstreamResolved2, upstreamResolved3, upstreamResolved4}) - resultMap := upstreams.ToFullNameAndUpstreamMap() - assert.EqualValues(t, expectedMap, resultMap) }) }) @@ -206,22 +190,6 @@ func TestEntityJob(t *testing.T) { upstreams := job.Upstreams([]*job.Upstream{upstreamResolved1, upstreamResolved2}) resultMap := upstreams.ToResourceDestinationAndUpstreamMap() - assert.EqualValues(t, expectedMap, resultMap) - }) - t.Run("should return a map with static upstream being prioritized when duplication is found", func(t *testing.T) { - upstreamResolved1 := job.NewUpstreamResolved("job-a", "host-sample", "project.dataset.sample-a", sampleTenant, job.UpstreamTypeStatic, "", false) - upstreamResolved2 := job.NewUpstreamResolved("job-a", "host-sample", "project.dataset.sample-a", sampleTenant, job.UpstreamTypeInferred, "", false) - upstreamResolved3 := job.NewUpstreamResolved("job-b", "host-sample", "project.dataset.sample-b", sampleTenant, job.UpstreamTypeInferred, "", false) - upstreamResolved4 := job.NewUpstreamResolved("job-b", "host-sample", "project.dataset.sample-b", sampleTenant, job.UpstreamTypeStatic, "", false) - - expectedMap := map[string]*job.Upstream{ - "project.dataset.sample-a": upstreamResolved1, - "project.dataset.sample-b": upstreamResolved4, - } - - upstreams := job.Upstreams([]*job.Upstream{upstreamResolved1, upstreamResolved2, upstreamResolved3, upstreamResolved4}) - resultMap := upstreams.ToResourceDestinationAndUpstreamMap() - assert.EqualValues(t, expectedMap, resultMap) }) }) @@ -233,15 +201,65 @@ func TestEntityJob(t *testing.T) { upstreamResolved2 := job.NewUpstreamResolved("job-b", "host-sample", "project.dataset.sample-b", sampleTenant, job.UpstreamTypeInferred, "", false) upstreamUnresolved1 := job.NewUpstreamUnresolvedStatic("job-c", sampleTenant.ProjectName()) upstreamUnresolved2 := job.NewUpstreamUnresolvedInferred("project.dataset.sample-d") + upstreamUnresolved3 := job.NewUpstreamUnresolvedStatic("job-c", sampleTenant.ProjectName()) + upstreamUnresolved4 := job.NewUpstreamUnresolvedInferred("project.dataset.sample-d") + + expected := []*job.Upstream{ + upstreamResolved1Static, + upstreamResolved2, + upstreamUnresolved1, + upstreamUnresolved2, + } + + upstreams := job.Upstreams([]*job.Upstream{ + upstreamResolved1Inferred, + upstreamResolved1Static, + upstreamResolved2, + upstreamUnresolved1, + upstreamUnresolved2, + upstreamUnresolved3, + upstreamUnresolved4, + }) + result := upstreams.Deduplicate() + + assert.ElementsMatch(t, expected, result) + }) + t.Run("should successfully return distinct upstreams when only resolved upstream is present", func(t *testing.T) { + upstreamResolved1Inferred := job.NewUpstreamResolved("job-a", "host-sample", "project.dataset.sample-a", sampleTenant, job.UpstreamTypeInferred, "", false) + upstreamResolved1Static := job.NewUpstreamResolved("job-a", "host-sample", "project.dataset.sample-a", sampleTenant, job.UpstreamTypeStatic, "", false) + upstreamResolved2 := job.NewUpstreamResolved("job-b", "host-sample", "project.dataset.sample-b", sampleTenant, job.UpstreamTypeInferred, "", false) expected := []*job.Upstream{ upstreamResolved1Static, upstreamResolved2, + } + + upstreams := job.Upstreams([]*job.Upstream{ + upstreamResolved1Inferred, + upstreamResolved1Static, + upstreamResolved2, + }) + result := upstreams.Deduplicate() + + assert.ElementsMatch(t, expected, result) + }) + t.Run("should successfully return distinct upstreams when only unresolved upstream is present", func(t *testing.T) { + upstreamUnresolved1 := job.NewUpstreamUnresolvedStatic("job-c", sampleTenant.ProjectName()) + upstreamUnresolved2 := job.NewUpstreamUnresolvedInferred("project.dataset.sample-d") + upstreamUnresolved3 := job.NewUpstreamUnresolvedStatic("job-c", sampleTenant.ProjectName()) + upstreamUnresolved4 := job.NewUpstreamUnresolvedInferred("project.dataset.sample-d") + + expected := []*job.Upstream{ upstreamUnresolved1, upstreamUnresolved2, } - upstreams := job.Upstreams([]*job.Upstream{upstreamResolved1Inferred, upstreamResolved1Static, upstreamResolved2, upstreamUnresolved1, upstreamUnresolved2}) + upstreams := job.Upstreams([]*job.Upstream{ + upstreamUnresolved1, + upstreamUnresolved2, + upstreamUnresolved3, + upstreamUnresolved4, + }) result := upstreams.Deduplicate() assert.ElementsMatch(t, expected, result) diff --git a/core/job/resolver/internal_upstream_resolver.go b/core/job/resolver/internal_upstream_resolver.go index ad7ac58382..23210de7b3 100644 --- a/core/job/resolver/internal_upstream_resolver.go +++ b/core/job/resolver/internal_upstream_resolver.go @@ -31,8 +31,9 @@ func (i internalUpstreamResolver) Resolve(ctx context.Context, jobWithUnresolved } internalUpstream := mergeUpstreams(internalUpstreamInferred, internalUpstreamStatic) - fullNameUpstreamMap := job.Upstreams(internalUpstream).ToFullNameAndUpstreamMap() - resourceDestinationUpstreamMap := job.Upstreams(internalUpstream).ToResourceDestinationAndUpstreamMap() + distinctInternalUpstream := job.Upstreams(internalUpstream).Deduplicate() + fullNameUpstreamMap := job.Upstreams(distinctInternalUpstream).ToFullNameAndUpstreamMap() + resourceDestinationUpstreamMap := job.Upstreams(distinctInternalUpstream).ToResourceDestinationAndUpstreamMap() var upstreamResults []*job.Upstream for _, unresolvedUpstream := range jobWithUnresolvedUpstream.Upstreams() { diff --git a/core/job/resolver/internal_upstream_resolver_test.go b/core/job/resolver/internal_upstream_resolver_test.go index 2122062455..77850ca762 100644 --- a/core/job/resolver/internal_upstream_resolver_test.go +++ b/core/job/resolver/internal_upstream_resolver_test.go @@ -81,7 +81,7 @@ func TestInternalUpstreamResolver(t *testing.T) { jobRepo.On("GetAllByResourceDestination", ctx, jobDSources[0]).Return([]*job.Job{jobC}, nil) jobRepo.On("GetByJobName", ctx, sampleTenant.ProjectName(), specC.Name()).Return(jobC, nil) - jobWithUnresolvedUpstream := job.NewWithUpstream(jobD, []*job.Upstream{unresolvedUpstreamCInferred, unresolvedUpstreamCStatic}) + jobWithUnresolvedUpstream := job.NewWithUpstream(jobD, []*job.Upstream{unresolvedUpstreamCStatic, unresolvedUpstreamCInferred}) expectedJobWithUpstream := job.NewWithUpstream(jobD, []*job.Upstream{internalUpstreamCStatic}) internalUpstreamResolver := resolver.NewInternalUpstreamResolver(jobRepo) diff --git a/internal/store/postgres/job/job_repository.go b/internal/store/postgres/job/job_repository.go index 3d6e5fad25..c313e0ab56 100644 --- a/internal/store/postgres/job/job_repository.go +++ b/internal/store/postgres/job/job_repository.go @@ -359,7 +359,7 @@ func (JobRepository) toUpstreams(storeUpstreams []JobWithUpstream) ([]*job.Upstr if err := me.ToErr(); err != nil { return nil, err } - return upstreams, nil + return job.Upstreams(upstreams).Deduplicate(), nil } func (j JobRepository) GetByJobName(ctx context.Context, projectName tenant.ProjectName, jobName job.Name) (*job.Job, error) { diff --git a/internal/store/postgres/job/job_repository_test.go b/internal/store/postgres/job/job_repository_test.go index a9466c424a..afb7c082e4 100644 --- a/internal/store/postgres/job/job_repository_test.go +++ b/internal/store/postgres/job/job_repository_test.go @@ -556,7 +556,7 @@ func TestPostgresJobRepository(t *testing.T) { upstreams, err := jobRepo.ResolveUpstreams(ctx, proj.Name(), []job.Name{jobSpecA.Name()}) assert.NoError(t, err) - assert.EqualValues(t, expectedUpstreams, upstreams[jobSpecA.Name()]) + assert.ElementsMatch(t, expectedUpstreams, upstreams[jobSpecA.Name()]) }) t.Run("returns job with external project static and inferred upstreams", func(t *testing.T) { db := dbSetup() @@ -606,7 +606,39 @@ func TestPostgresJobRepository(t *testing.T) { upstreams, err := jobRepo.ResolveUpstreams(ctx, proj.Name(), []job.Name{jobSpecA.Name()}) assert.NoError(t, err) - assert.EqualValues(t, expectedUpstreams, upstreams[jobSpecA.Name()]) + assert.ElementsMatch(t, expectedUpstreams, upstreams[jobSpecA.Name()]) + }) + t.Run("returns job with static upstream if found duplicated upstream from static and inferred", func(t *testing.T) { + db := dbSetup() + + tenantDetails, err := tenant.NewTenantDetails(proj, namespace) + assert.NoError(t, err) + + upstreamName := job.SpecUpstreamNameFrom("test-proj/sample-job-B") + jobAUpstream, _ := job.NewSpecUpstreamBuilder().WithUpstreamNames([]job.SpecUpstreamName{upstreamName}).Build() + jobSpecA, _ := job.NewSpecBuilder(jobVersion, "sample-job-A", jobOwner, jobSchedule, jobWindow, jobTask). + WithDescription(jobDescription). + WithSpecUpstream(jobAUpstream). + Build() + jobA := job.NewJob(sampleTenant, jobSpecA, "dev.resource.sample_a", []job.ResourceURN{"dev.resource.sample_b"}) + + jobSpecB, err := job.NewSpecBuilder(jobVersion, "sample-job-B", jobOwner, jobSchedule, jobWindow, jobTask).WithDescription(jobDescription).Build() + assert.NoError(t, err) + jobB := job.NewJob(sampleTenant, jobSpecB, "dev.resource.sample_b", nil) + + jobRepo := postgres.NewJobRepository(db) + _, err = jobRepo.Add(ctx, []*job.Job{jobA, jobB}) + assert.NoError(t, err) + + upstreamB := job.NewUpstreamResolved(jobSpecB.Name(), "", jobB.Destination(), tenantDetails.ToTenant(), "static", taskName, false) + + expectedUpstreams := []*job.Upstream{ + upstreamB, + } + + upstreams, err := jobRepo.ResolveUpstreams(ctx, proj.Name(), []job.Name{jobSpecA.Name()}) + assert.NoError(t, err) + assert.ElementsMatch(t, expectedUpstreams, upstreams[jobSpecA.Name()]) }) }) From 0c94e4a6f2db829018de744d729ac75f44217027 Mon Sep 17 00:00:00 2001 From: Arinda Arif Date: Tue, 17 Jan 2023 11:50:02 +0700 Subject: [PATCH 12/12] fix: simplify error when unable to get static upstreams to resolve --- core/job/job.go | 2 +- core/job/job_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/job/job.go b/core/job/job.go index 9de1072a5b..c29cc5142e 100644 --- a/core/job/job.go +++ b/core/job/job.go @@ -46,7 +46,7 @@ func (j Job) FullName() string { func (j Job) GetJobWithUnresolvedUpstream() (*WithUpstream, error) { unresolvedStaticUpstreams, err := j.getStaticUpstreamsToResolve() if err != nil { - err = errors.InvalidArgument(EntityJob, fmt.Sprintf("failed to get static upstreams to resolve for job %s: %s", j.GetName(), err.Error())) + err = errors.InvalidArgument(EntityJob, fmt.Sprintf("failed to get static upstreams to resolve for job %s", j.GetName())) } unresolvedInferredUpstreams := j.getInferredUpstreamsToResolve() allUpstreams := unresolvedStaticUpstreams diff --git a/core/job/job_test.go b/core/job/job_test.go index bc52bdb32c..b17d354840 100644 --- a/core/job/job_test.go +++ b/core/job/job_test.go @@ -420,7 +420,7 @@ func TestEntityJob(t *testing.T) { jobA := job.NewJob(sampleTenant, specA, jobADestination, jobASources) jobWithUpstream, err := jobA.GetJobWithUnresolvedUpstream() - assert.ErrorContains(t, err, "invalid argument for entity project: project name is empty") + assert.ErrorContains(t, err, "invalid argument for entity job: failed to get static upstreams to resolve") assert.Len(t, jobWithUpstream.Upstreams(), 1) }) t.Run("should get unresolved upstream", func(t *testing.T) {