Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pipes] subprocess termination forwarding #18685

Merged

Conversation

alangenfeld
Copy link
Member

@alangenfeld alangenfeld commented Dec 12, 2023

When a pipes subprocess is interrupted, forward the termination to the external process to avoid leaving it orphaned.

Addresses #18570 for subprocess client

How I Tested These Changes

added test

@alangenfeld
Copy link
Member Author

Current dependencies on/for this PR:

This stack of pull requests is managed by Graphite.

@alangenfeld alangenfeld force-pushed the al/12-12-_pipes_subprocess_termination_forwarding branch 2 times, most recently from f8fd3ea to e2d5a47 Compare December 13, 2023 17:58
Comment on lines 123 to 124
process.send_signal(signal.SIGINT)
process.wait(timeout=INTERRUPT_FWD_WAIT_SECONDS)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[1]

Comment on lines +705 to +718
import os
import time

from dagster_pipes import open_dagster_pipes

with open_dagster_pipes() as pipes:
timeout = pipes.get_extra("timeout")
log_path = pipes.get_extra("log_path")
with open(log_path, "w") as f:
f.write(f"{os.getpid()}")
f.flush()
start = time.time()
while time.time() - start < timeout:
...
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[2]

except DagsterExecutionInterruptedError:
context.log.info("[pipes] execution interrupted, sending SIGINT to subprocess.")
# send sigint to give external process chance to exit gracefully
process.send_signal(signal.SIGINT)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the alternative choice of using process.terminate() which uses SIGTERM would be having open_dagster_pipes install a SIGTERM handler since the default no-handler behavior is immediate process halt

@alangenfeld alangenfeld force-pushed the al/12-12-_pipes_subprocess_termination_forwarding branch from e2d5a47 to 0230e7a Compare December 14, 2023 17:25
Copy link

Deploy preview for dagster-docs ready!

Preview available at https://dagster-docs-fv5c6ruhu-elementl.vercel.app
https://al-12-12--pipes-subprocess-termination-forwarding.dagster.dagster-docs.io

Direct link to changed pages:

@alangenfeld alangenfeld force-pushed the al/12-12-_pipes_subprocess_termination_forwarding branch from 0230e7a to 2c8c568 Compare December 14, 2023 17:32
Copy link
Collaborator

@smackesey smackesey left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool

@alangenfeld alangenfeld merged commit 2d667aa into master Dec 15, 2023
1 check passed
@alangenfeld alangenfeld deleted the al/12-12-_pipes_subprocess_termination_forwarding branch December 15, 2023 17:14
rexledesma added a commit that referenced this pull request Jan 29, 2024
## Summary & Motivation
The third attempt at interrupting the dbt subprocess on exit.

Instead of relying on `atexit`
(#15897) or overriding signal
handlers, just catch the execution interrupt exception like
`dagster-pipes` (#18685) and
terminate the subprocess when that's handled.

## How I Tested These Changes
pytest, local



https://github.com/dagster-io/dagster/assets/16431325/05d1e002-430b-49b3-a81e-c08743e3ed63
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants