Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: simplify fetching of secrets for compilation #735

Merged
merged 1 commit into from
Feb 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions core/job/service/job_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,20 @@ func TestJobService(t *testing.T) {
map[string]string{
"bucket": "gs://ns_bucket",
})
secret1, err := tenant.NewPlainTextSecret("table_name", "secret_table")
assert.Nil(t, err)

sampleTenant, _ := tenant.NewTenant(project.Name().String(), namespace.Name().String())
detailedTenant, _ := tenant.NewTenantDetails(project, namespace)
detailedTenant, _ := tenant.NewTenantDetails(project, namespace, []*tenant.PlainTextSecret{secret1})

otherNamespace, _ := tenant.NewNamespace("other-ns", project.Name(),
map[string]string{
"bucket": "gs://other_ns_bucket",
})
otherTenant, _ := tenant.NewTenant(project.Name().String(), otherNamespace.Name().String())
detailedOtherTenant, _ := tenant.NewTenantDetails(project, otherNamespace)
secret2, err := tenant.NewPlainTextSecret("bucket", "gs://some_secret_bucket")
assert.Nil(t, err)
detailedOtherTenant, _ := tenant.NewTenantDetails(project, otherNamespace, []*tenant.PlainTextSecret{secret2})

jobVersion := 1
startDate, err := job.ScheduleDateFrom("2022-10-01")
Expand Down
34 changes: 8 additions & 26 deletions core/job/service/plugin_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,6 @@ var (
ErrYamlModNotExist = errors.New("yaml mod not found for plugin")
)

type SecretsGetter interface {
Get(ctx context.Context, projName tenant.ProjectName, namespaceName, name string) (*tenant.PlainTextSecret, error)
GetAll(ctx context.Context, projName tenant.ProjectName, namespaceName string) ([]*tenant.PlainTextSecret, error)
}

type PluginRepo interface {
GetByName(string) (*plugin.Plugin, error)
}
Expand All @@ -45,8 +40,6 @@ type Engine interface {
}

type JobPluginService struct {
secretsGetter SecretsGetter

pluginRepo PluginRepo
engine Engine

Expand All @@ -55,8 +48,8 @@ type JobPluginService struct {
logger log.Logger
}

func NewJobPluginService(secretsGetter SecretsGetter, pluginRepo PluginRepo, engine Engine, logger log.Logger) *JobPluginService {
return &JobPluginService{secretsGetter: secretsGetter, pluginRepo: pluginRepo, engine: engine, logger: logger, now: time.Now}
func NewJobPluginService(pluginRepo PluginRepo, engine Engine, logger log.Logger) *JobPluginService {
return &JobPluginService{pluginRepo: pluginRepo, engine: engine, logger: logger, now: time.Now}
}

func (p JobPluginService) Info(_ context.Context, taskName job.TaskName) (*plugin.Info, error) {
Expand All @@ -82,10 +75,7 @@ func (p JobPluginService) GenerateDestination(ctx context.Context, tnnt *tenant.
return "", ErrUpstreamModNotFound
}

compiledConfig, err := p.compileConfig(ctx, task.Config().Map(), tnnt)
if err != nil {
return "", err
}
compiledConfig := p.compileConfig(task.Config().Map(), tnnt)

destination, err := taskPlugin.DependencyMod.GenerateDestination(ctx, plugin.GenerateDestinationRequest{
Config: compiledConfig,
Expand Down Expand Up @@ -114,10 +104,7 @@ func (p JobPluginService) GenerateUpstreams(ctx context.Context, jobTenant *tena
return nil, fmt.Errorf("asset compilation failure: %w", err)
}

compiledConfigs, err := p.compileConfig(ctx, spec.Task().Config(), jobTenant)
if err != nil {
return nil, err
}
compiledConfigs := p.compileConfig(spec.Task().Config(), jobTenant)

resp, err := taskPlugin.DependencyMod.GenerateDependencies(ctx, plugin.GenerateDependenciesRequest{
Config: compiledConfigs,
Expand All @@ -139,30 +126,25 @@ func (p JobPluginService) GenerateUpstreams(ctx context.Context, jobTenant *tena
return upstreamURNs, nil
}

func (p JobPluginService) compileConfig(ctx context.Context, configs job.Config, tnnt *tenant.WithDetails) (plugin.Configs, error) {
jobTenant := tnnt.ToTenant()
secrets, err := p.secretsGetter.GetAll(ctx, jobTenant.ProjectName(), jobTenant.NamespaceName().String())
if err != nil {
return nil, err
}

func (p JobPluginService) compileConfig(configs job.Config, tnnt *tenant.WithDetails) plugin.Configs {
tmplCtx := compiler.PrepareContext(
compiler.From(tnnt.GetConfigs()).WithName("proj").WithKeyPrefix(projectConfigPrefix),
compiler.From(tenant.PlainTextSecrets(secrets).ToMap()).WithName("secret"),
compiler.From(tnnt.SecretsMap()).WithName("secret"),
)

var pluginConfigs plugin.Configs
for key, val := range configs {
compiledConf, err := p.engine.CompileString(val, tmplCtx)
if err != nil {
p.logger.Warn("error in template compilation: ", err.Error())
compiledConf = val
}
pluginConfigs = append(pluginConfigs, plugin.Config{
Name: key,
Value: compiledConf,
})
}
return pluginConfigs, nil
return pluginConfigs
}

func (p JobPluginService) compileAsset(ctx context.Context, taskPlugin *plugin.Plugin, spec *job.Spec, scheduledAt time.Time) (map[string]string, error) {
Expand Down
Loading