Skip to content

Commit

Permalink
refactor: simplify fetching of secrets for compilation (#735)
Browse files Browse the repository at this point in the history
  • Loading branch information
sbchaos authored Feb 10, 2023
1 parent 6d766d7 commit c4cf585
Show file tree
Hide file tree
Showing 12 changed files with 120 additions and 299 deletions.
9 changes: 7 additions & 2 deletions core/job/service/job_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,20 @@ func TestJobService(t *testing.T) {
map[string]string{
"bucket": "gs://ns_bucket",
})
secret1, err := tenant.NewPlainTextSecret("table_name", "secret_table")
assert.Nil(t, err)

sampleTenant, _ := tenant.NewTenant(project.Name().String(), namespace.Name().String())
detailedTenant, _ := tenant.NewTenantDetails(project, namespace)
detailedTenant, _ := tenant.NewTenantDetails(project, namespace, []*tenant.PlainTextSecret{secret1})

otherNamespace, _ := tenant.NewNamespace("other-ns", project.Name(),
map[string]string{
"bucket": "gs://other_ns_bucket",
})
otherTenant, _ := tenant.NewTenant(project.Name().String(), otherNamespace.Name().String())
detailedOtherTenant, _ := tenant.NewTenantDetails(project, otherNamespace)
secret2, err := tenant.NewPlainTextSecret("bucket", "gs://some_secret_bucket")
assert.Nil(t, err)
detailedOtherTenant, _ := tenant.NewTenantDetails(project, otherNamespace, []*tenant.PlainTextSecret{secret2})

jobVersion := 1
startDate, err := job.ScheduleDateFrom("2022-10-01")
Expand Down
34 changes: 8 additions & 26 deletions core/job/service/plugin_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,6 @@ var (
ErrYamlModNotExist = errors.New("yaml mod not found for plugin")
)

type SecretsGetter interface {
Get(ctx context.Context, projName tenant.ProjectName, namespaceName, name string) (*tenant.PlainTextSecret, error)
GetAll(ctx context.Context, projName tenant.ProjectName, namespaceName string) ([]*tenant.PlainTextSecret, error)
}

type PluginRepo interface {
GetByName(string) (*plugin.Plugin, error)
}
Expand All @@ -45,8 +40,6 @@ type Engine interface {
}

type JobPluginService struct {
secretsGetter SecretsGetter

pluginRepo PluginRepo
engine Engine

Expand All @@ -55,8 +48,8 @@ type JobPluginService struct {
logger log.Logger
}

func NewJobPluginService(secretsGetter SecretsGetter, pluginRepo PluginRepo, engine Engine, logger log.Logger) *JobPluginService {
return &JobPluginService{secretsGetter: secretsGetter, pluginRepo: pluginRepo, engine: engine, logger: logger, now: time.Now}
func NewJobPluginService(pluginRepo PluginRepo, engine Engine, logger log.Logger) *JobPluginService {
return &JobPluginService{pluginRepo: pluginRepo, engine: engine, logger: logger, now: time.Now}
}

func (p JobPluginService) Info(_ context.Context, taskName job.TaskName) (*plugin.Info, error) {
Expand All @@ -82,10 +75,7 @@ func (p JobPluginService) GenerateDestination(ctx context.Context, tnnt *tenant.
return "", ErrUpstreamModNotFound
}

compiledConfig, err := p.compileConfig(ctx, task.Config().Map(), tnnt)
if err != nil {
return "", err
}
compiledConfig := p.compileConfig(task.Config().Map(), tnnt)

destination, err := taskPlugin.DependencyMod.GenerateDestination(ctx, plugin.GenerateDestinationRequest{
Config: compiledConfig,
Expand Down Expand Up @@ -114,10 +104,7 @@ func (p JobPluginService) GenerateUpstreams(ctx context.Context, jobTenant *tena
return nil, fmt.Errorf("asset compilation failure: %w", err)
}

compiledConfigs, err := p.compileConfig(ctx, spec.Task().Config(), jobTenant)
if err != nil {
return nil, err
}
compiledConfigs := p.compileConfig(spec.Task().Config(), jobTenant)

resp, err := taskPlugin.DependencyMod.GenerateDependencies(ctx, plugin.GenerateDependenciesRequest{
Config: compiledConfigs,
Expand All @@ -139,30 +126,25 @@ func (p JobPluginService) GenerateUpstreams(ctx context.Context, jobTenant *tena
return upstreamURNs, nil
}

func (p JobPluginService) compileConfig(ctx context.Context, configs job.Config, tnnt *tenant.WithDetails) (plugin.Configs, error) {
jobTenant := tnnt.ToTenant()
secrets, err := p.secretsGetter.GetAll(ctx, jobTenant.ProjectName(), jobTenant.NamespaceName().String())
if err != nil {
return nil, err
}

func (p JobPluginService) compileConfig(configs job.Config, tnnt *tenant.WithDetails) plugin.Configs {
tmplCtx := compiler.PrepareContext(
compiler.From(tnnt.GetConfigs()).WithName("proj").WithKeyPrefix(projectConfigPrefix),
compiler.From(tenant.PlainTextSecrets(secrets).ToMap()).WithName("secret"),
compiler.From(tnnt.SecretsMap()).WithName("secret"),
)

var pluginConfigs plugin.Configs
for key, val := range configs {
compiledConf, err := p.engine.CompileString(val, tmplCtx)
if err != nil {
p.logger.Warn("error in template compilation: ", err.Error())
compiledConf = val
}
pluginConfigs = append(pluginConfigs, plugin.Config{
Name: key,
Value: compiledConf,
})
}
return pluginConfigs, nil
return pluginConfigs
}

func (p JobPluginService) compileAsset(ctx context.Context, taskPlugin *plugin.Plugin, spec *job.Spec, scheduledAt time.Time) (map[string]string, error) {
Expand Down
Loading

0 comments on commit c4cf585

Please sign in to comment.