Skip to content

Commit

Permalink
Revert "Don't fail if creating venv didn't succeed. Also provide a wa…
Browse files Browse the repository at this point in the history
…y to disable creating a venv. (#26778)"

This reverts commit 3ec6e58.
  • Loading branch information
tvalentyn authored May 19, 2023
1 parent 14ba618 commit eb957b1
Showing 1 changed file with 11 additions and 14 deletions.
25 changes: 11 additions & 14 deletions sdks/python/container/boot.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package main
import (
"context"
"encoding/json"
"errors"
"flag"
"fmt"
"log"
Expand Down Expand Up @@ -156,20 +157,15 @@ func launchSDKProcess() error {
signalChannel := make(chan os.Signal, 1)
signal.Notify(signalChannel, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM)

// Create a separate virtual environment (with access to globally installed packages), unless disabled by the user.
// This improves usability on runners that persist the execution environment for the boot entrypoint between multiple pipeline executions.
if os.Getenv("RUN_PYTHON_SDK_IN_DEFAULT_ENVIRONMENT") == "" {
venvDir, err := setupVenv(ctx, logger, "/opt/apache/beam-venv", *id)
if err != nil {
logger.Printf(ctx, "Using default environment, since creating a virtual environment for the SDK harness didn't succeed: %v", err)
} else {
cleanupFunc := func() {
os.RemoveAll(venvDir)
logger.Printf(ctx, "Cleaned up temporary venv for worker %v.", *id)
}
defer cleanupFunc()
}
venvDir, err := setupVenv(ctx, logger, "/opt/apache/beam-venv", *id)
if err != nil {
return errors.New("failed to initialize Python venv")
}
cleanupFunc := func() {
os.RemoveAll(venvDir)
logger.Printf(ctx, "Cleaned up temporary venv for worker %v.", *id)
}
defer cleanupFunc()

dir := filepath.Join(*semiPersistDir, "staged")
files, err := artifact.Materialize(ctx, *artifactEndpoint, info.GetDependencies(), info.GetRetrievalToken(), dir)
Expand Down Expand Up @@ -312,8 +308,9 @@ func StartCommandEnv(env map[string]string, prog string, args ...string) *exec.C

// setupVenv initializes a local Python venv and sets the corresponding env variables
func setupVenv(ctx context.Context, logger *tools.Logger, baseDir, workerId string) (string, error) {
logger.Printf(ctx, "Initializing temporary Python venv ...")

dir := filepath.Join(baseDir, "beam-venv-worker-"+workerId)
logger.Printf(ctx, "Initializing temporary Python venv in %v", dir)
if _, err := os.Stat(dir); !os.IsNotExist(err) {
// Probably leftovers from a previous run
logger.Printf(ctx, "Cleaning up previous venv ...")
Expand Down

0 comments on commit eb957b1

Please sign in to comment.