Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use buffered loggers that periodically flush. #31977

Merged
merged 1 commit into from
Jul 25, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions sdks/python/container/piputil.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/execx"
)

const pipLogFlushInterval time.Duration = 15 * time.Second
const unrecoverableURL string = "https://beam.apache.org/documentation/sdks/python-unrecoverable-errors/index.html#pip-dependency-resolution-failures"

// pipInstallRequirements installs the given requirement, if present.
Expand All @@ -40,15 +41,15 @@ func pipInstallRequirements(ctx context.Context, logger *tools.Logger, files []s
if err != nil {
return err
}
bufLogger := tools.NewBufferedLogger(logger)
bufLogger := tools.NewBufferedLoggerWithFlushInterval(ctx, logger, pipLogFlushInterval)
for _, file := range files {
if file == name {
// We run the install process in two rounds in order to avoid as much
// as possible PyPI downloads. In the first round the --find-links
// option will make sure that only things staged in the worker will be
// used without following their dependencies.
args := []string{"-m", "pip", "install", "-r", filepath.Join(dir, name), "--no-cache-dir", "--disable-pip-version-check", "--no-index", "--no-deps", "--find-links", dir}
if err := execx.Execute(pythonVersion, args...); err != nil {
if err := execx.ExecuteEnvWithIO(nil, os.Stdin, bufLogger, bufLogger, pythonVersion, args...); err != nil {
bufLogger.Printf(ctx, "Some packages could not be installed solely from the requirements cache. Installing packages from PyPI.")
}
// The second install round opens up the search for packages on PyPI and
Expand Down Expand Up @@ -79,8 +80,6 @@ func isPackageInstalled(pkgName string) bool {
return true
}

const pipLogFlushInterval time.Duration = 15 * time.Second

// pipInstallPackage installs the given package, if present.
func pipInstallPackage(ctx context.Context, logger *tools.Logger, files []string, dir, name string, force, optional bool, extras []string) error {
pythonVersion, err := expansionx.GetPythonVersion()
Expand Down Expand Up @@ -150,7 +149,7 @@ func pipInstallPackage(ctx context.Context, logger *tools.Logger, files []string
// installExtraPackages installs all the packages declared in the extra
// packages manifest file.
func installExtraPackages(ctx context.Context, logger *tools.Logger, files []string, extraPackagesFile, dir string) error {
bufLogger := tools.NewBufferedLogger(logger)
bufLogger := tools.NewBufferedLoggerWithFlushInterval(ctx, logger, pipLogFlushInterval)
// First check that extra packages manifest file is present.
for _, file := range files {
if file != extraPackagesFile {
Expand Down Expand Up @@ -179,7 +178,7 @@ func installExtraPackages(ctx context.Context, logger *tools.Logger, files []str
}

func findBeamSdkWhl(ctx context.Context, logger *tools.Logger, files []string, acceptableWhlSpecs []string) string {
bufLogger := tools.NewBufferedLogger(logger)
bufLogger := tools.NewBufferedLoggerWithFlushInterval(ctx, logger, pipLogFlushInterval)
for _, file := range files {
if strings.HasPrefix(file, "apache_beam") {
for _, s := range acceptableWhlSpecs {
Expand All @@ -200,7 +199,7 @@ func findBeamSdkWhl(ctx context.Context, logger *tools.Logger, files []string, a
// SDK from source tarball provided in sdkSrcFile.
func installSdk(ctx context.Context, logger *tools.Logger, files []string, workDir string, sdkSrcFile string, acceptableWhlSpecs []string, required bool) error {
sdkWhlFile := findBeamSdkWhl(ctx, logger, files, acceptableWhlSpecs)
bufLogger := tools.NewBufferedLogger(logger)
bufLogger := tools.NewBufferedLoggerWithFlushInterval(ctx, logger, pipLogFlushInterval)
if sdkWhlFile != "" {
// by default, pip rejects to install wheel if same version already installed
isDev := strings.Contains(sdkWhlFile, ".dev")
Expand Down
Loading