From c4cf58544876da29daab5376e9f7793f7719bc6c Mon Sep 17 00:00:00 2001 From: Sandeep Bhardwaj Date: Fri, 10 Feb 2023 11:30:20 +0530 Subject: [PATCH] refactor: simplify fetching of secrets for compilation (#735) --- core/job/service/job_service_test.go | 9 +- core/job/service/plugin_service.go | 34 +-- core/job/service/plugin_service_test.go | 196 ++---------------- .../service/executor_input_compiler.go | 15 +- .../service/executor_input_compiler_test.go | 59 +----- .../scheduler/service/notification_service.go | 5 +- core/tenant/service/tenant_service.go | 9 +- core/tenant/service/tenant_service_test.go | 32 ++- core/tenant/tenant.go | 16 +- core/tenant/tenant_test.go | 28 ++- .../store/postgres/job/job_repository_test.go | 14 +- server/optimus.go | 2 +- 12 files changed, 120 insertions(+), 299 deletions(-) diff --git a/core/job/service/job_service_test.go b/core/job/service/job_service_test.go index 0cea699ece..4acc5e202c 100644 --- a/core/job/service/job_service_test.go +++ b/core/job/service/job_service_test.go @@ -31,15 +31,20 @@ func TestJobService(t *testing.T) { map[string]string{ "bucket": "gs://ns_bucket", }) + secret1, err := tenant.NewPlainTextSecret("table_name", "secret_table") + assert.Nil(t, err) + sampleTenant, _ := tenant.NewTenant(project.Name().String(), namespace.Name().String()) - detailedTenant, _ := tenant.NewTenantDetails(project, namespace) + detailedTenant, _ := tenant.NewTenantDetails(project, namespace, []*tenant.PlainTextSecret{secret1}) otherNamespace, _ := tenant.NewNamespace("other-ns", project.Name(), map[string]string{ "bucket": "gs://other_ns_bucket", }) otherTenant, _ := tenant.NewTenant(project.Name().String(), otherNamespace.Name().String()) - detailedOtherTenant, _ := tenant.NewTenantDetails(project, otherNamespace) + secret2, err := tenant.NewPlainTextSecret("bucket", "gs://some_secret_bucket") + assert.Nil(t, err) + detailedOtherTenant, _ := tenant.NewTenantDetails(project, otherNamespace, []*tenant.PlainTextSecret{secret2}) jobVersion := 1 startDate, err := job.ScheduleDateFrom("2022-10-01") diff --git a/core/job/service/plugin_service.go b/core/job/service/plugin_service.go index 6cbda54977..568ab9ce5a 100644 --- a/core/job/service/plugin_service.go +++ b/core/job/service/plugin_service.go @@ -30,11 +30,6 @@ var ( ErrYamlModNotExist = errors.New("yaml mod not found for plugin") ) -type SecretsGetter interface { - Get(ctx context.Context, projName tenant.ProjectName, namespaceName, name string) (*tenant.PlainTextSecret, error) - GetAll(ctx context.Context, projName tenant.ProjectName, namespaceName string) ([]*tenant.PlainTextSecret, error) -} - type PluginRepo interface { GetByName(string) (*plugin.Plugin, error) } @@ -45,8 +40,6 @@ type Engine interface { } type JobPluginService struct { - secretsGetter SecretsGetter - pluginRepo PluginRepo engine Engine @@ -55,8 +48,8 @@ type JobPluginService struct { logger log.Logger } -func NewJobPluginService(secretsGetter SecretsGetter, pluginRepo PluginRepo, engine Engine, logger log.Logger) *JobPluginService { - return &JobPluginService{secretsGetter: secretsGetter, pluginRepo: pluginRepo, engine: engine, logger: logger, now: time.Now} +func NewJobPluginService(pluginRepo PluginRepo, engine Engine, logger log.Logger) *JobPluginService { + return &JobPluginService{pluginRepo: pluginRepo, engine: engine, logger: logger, now: time.Now} } func (p JobPluginService) Info(_ context.Context, taskName job.TaskName) (*plugin.Info, error) { @@ -82,10 +75,7 @@ func (p JobPluginService) GenerateDestination(ctx context.Context, tnnt *tenant. return "", ErrUpstreamModNotFound } - compiledConfig, err := p.compileConfig(ctx, task.Config().Map(), tnnt) - if err != nil { - return "", err - } + compiledConfig := p.compileConfig(task.Config().Map(), tnnt) destination, err := taskPlugin.DependencyMod.GenerateDestination(ctx, plugin.GenerateDestinationRequest{ Config: compiledConfig, @@ -114,10 +104,7 @@ func (p JobPluginService) GenerateUpstreams(ctx context.Context, jobTenant *tena return nil, fmt.Errorf("asset compilation failure: %w", err) } - compiledConfigs, err := p.compileConfig(ctx, spec.Task().Config(), jobTenant) - if err != nil { - return nil, err - } + compiledConfigs := p.compileConfig(spec.Task().Config(), jobTenant) resp, err := taskPlugin.DependencyMod.GenerateDependencies(ctx, plugin.GenerateDependenciesRequest{ Config: compiledConfigs, @@ -139,16 +126,10 @@ func (p JobPluginService) GenerateUpstreams(ctx context.Context, jobTenant *tena return upstreamURNs, nil } -func (p JobPluginService) compileConfig(ctx context.Context, configs job.Config, tnnt *tenant.WithDetails) (plugin.Configs, error) { - jobTenant := tnnt.ToTenant() - secrets, err := p.secretsGetter.GetAll(ctx, jobTenant.ProjectName(), jobTenant.NamespaceName().String()) - if err != nil { - return nil, err - } - +func (p JobPluginService) compileConfig(configs job.Config, tnnt *tenant.WithDetails) plugin.Configs { tmplCtx := compiler.PrepareContext( compiler.From(tnnt.GetConfigs()).WithName("proj").WithKeyPrefix(projectConfigPrefix), - compiler.From(tenant.PlainTextSecrets(secrets).ToMap()).WithName("secret"), + compiler.From(tnnt.SecretsMap()).WithName("secret"), ) var pluginConfigs plugin.Configs @@ -156,13 +137,14 @@ func (p JobPluginService) compileConfig(ctx context.Context, configs job.Config, compiledConf, err := p.engine.CompileString(val, tmplCtx) if err != nil { p.logger.Warn("error in template compilation: ", err.Error()) + compiledConf = val } pluginConfigs = append(pluginConfigs, plugin.Config{ Name: key, Value: compiledConf, }) } - return pluginConfigs, nil + return pluginConfigs } func (p JobPluginService) compileAsset(ctx context.Context, taskPlugin *plugin.Plugin, spec *job.Spec, scheduledAt time.Time) (map[string]string, error) { diff --git a/core/job/service/plugin_service_test.go b/core/job/service/plugin_service_test.go index aa86a8b9ef..411f52c0dc 100644 --- a/core/job/service/plugin_service_test.go +++ b/core/job/service/plugin_service_test.go @@ -30,7 +30,14 @@ func TestPluginService(t *testing.T) { map[string]string{ "bucket": "gs://ns_bucket", }) - tenantDetails, _ := tenant.NewTenantDetails(project, namespace) + + secret1, err := tenant.NewPlainTextSecret("table_name", "secret_table") + assert.Nil(t, err) + + secret2, err := tenant.NewPlainTextSecret("bucket", "gs://some_secret_bucket") + assert.Nil(t, err) + + tenantDetails, _ := tenant.NewTenantDetails(project, namespace, tenant.PlainTextSecrets{secret1, secret2}) startDate, err := job.ScheduleDateFrom("2022-10-01") assert.NoError(t, err) jobSchedule, err := job.NewScheduleBuilder(startDate).Build() @@ -51,7 +58,7 @@ func TestPluginService(t *testing.T) { pluginRepo.On("GetByName", jobTask.Name().String()).Return(nil, errors.New("some error when fetch plugin")) defer pluginRepo.AssertExpectations(t) - pluginService := service.NewJobPluginService(nil, pluginRepo, nil, nil) + pluginService := service.NewJobPluginService(pluginRepo, nil, nil) result, err := pluginService.Info(ctx, jobTask.Name()) assert.Error(t, err) assert.Nil(t, result) @@ -70,7 +77,7 @@ func TestPluginService(t *testing.T) { newPlugin := &plugin.Plugin{DependencyMod: depMod} pluginRepo.On("GetByName", jobTask.Name().String()).Return(newPlugin, nil) - pluginService := service.NewJobPluginService(nil, pluginRepo, nil, nil) + pluginService := service.NewJobPluginService(pluginRepo, nil, nil) result, err := pluginService.Info(ctx, jobTask.Name()) assert.Error(t, err) assert.Nil(t, result) @@ -96,7 +103,7 @@ func TestPluginService(t *testing.T) { }, nil) defer yamlMod.AssertExpectations(t) - pluginService := service.NewJobPluginService(nil, pluginRepo, nil, nil) + pluginService := service.NewJobPluginService(pluginRepo, nil, nil) result, err := pluginService.Info(ctx, jobTask.Name()) assert.NoError(t, err) assert.NotNil(t, result) @@ -110,9 +117,6 @@ func TestPluginService(t *testing.T) { t.Run("returns destination", func(t *testing.T) { logger := log.NewLogrus() - secretsGetter := new(SecretsGetter) - defer secretsGetter.AssertExpectations(t) - pluginRepo := new(mockPluginRepo) defer pluginRepo.AssertExpectations(t) @@ -128,14 +132,6 @@ func TestPluginService(t *testing.T) { taskPlugin := &plugin.Plugin{DependencyMod: depMod, YamlMod: yamlMod} pluginRepo.On("GetByName", jobTask.Name().String()).Return(taskPlugin, nil) - secret1, err := tenant.NewPlainTextSecret("table_name", "secret_table") - assert.Nil(t, err) - - secret2, err := tenant.NewPlainTextSecret("bucket", "gs://some_secret_bucket") - assert.Nil(t, err) - - secretsGetter.On("GetAll", ctx, project.Name(), namespace.Name().String()).Return([]*tenant.PlainTextSecret{secret1, secret2}, nil) - destination := "project.dataset.table" destinationURN := job.ResourceURN("bigquery://project.dataset.table") depMod.On("GenerateDestination", ctx, mock.Anything).Return(&plugin.GenerateDestinationResponse{ @@ -143,7 +139,7 @@ func TestPluginService(t *testing.T) { Type: "bigquery", }, nil) - pluginService := service.NewJobPluginService(secretsGetter, pluginRepo, engine, logger) + pluginService := service.NewJobPluginService(pluginRepo, engine, logger) result, err := pluginService.GenerateDestination(ctx, tenantDetails, jobTask) assert.Nil(t, err) assert.Equal(t, destinationURN, result) @@ -151,9 +147,6 @@ func TestPluginService(t *testing.T) { t.Run("returns error if unable to find the plugin", func(t *testing.T) { logger := log.NewLogrus() - secretsGetter := new(SecretsGetter) - defer secretsGetter.AssertExpectations(t) - pluginRepo := new(mockPluginRepo) defer pluginRepo.AssertExpectations(t) @@ -162,7 +155,7 @@ func TestPluginService(t *testing.T) { pluginRepo.On("GetByName", jobTask.Name().String()).Return(nil, errors.New("not found")) - pluginService := service.NewJobPluginService(secretsGetter, pluginRepo, engine, logger) + pluginService := service.NewJobPluginService(pluginRepo, engine, logger) result, err := pluginService.GenerateDestination(ctx, tenantDetails, jobTask) assert.ErrorContains(t, err, "not found") assert.Equal(t, "", result.String()) @@ -170,9 +163,6 @@ func TestPluginService(t *testing.T) { t.Run("returns proper error if the upstream mod is not found", func(t *testing.T) { logger := log.NewLogrus() - secretsGetter := new(SecretsGetter) - defer secretsGetter.AssertExpectations(t) - pluginRepo := new(mockPluginRepo) defer pluginRepo.AssertExpectations(t) @@ -188,7 +178,7 @@ func TestPluginService(t *testing.T) { pluginWithoutDependencyMod := &plugin.Plugin{YamlMod: yamlMod} pluginRepo.On("GetByName", jobTask.Name().String()).Return(pluginWithoutDependencyMod, nil) - pluginService := service.NewJobPluginService(secretsGetter, pluginRepo, engine, logger) + pluginService := service.NewJobPluginService(pluginRepo, engine, logger) result, err := pluginService.GenerateDestination(ctx, tenantDetails, jobTask) assert.ErrorIs(t, err, service.ErrUpstreamModNotFound) assert.Equal(t, "", result.String()) @@ -196,9 +186,6 @@ func TestPluginService(t *testing.T) { t.Run("returns error if generate destination failed", func(t *testing.T) { logger := log.NewLogrus() - secretsGetter := new(SecretsGetter) - defer secretsGetter.AssertExpectations(t) - pluginRepo := new(mockPluginRepo) defer pluginRepo.AssertExpectations(t) @@ -213,58 +200,19 @@ func TestPluginService(t *testing.T) { taskPlugin := &plugin.Plugin{DependencyMod: depMod, YamlMod: yamlMod} pluginRepo.On("GetByName", jobTask.Name().String()).Return(taskPlugin, nil) - secret1, err := tenant.NewPlainTextSecret("table_name", "secret_table") - assert.Nil(t, err) - - secret2, err := tenant.NewPlainTextSecret("bucket", "gs://some_secret_bucket") - assert.Nil(t, err) - - secretsGetter.On("GetAll", ctx, project.Name(), namespace.Name().String()).Return([]*tenant.PlainTextSecret{secret1, secret2}, nil) - depMod.On("GenerateDestination", ctx, mock.Anything).Return(&plugin.GenerateDestinationResponse{}, errors.New("generate destination error")) - pluginService := service.NewJobPluginService(secretsGetter, pluginRepo, engine, logger) + pluginService := service.NewJobPluginService(pluginRepo, engine, logger) result, err := pluginService.GenerateDestination(ctx, tenantDetails, jobTask) assert.ErrorContains(t, err, "generate destination error") assert.Equal(t, "", result.String()) }) - t.Run("returns error if unable to get secrets", func(t *testing.T) { - logger := log.NewLogrus() - - secretsGetter := new(SecretsGetter) - defer secretsGetter.AssertExpectations(t) - - pluginRepo := new(mockPluginRepo) - defer pluginRepo.AssertExpectations(t) - - engine := compiler.NewEngine() - defer pluginRepo.AssertExpectations(t) - - depMod := new(mockOpt.DependencyResolverMod) - defer depMod.AssertExpectations(t) - - yamlMod := new(mockOpt.YamlMod) - defer yamlMod.AssertExpectations(t) - - taskPlugin := &plugin.Plugin{DependencyMod: depMod, YamlMod: yamlMod} - pluginRepo.On("GetByName", jobTask.Name().String()).Return(taskPlugin, nil) - - secretsGetter.On("GetAll", ctx, project.Name(), namespace.Name().String()).Return([]*tenant.PlainTextSecret{}, errors.New("getting secret error")) - - pluginService := service.NewJobPluginService(secretsGetter, pluginRepo, engine, logger) - result, err := pluginService.GenerateDestination(ctx, tenantDetails, jobTask) - assert.ErrorContains(t, err, "getting secret error") - assert.Equal(t, "", result.String()) - }) }) t.Run("GenerateUpstreams", func(t *testing.T) { t.Run("returns upstreams", func(t *testing.T) { logger := log.NewLogrus() - secretsGetter := new(SecretsGetter) - defer secretsGetter.AssertExpectations(t) - pluginRepo := new(mockPluginRepo) defer pluginRepo.AssertExpectations(t) @@ -280,8 +228,6 @@ func TestPluginService(t *testing.T) { taskPlugin := &plugin.Plugin{DependencyMod: depMod, YamlMod: yamlMod} pluginRepo.On("GetByName", jobTask.Name().String()).Return(taskPlugin, nil) - secretsGetter.On("GetAll", ctx, project.Name(), namespace.Name().String()).Return(nil, nil) - destination := "project.dataset.table" depMod.On("GenerateDestination", ctx, mock.Anything).Return(&plugin.GenerateDestinationResponse{ Destination: destination, @@ -298,7 +244,7 @@ func TestPluginService(t *testing.T) { specA, err := job.NewSpecBuilder(jobVersion, "job-A", "sample-owner", jobSchedule, jobWindow, jobTask).WithAsset(asset).Build() assert.NoError(t, err) - pluginService := service.NewJobPluginService(secretsGetter, pluginRepo, engine, logger) + pluginService := service.NewJobPluginService(pluginRepo, engine, logger) result, err := pluginService.GenerateUpstreams(ctx, tenantDetails, specA, false) assert.Nil(t, err) assert.Equal(t, []job.ResourceURN{jobSource}, result) @@ -306,9 +252,6 @@ func TestPluginService(t *testing.T) { t.Run("returns error if unable to find the plugin", func(t *testing.T) { logger := log.NewLogrus() - secretsGetter := new(SecretsGetter) - defer secretsGetter.AssertExpectations(t) - pluginRepo := new(mockPluginRepo) defer pluginRepo.AssertExpectations(t) @@ -320,7 +263,7 @@ func TestPluginService(t *testing.T) { specA, err := job.NewSpecBuilder(jobVersion, "job-A", "sample-owner", jobSchedule, jobWindow, jobTask).Build() assert.NoError(t, err) - pluginService := service.NewJobPluginService(secretsGetter, pluginRepo, engine, logger) + pluginService := service.NewJobPluginService(pluginRepo, engine, logger) result, err := pluginService.GenerateUpstreams(ctx, tenantDetails, specA, false) assert.ErrorContains(t, err, "not found") assert.Nil(t, result) @@ -328,9 +271,6 @@ func TestPluginService(t *testing.T) { t.Run("returns proper error if the upstream mod is not found", func(t *testing.T) { logger := log.NewLogrus() - secretsGetter := new(SecretsGetter) - defer secretsGetter.AssertExpectations(t) - pluginRepo := new(mockPluginRepo) defer pluginRepo.AssertExpectations(t) @@ -349,54 +289,14 @@ func TestPluginService(t *testing.T) { specA, err := job.NewSpecBuilder(jobVersion, "job-A", "sample-owner", jobSchedule, jobWindow, jobTask).Build() assert.NoError(t, err) - pluginService := service.NewJobPluginService(secretsGetter, pluginRepo, engine, logger) + pluginService := service.NewJobPluginService(pluginRepo, engine, logger) result, err := pluginService.GenerateUpstreams(ctx, tenantDetails, specA, false) assert.ErrorContains(t, err, "not found") assert.Nil(t, result) }) - t.Run("returns error if unable to get secrets", func(t *testing.T) { - logger := log.NewLogrus() - - secretsGetter := new(SecretsGetter) - defer secretsGetter.AssertExpectations(t) - - pluginRepo := new(mockPluginRepo) - defer pluginRepo.AssertExpectations(t) - - engine := compiler.NewEngine() - defer pluginRepo.AssertExpectations(t) - - depMod := new(mockOpt.DependencyResolverMod) - defer depMod.AssertExpectations(t) - - yamlMod := new(mockOpt.YamlMod) - defer yamlMod.AssertExpectations(t) - - taskPlugin := &plugin.Plugin{DependencyMod: depMod, YamlMod: yamlMod} - pluginRepo.On("GetByName", jobTask.Name().String()).Return(taskPlugin, nil) - - destination := "project.dataset.table" - depMod.On("GenerateDestination", ctx, mock.Anything).Return(&plugin.GenerateDestinationResponse{ - Destination: destination, - Type: "bigquery", - }, nil) - - secretsGetter.On("GetAll", ctx, project.Name(), namespace.Name().String()).Return([]*tenant.PlainTextSecret{}, errors.New("getting secret error")) - - specA, err := job.NewSpecBuilder(jobVersion, "job-A", "sample-owner", jobSchedule, jobWindow, jobTask).Build() - assert.NoError(t, err) - - pluginService := service.NewJobPluginService(secretsGetter, pluginRepo, engine, logger) - result, err := pluginService.GenerateUpstreams(ctx, tenantDetails, specA, false) - assert.ErrorContains(t, err, "getting secret error") - assert.Nil(t, result) - }) t.Run("returns error if unable to generate destination successfully", func(t *testing.T) { logger := log.NewLogrus() - secretsGetter := new(SecretsGetter) - defer secretsGetter.AssertExpectations(t) - pluginRepo := new(mockPluginRepo) defer pluginRepo.AssertExpectations(t) @@ -417,7 +317,7 @@ func TestPluginService(t *testing.T) { specA, err := job.NewSpecBuilder(jobVersion, "job-A", "sample-owner", jobSchedule, jobWindow, jobTask).Build() assert.NoError(t, err) - pluginService := service.NewJobPluginService(secretsGetter, pluginRepo, engine, logger) + pluginService := service.NewJobPluginService(pluginRepo, engine, logger) result, err := pluginService.GenerateUpstreams(ctx, tenantDetails, specA, false) assert.ErrorContains(t, err, "generate destination error") assert.Nil(t, result) @@ -425,9 +325,6 @@ func TestPluginService(t *testing.T) { t.Run("returns error if unable to generate dependencies successfully", func(t *testing.T) { logger := log.NewLogrus() - secretsGetter := new(SecretsGetter) - defer secretsGetter.AssertExpectations(t) - pluginRepo := new(mockPluginRepo) defer pluginRepo.AssertExpectations(t) @@ -443,8 +340,6 @@ func TestPluginService(t *testing.T) { taskPlugin := &plugin.Plugin{DependencyMod: depMod, YamlMod: yamlMod} pluginRepo.On("GetByName", jobTask.Name().String()).Return(taskPlugin, nil) - secretsGetter.On("GetAll", ctx, project.Name(), namespace.Name().String()).Return(nil, nil) - destination := "project.dataset.table" depMod.On("GenerateDestination", ctx, mock.Anything).Return(&plugin.GenerateDestinationResponse{ Destination: destination, @@ -457,7 +352,7 @@ func TestPluginService(t *testing.T) { specA, err := job.NewSpecBuilder(jobVersion, "job-A", "sample-owner", jobSchedule, jobWindow, jobTask).Build() assert.NoError(t, err) - pluginService := service.NewJobPluginService(secretsGetter, pluginRepo, engine, logger) + pluginService := service.NewJobPluginService(pluginRepo, engine, logger) result, err := pluginService.GenerateUpstreams(ctx, tenantDetails, specA, false) assert.ErrorContains(t, err, "generate dependencies error") assert.Nil(t, result) @@ -465,57 +360,6 @@ func TestPluginService(t *testing.T) { }) } -// SecretsGetter is an autogenerated mock type for the SecretsGetter type -type SecretsGetter struct { - mock.Mock -} - -// Get provides a mock function with given fields: ctx, projName, namespaceName, name -func (_m *SecretsGetter) Get(ctx context.Context, projName tenant.ProjectName, namespaceName string, name string) (*tenant.PlainTextSecret, error) { - ret := _m.Called(ctx, projName, namespaceName, name) - - var r0 *tenant.PlainTextSecret - if rf, ok := ret.Get(0).(func(context.Context, tenant.ProjectName, string, string) *tenant.PlainTextSecret); ok { - r0 = rf(ctx, projName, namespaceName, name) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(*tenant.PlainTextSecret) - } - } - - var r1 error - if rf, ok := ret.Get(1).(func(context.Context, tenant.ProjectName, string, string) error); ok { - r1 = rf(ctx, projName, namespaceName, name) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// GetAll provides a mock function with given fields: ctx, projName, namespaceName -func (_m *SecretsGetter) GetAll(ctx context.Context, projName tenant.ProjectName, namespaceName string) ([]*tenant.PlainTextSecret, error) { - ret := _m.Called(ctx, projName, namespaceName) - - var r0 []*tenant.PlainTextSecret - if rf, ok := ret.Get(0).(func(context.Context, tenant.ProjectName, string) []*tenant.PlainTextSecret); ok { - r0 = rf(ctx, projName, namespaceName) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]*tenant.PlainTextSecret) - } - } - - var r1 error - if rf, ok := ret.Get(1).(func(context.Context, tenant.ProjectName, string) error); ok { - r1 = rf(ctx, projName, namespaceName) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - type mockPluginRepo struct { mock.Mock } diff --git a/core/scheduler/service/executor_input_compiler.go b/core/scheduler/service/executor_input_compiler.go index 1876a5375e..2d7f455275 100644 --- a/core/scheduler/service/executor_input_compiler.go +++ b/core/scheduler/service/executor_input_compiler.go @@ -61,11 +61,6 @@ func (i InputCompiler) Compile(ctx context.Context, job *scheduler.Job, config s return nil, err } - secrets, err := i.tenantService.GetSecrets(ctx, job.Tenant) - if err != nil { - return nil, err - } - systemDefinedVars, err := getSystemDefinedConfigs(job, config, executedAt) if err != nil { return nil, err @@ -74,7 +69,7 @@ func (i InputCompiler) Compile(ctx context.Context, job *scheduler.Job, config s // Prepare template context and compile task config taskContext := compiler.PrepareContext( compiler.From(tenantDetails.GetConfigs()).WithName(contextProject).WithKeyPrefix(projectConfigPrefix), - compiler.From(SecretsToMap(secrets)).WithName(contextSecret), + compiler.From(tenantDetails.SecretsMap()).WithName(contextSecret), compiler.From(systemDefinedVars).WithName(contextSystemDefined).AddToContext(), ) @@ -167,14 +162,6 @@ func splitConfigWithSecrets(conf map[string]string) (map[string]string, map[stri return configs, configWithSecrets } -func SecretsToMap(secrets []*tenant.PlainTextSecret) map[string]string { - mapping := make(map[string]string, len(secrets)) - for _, s := range secrets { - mapping[s.Name().String()] = s.Value() - } - return mapping -} - func NewJobInputCompiler(tenantService TenantService, compiler TemplateCompiler, assetCompiler AssetCompiler) *InputCompiler { return &InputCompiler{ tenantService: tenantService, diff --git a/core/scheduler/service/executor_input_compiler_test.go b/core/scheduler/service/executor_input_compiler_test.go index b821f87f75..3c573d919d 100644 --- a/core/scheduler/service/executor_input_compiler_test.go +++ b/core/scheduler/service/executor_input_compiler_test.go @@ -23,7 +23,12 @@ func TestExecutorCompiler(t *testing.T) { "SCHEDULER_HOST": "localhost", }) namespace, _ := tenant.NewNamespace("ns1", project.Name(), map[string]string{}) - tenantDetails, _ := tenant.NewTenantDetails(project, namespace) + + secret1, _ := tenant.NewPlainTextSecret("secretName", "secretValue") + secret2, _ := tenant.NewPlainTextSecret("secret2Name", "secret2Value") + secretsArray := []*tenant.PlainTextSecret{secret1, secret2} + + tenantDetails, _ := tenant.NewTenantDetails(project, namespace, secretsArray) tnnt, _ := tenant.NewTenant(project.Name().String(), namespace.Name().String()) currentTime := time.Now() @@ -55,32 +60,6 @@ func TestExecutorCompiler(t *testing.T) { assert.EqualError(t, err, "get details error") assert.Nil(t, inputExecutor) }) - t.Run("should give error if tenant service GetSecrets fails", func(t *testing.T) { - job := scheduler.Job{ - Name: "job1", - Tenant: tnnt, - } - config := scheduler.RunConfig{ - Executor: scheduler.Executor{ - Name: "transformer", - Type: "bq2bq", - }, - ScheduledAt: currentTime.Add(-time.Hour), - JobRunID: scheduler.JobRunID{}, - } - - tenantService := new(mockTenantService) - tenantService.On("GetDetails", ctx, tnnt).Return(tenantDetails, nil) - tenantService.On("GetSecrets", ctx, tnnt).Return(nil, fmt.Errorf("get secrets error")) - defer tenantService.AssertExpectations(t) - - inputCompiler := service.NewJobInputCompiler(tenantService, nil, nil) - inputExecutor, err := inputCompiler.Compile(ctx, &job, config, currentTime.Add(time.Hour)) - - assert.NotNil(t, err) - assert.EqualError(t, err, "get secrets error") - assert.Nil(t, inputExecutor) - }) t.Run("should give error if getSystemDefinedConfigs fails", func(t *testing.T) { window1, _ := models.NewWindow(1, "d", "2", "2") job := scheduler.Job{ @@ -99,7 +78,6 @@ func TestExecutorCompiler(t *testing.T) { tenantService := new(mockTenantService) tenantService.On("GetDetails", ctx, tnnt).Return(tenantDetails, nil) - tenantService.On("GetSecrets", ctx, tnnt).Return([]*tenant.PlainTextSecret{}, nil) defer tenantService.AssertExpectations(t) inputCompiler := service.NewJobInputCompiler(tenantService, nil, nil) @@ -126,13 +104,8 @@ func TestExecutorCompiler(t *testing.T) { JobRunID: scheduler.JobRunID{}, } - secret1, _ := tenant.NewPlainTextSecret("secretName", "secretValue") - secret2, _ := tenant.NewPlainTextSecret("secret2Name", "secret2Value") - secretsArray := []*tenant.PlainTextSecret{secret1, secret2} - tenantService := new(mockTenantService) tenantService.On("GetDetails", ctx, tnnt).Return(tenantDetails, nil) - tenantService.On("GetSecrets", ctx, tnnt).Return(secretsArray, nil) defer tenantService.AssertExpectations(t) startTime, _ := job.Window.GetStartTime(config.ScheduledAt) @@ -183,13 +156,8 @@ func TestExecutorCompiler(t *testing.T) { JobRunID: scheduler.JobRunID{}, } - secret1, _ := tenant.NewPlainTextSecret("secretName", "secretValue") - secret2, _ := tenant.NewPlainTextSecret("secret2Name", "secret2Value") - secretsArray := []*tenant.PlainTextSecret{secret1, secret2} - tenantService := new(mockTenantService) tenantService.On("GetDetails", ctx, tnnt).Return(tenantDetails, nil) - tenantService.On("GetSecrets", ctx, tnnt).Return(secretsArray, nil) defer tenantService.AssertExpectations(t) startTime, _ := job.Window.GetStartTime(config.ScheduledAt) @@ -298,13 +266,8 @@ func TestExecutorCompiler(t *testing.T) { JobRunID: scheduler.JobRunID{}, } - secret1, _ := tenant.NewPlainTextSecret("secretName", "secretValue") - secret2, _ := tenant.NewPlainTextSecret("secret2Name", "secret2Value") - secretsArray := []*tenant.PlainTextSecret{secret1, secret2} - tenantService := new(mockTenantService) tenantService.On("GetDetails", ctx, tnnt).Return(tenantDetails, nil) - tenantService.On("GetSecrets", ctx, tnnt).Return(secretsArray, nil) defer tenantService.AssertExpectations(t) startTime, _ := job.Window.GetStartTime(config.ScheduledAt) @@ -387,13 +350,8 @@ func TestExecutorCompiler(t *testing.T) { JobRunID: scheduler.JobRunID{}, } - secret1, _ := tenant.NewPlainTextSecret("secretName", "secretValue") - secret2, _ := tenant.NewPlainTextSecret("secret2Name", "secret2Value") - secretsArray := []*tenant.PlainTextSecret{secret1, secret2} - tenantService := new(mockTenantService) tenantService.On("GetDetails", ctx, tnnt).Return(tenantDetails, nil) - tenantService.On("GetSecrets", ctx, tnnt).Return(secretsArray, nil) defer tenantService.AssertExpectations(t) startTime, _ := job.Window.GetStartTime(config.ScheduledAt) @@ -457,13 +415,8 @@ func TestExecutorCompiler(t *testing.T) { JobRunID: scheduler.JobRunID{}, } - secret1, _ := tenant.NewPlainTextSecret("secretName", "secretValue") - secret2, _ := tenant.NewPlainTextSecret("secret2Name", "secret2Value") - secretsArray := []*tenant.PlainTextSecret{secret1, secret2} - tenantService := new(mockTenantService) tenantService.On("GetDetails", ctx, tnnt).Return(tenantDetails, nil) - tenantService.On("GetSecrets", ctx, tnnt).Return(secretsArray, nil) defer tenantService.AssertExpectations(t) startTime, _ := job.Window.GetStartTime(config.ScheduledAt) diff --git a/core/scheduler/service/notification_service.go b/core/scheduler/service/notification_service.go index f78ffe4a75..9f308cd9eb 100644 --- a/core/scheduler/service/notification_service.go +++ b/core/scheduler/service/notification_service.go @@ -61,13 +61,12 @@ func (n NotifyService) Push(ctx context.Context, event scheduler.Event) error { n.l.Debug("notification event for job", "job spec name", event.JobName, "event", fmt.Sprintf("%v", event)) if plainTextSecretsList == nil { - plainTextSecretsList, err = n.tenantService.GetSecrets(ctx, - event.Tenant) + plainTextSecretsList, err = n.tenantService.GetSecrets(ctx, event.Tenant) if err != nil { multierror.Append(err) continue } - secretMap = SecretsToMap(plainTextSecretsList) + secretMap = tenant.PlainTextSecrets(plainTextSecretsList).ToMap() } var secret string diff --git a/core/tenant/service/tenant_service.go b/core/tenant/service/tenant_service.go index 0f53d9d24d..23a8931118 100644 --- a/core/tenant/service/tenant_service.go +++ b/core/tenant/service/tenant_service.go @@ -27,7 +27,7 @@ type TenantService struct { } func (t TenantService) GetDetails(ctx context.Context, tnnt tenant.Tenant) (*tenant.WithDetails, error) { - if tnnt.ProjectName() == "" { + if tnnt.IsInvalid() { return nil, errors.InvalidArgument(tenant.EntityTenant, "invalid tenant details provided") } @@ -41,7 +41,12 @@ func (t TenantService) GetDetails(ctx context.Context, tnnt tenant.Tenant) (*ten return nil, err } - return tenant.NewTenantDetails(proj, namespace) + secrets, err := t.secretsGetter.GetAll(ctx, tnnt.ProjectName(), tnnt.NamespaceName().String()) + if err != nil { + return nil, err + } + + return tenant.NewTenantDetails(proj, namespace, secrets) } func (t TenantService) GetProject(ctx context.Context, name tenant.ProjectName) (*tenant.Project, error) { diff --git a/core/tenant/service/tenant_service_test.go b/core/tenant/service/tenant_service_test.go index eb3582277b..21d980e6c2 100644 --- a/core/tenant/service/tenant_service_test.go +++ b/core/tenant/service/tenant_service_test.go @@ -57,7 +57,7 @@ func TestTenantService(t *testing.T) { assert.NotNil(t, err) assert.EqualError(t, err, "unable to get ns") }) - t.Run("returns both project and namespace", func(t *testing.T) { + t.Run("returns error when unable to get secrets", func(t *testing.T) { projGetter := new(projectGetter) projGetter.On("Get", ctx, tnnt.ProjectName()).Return(proj, nil) defer projGetter.AssertExpectations(t) @@ -66,13 +66,41 @@ func TestTenantService(t *testing.T) { nsGetter.On("Get", ctx, tnnt.ProjectName(), tnnt.NamespaceName()).Return(ns, nil) defer nsGetter.AssertExpectations(t) - tenantService := service.NewTenantService(projGetter, nsGetter, nil) + secGetter := new(secretGetter) + secGetter.On("GetAll", ctx, tnnt.ProjectName(), tnnt.NamespaceName().String()).Return(nil, errors.New("unable to get secrets")) + defer secGetter.AssertExpectations(t) + + tenantService := service.NewTenantService(projGetter, nsGetter, secGetter) + + _, err := tenantService.GetDetails(ctx, tnnt) + assert.NotNil(t, err) + assert.EqualError(t, err, "unable to get secrets") + }) + t.Run("returns project, namespace and secrets", func(t *testing.T) { + projGetter := new(projectGetter) + projGetter.On("Get", ctx, tnnt.ProjectName()).Return(proj, nil) + defer projGetter.AssertExpectations(t) + + nsGetter := new(namespaceGetter) + nsGetter.On("Get", ctx, tnnt.ProjectName(), tnnt.NamespaceName()).Return(ns, nil) + defer nsGetter.AssertExpectations(t) + + pts, _ := tenant.NewPlainTextSecret("key1", "value1") + secGetter := new(secretGetter) + secGetter.On("GetAll", ctx, tnnt.ProjectName(), tnnt.NamespaceName().String()). + Return([]*tenant.PlainTextSecret{pts}, nil) + defer secGetter.AssertExpectations(t) + + tenantService := service.NewTenantService(projGetter, nsGetter, secGetter) d, err := tenantService.GetDetails(ctx, tnnt) assert.Nil(t, err) assert.Equal(t, proj.Name().String(), d.Project().Name().String()) receivedNS := d.Namespace() assert.Equal(t, receivedNS.Name(), ns.Name()) + sec := d.SecretsMap() + assert.Equal(t, 1, len(sec)) + assert.Equal(t, "value1", sec[pts.Name().String()]) }) }) t.Run("GetProject", func(t *testing.T) { diff --git a/core/tenant/tenant.go b/core/tenant/tenant.go index fb25b658ca..d90d0aa4f0 100644 --- a/core/tenant/tenant.go +++ b/core/tenant/tenant.go @@ -42,11 +42,12 @@ func NewTenant(projectName string, namespaceName string) (Tenant, error) { } type WithDetails struct { - project Project - namespace Namespace + project Project + namespace Namespace + secretsMap map[string]string } -func NewTenantDetails(proj *Project, namespace *Namespace) (*WithDetails, error) { +func NewTenantDetails(proj *Project, namespace *Namespace, secrets PlainTextSecrets) (*WithDetails, error) { if proj == nil { return nil, errors.InvalidArgument(EntityTenant, "project is nil") } @@ -55,8 +56,9 @@ func NewTenantDetails(proj *Project, namespace *Namespace) (*WithDetails, error) } return &WithDetails{ - project: *proj, - namespace: *namespace, + project: *proj, + namespace: *namespace, + secretsMap: secrets.ToMap(), }, nil } @@ -94,3 +96,7 @@ func (w *WithDetails) Project() *Project { func (w *WithDetails) Namespace() *Namespace { return &w.namespace } + +func (w *WithDetails) SecretsMap() map[string]string { + return w.secretsMap +} diff --git a/core/tenant/tenant_test.go b/core/tenant/tenant_test.go index 0025030a75..349d4cff79 100644 --- a/core/tenant/tenant_test.go +++ b/core/tenant/tenant_test.go @@ -43,18 +43,18 @@ func TestAggregateRootTenant(t *testing.T) { }) t.Run("return error when project not present", func(t *testing.T) { - _, err := tenant.NewTenantDetails(nil, nil) + _, err := tenant.NewTenantDetails(nil, nil, nil) assert.NotNil(t, err) assert.EqualError(t, err, "invalid argument for entity tenant: project is nil") }) t.Run("return error when namespace not present", func(t *testing.T) { - _, err := tenant.NewTenantDetails(project, nil) + _, err := tenant.NewTenantDetails(project, nil, nil) assert.NotNil(t, err) assert.EqualError(t, err, "invalid argument for entity tenant: namespace is nil") }) t.Run("when both project and namespace are present", func(t *testing.T) { t.Run("return withDetails with project and namespace", func(t *testing.T) { - tnnt, err := tenant.NewTenantDetails(project, namespace) + tnnt, err := tenant.NewTenantDetails(project, namespace, nil) assert.Nil(t, err) p := tnnt.Project() @@ -66,13 +66,13 @@ func TestAggregateRootTenant(t *testing.T) { assert.Equal(t, "test-ns", ns.Name().String()) }) t.Run("returns merged config", func(t *testing.T) { - tnnt, err := tenant.NewTenantDetails(project, namespace) + tnnt, err := tenant.NewTenantDetails(project, namespace, nil) assert.Nil(t, err) assert.Equal(t, 4, len(tnnt.GetConfigs())) }) t.Run("returns an error when key not present", func(t *testing.T) { - tnnt, err := tenant.NewTenantDetails(project, namespace) + tnnt, err := tenant.NewTenantDetails(project, namespace, nil) assert.Nil(t, err) _, err = tnnt.GetConfig("NON-EXISTENT") @@ -80,7 +80,7 @@ func TestAggregateRootTenant(t *testing.T) { assert.EqualError(t, err, "not found for entity tenant: config not present in tenant NON-EXISTENT") }) t.Run("returns a config giving priority to namespace", func(t *testing.T) { - tnnt, err := tenant.NewTenantDetails(project, namespace) + tnnt, err := tenant.NewTenantDetails(project, namespace, nil) assert.Nil(t, err) val, err := tnnt.GetConfig("BUCKET") @@ -88,7 +88,7 @@ func TestAggregateRootTenant(t *testing.T) { assert.Equal(t, "gs://ns_folder", val) }) t.Run("returns a config from project", func(t *testing.T) { - details, err := tenant.NewTenantDetails(project, namespace) + details, err := tenant.NewTenantDetails(project, namespace, nil) assert.Nil(t, err) val, err := details.GetConfig(tenant.ProjectStoragePathKey) @@ -96,7 +96,7 @@ func TestAggregateRootTenant(t *testing.T) { assert.Equal(t, "gs://location", val) }) t.Run("returns tenant", func(t *testing.T) { - details, err := tenant.NewTenantDetails(project, namespace) + details, err := tenant.NewTenantDetails(project, namespace, nil) assert.Nil(t, err) tnnt := details.ToTenant() @@ -105,6 +105,18 @@ func TestAggregateRootTenant(t *testing.T) { ns := tnnt.NamespaceName() assert.Equal(t, "test-ns", ns.String()) }) + t.Run("returns secrets", func(t *testing.T) { + p1, _ := tenant.NewPlainTextSecret("key", "value") + p2, _ := tenant.NewPlainTextSecret("key2", "value2") + secrets := tenant.PlainTextSecrets{p1, p2} + + details, err := tenant.NewTenantDetails(project, namespace, secrets) + assert.NoError(t, err) + + secMap := details.SecretsMap() + assert.Len(t, secMap, 2) + assert.Equal(t, "value2", secMap[p2.Name().String()]) + }) }) }) } diff --git a/internal/store/postgres/job/job_repository_test.go b/internal/store/postgres/job/job_repository_test.go index 8f937ff7d3..971435156e 100644 --- a/internal/store/postgres/job/job_repository_test.go +++ b/internal/store/postgres/job/job_repository_test.go @@ -471,7 +471,7 @@ func TestPostgresJobRepository(t *testing.T) { t.Run("returns job with inferred upstreams", func(t *testing.T) { db := dbSetup() - tenantDetails, err := tenant.NewTenantDetails(proj, namespace) + tnnt, err := tenant.NewTenant(proj.Name().String(), namespace.Name().String()) assert.NoError(t, err) jobSpecA, err := job.NewSpecBuilder(jobVersion, "sample-job-A", jobOwner, jobSchedule, jobWindow, jobTask).WithDescription(jobDescription).Build() @@ -486,7 +486,7 @@ func TestPostgresJobRepository(t *testing.T) { _, err = jobRepo.Add(ctx, []*job.Job{jobA, jobB}) assert.NoError(t, err) - expectedUpstream := job.NewUpstreamResolved(jobSpecB.Name(), "", jobB.Destination(), tenantDetails.ToTenant(), "inferred", taskName, false) + expectedUpstream := job.NewUpstreamResolved(jobSpecB.Name(), "", jobB.Destination(), tnnt, "inferred", taskName, false) upstreams, err := jobRepo.ResolveUpstreams(ctx, proj.Name(), []job.Name{jobSpecA.Name()}) assert.NoError(t, err) @@ -495,7 +495,7 @@ func TestPostgresJobRepository(t *testing.T) { t.Run("returns job with static upstreams", func(t *testing.T) { db := dbSetup() - tenantDetails, err := tenant.NewTenantDetails(proj, namespace) + tnnt, err := tenant.NewTenant(proj.Name().String(), namespace.Name().String()) assert.NoError(t, err) upstreamName := job.SpecUpstreamNameFrom("sample-job-B") @@ -514,7 +514,7 @@ func TestPostgresJobRepository(t *testing.T) { _, err = jobRepo.Add(ctx, []*job.Job{jobA, jobB}) assert.NoError(t, err) - expectedUpstream := job.NewUpstreamResolved(jobSpecB.Name(), "", jobB.Destination(), tenantDetails.ToTenant(), "static", taskName, false) + expectedUpstream := job.NewUpstreamResolved(jobSpecB.Name(), "", jobB.Destination(), tnnt, "static", taskName, false) upstreams, err := jobRepo.ResolveUpstreams(ctx, proj.Name(), []job.Name{jobSpecA.Name()}) assert.NoError(t, err) @@ -523,7 +523,7 @@ func TestPostgresJobRepository(t *testing.T) { t.Run("returns job with static and inferred upstreams", func(t *testing.T) { db := dbSetup() - tenantDetails, err := tenant.NewTenantDetails(proj, namespace) + tenantDetails, err := tenant.NewTenantDetails(proj, namespace, nil) assert.NoError(t, err) upstreamName := job.SpecUpstreamNameFrom("test-proj/sample-job-B") @@ -611,7 +611,7 @@ func TestPostgresJobRepository(t *testing.T) { t.Run("returns job with static upstream if found duplicated upstream from static and inferred", func(t *testing.T) { db := dbSetup() - tenantDetails, err := tenant.NewTenantDetails(proj, namespace) + tnnt, err := tenant.NewTenant(proj.Name().String(), namespace.Name().String()) assert.NoError(t, err) upstreamName := job.SpecUpstreamNameFrom("test-proj/sample-job-B") @@ -630,7 +630,7 @@ func TestPostgresJobRepository(t *testing.T) { _, err = jobRepo.Add(ctx, []*job.Job{jobA, jobB}) assert.NoError(t, err) - upstreamB := job.NewUpstreamResolved(jobSpecB.Name(), "", jobB.Destination(), tenantDetails.ToTenant(), "static", taskName, false) + upstreamB := job.NewUpstreamResolved(jobSpecB.Name(), "", jobB.Destination(), tnnt, "static", taskName, false) expectedUpstreams := []*job.Upstream{ upstreamB, diff --git a/server/optimus.go b/server/optimus.go index 88f7407955..a1af71656a 100644 --- a/server/optimus.go +++ b/server/optimus.go @@ -322,7 +322,7 @@ func (s *OptimusServer) setupHandlers() error { // Job Bounded Context Setup jJobRepo := jRepo.NewJobRepository(s.dbPool) - jPluginService := jService.NewJobPluginService(tSecretService, s.pluginRepo, newEngine, s.logger) + jPluginService := jService.NewJobPluginService(s.pluginRepo, newEngine, s.logger) jExternalUpstreamResolver, _ := jResolver.NewExternalUpstreamResolver(s.conf.ResourceManagers) jInternalUpstreamResolver := jResolver.NewInternalUpstreamResolver(jJobRepo) jUpstreamResolver := jResolver.NewUpstreamResolver(jJobRepo, jExternalUpstreamResolver, jInternalUpstreamResolver)