Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hotfix/pipeline support multi containers monitor #1777

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 24 additions & 23 deletions apistructs/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,29 +65,30 @@ type Job struct {
}

type JobFromUser struct {
Name string `json:"name"`
Namespace string `json:"namespace"` // the default namespace is "default"
ID string `json:"id,omitempty"` // if Job has owner, e.g. jobflow, it's ID can be specified.
CallBackUrls []string `json:"callbackurls,omitempty"`
Image string `json:"image,omitempty"`
Resource string `json:"resource,omitempty"` // Flink时,为jarId;Spark时,为jar url
MainClass string `json:"mainClass,omitempty"` // 入口类, 主要用于Flink/Spark
MainArgs []string `json:"mainArgs"` // 入口类参数, 主要用于Flink/Spark
Cmd string `json:"cmd,omitempty"`
CPU float64 `json:"cpu,omitempty"`
Memory float64 `json:"memory,omitempty"`
Labels map[string]string `json:"labels,omitempty"`
Extra map[string]string `json:"extra,omitempty"`
Env map[string]string `json:"env,omitempty"`
Binds []Bind `json:"binds,omitempty"`
Volumes []diceyml.Volume `json:"volumes,omitempty"`
Executor string `json:"executor,omitempty"`
ClusterName string `json:"clusterName,omitempty"`
Kind string `json:"kind"` // Metronome/FLink/Spark/LocalDocker/Swarm/Kubernetes
Depends []string `json:"depends,omitempty"` // JobName
PreFetcher *PreFetcher `json:"preFetcher,omitempty"`
BackoffLimit int `json:"backoffLimit,omitempty"`
Params map[string]interface{} `json:"params,omitempty"`
Name string `json:"name"`
Namespace string `json:"namespace"` // the default namespace is "default"
ID string `json:"id,omitempty"` // if Job has owner, e.g. jobflow, it's ID can be specified.
CallBackUrls []string `json:"callbackurls,omitempty"`
Image string `json:"image,omitempty"`
Resource string `json:"resource,omitempty"` // Flink时,为jarId;Spark时,为jar url
MainClass string `json:"mainClass,omitempty"` // 入口类, 主要用于Flink/Spark
MainArgs []string `json:"mainArgs"` // 入口类参数, 主要用于Flink/Spark
Cmd string `json:"cmd,omitempty"`
CPU float64 `json:"cpu,omitempty"`
Memory float64 `json:"memory,omitempty"`
Labels map[string]string `json:"labels,omitempty"`
Extra map[string]string `json:"extra,omitempty"`
Env map[string]string `json:"env,omitempty"`
Binds []Bind `json:"binds,omitempty"`
Volumes []diceyml.Volume `json:"volumes,omitempty"`
Executor string `json:"executor,omitempty"`
ClusterName string `json:"clusterName,omitempty"`
Kind string `json:"kind"` // Metronome/FLink/Spark/LocalDocker/Swarm/Kubernetes
Depends []string `json:"depends,omitempty"` // JobName
PreFetcher *PreFetcher `json:"preFetcher,omitempty"`
BackoffLimit int `json:"backoffLimit,omitempty"`
Params map[string]interface{} `json:"params,omitempty"`
TaskContainers []TaskContainer `json:"taskContainers"`
}

// PreFetcher 用于 job 下载功能
Expand Down
15 changes: 13 additions & 2 deletions apistructs/pipeline_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ import (
"time"
)

const (
// TerminusDefineTag add this tag env to container for collecting logs
TerminusDefineTag = "TERMINUS_DEFINE_TAG"
)

type PipelineTaskDTO struct {
ID uint64 `json:"id"`
PipelineID uint64 `json:"pipelineID"`
Expand All @@ -46,8 +51,14 @@ type PipelineTaskDTO struct {
}

type PipelineTaskExtra struct {
UUID string `json:"uuid"`
AllowFailure bool `json:"allowFailure"`
UUID string `json:"uuid"`
AllowFailure bool `json:"allowFailure"`
TaskContainers []TaskContainer `json:"taskContainers"`
}

type TaskContainer struct {
TaskName string `json:"taskName"`
ContainerID string `json:"containerID"`
}

type PipelineTaskResult struct {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ require (

replace (
github.com/google/gnostic => github.com/googleapis/gnostic v0.4.0
github.com/googlecloudplatform/flink-operator => github.com/johnlanni/flink-on-k8s-operator v0.0.0-20210712093304-4d24aba33511
github.com/googlecloudplatform/flink-operator => github.com/erda-project/flink-on-k8s-operator v0.0.0-20210828094530-28e003581cf2
github.com/influxdata/influxql => github.com/erda-project/influxql v1.1.0-ex
github.com/olivere/elastic v6.2.35+incompatible => github.com/erda-project/elastic v0.0.1-ex
github.com/rancher/remotedialer => github.com/erda-project/remotedialer v0.2.6-0.20210713103000-da03eb9e4b23
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,8 @@ github.com/erda-project/elastic v0.0.1-ex/go.mod h1:iAVsas6fcmt9pxtge1+dErMhecv+
github.com/erda-project/erda-infra v0.0.0-20210706133120-0a742437972c/go.mod h1:TUQYSZ60w9dk7m0q3U3AVg7U74APj/sdEVvRWR3wYv8=
github.com/erda-project/erda-infra v0.0.0-20210729162038-a2e798d921de h1:jI4eyZ0bMf0UmEeZY8yJBnetcSICpH8M6+FEEKadTjc=
github.com/erda-project/erda-infra v0.0.0-20210729162038-a2e798d921de/go.mod h1:L+fFQghY2po2P3H9pzwEOufDLAhL+mRRhPGdZ7vFnAw=
github.com/erda-project/flink-on-k8s-operator v0.0.0-20210828094530-28e003581cf2 h1:5QKXIq0yTPulGqXzEQx96fLK6HDCb8xC1H6kOjj4hsk=
github.com/erda-project/flink-on-k8s-operator v0.0.0-20210828094530-28e003581cf2/go.mod h1:QbDTwfKi8Nd8VR7cdOqRyWM2Ymul7m0hzSaoC3xSho8=
github.com/erda-project/erda-proto-go v1.2.0 h1:dVqEL7w8NkvU454BNeJ5gCfEKvNd9iAsfn/fTPD1bUI=
github.com/erda-project/erda-proto-go v1.2.0/go.mod h1:rSETXX3nKxxIhgrVn7fKDM3mla1nNlWcPz4AkepixaU=
github.com/erda-project/influxql v1.1.0-ex h1:NgP5+S5Qo234IVSIJ3N/egvzCNYJURfMAett3e8a9LE=
Expand Down Expand Up @@ -923,8 +925,6 @@ github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfC
github.com/jmoiron/sqlx v1.3.1 h1:aLN7YINNZ7cYOPK3QC83dbM6KT0NMqVMw961TqrejlE=
github.com/jmoiron/sqlx v1.3.1/go.mod h1:2BljVx/86SuTyjE+aPYlHCTNvZrnJXghYGpNiXLBMCQ=
github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901/go.mod h1:Z86h9688Y0wesXCyonoVr47MasHilkuLMqGhRZ4Hpak=
github.com/johnlanni/flink-on-k8s-operator v0.0.0-20210712093304-4d24aba33511 h1:hTsn9H037MvDBdBWbTlHw9nsoZPMilJTJdGTo8zfifo=
github.com/johnlanni/flink-on-k8s-operator v0.0.0-20210712093304-4d24aba33511/go.mod h1:QbDTwfKi8Nd8VR7cdOqRyWM2Ymul7m0hzSaoC3xSho8=
github.com/joho/godotenv v1.3.0 h1:Zjp+RcGpHhGlrMbJzXTrZZPrWj+1vfm90La1wgB6Bhc=
github.com/joho/godotenv v1.3.0/go.mod h1:7hK45KPybAkOC6peb+G5yklZfMxEjkZhHbwpqxOKXbg=
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
Expand Down
44 changes: 42 additions & 2 deletions modules/dop/endpoints/cicd.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package endpoints

import (
"context"
"fmt"
"net/http"
"strconv"

Expand Down Expand Up @@ -42,7 +43,22 @@ func (e *Endpoints) CICDTaskLog(ctx context.Context, r *http.Request, vars map[s
if err := queryStringDecoder.Decode(&logReq, r.URL.Query()); err != nil {
return apierrors.ErrGetCICDTaskLog.InvalidParameter(err).ToResp(), nil
}
logReq.ID = task.Extra.UUID
logID := task.Extra.UUID
if logReq.ID != "" {
var exist bool
for _, container := range task.Extra.TaskContainers {
if container.ContainerID == logReq.ID {
exist = true
}
}
if !exist {
return apierrors.ErrGetCICDTaskLog.InvalidParameter(
fmt.Errorf("container: %s don't exist", logReq.ID),
).ToResp(), nil
}
logID = logReq.ID
}
logReq.ID = logID
logReq.Source = apistructs.DashboardSpotLogSourceJob

log, err := e.bdl.GetLog(logReq)
Expand All @@ -59,14 +75,38 @@ func (e *Endpoints) ProxyCICDTaskLogDownload(ctx context.Context, r *http.Reques
return apierrors.ErrDownloadCICDTaskLog.InternalError(err)
}

var logReq apistructs.DashboardSpotLogRequest
if err := queryStringDecoder.Decode(&logReq, r.URL.Query()); err != nil {
return apierrors.ErrDownloadCICDTaskLog.InvalidParameter(err)
}

logID := task.Extra.UUID
if logReq.ID != "" {
var exist bool
for _, container := range task.Extra.TaskContainers {
if container.ContainerID == logReq.ID {
exist = true
}
}
if !exist {
return apierrors.ErrDownloadCICDTaskLog.InvalidParameter(
fmt.Errorf("container: %s don't exist", logReq.ID),
)
}
logID = logReq.ID
}

// proxy
r.URL.Scheme = "http"
r.Host = discover.Monitor()
r.URL.Host = discover.Monitor()
r.URL.Path = "/api/logs/actions/download"
q := r.URL.Query()
q.Add("source", string(apistructs.DashboardSpotLogSourceJob))
q.Add("id", task.Extra.UUID)
q.Add("id", logID)
q.Add("start", strconv.FormatInt(int64(logReq.Start), 10))
q.Add("end", strconv.FormatInt(int64(logReq.End), 10))
q.Add("stream", string(logReq.Stream))
r.URL.RawQuery = q.Encode()

return nil
Expand Down
4 changes: 4 additions & 0 deletions modules/pipeline/endpoints/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"context"
"net/http"

"github.com/sirupsen/logrus"

"github.com/erda-project/erda/modules/pipeline/services/apierrors"
"github.com/erda-project/erda/pkg/http/httpserver"
)
Expand All @@ -25,6 +27,8 @@ func (e *Endpoints) healthCheck(ctx context.Context, r *http.Request, vars map[s
httpserver.Responser, error) {
_, err := e.dbClient.Exec("select 1")
if err != nil {
logrus.Errorf("failed to health check, err: %v", err)

return apierrors.ErrPipelineHealthCheck.InternalError(err).ToResp(), nil
}
return httpserver.OkResp("success")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"

"github.com/erda-project/erda/modules/pipeline/pkg/containers"

"github.com/erda-project/erda/apistructs"
"github.com/erda-project/erda/modules/pipeline/pipengine/actionexecutor/plugins/scheduler/logic"
)
Expand Down Expand Up @@ -144,8 +146,11 @@ func ComposeFlinkCluster(data apistructs.BigdataConf, hostURL string) *flinkoper
Ingress: &flinkoperatorv1beta1.JobManagerIngressSpec{
HostFormat: getStringPoints(hostURL),
},
Replicas: getInt32Points(data.Spec.FlinkConf.JobManagerResource.Replica),
Resources: composeResources(data.Spec.FlinkConf.JobManagerResource),
Replicas: getInt32Points(data.Spec.FlinkConf.JobManagerResource.Replica),
Resources: composeResources(data.Spec.FlinkConf.JobManagerResource),
PodLabels: map[string]string{
apistructs.TerminusDefineTag: containers.MakeFlinkJobManagerID(data.Name),
},
Volumes: nil,
VolumeMounts: nil,
InitContainers: nil,
Expand All @@ -155,8 +160,11 @@ func ComposeFlinkCluster(data apistructs.BigdataConf, hostURL string) *flinkoper
PodAnnotations: nil,
},
TaskManager: flinkoperatorv1beta1.TaskManagerSpec{
Replicas: data.Spec.FlinkConf.TaskManagerResource.Replica,
Resources: composeResources(data.Spec.FlinkConf.TaskManagerResource),
Replicas: data.Spec.FlinkConf.TaskManagerResource.Replica,
Resources: composeResources(data.Spec.FlinkConf.TaskManagerResource),
PodLabels: map[string]string{
apistructs.TerminusDefineTag: containers.MakeFLinkTaskManagerID(data.Name),
},
Volumes: nil,
VolumeMounts: nil,
InitContainers: nil,
Expand Down Expand Up @@ -197,6 +205,9 @@ func composeFlinkJob(data apistructs.BigdataConf) *flinkoperatorv1beta1.JobSpec
CancelRequested: nil,
PodAnnotations: nil,
Resources: corev1.ResourceRequirements{},
PodLabels: map[string]string{
apistructs.TerminusDefineTag: containers.MakeFlinkJobID(data.Name),
},
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,14 @@ func (k *K8sJob) generateContainerEnvs(job *apistructs.JobFromUser, clusterInfo
},
)

// add container TerminusDefineTag env
if len(job.TaskContainers) > 0 {
env = append(env, corev1.EnvVar{
Name: apistructs.TerminusDefineTag,
Value: job.TaskContainers[0].ContainerID,
})
}

if len(clusterInfo) > 0 {
for k, v := range clusterInfo {
env = append(env, corev1.EnvVar{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,15 +147,16 @@ func TransferToSchedulerJob(task *spec.PipelineTask) (job apistructs.JobFromUser
}
return task.Extra.ClusterName
}(),
Image: task.Extra.Image,
Cmd: strings.Join(append([]string{task.Extra.Cmd}, task.Extra.CmdArgs...), " "),
CPU: task.Extra.RuntimeResource.CPU,
Memory: task.Extra.RuntimeResource.Memory,
Binds: task.Extra.Binds,
Volumes: MakeVolume(task),
PreFetcher: task.Extra.PreFetcher,
Env: task.Extra.PublicEnvs,
Labels: task.Extra.Labels,
Image: task.Extra.Image,
Cmd: strings.Join(append([]string{task.Extra.Cmd}, task.Extra.CmdArgs...), " "),
CPU: task.Extra.RuntimeResource.CPU,
Memory: task.Extra.RuntimeResource.Memory,
Binds: task.Extra.Binds,
Volumes: MakeVolume(task),
PreFetcher: task.Extra.PreFetcher,
Env: task.Extra.PublicEnvs,
Labels: task.Extra.Labels,
TaskContainers: task.Extra.TaskContainers,
// flink/spark
Resource: task.Extra.FlinkSparkConf.JarResource,
MainClass: task.Extra.FlinkSparkConf.MainClass,
Expand Down
14 changes: 11 additions & 3 deletions modules/pipeline/pipengine/reconciler/taskrun/taskop/prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (

"github.com/sirupsen/logrus"

"github.com/erda-project/erda/modules/pipeline/pkg/containers"

"github.com/erda-project/erda/apistructs"
"github.com/erda-project/erda/modules/actionagent"
"github.com/erda-project/erda/modules/pipeline/aop/aoptypes"
Expand Down Expand Up @@ -209,7 +211,6 @@ func (pre *prepare) makeTaskRun() (needRetry bool, err error) {
task.Extra.EncryptSecretKeys = p.Snapshot.EncryptSecretKeys

const (
TerminusDefineTag = "TERMINUS_DEFINE_TAG"
PipelineTaskLogID = "PIPELINE_TASK_LOG_ID"
PipelineDebugMode = "PIPELINE_DEBUG_MODE"
AgentEnvPrefix = "ACTIONAGENT_"
Expand Down Expand Up @@ -253,7 +254,6 @@ func (pre *prepare) makeTaskRun() (needRetry bool, err error) {
// If set to privateEnvs, cannot set to debug mode if agent invoke platform to fetch privateEnvs failed.
task.Extra.PublicEnvs[AgentEnvPrefix+k] = v
}
task.Extra.PublicEnvs[TerminusDefineTag] = task.Extra.UUID
task.Extra.PublicEnvs["PIPELINE_ID"] = strconv.FormatUint(p.ID, 10)
task.Extra.PublicEnvs["PIPELINE_TASK_ID"] = fmt.Sprintf("%v", task.ID)
task.Extra.PublicEnvs["PIPELINE_TASK_NAME"] = task.Name
Expand All @@ -279,7 +279,7 @@ func (pre *prepare) makeTaskRun() (needRetry bool, err error) {
if task.Extra.Labels == nil {
task.Extra.Labels = make(map[string]string)
}
task.Extra.Labels[TerminusDefineTag] = task.Extra.UUID
task.Extra.Labels[apistructs.TerminusDefineTag] = task.Extra.UUID

// --- image ---
// 所有 action,包括 custom-script,都需要在 ext market 注册;
Expand Down Expand Up @@ -317,6 +317,14 @@ func (pre *prepare) makeTaskRun() (needRetry bool, err error) {
}
}

// --- task containers ---
taskContainers, err := containers.GenContainers(task)
if err != nil {
return false, apierrors.ErrRunPipeline.InvalidState(
fmt.Sprintf("failed to make task containers err: %v", err))
}
task.Extra.TaskContainers = taskContainers

// --- resource ---
task.Extra.RuntimeResource = spec.RuntimeResource{
CPU: conf.TaskDefaultCPU(),
Expand Down
Loading