-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add scheduler func for clearing batch scheduling on completed #1079
Add scheduler func for clearing batch scheduling on completed #1079
Conversation
if want to add some test codes for volcano scheduler, I will write it :) |
@@ -25,4 +25,5 @@ type BatchScheduler interface { | |||
|
|||
ShouldSchedule(app *v1beta2.SparkApplication) bool | |||
DoBatchSchedulingOnSubmission(app *v1beta2.SparkApplication) error | |||
ClearBatchSchedulingOnCompleted(app *v1beta2.SparkApplication) error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: CleanupOnCompletion
.
func (v *VolcanoBatchScheduler) ClearBatchSchedulingOnCompleted(app *v1beta2.SparkApplication) error { | ||
// remove pod group | ||
podGroupName := v.getAppPodGroupName(app) | ||
if err := v.volcanoClient.SchedulingV1beta1().PodGroups(app.Namespace).Delete(podGroupName, &metav1.DeleteOptions{}); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can do:
return v.volcanoClient.SchedulingV1beta1().PodGroups(app.Namespace).Delete(podGroupName, &metav1.DeleteOptions{})
@@ -597,6 +597,11 @@ func (c *Controller) syncSparkApplication(key string) error { | |||
glog.Errorf("failed to update SparkApplication %s/%s: %v", app.Namespace, app.Name, err) | |||
return err | |||
} | |||
|
|||
if err := c.clearBatchScheduling(app, appCopy); err != nil { | |||
glog.Errorf("failed to clean up batch scheduling %s/%s: %v", app.Namespace, app.Name, err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
failed to clean up batch scheduling config for SparkApplication
@@ -1001,3 +1006,19 @@ func (c *Controller) hasApplicationExpired(app *v1beta2.SparkApplication) bool { | |||
|
|||
return false | |||
} | |||
|
|||
// Clean up batch scheduler if use |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clean up batch scheduling config if applicable.
.
@liyinan926 Thanks! Update code for your comments |
629ceb8
to
6ed3470
Compare
@liyinan926 PTAL |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the delay in review.
@@ -597,6 +597,11 @@ func (c *Controller) syncSparkApplication(key string) error { | |||
glog.Errorf("failed to update SparkApplication %s/%s: %v", app.Namespace, app.Name, err) | |||
return err | |||
} | |||
|
|||
if err := c.cleanUpBatchScheduling(app, appCopy); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just realized that we have a method for cleaning up application resources like the driver UI service. We should add deletion of this resource to that method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for review!
I wonder if the function you are talking about is deleteSparkResources.
As far as I understand, this function is called before the Spark application retry, which is called when not normal (invalidating, failed submission, etc.).
In the case of cleanup, the podgroup is deleted when it is in completed and failed state. This means that the spark application no longer restarts. If spark operator deletes the podgroup in the state of invalidating or failed submission, the spark application will not be queued to the volcano and will not be restarted until re-queued.
Please comment if it is different from what I understood.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense.
@@ -597,6 +597,11 @@ func (c *Controller) syncSparkApplication(key string) error { | |||
glog.Errorf("failed to update SparkApplication %s/%s: %v", app.Namespace, app.Name, err) | |||
return err | |||
} | |||
|
|||
if err := c.cleanUpBatchScheduling(app, appCopy); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense.
@@ -25,4 +25,5 @@ type BatchScheduler interface { | |||
|
|||
ShouldSchedule(app *v1beta2.SparkApplication) bool | |||
DoBatchSchedulingOnSubmission(app *v1beta2.SparkApplication) error | |||
CleanupOnCompleted(app *v1beta2.SparkApplication) error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be consistent, let's rename this to CleanupOnCompletion
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -1001,3 +1006,19 @@ func (c *Controller) hasApplicationExpired(app *v1beta2.SparkApplication) bool { | |||
|
|||
return false | |||
} | |||
|
|||
// Clean up batch scheduling config if applicable. | |||
func (c *Controller) cleanUpBatchScheduling(oldApp, newApp *v1beta2.SparkApplication) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please rename this to cleanUpOnTermination
, which sounds more generic and can be extended later to handle deletion of other resources, e.g., the UI service. Please update the comments also.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix it
state := newApp.Status.AppState.State | ||
// If new app state is completed or failed, no more app is running, | ||
// and only once needs to clean up on completed or failed | ||
if (state == v1beta2.CompletedState || state == v1beta2.FailedState) && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest moving the state check to the caller of this method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks, I moved it to caller
@@ -1001,3 +1009,17 @@ func (c *Controller) hasApplicationExpired(app *v1beta2.SparkApplication) bool { | |||
|
|||
return false | |||
} | |||
|
|||
// Clean up resources such as batch scheduler config if applicable. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment needs to be updated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated it
Clean up when the spark application is terminated.
// Clean up resources such as batch scheduler config if applicable. | ||
func (c *Controller) cleanUpOnTermination(oldApp, newApp *v1beta2.SparkApplication) error { | ||
if needScheduling, scheduler := c.shouldDoBatchScheduling(newApp); needScheduling { | ||
// If new app state is completed or failed, no more app is running, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment needs update.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated it
batch schduler is cleaned up only when state is changed to completion state
func (v *VolcanoBatchScheduler) CleanupOnCompletion(app *v1beta2.SparkApplication) error { | ||
podGroupName := v.getAppPodGroupName(app) | ||
//Remove pod group for Spark Application | ||
return v.volcanoClient.SchedulingV1beta1().PodGroups(app.Namespace).Delete(podGroupName, &metav1.DeleteOptions{}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should ignore the error if it's NotFound.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I saw this comments at now, I will update code that ignores NotFound error here
@@ -160,6 +160,12 @@ func (v *VolcanoBatchScheduler) syncPodGroup(app *v1beta2.SparkApplication, size | |||
return nil | |||
} | |||
|
|||
func (v *VolcanoBatchScheduler) CleanupOnCompletion(app *v1beta2.SparkApplication) error { | |||
podGroupName := v.getAppPodGroupName(app) | |||
//Remove pod group for Spark Application |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment is not needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove it
func (c *Controller) cleanUpOnTermination(oldApp, newApp *v1beta2.SparkApplication) error { | ||
if needScheduling, scheduler := c.shouldDoBatchScheduling(newApp); needScheduling { | ||
// batch schduler is cleaned up only when state is changed to completion state | ||
if newApp.Status.AppState.State != oldApp.Status.AppState.State { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This check is not needed if deletion is idempotent, i.e., if we ignore NotFound error above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks reasonable
updated to
if needScheduling, scheduler := c.shouldDoBatchScheduling(newApp); needScheduling {
if err := scheduler.CleanupOnCompletion(newApp); err != nil && !errors.IsNotFound(err) {
return err
}
}
if err := scheduler.CleanupOnCompletion(newApp); err != nil { | ||
return err | ||
} | ||
if err := scheduler.CleanupOnCompletion(newApp); err != nil && !errors.IsNotFound(err) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of dealing with the NotFound error here, why not return a nil if so in CleanupOnCompletion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you're right :)
Updated not found checking in CleanUpOnCompletion
c35c566
to
c13b1c3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
…ow#1079) * Add scheduler func for clearing batch scheduling on completed * Rename batch scheduler func * Rename spark app claen up func * Update comment for clean up function * Remove app state changed checking when clean up batch scheduler
Related to issue #1062
The spark application injected with the volcano scheduler does not delete the pod group when the state of the spark application is changed to a completed or failed.
It can only be removed from the volcano queue by spark application deletion command or TTL timeout.
This will cause new pod groups to be scheduled to get stuck(pending) in the queue until spark application removed with ownerReference.
In my opinion, it would be better to remove the pod group when the spark application is changed to a completed or failed state.