Skip to content

Commit

Permalink
PythonMutator: replace stdin/stdout with files (#1512)
Browse files Browse the repository at this point in the history
## Changes
Replace stdin/stdout with files in `PythonMutator`. Files are created in
a temporary directory.

Rename `ApplyPythonMutator` to `PythonMutator`.

Add test for `dyn.Location` behavior during the "load" stage.

## Tests
Unit tests
  • Loading branch information
kanterov authored Jun 24, 2024
1 parent 068c7cf commit 5ff0657
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 56 deletions.
2 changes: 1 addition & 1 deletion bundle/config/mutator/mutator.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,6 @@ func DefaultMutators() []bundle.Mutator {
InitializeVariables(),
DefineDefaultTarget(),
LoadGitDetails(),
pythonmutator.ApplyPythonMutator(pythonmutator.ApplyPythonMutatorPhaseLoad),
pythonmutator.PythonMutator(pythonmutator.PythonMutatorPhaseLoad),
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package python

import (
"bytes"
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
"runtime"

"github.com/databricks/cli/bundle/env"

"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/libs/diag"
Expand All @@ -23,17 +24,17 @@ import (
type phase string

const (
// ApplyPythonMutatorPhaseLoad is the phase in which bundle configuration is loaded.
// PythonMutatorPhaseLoad is the phase in which bundle configuration is loaded.
//
// At this stage, PyDABs adds statically defined resources to the bundle configuration.
// Which resources are added should be deterministic and not depend on the bundle configuration.
//
// We also open for possibility of appending other sections of bundle configuration,
// for example, adding new variables. However, this is not supported yet, and CLI rejects
// such changes.
ApplyPythonMutatorPhaseLoad phase = "load"
PythonMutatorPhaseLoad phase = "load"

// ApplyPythonMutatorPhaseInit is the phase after bundle configuration was loaded, and
// PythonMutatorPhaseInit is the phase after bundle configuration was loaded, and
// the list of statically declared resources is known.
//
// At this stage, PyDABs adds resources defined using generators, or mutates existing resources,
Expand All @@ -50,21 +51,21 @@ const (
// PyDABs can output YAML containing references to variables, and CLI should resolve them.
//
// Existing resources can't be removed, and CLI rejects such changes.
ApplyPythonMutatorPhaseInit phase = "init"
PythonMutatorPhaseInit phase = "init"
)

type applyPythonMutator struct {
type pythonMutator struct {
phase phase
}

func ApplyPythonMutator(phase phase) bundle.Mutator {
return &applyPythonMutator{
func PythonMutator(phase phase) bundle.Mutator {
return &pythonMutator{
phase: phase,
}
}

func (m *applyPythonMutator) Name() string {
return fmt.Sprintf("ApplyPythonMutator(%s)", m.phase)
func (m *pythonMutator) Name() string {
return fmt.Sprintf("PythonMutator(%s)", m.phase)
}

func getExperimental(b *bundle.Bundle) config.Experimental {
Expand All @@ -75,7 +76,7 @@ func getExperimental(b *bundle.Bundle) config.Experimental {
return *b.Config.Experimental
}

func (m *applyPythonMutator) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
func (m *pythonMutator) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
experimental := getExperimental(b)

if !experimental.PyDABs.Enabled {
Expand All @@ -97,7 +98,12 @@ func (m *applyPythonMutator) Apply(ctx context.Context, b *bundle.Bundle) diag.D
}
}

rightRoot, err := m.runPythonMutator(ctx, b.RootPath, pythonPath, leftRoot)
cacheDir, err := createCacheDir(ctx)
if err != nil {
return dyn.InvalidValue, fmt.Errorf("failed to create cache dir: %w", err)
}

rightRoot, err := m.runPythonMutator(ctx, cacheDir, b.RootPath, pythonPath, leftRoot)
if err != nil {
return dyn.InvalidValue, err
}
Expand All @@ -113,13 +119,39 @@ func (m *applyPythonMutator) Apply(ctx context.Context, b *bundle.Bundle) diag.D
return diag.FromErr(err)
}

func (m *applyPythonMutator) runPythonMutator(ctx context.Context, rootPath string, pythonPath string, root dyn.Value) (dyn.Value, error) {
func createCacheDir(ctx context.Context) (string, error) {
// b.CacheDir doesn't work because target isn't yet selected

// support the same env variable as in b.CacheDir
if tempDir, exists := env.TempDir(ctx); exists {
// use 'default' as target name
cacheDir := filepath.Join(tempDir, "default", "pydabs")

err := os.MkdirAll(cacheDir, 0700)
if err != nil {
return "", err
}

return cacheDir, nil
}

return os.MkdirTemp("", "-pydabs")
}

func (m *pythonMutator) runPythonMutator(ctx context.Context, cacheDir string, rootPath string, pythonPath string, root dyn.Value) (dyn.Value, error) {
inputPath := filepath.Join(cacheDir, "input.json")
outputPath := filepath.Join(cacheDir, "output.json")

args := []string{
pythonPath,
"-m",
"databricks.bundles.build",
"--phase",
string(m.phase),
"--input",
inputPath,
"--output",
outputPath,
}

// we need to marshal dyn.Value instead of bundle.Config to JSON to support
Expand All @@ -129,27 +161,48 @@ func (m *applyPythonMutator) runPythonMutator(ctx context.Context, rootPath stri
return dyn.InvalidValue, fmt.Errorf("failed to marshal root config: %w", err)
}

logWriter := newLogWriter(ctx, "stderr: ")
err = os.WriteFile(inputPath, rootConfigJson, 0600)
if err != nil {
return dyn.InvalidValue, fmt.Errorf("failed to write input file: %w", err)
}

stderrWriter := newLogWriter(ctx, "stderr: ")
stdoutWriter := newLogWriter(ctx, "stdout: ")

stdout, err := process.Background(
_, err = process.Background(
ctx,
args,
process.WithDir(rootPath),
process.WithStderrWriter(logWriter),
process.WithStdinReader(bytes.NewBuffer(rootConfigJson)),
process.WithStderrWriter(stderrWriter),
process.WithStdoutWriter(stdoutWriter),
)
if err != nil {
return dyn.InvalidValue, fmt.Errorf("python mutator process failed: %w", err)
}

// we need absolute path, or because later parts of pipeline assume all paths are absolute
// and this file will be used as location
outputFile, err := os.Open(outputPath)
if err != nil {
return dyn.InvalidValue, fmt.Errorf("failed to open Python mutator output: %w", err)
}

defer func() {
_ = outputFile.Close()
}()

// we need absolute path because later parts of pipeline assume all paths are absolute
// and this file will be used as location to resolve relative paths.
//
// virtualPath has to stay in rootPath, because locations outside root path are not allowed:
//
// Error: path /var/folders/.../pydabs/dist/*.whl is not contained in bundle root path
//
// for that, we pass virtualPath instead of outputPath as file location
virtualPath, err := filepath.Abs(filepath.Join(rootPath, "__generated_by_pydabs__.yml"))
if err != nil {
return dyn.InvalidValue, fmt.Errorf("failed to get absolute path: %w", err)
}

generated, err := yamlloader.LoadYAML(virtualPath, bytes.NewReader([]byte(stdout)))
generated, err := yamlloader.LoadYAML(virtualPath, outputFile)
if err != nil {
return dyn.InvalidValue, fmt.Errorf("failed to parse Python mutator output: %w", err)
}
Expand All @@ -171,9 +224,9 @@ func (m *applyPythonMutator) runPythonMutator(ctx context.Context, rootPath stri

func createOverrideVisitor(ctx context.Context, phase phase) (merge.OverrideVisitor, error) {
switch phase {
case ApplyPythonMutatorPhaseLoad:
case PythonMutatorPhaseLoad:
return createLoadOverrideVisitor(ctx), nil
case ApplyPythonMutatorPhaseInit:
case PythonMutatorPhaseInit:
return createInitOverrideVisitor(ctx), nil
default:
return merge.OverrideVisitor{}, fmt.Errorf("unknown phase: %s", phase)
Expand Down
Loading

0 comments on commit 5ff0657

Please sign in to comment.