diff --git a/modules/pipeline/pipengine/actionexecutor/plugins/scheduler/executor/plugins/k8sflink/flink.go b/modules/pipeline/pipengine/actionexecutor/plugins/scheduler/executor/plugins/k8sflink/flink.go index 00e70d713fd..3572ae91130 100644 --- a/modules/pipeline/pipengine/actionexecutor/plugins/scheduler/executor/plugins/k8sflink/flink.go +++ b/modules/pipeline/pipengine/actionexecutor/plugins/scheduler/executor/plugins/k8sflink/flink.go @@ -35,8 +35,9 @@ import ( ) const ( - DiceRootDomain = "DICE_ROOT_DOMAIN" - DiceClusterInfo = "dice-cluster-info" + DiceRootDomain = "DICE_ROOT_DOMAIN" + DiceClusterInfo = "dice-cluster-info" + K8SFlinkLogPrefix = "[k8sflink]" ) var Kind = types.Kind("k8sflink") @@ -182,12 +183,56 @@ func (k *K8sFlink) Remove(ctx context.Context, task *spec.PipelineTask) (interfa if k8serrors.IsNotFound(err) { return nil, nil } + return nil, fmt.Errorf("%s failed to get flink cluster: %s, err: %v", K8SFlinkLogPrefix, task.Name, err) } err = k.client.CRClient.Delete(ctx, flinkCluster) if err != nil { return nil, fmt.Errorf("delete flink cluster %s err: %s", bigDataConf.Name, err.Error()) } + + // delete namespace after gc flinkcluster + namespace := task.Extra.Namespace + if !task.Extra.NotPipelineControlledNs { + flinkClusters := flinkoperatorv1beta1.FlinkClusterList{} + err = k.client.CRClient.List(context.Background(), &flinkClusters, &client.ListOptions{ + Namespace: namespace, + }) + if err != nil { + return nil, fmt.Errorf("%s list k8sflink clusters error: %+v, namespace: %s", K8SFlinkLogPrefix, err, namespace) + } + remainCount := 0 + if len(flinkClusters.Items) != 0 { + for _, f := range flinkClusters.Items { + if f.DeletionTimestamp == nil { + remainCount++ + } + } + } + + if remainCount < 1 { + ns, err := k.client.ClientSet.CoreV1().Namespaces().Get(ctx, namespace, metav1.GetOptions{}) + if err != nil { + if k8serrors.IsNotFound(err) { + return nil, nil + } + return nil, fmt.Errorf("%s get the namespace %s, error: %+v", K8SFlinkLogPrefix, namespace, err) + } + + if ns.DeletionTimestamp == nil { + logrus.Debugf("%s start to delete the namespace %s", K8SFlinkLogPrefix, namespace) + err = k.client.ClientSet.CoreV1().Namespaces().Delete(ctx, namespace, metav1.DeleteOptions{}) + if err != nil { + if !k8serrors.IsNotFound(err) { + errMsg := fmt.Errorf("%s delete the namespace %s, error: %+v", K8SFlinkLogPrefix, namespace, err) + return nil, errMsg + } + logrus.Warningf("%s not found the namespace %s", K8SFlinkLogPrefix, namespace) + } + logrus.Debugf("%s clean namespace %s successfully", K8SFlinkLogPrefix, namespace) + } + } + } return nil, nil } @@ -213,7 +258,6 @@ func (k *K8sFlink) GetFlinkClusterInfo(ctx context.Context, data apistructs.Bigd logrus.Debugf("get flinkCluster name %s in ns %s", data.Name, data.Namespace) flinkCluster := flinkoperatorv1beta1.FlinkCluster{} - err := k.client.CRClient.Get(context.Background(), client.ObjectKey{ Name: data.Name, Namespace: data.Namespace, diff --git a/modules/pipeline/pipengine/actionexecutor/plugins/scheduler/executor/plugins/k8sspark/k8sspark.go b/modules/pipeline/pipengine/actionexecutor/plugins/scheduler/executor/plugins/k8sspark/k8sspark.go index fa8452506a0..46197a3c7e8 100644 --- a/modules/pipeline/pipengine/actionexecutor/plugins/scheduler/executor/plugins/k8sspark/k8sspark.go +++ b/modules/pipeline/pipengine/actionexecutor/plugins/scheduler/executor/plugins/k8sspark/k8sspark.go @@ -28,11 +28,10 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/client" - "github.com/erda-project/erda/modules/pipeline/pkg/containers" - "github.com/erda-project/erda/apistructs" "github.com/erda-project/erda/modules/pipeline/pipengine/actionexecutor/plugins/scheduler/executor/types" "github.com/erda-project/erda/modules/pipeline/pipengine/actionexecutor/plugins/scheduler/logic" + "github.com/erda-project/erda/modules/pipeline/pkg/containers" "github.com/erda-project/erda/modules/pipeline/spec" "github.com/erda-project/erda/pkg/strutil" ) @@ -40,6 +39,7 @@ import ( const ( sparkDriverType = "driver" sparkExecutorType = "executor" + K8SSparkLogPrefix = "[k8sspark]" ) func init() { @@ -192,6 +192,45 @@ func (k *K8sSpark) Remove(ctx context.Context, task *spec.PipelineTask) (interfa } return nil, errors.Errorf("failed to remove spark application, namespace: %s, name: %s, err: %v", task.Extra.Namespace, task.Extra.UUID, err) } + + sparkApps := sparkv1beta2.SparkApplicationList{} + namespace := task.Extra.Namespace + if !task.Extra.NotPipelineControlledNs { + err = k.client.CRClient.List(ctx, &sparkApps, &client.ListOptions{Namespace: namespace}) + if err != nil { + return nil, fmt.Errorf("%s list k8sspark apps error: %+v, namespace: %s", K8SSparkLogPrefix, err, namespace) + } + remainCount := 0 + if len(sparkApps.Items) != 0 { + for _, app := range sparkApps.Items { + if app.DeletionTimestamp == nil { + remainCount++ + } + } + } + if remainCount < 1 { + ns, err := k.client.ClientSet.CoreV1().Namespaces().Get(ctx, namespace, metav1.GetOptions{}) + if err != nil { + if k8serrors.IsNotFound(err) { + return nil, nil + } + return nil, fmt.Errorf("%s get the namespace: %s, error: %+v", K8SSparkLogPrefix, namespace, err) + } + + if ns.DeletionTimestamp == nil { + logrus.Debugf(" %s start to delete the namespace %s", K8SSparkLogPrefix, namespace) + err = k.client.ClientSet.CoreV1().Namespaces().Delete(ctx, namespace, metav1.DeleteOptions{}) + if err != nil { + if !k8serrors.IsNotFound(err) { + errMsg := fmt.Errorf("%s delete the namespace: %s, error: %+v", K8SSparkLogPrefix, namespace, err) + return nil, errMsg + } + logrus.Warningf("%s not found the namespace %s", K8SSparkLogPrefix, namespace) + } + logrus.Debugf("%s clean namespace %s successfully", K8SSparkLogPrefix, namespace) + } + } + } return nil, nil } @@ -522,7 +561,6 @@ func (k *K8sSpark) appendEnvs(podSpec *sparkv1beta2.SparkPodSpec, resource *apis Value: v, }) } - podSpec.EnvVars = envMap clusterInfo, err := logic.GetCLusterInfo(k.clusterName) if err != nil { @@ -537,4 +575,9 @@ func (k *K8sSpark) appendEnvs(podSpec *sparkv1beta2.SparkPodSpec, resource *apis }) } } + + podSpec.EnvVars = map[string]string{} + for _, v := range podSpec.Env { + podSpec.EnvVars[v.Name] = v.Value + } }