Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: duplicated upstream issue & job BC unclear logs #719

Merged
merged 16 commits into from
Jan 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 46 additions & 3 deletions core/job/job.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package job

import (
"fmt"
"strings"

"github.com/odpf/optimus/core/tenant"
Expand Down Expand Up @@ -44,6 +45,9 @@ func (j Job) FullName() string {

func (j Job) GetJobWithUnresolvedUpstream() (*WithUpstream, error) {
unresolvedStaticUpstreams, err := j.getStaticUpstreamsToResolve()
if err != nil {
err = errors.InvalidArgument(EntityJob, fmt.Sprintf("failed to get static upstreams to resolve for job %s", j.GetName()))
}
unresolvedInferredUpstreams := j.getInferredUpstreamsToResolve()
allUpstreams := unresolvedStaticUpstreams
allUpstreams = append(allUpstreams, unresolvedInferredUpstreams...)
Expand Down Expand Up @@ -225,10 +229,10 @@ func (w WithUpstreams) GetSubjectJobNames() []Name {
return names
}

func (w WithUpstreams) MergeWithResolvedUpstreams(resolvedUpstreamMap map[Name][]*Upstream) []*WithUpstream {
func (w WithUpstreams) MergeWithResolvedUpstreams(resolvedUpstreamsBySubjectJobMap map[Name][]*Upstream) []*WithUpstream {
var jobsWithMergedUpstream []*WithUpstream
for _, jobWithUnresolvedUpstream := range w {
resolvedUpstreams := resolvedUpstreamMap[jobWithUnresolvedUpstream.Name()]
resolvedUpstreams := resolvedUpstreamsBySubjectJobMap[jobWithUnresolvedUpstream.Name()]
resolvedUpstreamMapByFullName := Upstreams(resolvedUpstreams).ToFullNameAndUpstreamMap()
resolvedUpstreamMapByDestination := Upstreams(resolvedUpstreams).ToResourceDestinationAndUpstreamMap()

Expand All @@ -244,7 +248,8 @@ func (w WithUpstreams) MergeWithResolvedUpstreams(resolvedUpstreamMap map[Name][
}
mergedUpstream = append(mergedUpstream, unresolvedUpstream)
}
jobsWithMergedUpstream = append(jobsWithMergedUpstream, NewWithUpstream(jobWithUnresolvedUpstream.Job(), mergedUpstream))
distinctMergedUpstream := Upstreams(mergedUpstream).Deduplicate()
jobsWithMergedUpstream = append(jobsWithMergedUpstream, NewWithUpstream(jobWithUnresolvedUpstream.Job(), distinctMergedUpstream))
}
return jobsWithMergedUpstream
}
Expand Down Expand Up @@ -371,6 +376,44 @@ func (u Upstreams) ToResourceDestinationAndUpstreamMap() map[string]*Upstream {
return resourceDestinationUpstreamMap
}

func (u Upstreams) Deduplicate() []*Upstream {
resolvedUpstreamMap := make(map[string]*Upstream)
unresolvedStaticUpstreamMap := make(map[string]*Upstream)
unresolvedInferredUpstreamMap := make(map[string]*Upstream)

for _, upstream := range u {
if upstream.state == UpstreamStateUnresolved && upstream._type == UpstreamTypeStatic {
unresolvedStaticUpstreamMap[upstream.FullName()] = upstream
continue
}

if upstream.state == UpstreamStateUnresolved && upstream._type == UpstreamTypeInferred {
unresolvedInferredUpstreamMap[upstream.resource.String()] = upstream
continue
}

if upstreamInMap, ok := resolvedUpstreamMap[upstream.FullName()]; ok {
// keep static upstreams in the map if exists
if upstreamInMap._type == UpstreamTypeStatic {
continue
}
}
resolvedUpstreamMap[upstream.FullName()] = upstream
}

return mapsToUpstreams(resolvedUpstreamMap, unresolvedInferredUpstreamMap, unresolvedStaticUpstreamMap)
}

func mapsToUpstreams(upstreamsMaps ...map[string]*Upstream) []*Upstream {
var result []*Upstream
for _, upstreamsMap := range upstreamsMaps {
for _, upstream := range upstreamsMap {
result = append(result, upstream)
}
}
return result
}

type FullName string

func FullNameFrom(projectName tenant.ProjectName, jobName Name) FullName {
Expand Down
103 changes: 90 additions & 13 deletions core/job/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,78 @@ func TestEntityJob(t *testing.T) {
})
})

t.Run("Deduplicate", func(t *testing.T) {
t.Run("should return upstreams with static being prioritized if duplication is found", func(t *testing.T) {
upstreamResolved1Inferred := job.NewUpstreamResolved("job-a", "host-sample", "project.dataset.sample-a", sampleTenant, job.UpstreamTypeInferred, "", false)
upstreamResolved1Static := job.NewUpstreamResolved("job-a", "host-sample", "project.dataset.sample-a", sampleTenant, job.UpstreamTypeStatic, "", false)
upstreamResolved2 := job.NewUpstreamResolved("job-b", "host-sample", "project.dataset.sample-b", sampleTenant, job.UpstreamTypeInferred, "", false)
upstreamUnresolved1 := job.NewUpstreamUnresolvedStatic("job-c", sampleTenant.ProjectName())
upstreamUnresolved2 := job.NewUpstreamUnresolvedInferred("project.dataset.sample-d")
upstreamUnresolved3 := job.NewUpstreamUnresolvedStatic("job-c", sampleTenant.ProjectName())
upstreamUnresolved4 := job.NewUpstreamUnresolvedInferred("project.dataset.sample-d")

expected := []*job.Upstream{
upstreamResolved1Static,
upstreamResolved2,
upstreamUnresolved1,
upstreamUnresolved2,
}

upstreams := job.Upstreams([]*job.Upstream{
upstreamResolved1Inferred,
upstreamResolved1Static,
upstreamResolved2,
upstreamUnresolved1,
upstreamUnresolved2,
upstreamUnresolved3,
upstreamUnresolved4,
})
result := upstreams.Deduplicate()

assert.ElementsMatch(t, expected, result)
})
t.Run("should successfully return distinct upstreams when only resolved upstream is present", func(t *testing.T) {
upstreamResolved1Inferred := job.NewUpstreamResolved("job-a", "host-sample", "project.dataset.sample-a", sampleTenant, job.UpstreamTypeInferred, "", false)
upstreamResolved1Static := job.NewUpstreamResolved("job-a", "host-sample", "project.dataset.sample-a", sampleTenant, job.UpstreamTypeStatic, "", false)
upstreamResolved2 := job.NewUpstreamResolved("job-b", "host-sample", "project.dataset.sample-b", sampleTenant, job.UpstreamTypeInferred, "", false)

expected := []*job.Upstream{
upstreamResolved1Static,
upstreamResolved2,
}

upstreams := job.Upstreams([]*job.Upstream{
upstreamResolved1Inferred,
upstreamResolved1Static,
upstreamResolved2,
})
result := upstreams.Deduplicate()

assert.ElementsMatch(t, expected, result)
})
t.Run("should successfully return distinct upstreams when only unresolved upstream is present", func(t *testing.T) {
upstreamUnresolved1 := job.NewUpstreamUnresolvedStatic("job-c", sampleTenant.ProjectName())
upstreamUnresolved2 := job.NewUpstreamUnresolvedInferred("project.dataset.sample-d")
upstreamUnresolved3 := job.NewUpstreamUnresolvedStatic("job-c", sampleTenant.ProjectName())
upstreamUnresolved4 := job.NewUpstreamUnresolvedInferred("project.dataset.sample-d")

expected := []*job.Upstream{
upstreamUnresolved1,
upstreamUnresolved2,
}

upstreams := job.Upstreams([]*job.Upstream{
upstreamUnresolved1,
upstreamUnresolved2,
upstreamUnresolved3,
upstreamUnresolved4,
})
result := upstreams.Deduplicate()

assert.ElementsMatch(t, expected, result)
})
})

t.Run("FullNameFrom", func(t *testing.T) {
t.Run("should return the job full name given project and job name", func(t *testing.T) {
fullName := job.FullNameFrom(project.Name(), specA.Name())
Expand Down Expand Up @@ -273,31 +345,36 @@ func TestEntityJob(t *testing.T) {
})
})
t.Run("MergeWithResolvedUpstreams", func(t *testing.T) {
upstreamCUnresolved := job.NewUpstreamUnresolvedStatic("job-C", project.Name())
upstreamDUnresolved := job.NewUpstreamUnresolvedInferred("project.dataset.sample-d")
upstreamCUnresolved := job.NewUpstreamUnresolvedInferred("project.dataset.sample-c")
upstreamDUnresolvedInferred := job.NewUpstreamUnresolvedInferred("project.dataset.sample-d")
upstreamDUnresolvedStatic := job.NewUpstreamUnresolvedStatic("job-D", project.Name())
upstreamEUnresolved := job.NewUpstreamUnresolvedStatic("job-E", project.Name())
upstreamFUnresolved := job.NewUpstreamUnresolvedInferred("project.dataset.sample-f")

upstreamCResolved := job.NewUpstreamResolved("job-C", "host-sample", "project.dataset.sample-c", sampleTenant, job.UpstreamTypeStatic, "bq2bq", false)
upstreamDResolved := job.NewUpstreamResolved("job-D", "host-sample", "project.dataset.sample-d", sampleTenant, job.UpstreamTypeInferred, "bq2bq", false)
upstreamCResolved := job.NewUpstreamResolved("job-C", "host-sample", "project.dataset.sample-c", sampleTenant, job.UpstreamTypeInferred, "bq2bq", false)
upstreamDResolvedStatic := job.NewUpstreamResolved("job-D", "host-sample", "project.dataset.sample-d", sampleTenant, job.UpstreamTypeStatic, "bq2bq", false)
upstreamDResolvedInferred := job.NewUpstreamResolved("job-D", "host-sample", "project.dataset.sample-d", sampleTenant, job.UpstreamTypeInferred, "bq2bq", false)

resolvedUpstreamMap := map[job.Name][]*job.Upstream{
"job-A": {upstreamCResolved, upstreamDResolved},
"job-B": {upstreamDResolved},
"job-A": {upstreamCResolved, upstreamDResolvedInferred},
"job-B": {upstreamDResolvedStatic},
}

expected := []*job.WithUpstream{
job.NewWithUpstream(jobA, []*job.Upstream{upstreamCResolved, upstreamDResolved, upstreamEUnresolved}),
job.NewWithUpstream(jobB, []*job.Upstream{upstreamDResolved, upstreamEUnresolved, upstreamFUnresolved}),
job.NewWithUpstream(jobA, []*job.Upstream{upstreamCResolved, upstreamDResolvedInferred, upstreamEUnresolved}),
job.NewWithUpstream(jobB, []*job.Upstream{upstreamDResolvedStatic, upstreamEUnresolved, upstreamFUnresolved}),
}

jobsWithUnresolvedUpstream := []*job.WithUpstream{
job.NewWithUpstream(jobA, []*job.Upstream{upstreamCUnresolved, upstreamDUnresolved, upstreamEUnresolved}),
job.NewWithUpstream(jobB, []*job.Upstream{upstreamDUnresolved, upstreamEUnresolved, upstreamFUnresolved}),
job.NewWithUpstream(jobA, []*job.Upstream{upstreamCUnresolved, upstreamDUnresolvedInferred, upstreamEUnresolved}),
job.NewWithUpstream(jobB, []*job.Upstream{upstreamDUnresolvedStatic, upstreamDUnresolvedInferred, upstreamEUnresolved, upstreamFUnresolved}),
}

result := job.WithUpstreams(jobsWithUnresolvedUpstream).MergeWithResolvedUpstreams(resolvedUpstreamMap)
assert.EqualValues(t, expected, result)
assert.Equal(t, expected[0].Job(), result[0].Job())
assert.Equal(t, expected[1].Job(), result[1].Job())
assert.ElementsMatch(t, expected[0].Upstreams(), result[0].Upstreams())
assert.ElementsMatch(t, expected[1].Upstreams(), result[1].Upstreams())
})
})

Expand Down Expand Up @@ -333,7 +410,7 @@ func TestEntityJob(t *testing.T) {
jobA := job.NewJob(sampleTenant, specA, jobADestination, jobASources)

jobWithUpstream, err := jobA.GetJobWithUnresolvedUpstream()
assert.ErrorContains(t, err, "invalid argument for entity job: name is empty")
assert.ErrorContains(t, err, "invalid argument for entity job: failed to get static upstreams to resolve")
assert.Len(t, jobWithUpstream.Upstreams(), 1)
})
t.Run("should contains error when get static upstream failed because of project name empty", func(t *testing.T) {
Expand All @@ -343,7 +420,7 @@ func TestEntityJob(t *testing.T) {
jobA := job.NewJob(sampleTenant, specA, jobADestination, jobASources)

jobWithUpstream, err := jobA.GetJobWithUnresolvedUpstream()
assert.ErrorContains(t, err, "invalid argument for entity project: project name is empty")
assert.ErrorContains(t, err, "invalid argument for entity job: failed to get static upstreams to resolve")
assert.Len(t, jobWithUpstream.Upstreams(), 1)
})
t.Run("should get unresolved upstream", func(t *testing.T) {
Expand Down
9 changes: 6 additions & 3 deletions core/job/resolver/external_upstream_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ type ResourceManager interface {
}

func (e *extUpstreamResolver) Resolve(ctx context.Context, jobWithUpstream *job.WithUpstream, lw writer.LogWriter) (*job.WithUpstream, error) {
me := errors.NewMultiError(fmt.Sprintf("external upstream resolution errors for job %s", jobWithUpstream.Name().String()))
me := errors.NewMultiError(fmt.Sprintf("[%s] external upstream resolution errors for job %s", jobWithUpstream.Job().Tenant().NamespaceName().String(), jobWithUpstream.Name().String()))
mergedUpstreams := jobWithUpstream.GetResolvedUpstreams()
unresolvedUpstreams := jobWithUpstream.GetUnresolvedUpstreams()
resolvedExternally := false
for _, unresolvedUpstream := range unresolvedUpstreams {
externalUpstream, err := e.fetchOptimusUpstreams(ctx, unresolvedUpstream)
if err != nil || len(externalUpstream) == 0 {
Expand All @@ -53,11 +54,13 @@ func (e *extUpstreamResolver) Resolve(ctx context.Context, jobWithUpstream *job.
continue
}
mergedUpstreams = append(mergedUpstreams, externalUpstream...)
resolvedExternally = true
}
if len(me.Errors) > 0 {
lw.Write(writer.LogLevelError, errors.MultiToError(me).Error())
} else {
lw.Write(writer.LogLevelDebug, fmt.Sprintf("resolved job %s upstream from external", jobWithUpstream.Name().String()))
}
if resolvedExternally {
lw.Write(writer.LogLevelDebug, fmt.Sprintf("[%s] resolved job %s upstream from external", jobWithUpstream.Job().Tenant().NamespaceName().String(), jobWithUpstream.Name().String()))
}
return job.NewWithUpstream(jobWithUpstream.Job(), mergedUpstreams), errors.MultiToError(me)
}
Expand Down
8 changes: 5 additions & 3 deletions core/job/resolver/internal_upstream_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ func (i internalUpstreamResolver) Resolve(ctx context.Context, jobWithUnresolved
}

internalUpstream := mergeUpstreams(internalUpstreamInferred, internalUpstreamStatic)
fullNameUpstreamMap := job.Upstreams(internalUpstream).ToFullNameAndUpstreamMap()
resourceDestinationUpstreamMap := job.Upstreams(internalUpstream).ToResourceDestinationAndUpstreamMap()
distinctInternalUpstream := job.Upstreams(internalUpstream).Deduplicate()
fullNameUpstreamMap := job.Upstreams(distinctInternalUpstream).ToFullNameAndUpstreamMap()
resourceDestinationUpstreamMap := job.Upstreams(distinctInternalUpstream).ToResourceDestinationAndUpstreamMap()

var upstreamResults []*job.Upstream
for _, unresolvedUpstream := range jobWithUnresolvedUpstream.Upstreams() {
Expand All @@ -47,7 +48,8 @@ func (i internalUpstreamResolver) Resolve(ctx context.Context, jobWithUnresolved
upstreamResults = append(upstreamResults, unresolvedUpstream)
}

return job.NewWithUpstream(jobWithUnresolvedUpstream.Job(), upstreamResults), errors.MultiToError(me)
distinctUpstreams := job.Upstreams(upstreamResults).Deduplicate()
return job.NewWithUpstream(jobWithUnresolvedUpstream.Job(), distinctUpstreams), errors.MultiToError(me)
}

func (i internalUpstreamResolver) BulkResolve(ctx context.Context, projectName tenant.ProjectName, jobsWithUnresolvedUpstream []*job.WithUpstream) ([]*job.WithUpstream, error) {
Expand Down
43 changes: 36 additions & 7 deletions core/job/resolver/internal_upstream_resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,33 @@ func TestInternalUpstreamResolver(t *testing.T) {
internalUpstreamResolver := resolver.NewInternalUpstreamResolver(jobRepo)
result, err := internalUpstreamResolver.Resolve(ctx, jobWithUnresolvedUpstream)
assert.NoError(t, err)
assert.EqualValues(t, expectedJobWithUpstream.Upstreams(), result.Upstreams())
assert.Equal(t, expectedJobWithUpstream.Job(), result.Job())
assert.ElementsMatch(t, expectedJobWithUpstream.Upstreams(), result.Upstreams())
})
t.Run("resolves inferred and static upstream internally and prioritize static upstream when duplication found", func(t *testing.T) {
jobRepo := new(JobRepository)
logWriter := new(mockWriter)
defer logWriter.AssertExpectations(t)

specD, _ := job.NewSpecBuilder(jobVersion, "job-D", "sample-owner", jobSchedule, jobWindow, jobTask).WithSpecUpstream(upstreamSpec).Build()
jobDDestination := job.ResourceURN("resource-D")
jobDSources := []job.ResourceURN{"resource-C"}
jobD := job.NewJob(sampleTenant, specD, jobDDestination, jobDSources)

unresolvedUpstreamCInferred := job.NewUpstreamUnresolvedInferred("resource-C")
unresolvedUpstreamCStatic := job.NewUpstreamUnresolvedStatic("job-C", sampleTenant.ProjectName())
internalUpstreamCStatic := job.NewUpstreamResolved("job-C", "", "resource-C", sampleTenant, "static", taskName, false)

jobRepo.On("GetAllByResourceDestination", ctx, jobDSources[0]).Return([]*job.Job{jobC}, nil)
jobRepo.On("GetByJobName", ctx, sampleTenant.ProjectName(), specC.Name()).Return(jobC, nil)

jobWithUnresolvedUpstream := job.NewWithUpstream(jobD, []*job.Upstream{unresolvedUpstreamCStatic, unresolvedUpstreamCInferred})
expectedJobWithUpstream := job.NewWithUpstream(jobD, []*job.Upstream{internalUpstreamCStatic})

internalUpstreamResolver := resolver.NewInternalUpstreamResolver(jobRepo)
result, err := internalUpstreamResolver.Resolve(ctx, jobWithUnresolvedUpstream)
assert.NoError(t, err)
assert.EqualValues(t, expectedJobWithUpstream, result)
})
t.Run("resolves inferred upstream internally", func(t *testing.T) {
jobRepo := new(JobRepository)
Expand All @@ -80,7 +106,7 @@ func TestInternalUpstreamResolver(t *testing.T) {
internalUpstreamResolver := resolver.NewInternalUpstreamResolver(jobRepo)
result, err := internalUpstreamResolver.Resolve(ctx, jobWithUnresolvedUpstream)
assert.NoError(t, err)
assert.EqualValues(t, expectedJobWithUpstream.Upstreams(), result.Upstreams())
assert.ElementsMatch(t, expectedJobWithUpstream.Upstreams(), result.Upstreams())
})
t.Run("resolves static upstream internally", func(t *testing.T) {
jobRepo := new(JobRepository)
Expand All @@ -99,7 +125,7 @@ func TestInternalUpstreamResolver(t *testing.T) {
internalUpstreamResolver := resolver.NewInternalUpstreamResolver(jobRepo)
result, err := internalUpstreamResolver.Resolve(ctx, jobWithUnresolvedUpstream)
assert.NoError(t, err)
assert.EqualValues(t, expectedJobWithUpstream.Upstreams(), result.Upstreams())
assert.ElementsMatch(t, expectedJobWithUpstream.Upstreams(), result.Upstreams())
})
t.Run("should not stop the process but keep appending error when unable to resolve inferred upstream", func(t *testing.T) {
jobRepo := new(JobRepository)
Expand All @@ -119,7 +145,7 @@ func TestInternalUpstreamResolver(t *testing.T) {
internalUpstreamResolver := resolver.NewInternalUpstreamResolver(jobRepo)
result, err := internalUpstreamResolver.Resolve(ctx, jobWithUnresolvedUpstream)
assert.ErrorContains(t, err, "internal error")
assert.EqualValues(t, expectedJobWithUpstream.Upstreams(), result.Upstreams())
assert.ElementsMatch(t, expectedJobWithUpstream.Upstreams(), result.Upstreams())
})
t.Run("should not stop the process but keep appending error when unable to resolve static upstream", func(t *testing.T) {
jobRepo := new(JobRepository)
Expand All @@ -144,7 +170,7 @@ func TestInternalUpstreamResolver(t *testing.T) {
internalUpstreamResolver := resolver.NewInternalUpstreamResolver(jobRepo)
result, err := internalUpstreamResolver.Resolve(ctx, jobWithUnresolvedUpstream)
assert.ErrorContains(t, err, "not found")
assert.EqualValues(t, expectedJobWithUpstream.Upstreams(), result.Upstreams())
assert.ElementsMatch(t, expectedJobWithUpstream.Upstreams(), result.Upstreams())
})
t.Run("should not stop the process but keep appending error when static upstream name is invalid", func(t *testing.T) {
jobRepo := new(JobRepository)
Expand All @@ -171,7 +197,7 @@ func TestInternalUpstreamResolver(t *testing.T) {
internalUpstreamResolver := resolver.NewInternalUpstreamResolver(jobRepo)
result, err := internalUpstreamResolver.Resolve(ctx, jobWithUnresolvedUpstream)
assert.ErrorContains(t, err, "name is empty")
assert.EqualValues(t, expectedJobWithUpstream.Upstreams(), result.Upstreams())
assert.ElementsMatch(t, expectedJobWithUpstream.Upstreams(), result.Upstreams())
})
})
t.Run("BulkResolve", func(t *testing.T) {
Expand Down Expand Up @@ -206,7 +232,10 @@ func TestInternalUpstreamResolver(t *testing.T) {
internalUpstreamResolver := resolver.NewInternalUpstreamResolver(jobRepo)
result, err := internalUpstreamResolver.BulkResolve(ctx, sampleTenant.ProjectName(), jobsWithUnresolvedUpstream)
assert.NoError(t, err)
assert.EqualValues(t, expectedJobsWithUpstream, result)
assert.Equal(t, expectedJobsWithUpstream[0].Job(), result[0].Job())
assert.Equal(t, expectedJobsWithUpstream[1].Job(), result[1].Job())
assert.ElementsMatch(t, expectedJobsWithUpstream[0].Upstreams(), result[0].Upstreams())
assert.ElementsMatch(t, expectedJobsWithUpstream[1].Upstreams(), result[1].Upstreams())
})
t.Run("returns error if unable to resolve upstream internally", func(t *testing.T) {
jobRepo := new(JobRepository)
Expand Down
Loading