Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: duplicated upstream issue & job BC unclear logs #719

Merged
merged 16 commits into from
Jan 17, 2023
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 24 additions & 3 deletions core/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,10 @@ func (w WithUpstreams) GetSubjectJobNames() []Name {
return names
}

func (w WithUpstreams) MergeWithResolvedUpstreams(resolvedUpstreamMap map[Name][]*Upstream) []*WithUpstream {
func (w WithUpstreams) MergeWithResolvedUpstreams(resolvedUpstreamsBySubjectJobMap map[Name][]*Upstream) []*WithUpstream {
var jobsWithMergedUpstream []*WithUpstream
for _, jobWithUnresolvedUpstream := range w {
resolvedUpstreams := resolvedUpstreamMap[jobWithUnresolvedUpstream.Name()]
resolvedUpstreams := resolvedUpstreamsBySubjectJobMap[jobWithUnresolvedUpstream.Name()]
resolvedUpstreamMapByFullName := Upstreams(resolvedUpstreams).ToFullNameAndUpstreamMap()
resolvedUpstreamMapByDestination := Upstreams(resolvedUpstreams).ToResourceDestinationAndUpstreamMap()

Expand All @@ -183,7 +183,8 @@ func (w WithUpstreams) MergeWithResolvedUpstreams(resolvedUpstreamMap map[Name][
}
mergedUpstream = append(mergedUpstream, unresolvedUpstream)
}
jobsWithMergedUpstream = append(jobsWithMergedUpstream, NewWithUpstream(jobWithUnresolvedUpstream.Job(), mergedUpstream))
distinctMergedUpstream := Upstreams(mergedUpstream).Deduplicate()
jobsWithMergedUpstream = append(jobsWithMergedUpstream, NewWithUpstream(jobWithUnresolvedUpstream.Job(), distinctMergedUpstream))
}
return jobsWithMergedUpstream
}
Expand Down Expand Up @@ -310,6 +311,26 @@ func (u Upstreams) ToResourceDestinationAndUpstreamMap() map[string]*Upstream {
return resourceDestinationUpstreamMap
}

func (u Upstreams) Deduplicate() []*Upstream {
distinctUpstreamsMap := make(map[string]*Upstream)
for _, upstream := range u {
if upstreamInMap, ok := distinctUpstreamsMap[upstream.FullName()]; ok {
// prioritize static upstreams
if upstreamInMap._type == UpstreamTypeStatic {
continue
}
}
distinctUpstreamsMap[upstream.FullName()] = upstream
}

var distinctUpstreams []*Upstream
for _, upstream := range distinctUpstreamsMap {
distinctUpstreams = append(distinctUpstreams, upstream)
}

return distinctUpstreams
}

type FullName string

func FullNameFrom(projectName tenant.ProjectName, jobName Name) FullName {
Expand Down
45 changes: 34 additions & 11 deletions core/job/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,24 @@ func TestEntityJob(t *testing.T) {
})
})

t.Run("Deduplicate", func(t *testing.T) {
t.Run("should return upstreams with static being prioritized if duplication is found", func(t *testing.T) {
upstreamResolved1Inferred := job.NewUpstreamResolved("job-a", "host-sample", "project.dataset.sample-a", sampleTenant, job.UpstreamTypeInferred, "", false)
upstreamResolved1Static := job.NewUpstreamResolved("job-a", "host-sample", "project.dataset.sample-a", sampleTenant, job.UpstreamTypeStatic, "", false)
upstreamResolved2 := job.NewUpstreamResolved("job-b", "host-sample", "project.dataset.sample-b", sampleTenant, job.UpstreamTypeInferred, "", false)

expectedMap := []*job.Upstream{
upstreamResolved1Static,
upstreamResolved2,
}

upstreams := job.Upstreams([]*job.Upstream{upstreamResolved1Inferred, upstreamResolved1Static, upstreamResolved2})
resultMap := upstreams.Deduplicate()

assert.EqualValues(t, expectedMap, resultMap)
})
})

t.Run("FullNameFrom", func(t *testing.T) {
t.Run("should return the job full name given project and job name", func(t *testing.T) {
fullName := job.FullNameFrom(project.Name(), specA.Name())
Expand Down Expand Up @@ -273,31 +291,36 @@ func TestEntityJob(t *testing.T) {
})
})
t.Run("MergeWithResolvedUpstreams", func(t *testing.T) {
upstreamCUnresolved := job.NewUpstreamUnresolvedStatic("job-C", project.Name())
upstreamDUnresolved := job.NewUpstreamUnresolvedInferred("project.dataset.sample-d")
upstreamCUnresolved := job.NewUpstreamUnresolvedInferred("project.dataset.sample-c")
upstreamDUnresolvedInferred := job.NewUpstreamUnresolvedInferred("project.dataset.sample-d")
upstreamDUnresolvedStatic := job.NewUpstreamUnresolvedStatic("job-D", project.Name())
upstreamEUnresolved := job.NewUpstreamUnresolvedStatic("job-E", project.Name())
upstreamFUnresolved := job.NewUpstreamUnresolvedInferred("project.dataset.sample-f")

upstreamCResolved := job.NewUpstreamResolved("job-C", "host-sample", "project.dataset.sample-c", sampleTenant, job.UpstreamTypeStatic, "bq2bq", false)
upstreamDResolved := job.NewUpstreamResolved("job-D", "host-sample", "project.dataset.sample-d", sampleTenant, job.UpstreamTypeInferred, "bq2bq", false)
upstreamCResolved := job.NewUpstreamResolved("job-C", "host-sample", "project.dataset.sample-c", sampleTenant, job.UpstreamTypeInferred, "bq2bq", false)
upstreamDResolvedStatic := job.NewUpstreamResolved("job-D", "host-sample", "project.dataset.sample-d", sampleTenant, job.UpstreamTypeStatic, "bq2bq", false)
upstreamDResolvedInferred := job.NewUpstreamResolved("job-D", "host-sample", "project.dataset.sample-d", sampleTenant, job.UpstreamTypeInferred, "bq2bq", false)

resolvedUpstreamMap := map[job.Name][]*job.Upstream{
"job-A": {upstreamCResolved, upstreamDResolved},
"job-B": {upstreamDResolved},
"job-A": {upstreamCResolved, upstreamDResolvedInferred},
"job-B": {upstreamDResolvedStatic},
}

expected := []*job.WithUpstream{
job.NewWithUpstream(jobA, []*job.Upstream{upstreamCResolved, upstreamDResolved, upstreamEUnresolved}),
job.NewWithUpstream(jobB, []*job.Upstream{upstreamDResolved, upstreamEUnresolved, upstreamFUnresolved}),
job.NewWithUpstream(jobA, []*job.Upstream{upstreamCResolved, upstreamDResolvedInferred, upstreamEUnresolved}),
job.NewWithUpstream(jobB, []*job.Upstream{upstreamDResolvedStatic, upstreamEUnresolved, upstreamFUnresolved}),
}

jobsWithUnresolvedUpstream := []*job.WithUpstream{
job.NewWithUpstream(jobA, []*job.Upstream{upstreamCUnresolved, upstreamDUnresolved, upstreamEUnresolved}),
job.NewWithUpstream(jobB, []*job.Upstream{upstreamDUnresolved, upstreamEUnresolved, upstreamFUnresolved}),
job.NewWithUpstream(jobA, []*job.Upstream{upstreamCUnresolved, upstreamDUnresolvedInferred, upstreamEUnresolved}),
job.NewWithUpstream(jobB, []*job.Upstream{upstreamDUnresolvedStatic, upstreamDUnresolvedInferred, upstreamEUnresolved, upstreamFUnresolved}),
}

result := job.WithUpstreams(jobsWithUnresolvedUpstream).MergeWithResolvedUpstreams(resolvedUpstreamMap)
assert.EqualValues(t, expected, result)
assert.Equal(t, expected[0].Job(), result[0].Job())
assert.Equal(t, expected[1].Job(), result[1].Job())
assert.ElementsMatch(t, expected[0].Upstreams(), result[0].Upstreams())
assert.ElementsMatch(t, expected[1].Upstreams(), result[1].Upstreams())
})
})

Expand Down
9 changes: 6 additions & 3 deletions core/job/resolver/external_upstream_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ type ResourceManager interface {
}

func (e *extUpstreamResolver) Resolve(ctx context.Context, jobWithUpstream *job.WithUpstream, lw writer.LogWriter) (*job.WithUpstream, error) {
me := errors.NewMultiError(fmt.Sprintf("external upstream resolution errors for job %s", jobWithUpstream.Name().String()))
me := errors.NewMultiError(fmt.Sprintf("[%s] external upstream resolution errors for job %s", jobWithUpstream.Job().Tenant().NamespaceName().String(), jobWithUpstream.Name().String()))
mergedUpstreams := jobWithUpstream.GetResolvedUpstreams()
unresolvedUpstreams := jobWithUpstream.GetUnresolvedUpstreams()
resolvedExternally := false
for _, unresolvedUpstream := range unresolvedUpstreams {
externalUpstream, err := e.fetchOptimusUpstreams(ctx, unresolvedUpstream)
if err != nil || len(externalUpstream) == 0 {
Expand All @@ -53,11 +54,13 @@ func (e *extUpstreamResolver) Resolve(ctx context.Context, jobWithUpstream *job.
continue
}
mergedUpstreams = append(mergedUpstreams, externalUpstream...)
resolvedExternally = true
}
if len(me.Errors) > 0 {
lw.Write(writer.LogLevelError, errors.MultiToError(me).Error())
} else {
lw.Write(writer.LogLevelDebug, fmt.Sprintf("resolved job %s upstream from external", jobWithUpstream.Name().String()))
}
if resolvedExternally {
lw.Write(writer.LogLevelDebug, fmt.Sprintf("[%s] resolved job %s upstream from external", jobWithUpstream.Job().Tenant().NamespaceName().String(), jobWithUpstream.Name().String()))
}
return job.NewWithUpstream(jobWithUpstream.Job(), mergedUpstreams), errors.MultiToError(me)
}
Expand Down
5 changes: 4 additions & 1 deletion core/job/resolver/internal_upstream_resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,10 @@ func TestInternalUpstreamResolver(t *testing.T) {
internalUpstreamResolver := resolver.NewInternalUpstreamResolver(jobRepo)
result, err := internalUpstreamResolver.BulkResolve(ctx, sampleTenant.ProjectName(), jobsWithUnresolvedUpstream)
assert.NoError(t, err)
assert.EqualValues(t, expectedJobsWithUpstream, result)
assert.Equal(t, expectedJobsWithUpstream[0].Job(), result[0].Job())
assert.Equal(t, expectedJobsWithUpstream[1].Job(), result[1].Job())
assert.ElementsMatch(t, expectedJobsWithUpstream[0].Upstreams(), result[0].Upstreams())
assert.ElementsMatch(t, expectedJobsWithUpstream[1].Upstreams(), result[1].Upstreams())
})
t.Run("returns error if unable to resolve upstream internally", func(t *testing.T) {
jobRepo := new(JobRepository)
Expand Down
12 changes: 10 additions & 2 deletions core/job/resolver/upstream_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,20 @@ func (u UpstreamResolver) BulkResolve(ctx context.Context, projectName tenant.Pr
var jobsWithUnresolvedUpstream []*job.WithUpstream
for _, subjectJob := range jobs {
jobWithUnresolvedUpstream, err := u.getJobWithUnresolvedUpstream(subjectJob)
me.Append(err)
if err != nil {
errorMsg := fmt.Sprintf("[%s] %s", subjectJob.Tenant().NamespaceName().String(), err.Error())
logWriter.Write(writer.LogLevelError, errorMsg)
me.Append(err)
}
jobsWithUnresolvedUpstream = append(jobsWithUnresolvedUpstream, jobWithUnresolvedUpstream)
}

jobsWithResolvedInternalUpstreams, err := u.internalUpstreamResolver.BulkResolve(ctx, projectName, jobsWithUnresolvedUpstream)
if err != nil {
errorMsg := fmt.Sprintf("unable to resolve upstream: %s", err.Error())
logWriter.Write(writer.LogLevelError, errorMsg)
return nil, errors.NewError(errors.ErrInternalError, job.EntityJob, errorMsg)
me.Append(errors.NewError(errors.ErrInternalError, job.EntityJob, errorMsg))
return nil, errors.MultiToError(me)
}

jobsWithResolvedExternalUpstreams, err := u.externalUpstreamResolver.BulkResolve(ctx, jobsWithResolvedInternalUpstreams, logWriter)
Expand Down Expand Up @@ -84,6 +89,9 @@ func (u UpstreamResolver) Resolve(ctx context.Context, subjectJob *job.Job, logW

func (u UpstreamResolver) getJobWithUnresolvedUpstream(subjectJob *job.Job) (*job.WithUpstream, error) {
unresolvedStaticUpstreams, err := u.getStaticUpstreamsToResolve(subjectJob.StaticUpstreamNames(), subjectJob.ProjectName())
if err != nil {
err = errors.InvalidArgument(job.EntityJob, fmt.Sprintf("failed to get static upstreams to resolve for job %s", subjectJob.GetName()))
}

unresolvedInferredUpstreams := u.getInferredUpstreamsToResolve(subjectJob.Sources())

Expand Down
2 changes: 1 addition & 1 deletion core/job/resolver/upstream_resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ func TestUpstreamResolver(t *testing.T) {

upstreamResolver := resolver.NewUpstreamResolver(jobRepo, externalUpstreamResolver, internalUpstreamResolver)
result, err := upstreamResolver.Resolve(ctx, jobA, logWriter)
assert.ErrorContains(t, err, "name is empty")
assert.ErrorContains(t, err, "failed to get static upstreams to resolve")
assert.EqualValues(t, expectedUpstream, result)
})
t.Run("should not break process but still return error if failed to resolve some static upstream internally", func(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion core/job/service/job_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ func (j JobService) bulkDelete(ctx context.Context, jobTenant tenant.Tenant, toD
if len(downstreamList) > 0 {
downstreamFullNames := job.DownstreamList(downstreamList).GetDownstreamFullNames()
errorMsg := fmt.Sprintf("deleting job %s failed. job is being used by %s", spec.Name().String(), downstreamFullNames.String())
logWriter.Write(writer.LogLevelError, fmt.Sprintf("[%s] %s", jobTenant.NamespaceName().String(), spec.Name().String()))
logWriter.Write(writer.LogLevelError, fmt.Sprintf("[%s] %s", jobTenant.NamespaceName().String(), errorMsg))
me.Append(errors.NewError(errors.ErrFailedPrecond, job.EntityJob, errorMsg))
continue
}
Expand Down
2 changes: 1 addition & 1 deletion core/job/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ type Hook struct {

func NewHook(name string, config Config) (*Hook, error) {
if name == "" {
return nil, errors.InvalidArgument(EntityJob, "name is empty")
return nil, errors.InvalidArgument(EntityJob, "hook name is empty")
}
return &Hook{name: name, config: config}, nil
}
Expand Down
6 changes: 3 additions & 3 deletions internal/store/postgres/job/job_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ func (JobRepository) toUpstreams(storeUpstreams []JobWithUpstream) ([]*job.Upstr
if storeUpstream.UpstreamProjectName.Valid && storeUpstream.UpstreamNamespaceName.Valid {
upstreamTenant, err = tenant.NewTenant(storeUpstream.UpstreamProjectName.String, storeUpstream.UpstreamNamespaceName.String)
if err != nil {
me.Append(err)
me.Append(errors.Wrap(job.EntityJob, fmt.Sprintf("failed to get tenant for upstream %s of job %s", storeUpstream.UpstreamJobName.String, storeUpstream.JobName), err))
continue
}
}
Expand All @@ -333,7 +333,7 @@ func (JobRepository) toUpstreams(storeUpstreams []JobWithUpstream) ([]*job.Upstr
if storeUpstream.UpstreamTaskName.Valid {
taskName, err = job.TaskNameFrom(storeUpstream.UpstreamTaskName.String)
if err != nil {
me.Append(err)
me.Append(errors.Wrap(job.EntityJob, fmt.Sprintf("failed to get task for upstream %s of job %s", storeUpstream.UpstreamJobName.String, storeUpstream.JobName), err))
continue
}
}
Expand All @@ -359,7 +359,7 @@ func (JobRepository) toUpstreams(storeUpstreams []JobWithUpstream) ([]*job.Upstr
if err := me.ToErr(); err != nil {
return nil, err
}
return upstreams, nil
return job.Upstreams(upstreams).Deduplicate(), nil
arinda-arif marked this conversation as resolved.
Show resolved Hide resolved
}

func (j JobRepository) GetByJobName(ctx context.Context, projectName tenant.ProjectName, jobName job.Name) (*job.Job, error) {
Expand Down
32 changes: 32 additions & 0 deletions internal/store/postgres/job/job_repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,38 @@ func TestPostgresJobRepository(t *testing.T) {
upstreamE,
}

upstreams, err := jobRepo.ResolveUpstreams(ctx, proj.Name(), []job.Name{jobSpecA.Name()})
assert.NoError(t, err)
assert.EqualValues(t, expectedUpstreams, upstreams[jobSpecA.Name()])
})
t.Run("returns job with static upstream if found duplicated upstream from static and inferred", func(t *testing.T) {
db := dbSetup()

tenantDetails, err := tenant.NewTenantDetails(proj, namespace)
assert.NoError(t, err)

upstreamName := job.SpecUpstreamNameFrom("test-proj/sample-job-B")
jobAUpstream, _ := job.NewSpecUpstreamBuilder().WithUpstreamNames([]job.SpecUpstreamName{upstreamName}).Build()
jobSpecA, _ := job.NewSpecBuilder(jobVersion, "sample-job-A", jobOwner, jobSchedule, jobWindow, jobTask).
WithDescription(jobDescription).
WithSpecUpstream(jobAUpstream).
Build()
jobA := job.NewJob(sampleTenant, jobSpecA, "dev.resource.sample_a", []job.ResourceURN{"dev.resource.sample_b"})

jobSpecB, err := job.NewSpecBuilder(jobVersion, "sample-job-B", jobOwner, jobSchedule, jobWindow, jobTask).WithDescription(jobDescription).Build()
assert.NoError(t, err)
jobB := job.NewJob(sampleTenant, jobSpecB, "dev.resource.sample_b", nil)

jobRepo := postgres.NewJobRepository(db)
_, err = jobRepo.Add(ctx, []*job.Job{jobA, jobB})
assert.NoError(t, err)

upstreamB := job.NewUpstreamResolved(jobSpecB.Name(), "", jobB.Destination(), tenantDetails.ToTenant(), "static", taskName, false)

expectedUpstreams := []*job.Upstream{
upstreamB,
}

upstreams, err := jobRepo.ResolveUpstreams(ctx, proj.Name(), []job.Name{jobSpecA.Name()})
assert.NoError(t, err)
assert.EqualValues(t, expectedUpstreams, upstreams[jobSpecA.Name()])
Expand Down