-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(backend): Synced ScheduledWorkflow CRs on apiserver startup #11469
base: master
Are you sure you want to change the base?
Conversation
Skipping CI for Draft Pull Request. |
8ed1796
to
332cc47
Compare
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here.
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
Signed-off-by: Helber Belmiro <helber.belmiro@gmail.com>
50713d3
to
953426d
Compare
backend/src/apiserver/main.go
Outdated
@@ -106,6 +106,13 @@ func main() { | |||
} | |||
log.SetLevel(level) | |||
|
|||
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) | |||
defer cancel() | |||
err = resourceManager.SyncSwfCrs(ctx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The persistence agent seems to already have a reconcile loop for scheduled workflows. If I'm reading the code right, on start up, it'll reconcile everything and then handle creates, updates, and deletes.
Could the migration logic be added to the persistence agent instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It probably could. But that doesn't sound like a persistence agent's responsibility. It doesn't sound like API server's responsibility too, but I think it fits better there since we have a jobStore
.
My original plan was to do it in https://github.com/kubeflow/pipelines/tree/master/backend/src/crd/controller/scheduledworkflow, but we would have to make HTTP calls to the API server. Then we decided to leave it in the API server.
It would be nice to hear others' opinions about that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hbelmiro I agree that persistence agent isn't the best fit. That controller you linked could be a good fit.
I think we should consider adding a KFP version column to the jobs
table so that you can skip generating and updating scheduled workflows if the version matches.
If were to pursue using the scheduled workflow controller, here is an idea:
- Have the scheduled workflow controller query the API server health endpoint at start up to get the
tag_name
value to see what version of KFP we are on. In the background, it could keep querying the API server to see if the version changed. - The scheduled workflow controller's reconcile loop checks an annotation of
pipelines.kubeflow.org/version
and if the value of that annotation doesn't matchtag_name
, then the workflow definition is updated and the annotation value is set to the current version. - When the API server creates a
ScheduledWorkflow
object, it sets thepipelines.kubeflow.org/version
annotation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I love this idea. My only concern is about making HTTP calls to the API server.
How about implementing it in a follow-up PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, no problem! What do you think of the other comnment of adding a KFP version column to the jobs table so that you can skip generating and updating scheduled workflows if the version matches?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like that too. But other than https://github.com/kubeflow/pipelines/blob/master/VERSION, I'm not aware of a field/file from where I can get the version. Are you?
Otherwise, maybe @HumairAK and @chensun can recommend something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hbelmiro I think TAG_VERSION
can be used like is surfaced from the health endpoint.
Co-authored-by: Matt Prahl <mprahl@users.noreply.github.com> Signed-off-by: Helber Belmiro <helber.belmiro@gmail.com>
Signed-off-by: Helber Belmiro <helber.belmiro@gmail.com>
go reconcileSwfCrs(resourceManager) | ||
|
||
go startRpcServer(resourceManager) | ||
startHttpProxy(resourceManager) | ||
|
||
clientManager.Close() | ||
} | ||
|
||
func reconcileSwfCrs(resourceManager *resource.ResourceManager) { | ||
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) | ||
defer cancel() | ||
err := resourceManager.ReconcileSwfCrs(ctx) | ||
if err != nil { | ||
log.Errorf("Could not reconcile the ScheduledWorkflow Kubernetes resources: %v", err) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know the rest of the code doesn't do this for separate go routines but I think the correct approach would be something like this to properly wait for the goroutine to finish.
go reconcileSwfCrs(resourceManager) | |
go startRpcServer(resourceManager) | |
startHttpProxy(resourceManager) | |
clientManager.Close() | |
} | |
func reconcileSwfCrs(resourceManager *resource.ResourceManager) { | |
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) | |
defer cancel() | |
err := resourceManager.ReconcileSwfCrs(ctx) | |
if err != nil { | |
log.Errorf("Could not reconcile the ScheduledWorkflow Kubernetes resources: %v", err) | |
} | |
} | |
backgroundCtx, backgroundCancel := context.WithCancel(context.Background()) | |
defer backgroundCancel() | |
wg := sync.WaitGroup{} | |
wg.Add(1) | |
go reconcileSwfCrs(resourceManager, ctx, &wg) | |
go startRpcServer(resourceManager) | |
// This is blocking | |
startHttpProxy(resourceManager) | |
backgroundCancel() | |
clientManager.Close() | |
wg.Wait() | |
} | |
func reconcileSwfCrs(resourceManager *resource.ResourceManager, ctx context.Context, wg *sync.WaitGroup) { | |
defer wg.Done() | |
err := resourceManager.ReconcileSwfCrs(ctx) | |
if err != nil { | |
log.Errorf("Could not reconcile the ScheduledWorkflow Kubernetes resources: %v", err) | |
} | |
} |
go startRpcServer(resourceManager) | ||
startHttpProxy(resourceManager) | ||
|
||
clientManager.Close() | ||
} | ||
|
||
func reconcileSwfCrs(resourceManager *resource.ResourceManager) { | ||
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need a timeout context here since we don't really care if it takes longer than 3 minutes when it's run asynchronously. A regular background context should be okay.
defer cancel() | ||
err := resourceManager.ReconcileSwfCrs(ctx) | ||
if err != nil { | ||
log.Errorf("Could not reconcile the ScheduledWorkflow Kubernetes resources: %v", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with your approach of not exiting with an error code in this case, but I'm curious what others think. My concern is that it'd be hard to know that this failed as a KFP admin but it also doesn't seem warranted to keep the API server from running if this can't succeed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I was implementing this from scratch, I'd definitely exit with an error. But since the original behavior (the one that this PR aims to fix) is to run the API server with outdated swf
s, existing deployments may start to fail after an upgrade even when they would work with outdated swf
s.
At the same time, silently ignoring it and the outdated swf
s "working" silently may be even more dangerous than just exiting with an error. (Now while I'm writing this, I'm tilting more to exit with an error)
Yeah, let's see what others think.
if k8sNamespace == "" { | ||
k8sNamespace = common.GetPodNamespace() | ||
} | ||
if k8sNamespace == "" { | ||
return errors.New("Namespace cannot be empty when deleting a ScheduledWorkflow Kubernetes resource.") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When is this code needed? It seems the jobs table always has the namespace set.
return failedToReconcileSwfCrsError(err) | ||
} | ||
|
||
err = r.patchSwfCrSpec(ctx, jobs[i].Namespace, jobs[i].K8SName, scheduledWorkflow.Spec) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you use a reflect.DeepEquals
check to compare the desired spec and the current spec to avoid patches when the value is already correct?
"Failed to marshal patch spec") | ||
} | ||
|
||
_, err = r.getScheduledWorkflowClient(k8sNamespace).Patch( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you perform an update instead of a patch? That way it'll catch resourceVersion
mismatches (i.e. ScheduledWorkflow was updated between when the object was retrieved and the update request). It'd be good to also retry the whole job iteration flow in that event. The IsConflict
function from "k8s.io/apimachinery/pkg/api/errors"
should help detect that specific case.
} | ||
|
||
scheduledWorkflow, err := tmpl.ScheduledWorkflow(jobs[i]) | ||
if err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this ignore not found errors? I could see the case where the database still has a reference to the scheduled workflow but the object no longer exists on the Kubernetes cluster.
Resolves: #11296
This PR changes apiserver to patch existing
ScheduledWorkflow
CRs for eachJob
on startup to reflect the current KFP version deployed.Testing
Create recurring runs
Make sure the recurring runs are running
Patch their
swf
CRs to force failures3.1 Get the
swf
CRs3.2 For each
swf
patch them with an invalid workflow spec to force failures. At this point, the recurring runs will start to fail due to the invalid spec.Build a new apiserver image
Edit the
ml-pipeline
deployment to use the new apiserver imageThe new apiserver pod will fix the
swf
CRs and the recurring runs will run successfully againSee a video demonstrating the test:
kfp-issue-11296.mp4
Checklist: