Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Add support raw container in the map task #329

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
Open

Conversation

pingsutw
Copy link
Member

@pingsutw pingsutw commented Mar 9, 2023

TL;DR

When using regular task in the map task, the entrypoint is pyflyte-map-execute. it does two things before running the task.

  1. Update sub task input / output interface to list interface
  2. Append job id to output prefix

However, when using raw container, the entrypoint will not be pyflyte-map-execute.

Therefore, we should update the output prefix for copilot, and support upload collection in the copilot.

Type

  • Bug Fix
  • Feature
  • Plugin

Are all requirements met?

  • Code completed
  • Smoke tested
  • Unit tests added
  • Code documentation added
  • Any pending items have an associated Issue

Complete description

image

Tracking Issue

https://flyte-org.slack.com/archives/CP2HDHKE1/p1678230956906899

Follow-up issue

NA

Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Comment on lines 189 to 191
for sidecarIndex, container := range pod.Spec.Containers {
if container.Name == config.GetK8sPluginConfig().CoPilot.NamePrefix+flytek8s.Sidecar {
for i, arg := range pod.Spec.Containers[sidecarIndex].Args {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @hamersaw should we pass env FlyteK8sArrayIndex to copilot, and construct final output prefix in the copilot?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have strong feelings here. How are we passing the array index to the inputs downloader? Because in flytekit we pass the input data ref and a subtask index, IIUC it reads the full list of inputs and only uses the value at the subtask index. We need to do the same thing here right?

Copy link
Member Author

@pingsutw pingsutw Mar 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we pass array index to primary container instead of downloader, and raw container task will read the value at subtask index. here is an example. flyteorg/flytekit#1547.

The problem is that in the regular map task, we construct the final output prefix in the flytekit (output_prefix + array index.), but the raw container doesn't know the output prefix, it write to a local share dir instead. uploader will read the data in the share dir and upload to s3.

Signed-off-by: Kevin Su <pingsutw@apache.org>
@codecov
Copy link

codecov bot commented Mar 10, 2023

Codecov Report

Merging #329 (3a6cc0b) into master (18a594e) will increase coverage by 1.39%.
The diff coverage is 68.75%.

❗ Current head 3a6cc0b differs from pull request most recent head b80c8a9. Consider uploading reports for the commit b80c8a9 to get more accurate results

@@            Coverage Diff             @@
##           master     #329      +/-   ##
==========================================
+ Coverage   62.65%   64.05%   +1.39%     
==========================================
  Files         146      146              
  Lines       12220     9929    -2291     
==========================================
- Hits         7657     6360    -1297     
+ Misses       3981     2985     -996     
- Partials      582      584       +2     
Flag Coverage Δ
unittests ?

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
go/tasks/pluginmachinery/core/transition.go 100.00% <ø> (ø)
go/tasks/pluginmachinery/flytek8s/pod_helper.go 69.81% <ø> (-3.21%) ⬇️
go/tasks/pluginmachinery/internal/webapi/core.go 25.00% <0.00%> (+2.27%) ⬆️
go/tasks/plugins/hive/executor.go 10.12% <0.00%> (+1.87%) ⬆️
go/tasks/plugins/presto/executor.go 11.26% <0.00%> (+1.96%) ⬆️
...tasks/pluginmachinery/flytek8s/container_helper.go 87.41% <40.00%> (+0.23%) ⬆️
go/tasks/plugins/array/k8s/subtask_exec_context.go 80.80% <66.66%> (+0.97%) ⬆️
go/tasks/plugins/array/catalog.go 46.98% <68.18%> (+1.93%) ⬆️
go/tasks/plugins/array/k8s/executor.go 40.67% <100.00%> (+1.09%) ⬆️
go/tasks/plugins/array/k8s/management.go 57.43% <100.00%> (-0.90%) ⬇️
... and 1 more

... and 120 files with indirect coverage changes

Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.

Signed-off-by: Kevin Su <pingsutw@apache.org>
Copy link
Contributor

@hamersaw hamersaw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is looking very good - thanks for diving into it! My general feedback is I think we can leave the pod plugin code as is. If we can encapsulate the code for maptasks in the k8s_array package then when we implement ArrayNode and ultimately refactor this all out it doesn't leave nasty one-off code to support legacy functionality. What are your thoughts on this? Very open for discussion.

Comment on lines 189 to 191
for sidecarIndex, container := range pod.Spec.Containers {
if container.Name == config.GetK8sPluginConfig().CoPilot.NamePrefix+flytek8s.Sidecar {
for i, arg := range pod.Spec.Containers[sidecarIndex].Args {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have strong feelings here. How are we passing the array index to the inputs downloader? Because in flytekit we pass the input data ref and a subtask index, IIUC it reads the full list of inputs and only uses the value at the subtask index. We need to do the same thing here right?

Comment on lines 194 to 202
// When the copilot is running, we should wait until the data is uploaded by the copilot.
copilotContainerName, exists := r.GetAnnotations()[flytek8s.FlyteCopilotName]
if exists {
copilotContainerPhase := flytek8s.DetermineContainerPhase(copilotContainerName, pod.Status.ContainerStatuses, &info)
if copilotContainerPhase.Phase() == pluginsCore.PhaseRunning && len(info.Logs) > 0 {
return pluginsCore.PhaseInfoRunning(pluginsCore.DefaultPhaseVersion+1, copilotContainerPhase.Info()), nil
}
}

Copy link
Contributor

@hamersaw hamersaw Mar 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So right now this is done in ContainerTasks (not using map task) by just not setting a primaryContainerName on the Pod, then this code waits for the entire Pod to complete. It seems like this is what we should do for subtasks as well.

I think the issue is that here we always add a PrimaryContainerName to the pod annotation. Maybe it makes sense to update the code in the subtask.go so that this annotation is only added if necessary, then we shouldn't need to add to flytek8s.FlyteCopilotName annotation above either.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue is the propeller will only pass array index to primary container, so we have to set raw-container to primary. However, if we set it to primary, propeller won't wait for the uploader complete, so I added flytek8s.FlyteCopilotName to annotation, and wait for copilot first if we find the uploader container in the pod here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so we should probably update this logic then to support ContainerTask (ie. add array index to non-copilot containers) so that it doesn't rely on the primaryContainerName annotation.

This means we could keep the logic in PodPlugin so that if the primaryContainerName annotation exists, it waits for that container to completed. If it doesn't then it waits for the Pod to complete. It helps if this logic is simple because we have a few perf ideas to layer on top of it.

Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
@@ -330,6 +346,10 @@ func getTaskContainerIndex(pod *v1.Pod) (int, error) {
if len(pod.Spec.Containers) == 1 {
return 0, nil
}
// Copilot is always the second container if it is enabled.
if len(pod.Spec.Containers) == 2 && pod.Spec.Containers[1].Name == config.GetK8sPluginConfig().CoPilot.NamePrefix+flytek8s.Sidecar {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it is reasonable to assume this. Could there a scenario where the second container was not copilot?

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants