Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Job context aware #18

Merged
merged 7 commits into from
Oct 4, 2022
Merged

Make Job context aware #18

merged 7 commits into from
Oct 4, 2022

Conversation

inkel
Copy link
Contributor

@inkel inkel commented Sep 28, 2022

Here we're changing the Job API to make it context aware while also giving the calling user the possibility of gracefully stopping the go routine that manages polling the job and its tasks.

The biggest change is in the Job.Start method that now accepts a context.Context as its main parameter, and returns a signal function for the caller to execute when manually stopping the job.

With these changes now managing jobs become:

job := rnr.NewJob(task)
if err := job.Start(context.Background()); err != nil {
    log.Fatal("failed to start job:", err)
}

// Stop after 10 seconds
go func() {
    time.Sleep(10 * time.Second)
    job.Stop()
}()

// wait for job to finish
<-job.Wait()

// check if there was an error
if err := job.Err(); err != nil {
    log.Fatal("job error:", err)
}

Manually calling Job.Stop doesn't seem like the most interesting option, however, by making it context aware we can also control the polling loop. In the following example, the job can be cancelled by the user pressing ^C or after 10 seconds, whatever happens first:

ctx, stopNotify := signal.NotifyContext(context.Background(), os.Interrupt)
defer stopNotify()
ctx, stopTimeout := context.WithTimeout(ctx, 10 * time.Second)
defer stopTimeout()

job := rnr.NewJob(task)
job.Start(ctx)
<-job.Wait()

log.Printf("job finished with: %v", job.Err())

It is worth mentioning that both Stop and Start return the following errors:

  • Job.Stop returns ErrJobNotRunning when calling Stop before calling Start.
  • Job.Start returns ErrJobAlreadyStarted when calling Start more than once, or after a job was stopped; yes, jobs are one use only.

Here we're changing the Job API to make it context aware while also
giving the calling user the possibility of gracefully stopping the go
routine that manages polling the job and its tasks.

The biggest change is in the Job.Start method that now accepts a
context.Context as its main parameter, and returns a signal function
for the caller to execute when manually stopping the job.

With these changes now managing jobs become:

```go
job := rnr.NewJob(task)
stop := job.Start(context.Background())

// do something

// effectively stop the polling loop
stop()

// check if there was an error
if err := job.Err(); err != nil {
    log.Fatal("job error:", err)
}
```

Manually calling stop doesn't seem like the most interesting option,
however, by making it context aware we can also control the polling
loop. In the following example, the job can be cancelled by the user
pressing `^C` or after 10 seconds, whatever happens first:

```go
ctx, stopNotify := signal.NotifyContext(context.Background(), os.Interrupt)
defer stopNotify()
ctx, stopTimeout := context.WithTimeout(ctx, 10 * time.Second)
defer stopTimeout()

job := rnr.NewJob(task)
job.Start(ctx)

// Wait for job to complete

log.Fatalf("Job finished with: %v", job.Err())
```

Signed-off-by: Leandro López (inkel) <inkel.ar@gmail.com>
@inkel
Copy link
Contributor Author

inkel commented Sep 28, 2022

After opening the PR I realize maybe instead of returning the stop function we could add two additional functions:

// Stop manually stops the Job polling loop
func (j *Job) Stop() { }

// Wait wait for a Job to be finished, returning a nil error if
// manually stopped, or an error if the context was cancelled.
func (j *Wait) Wait() error { }

I'll add them in a few and adjust the examples.

@inkel
Copy link
Contributor Author

inkel commented Sep 28, 2022

One other thing I realized is that nothing stops the caller from calling Start twice and creating a parallel polling loop. I'm going to fix that as well.

Signed-off-by: Leandro López (inkel) <inkel.ar@gmail.com>
This also removes the stop function returned by Job.Start. By doing
this we can later implement a waiting function and have more control
on the status of a job when instatiating it.

Signed-off-by: Leandro López (inkel) <inkel.ar@gmail.com>
Signed-off-by: Leandro López (inkel) <inkel.ar@gmail.com>
Signed-off-by: Leandro López (inkel) <inkel.ar@gmail.com>
This isn't by any means exhaustive, but checks most of what can be
done with the current API.

Signed-off-by: Leandro López (inkel) <inkel.ar@gmail.com>
@inkel
Copy link
Contributor Author

inkel commented Sep 29, 2022

Ok, added some errors and tests as well. I'm going to update the PR description.

@inkel
Copy link
Contributor Author

inkel commented Sep 29, 2022

One last thing worth nothing that is out of the scope of this PR but it's made visible by these changes is that you could have a never-ending job if you do something like this and never manually call Stop:

job := rnr.NewJob(task)
job.Start(context.Backgrond())
<-job.Wait()

What I think we should do in this case is change the Job.Poll implementation to check the status of the job's task and, if done, whatever the state, call Stop internally.

@inkel
Copy link
Contributor Author

inkel commented Sep 29, 2022

Oh, one last thing I forgot to mention: I think it would make more sense to pass the polling interval as an argument to Job.Start rather than using the fixed 5 seconds one we have right now. This gives more control to the caller, and it will also sped up tests an awful lot 😬

I just made the change 😬

Signed-off-by: Leandro López (inkel) <inkel.ar@gmail.com>
@mplzik
Copy link
Owner

mplzik commented Sep 30, 2022

This opens few interesting questions, most notably the relationship between root task and a job. I started to slowly moving towards a model where Poll() is assumed to be called all the time, regardless of task's state. Although the initial design was fairly simplistic with totally stateless polling, there are use-cases where we need to maintain i.e. a background task, and in these cases we need to properly handle cases when someone switches task's state from RUNNING to something else.

So, coming back to this PR. My personal opinion is to stay on the road of calling Poll() as long as possible, ideally across the whole lifetime of a Job (making Start likely obsolete). Having an option to start/stop a Job probably doesn't make much sense -- that can be done on task level. Still, rnr shouldn't prevent the end user from correctly terminating their code, so having a way to terminate a job definitely makes sense; having an API for that is definitely desirable.

Thus, my proposal would be:

  • let's make Start an integral part of the NewJob function and fix any race conditions that might show up there
  • We can keep the externally-supplied context and the Wait function -- this will allow the higher-level code to terminate a job.

nit: Now, another topic for a different PR would be how to time-limit individual Task's Poll() time. I was thinking about providing them with a ctx as well, but we should also ensure that this ctx won't outlive the Job's context. Is there any library that would implement a "composite" context that expires if any of it child context's expires?

@inkel
Copy link
Contributor Author

inkel commented Sep 30, 2022

I'm going to start replying backwards because why not? 😬

nit: Now, another topic for a different PR would be how to time-limit individual Task's Poll() time. I was thinking about providing them with a ctx as well, but we should also ensure that this ctx won't outlive the Job's context. Is there any library that would implement a "composite" context that expires if any of it child context's expires?

That was going to be my next PR 🤓 I actually have some code already, where the job.Poll function passes down the context given in Start to each task. This would "fix" your concern of the context outliving the one of the job. Would that make sense?

As for a context that expires when any of its children expires I don't know of any from the top of my head, but maybe there are. And if you, I suppose we could implement it, though it feels a bit odd when it's the other way around. Could be a fun exercise 😉

  • let's make Start an integral part of the NewJob function and fix any race conditions that might show up there

I'm not sure what race conditions there might be in that case. This could be solved with the current API by adding a top-level function similar to this one:

func StartJob(task Task) *Job {
  job := NewJob(task)
  job.Start()
  return job
}

Then you would use it in the code by doing:

job := rnr.StartJob(rootTask)

<-job.Wait()

In the end it's just semantics.

Having an option to start/stop a Job probably doesn't make much sense -- that can be done on task level.

Yeah, I suppose you're correct, however, for that to happen Task should be made aware of Job in some way, which introduces a sort of cyclic dependency between both types. While working on making Task more context-aware, I also started thinking about this. I think one possible way of achieving this (tasks stopping job) you could change Job.Poll to be something like the following (mostly pseudo-code, removing all non-relevant parts):

func (j *Job) Poll() {
  state := j.root.Poll(j.ctx)

  if state == DONE {
    j.Stop()
    j.err = j.root.Err()
  }
}

// We would also need to change the Task interface
type Task interface {
  // return the current task state after calling Poll
  Poll(context.Context) TaskState

  // return any potential error that could have happened in the task
  Err() error
}

With these changes now you could have something like this:

func main() {
  root := rnr.NewTaskNested("foo")
  root.Add(&CountTask{})
  root.Add(&RandomFailureTask{})

  job := rnr.StartJob(root)

  <-job.Wait()

  fmt.Println("Job finished with error:", job.Err())
}

// Below are the custom task types used in the example above

type CountTask int

func (t *CountTask) Poll(context.Context) TaskState {
  *t++ // increment count
  if *t == 10 {
    return DONE
  }
  return RUNNING
}

func (t *CountTask) Err() error { return nil }

type RandomFailureTask struct{
  err error
}

func (t *RandomFailureTask) Poll(context.Context) TaskState {
  if n := rand.Int(); n % 3 == 0 {
    t.err = errors.New("I failed")
  }
}

func (t *RandomFailureTask) Err() error { return t.err }

These changes would allow the job to stop when its root task enters a DONE state, which it currently means the task was failed, skipped, or succeeded.

Does this make sense?

This opens few interesting questions, most notably the relationship between root task and a job. I started to slowly moving towards a model where Poll() is assumed to be called all the time, regardless of task's state. Although the initial design was fairly simplistic with totally stateless polling, there are use-cases where we need to maintain i.e. a background task, and in these cases we need to properly handle cases when someone switches task's state from RUNNING to something else.

I believe all of the above match this idea. There are of course changes that could be done to the current PR, and I'll be happy to continue this discussion and make any adjustments.

@mplzik
Copy link
Owner

mplzik commented Oct 4, 2022

This definitely makes sense.

As for returning state from Poll(); that's another interesting topic which might as well expand beyond this PR's scope (more reasonable task state handling). Let's start with small bits first :)

@inkel
Copy link
Contributor Author

inkel commented Oct 4, 2022

might as well expand beyond this PR's scope (more reasonable task state handling)

Agreed! It definitely needs a PR of its own, we should carefully consider implementing this.

I see you've approved the PR, thanks! Feel free to merge it whenever you have the time 😸

@mplzik mplzik merged commit 6b180b6 into mplzik:master Oct 4, 2022
@inkel inkel deleted the inkel/job-context branch October 4, 2022 13:29
@inkel inkel mentioned this pull request Oct 4, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants