diff --git a/sdks/python/container/piputil.go b/sdks/python/container/piputil.go index 113bf4054167..d6250ad2fdcd 100644 --- a/sdks/python/container/piputil.go +++ b/sdks/python/container/piputil.go @@ -32,6 +32,7 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/util/execx" ) +const pipLogFlushInterval time.Duration = 15 * time.Second const unrecoverableURL string = "https://beam.apache.org/documentation/sdks/python-unrecoverable-errors/index.html#pip-dependency-resolution-failures" // pipInstallRequirements installs the given requirement, if present. @@ -40,7 +41,7 @@ func pipInstallRequirements(ctx context.Context, logger *tools.Logger, files []s if err != nil { return err } - bufLogger := tools.NewBufferedLogger(logger) + bufLogger := tools.NewBufferedLoggerWithFlushInterval(ctx, logger, pipLogFlushInterval) for _, file := range files { if file == name { // We run the install process in two rounds in order to avoid as much @@ -48,7 +49,7 @@ func pipInstallRequirements(ctx context.Context, logger *tools.Logger, files []s // option will make sure that only things staged in the worker will be // used without following their dependencies. args := []string{"-m", "pip", "install", "-r", filepath.Join(dir, name), "--no-cache-dir", "--disable-pip-version-check", "--no-index", "--no-deps", "--find-links", dir} - if err := execx.Execute(pythonVersion, args...); err != nil { + if err := execx.ExecuteEnvWithIO(nil, os.Stdin, bufLogger, bufLogger, pythonVersion, args...); err != nil { bufLogger.Printf(ctx, "Some packages could not be installed solely from the requirements cache. Installing packages from PyPI.") } // The second install round opens up the search for packages on PyPI and @@ -79,8 +80,6 @@ func isPackageInstalled(pkgName string) bool { return true } -const pipLogFlushInterval time.Duration = 15 * time.Second - // pipInstallPackage installs the given package, if present. func pipInstallPackage(ctx context.Context, logger *tools.Logger, files []string, dir, name string, force, optional bool, extras []string) error { pythonVersion, err := expansionx.GetPythonVersion() @@ -150,7 +149,7 @@ func pipInstallPackage(ctx context.Context, logger *tools.Logger, files []string // installExtraPackages installs all the packages declared in the extra // packages manifest file. func installExtraPackages(ctx context.Context, logger *tools.Logger, files []string, extraPackagesFile, dir string) error { - bufLogger := tools.NewBufferedLogger(logger) + bufLogger := tools.NewBufferedLoggerWithFlushInterval(ctx, logger, pipLogFlushInterval) // First check that extra packages manifest file is present. for _, file := range files { if file != extraPackagesFile { @@ -179,7 +178,7 @@ func installExtraPackages(ctx context.Context, logger *tools.Logger, files []str } func findBeamSdkWhl(ctx context.Context, logger *tools.Logger, files []string, acceptableWhlSpecs []string) string { - bufLogger := tools.NewBufferedLogger(logger) + bufLogger := tools.NewBufferedLoggerWithFlushInterval(ctx, logger, pipLogFlushInterval) for _, file := range files { if strings.HasPrefix(file, "apache_beam") { for _, s := range acceptableWhlSpecs { @@ -200,7 +199,7 @@ func findBeamSdkWhl(ctx context.Context, logger *tools.Logger, files []string, a // SDK from source tarball provided in sdkSrcFile. func installSdk(ctx context.Context, logger *tools.Logger, files []string, workDir string, sdkSrcFile string, acceptableWhlSpecs []string, required bool) error { sdkWhlFile := findBeamSdkWhl(ctx, logger, files, acceptableWhlSpecs) - bufLogger := tools.NewBufferedLogger(logger) + bufLogger := tools.NewBufferedLoggerWithFlushInterval(ctx, logger, pipLogFlushInterval) if sdkWhlFile != "" { // by default, pip rejects to install wheel if same version already installed isDev := strings.Contains(sdkWhlFile, ".dev")