Skip to content

Commit

Permalink
[Array-Jobs] Support separate namespace + Fix remote logs (flyteorg#172)
Browse files Browse the repository at this point in the history
* [Array-Jobs] Support separate namespace

Signed-off-by: Anmol Khurana <akhurana@lyft.com>

* [Array-Jobs] Support separate namespace

Signed-off-by: Anmol Khurana <akhurana@lyft.com>

* Fix log links

Signed-off-by: Anmol Khurana <akhurana@lyft.com>

* Fix log links

Signed-off-by: Anmol Khurana <akhurana@lyft.com>

* Add log tests

Signed-off-by: Anmol Khurana <akhurana@lyft.com>

* PR comments:

Signed-off-by: Anmol Khurana <akhurana@lyft.com>

* Update log links

Signed-off-by: Anmol Khurana <akhurana@lyft.com>

* Update log links

Signed-off-by: Anmol Khurana <akhurana@lyft.com>

* Update log links

Signed-off-by: Anmol Khurana <akhurana@lyft.com>

* PR comments

Signed-off-by: Anmol Khurana <akhurana@lyft.com>
  • Loading branch information
akhurana001 committed May 6, 2021
1 parent c1bc48d commit 83324d5
Show file tree
Hide file tree
Showing 7 changed files with 378 additions and 18 deletions.
8 changes: 8 additions & 0 deletions go/tasks/plugins/array/k8s/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"fmt"
"io/ioutil"

"github.com/flyteorg/flyteplugins/go/tasks/logs"

"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
restclient "k8s.io/client-go/rest"
Expand Down Expand Up @@ -113,8 +115,14 @@ type Config struct {
RemoteClusterConfig ClusterConfig `json:"remoteClusterConfig" pflag:"-,Configuration of remote K8s cluster for array jobs"`
NodeSelector map[string]string `json:"node-selector" pflag:"-,Defines a set of node selector labels to add to the pod."`
Tolerations []v1.Toleration `json:"tolerations" pflag:"-,Tolerations to be applied for k8s-array pods"`
NamespaceTemplate string `json:"namespaceTemplate" pflag:"-,Namespace pattern to spawn array-jobs in. Defaults to parent namespace if not set"`
OutputAssembler workqueue.Config
ErrorAssembler workqueue.Config
LogConfig LogConfig `json:"logs" pflag:",Config for log links for k8s array jobs."`
}

type LogConfig struct {
Config logs.LogConfig `json:"config" pflag:",Defines the log config for k8s logs."`
}

func GetConfig() *Config {
Expand Down
11 changes: 11 additions & 0 deletions go/tasks/plugins/array/k8s/config_flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

242 changes: 242 additions & 0 deletions go/tasks/plugins/array/k8s/config_flags_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 22 additions & 6 deletions go/tasks/plugins/array/k8s/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"strconv"
"time"

"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/tasklog"

"github.com/flyteorg/flytestdlib/logger"
"github.com/flyteorg/flytestdlib/storage"

Expand Down Expand Up @@ -59,6 +61,12 @@ func LaunchAndCheckSubTasksState(ctx context.Context, tCtx core.TaskExecutionCon
currentState.ArrayStatus = *newArrayStatus
}

logPlugin, err := logs.InitializeLogPlugins(&config.LogConfig.Config)
if err != nil {
logger.Errorf(ctx, "Error initializing LogPlugins: [%s]", err)
return currentState, logLinks, subTaskIDs, err
}

for childIdx, existingPhaseIdx := range currentState.GetArrayStatus().Detailed.GetItems() {
existingPhase := core.Phases[existingPhaseIdx]
indexStr := strconv.Itoa(childIdx)
Expand Down Expand Up @@ -113,7 +121,7 @@ func LaunchAndCheckSubTasksState(ctx context.Context, tCtx core.TaskExecutionCon
}

var monitorResult MonitorResult
monitorResult, err = task.Monitor(ctx, tCtx, kubeClient, dataStore, outputPrefix, baseOutputDataSandbox)
monitorResult, err = task.Monitor(ctx, tCtx, kubeClient, dataStore, outputPrefix, baseOutputDataSandbox, logPlugin)
logLinks = task.LogLinks
subTaskIDs = task.SubTaskIDs

Expand Down Expand Up @@ -157,7 +165,7 @@ func LaunchAndCheckSubTasksState(ctx context.Context, tCtx core.TaskExecutionCon
return newState, logLinks, subTaskIDs, nil
}

func CheckPodStatus(ctx context.Context, client core.KubeClient, name k8sTypes.NamespacedName) (
func FetchPodStatusAndLogs(ctx context.Context, client core.KubeClient, name k8sTypes.NamespacedName, index int, retryAttempt uint32, logPlugin tasklog.Plugin) (
info core.PhaseInfo, err error) {

pod := &v1.Pod{
Expand Down Expand Up @@ -192,11 +200,19 @@ func CheckPodStatus(ctx context.Context, client core.KubeClient, name k8sTypes.N
}

if pod.Status.Phase != v1.PodPending && pod.Status.Phase != v1.PodUnknown {
taskLogs, err := logs.GetLogsForContainerInPod(ctx, pod, 0, " (User)")
if err != nil {
return core.PhaseInfoUndefined, err

if logPlugin != nil {
o, err := logPlugin.GetTaskLogs(tasklog.Input{
PodName: pod.Name,
Namespace: pod.Namespace,
LogName: fmt.Sprintf(" #%d-%d", index, retryAttempt),
})

if err != nil {
return core.PhaseInfoUndefined, err
}
taskInfo.Logs = o.TaskLogs
}
taskInfo.Logs = taskLogs
}
switch pod.Status.Phase {
case v1.PodSucceeded:
Expand Down
Loading

0 comments on commit 83324d5

Please sign in to comment.