Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experimental functions for resuming a task run #910

Merged
merged 7 commits into from
Jul 2, 2024

Conversation

JackUrb
Copy link
Contributor

@JackUrb JackUrb commented Oct 4, 2022

Overview

This PR acts as a starting point for implementing the path of resuming incomplete or interrupted TaskRuns. It creates two new experimental functions (denoted with an EXP_ prefix) that power the functionality. The basic flow is to:

  1. Clone configuration from the previous TaskRun, and use it to initialize a CrowdProvider, Architect, and Blueprint
  2. Query for all assignments of that task run, and find ones with missing Units.
  3. Load up all of the Assignments, then force-reset their incomplete units from EXPIRED to CREATED,
  4. Push the assignments through the launch_units flow.

Implementation

The first portion is a version of Operator.launch_task_run_or_die, which instead of creating a new task run from a given config, loads a task run from ID and steals its config. It then establishes the same type of local state that an operator would have normally and tries to execute the run.
The second part is TaskLauncher.EXP_resume_assignments, which replaces the create_assignments step and instead looks for existing assignments that have incomplete units. It moves those incomplete units (either by expiration or (+soft) rejection) to the CREATED state, and stores them locally so that the launch_units call from the Operator only relaunches these particular units.

Testing

No testing yet

TODO

  • Do some testing in sandbox/local, discover some bugs, resolve those bugs
  • Commit this to main: it's experimental. Some people can use it, but I'd like to standardize pre-releasing functionality that may unblock people (or lead them to contribute the remaining parts to bring to core functionality)

TODO to move to core

  • Add validation that a job can be resumed. Perhaps we need metadata? Any task run with non-standard SharedTaskState for instance is going to struggle to resume properly, but it's possible we have a script wrapper that restores screening unit validator, onboarding functions, etc.
  • Create tests
    • Create a simple automated test that actually runs this functionality
    • Ensure that nothing is broken with regards to shutting down after the new tasks are compeleted
    • Ensure that there's no weird linking if we go back and re-action other workers
    • Ensure that our review scripts still work on task runs that go through this flow
  • Fix anything that was surfaced writing new tests
  • Remove experimental prefix

@facebook-github-bot facebook-github-bot added the CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed. label Oct 4, 2022
@ajyl
Copy link

ajyl commented Oct 5, 2022

Can you point me towards how to created automated tests? In particular, I think
Create a simple automated test that actually runs this functionality would be a good place for me to familiarize myself with this PR.

@JackUrb
Copy link
Contributor Author

JackUrb commented Oct 5, 2022

Surely! Here's a good example of an operator test you could build from. The flow you'd look to build would likely need to:

  1. Create a task run with 3 Assignments, each with 2 units
  2. Create 3 agents using at least 2 workers (to make sure you can complete both of the first assignment, then one of the second). Lets say two for w0 and one for w1
  3. Run the 3 agents to completion (with send_agent_act and submit_mock_unit).
  4. Quit the job (with Operator.force_shutdown())
  5. Check that one assignment is completed, one is mixed, and one is purely expired
  6. Relaunch this task run
  7. Complete one more unit as w0
  8. Try to accept another as w0, assert that it fails (they've already done 3)
  9. Complete two more units as w1
  10. Assert that the operator exits (_wait_for_runs_in_testing)
  11. Assert all three assignments are now completed.

@ajyl
Copy link

ajyl commented Oct 5, 2022

Great! I'll likely take a stab at it over the weekend.
Noob question - How do I run these tests? Like any other python unittests, or do I need to do anything special?

@JackUrb
Copy link
Contributor Author

JackUrb commented Oct 5, 2022

Just like any other unit test. I tend to use pytest -k <my_test_name> when writing a singular test case.

@ajyl
Copy link

ajyl commented Oct 7, 2022

Hi - I have a new unit test created, where it's the exact copy of test_run_job_not_concurrent, and the only change I've made is changing num_assignment for MockBlueprintArgs from 1 to 3, and getting a huge traceback.
I think the main ones that are informative include the following:

___________________________________________________________________________________________________________________ ERROR at teardown of TestOperatorLocal.test_resume_job ___________________________________________________________________________________
________________________________                                                                                                                                                                                                                              
                                                                                                                                                                                                                                                              
self = <test.core.test_operator.TestOperatorLocal testMethod=test_resume_job>                                                                                                                                                                                 
                                                                                                                                                                                                                                                              
    def tearDown(self):                                                                                                                                                                                                                                       
        if self.operator is not None:                                                                                                                                                                                                                         
            self.operator.force_shutdown(timeout=10)                                                                                                                                                                                                          
        self.db.shutdown()                                                                                                                                                                                                                                            shutil.rmtree(self.data_dir, ignore_errors=True)                                                                                                                                                                                                      
        SHUTDOWN_TIMEOUT = 10                                                                                                                                                                                                                                 
        threads = threading.enumerate()                                                                                                                                                                                                                       
        target_threads = [                                                                                                                                                                                                                                    
            t for t in threads if not isinstance(t, TMonitor) and not t.daemon                                                                                                                                                                                
        ]                                                                                                                                                                                                                                                     
        start_time = time.time()                                                                                                                                                                                                                              
        while len(target_threads) > 1 and time.time() - start_time < SHUTDOWN_TIMEOUT:                                                                                                                                                                        
            threads = threading.enumerate()                                                                                                                                                                                                                   
            target_threads = [                                                                                                                                                                                                                                
                t for t in threads if not isinstance(t, TMonitor) and not t.daemon                                                                                                                                                                            
            ]                                                                                                                                                                                                                                                             time.sleep(0.3)                                                                                                                                                                                                                                   
>       self.assertTrue(                                                                                                                                                                                                                                                  time.time() - start_time < SHUTDOWN_TIMEOUT,                                                                                                                                                                                                      
            f"Expected only main thread at teardown after {SHUTDOWN_TIMEOUT} seconds, found {target_threads}",                                                                                                                                                        )                                                                                                                                                                                                                                                     
E       AssertionError: False is not true : Expected only main thread at teardown after 10 seconds, found [<_MainThread(MainThread, started 140144099612480)>, <Thread(socket-thread-ws://localhost:3000/socket, started 140142232729344)>, <Thread(force-shut
down-cleanup-runs, started 140142213834496)>]   
ERROR    mephisto.utils.metrics:metrics.py:124 Could not launch prometheus metrics client, perhaps a process is already running on 3031? Mephisto metrics currently only supports one Operator class at a time at the moment
Traceback (most recent call last):                                                                                             
  File "/home/repos/Mephisto/mephisto/utils/metrics.py", line 122, in start_metrics_server                                                                                                                                                                    
    start_http_server(3031)                                                                                                    
  File "/home/repos/venv_mephisto/lib/python3.8/site-packages/prometheus_client/exposition.py", line 144, in start_wsgi_server                                                                                                                                
    httpd = make_server(addr, port, app, ThreadingWSGIServer, handler_class=_SilentHandler)                                                                                                                                                                   
  File "/usr/lib/python3.8/wsgiref/simple_server.py", line 154, in make_server                                                                                                                                                                                
    server = server_class((host, port), handler_class)                                                                         
  File "/usr/lib/python3.8/socketserver.py", line 452, in __init__                                                             
    self.server_bind()                                                                                                         
  File "/usr/lib/python3.8/wsgiref/simple_server.py", line 50, in server_bind                                                                                                                                                                                 
    HTTPServer.server_bind(self)                                                                                               
  File "/usr/lib/python3.8/http/server.py", line 138, in server_bind                                                           
    socketserver.TCPServer.server_bind(self)                                                                                   
  File "/usr/lib/python3.8/socketserver.py", line 466, in server_bind                                                          
    self.socket.bind(self.server_address)                                                                                      
OSError: [Errno 98] Address already in use  

I see that test_run_jobs_with_restrictions also launches 3 assignments, with 2 max units per worker. I'm able to run this test, and I'm guessing I want to move over some of the functionality in that test over as well, but I'm not sure what's missing/needed.
I've tried some other combinations of configurations, but as far as I can tell, changing the number of assignments from 1 to 3 seem to be the culprit. Have you seen this error before?

@JackUrb
Copy link
Contributor Author

JackUrb commented Oct 17, 2022

@ajyl Sorry I missed this for so long... The issue is that Mephisto in these tests expects to be running the tasks to completion in order to shutdown. If you've increased the number of assignments without any other changes, that means that the task isn't complete by the time it shuts down.

@facebook-github-bot
Copy link
Contributor

Hi @JackUrb!

Thank you for your pull request.

We require contributors to sign our Contributor License Agreement, and yours needs attention.

You currently have a record in our system, but the CLA is no longer valid, and will need to be resubmitted.

Process

In order for us to review and merge your suggested changes, please sign at https://code.facebook.com/cla. If you are contributing on behalf of someone else (eg your employer), the individual CLA may not be sufficient and your employer may need to sign the corporate CLA.

Once the CLA is signed, our tooling will perform checks and validations. Afterwards, the pull request will be tagged with CLA signed. The tagging process may take up to 1 hour after signing. Please give it that time before contacting us about it.

If you have received this in error or have any questions, please contact us at cla@meta.com. Thanks!

@meta-paul meta-paul force-pushed the experimental-resume-task-run branch from 5ea9892 to 7c052aa Compare May 15, 2024 23:39
@meta-paul meta-paul force-pushed the experimental-resume-task-run branch from b3dcb56 to 8ca310b Compare June 8, 2024 01:34
@meta-paul meta-paul force-pushed the experimental-resume-task-run branch from 8ca310b to b262670 Compare June 27, 2024 15:29
@meta-paul meta-paul force-pushed the experimental-resume-task-run branch from 21c3bb3 to 555bbe5 Compare June 27, 2024 19:38
@meta-paul meta-paul force-pushed the experimental-resume-task-run branch from 101edc5 to b68e93c Compare July 2, 2024 18:27
@meta-paul meta-paul merged commit 0ec72ff into main Jul 2, 2024
13 of 14 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants