Skip to content

Commit

Permalink
gc namespace for k8sspark k8sflink (#3392)
Browse files Browse the repository at this point in the history
  • Loading branch information
chengjoey authored Dec 17, 2021
1 parent e3df8f5 commit b7ac3d8
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ import (
)

const (
DiceRootDomain = "DICE_ROOT_DOMAIN"
DiceClusterInfo = "dice-cluster-info"
DiceRootDomain = "DICE_ROOT_DOMAIN"
DiceClusterInfo = "dice-cluster-info"
K8SFlinkLogPrefix = "[k8sflink]"
)

var Kind = types.Kind("k8sflink")
Expand Down Expand Up @@ -182,12 +183,56 @@ func (k *K8sFlink) Remove(ctx context.Context, task *spec.PipelineTask) (interfa
if k8serrors.IsNotFound(err) {
return nil, nil
}
return nil, fmt.Errorf("%s failed to get flink cluster: %s, err: %v", K8SFlinkLogPrefix, task.Name, err)
}

err = k.client.CRClient.Delete(ctx, flinkCluster)
if err != nil {
return nil, fmt.Errorf("delete flink cluster %s err: %s", bigDataConf.Name, err.Error())
}

// delete namespace after gc flinkcluster
namespace := task.Extra.Namespace
if !task.Extra.NotPipelineControlledNs {
flinkClusters := flinkoperatorv1beta1.FlinkClusterList{}
err = k.client.CRClient.List(context.Background(), &flinkClusters, &client.ListOptions{
Namespace: namespace,
})
if err != nil {
return nil, fmt.Errorf("%s list k8sflink clusters error: %+v, namespace: %s", K8SFlinkLogPrefix, err, namespace)
}
remainCount := 0
if len(flinkClusters.Items) != 0 {
for _, f := range flinkClusters.Items {
if f.DeletionTimestamp == nil {
remainCount++
}
}
}

if remainCount < 1 {
ns, err := k.client.ClientSet.CoreV1().Namespaces().Get(ctx, namespace, metav1.GetOptions{})
if err != nil {
if k8serrors.IsNotFound(err) {
return nil, nil
}
return nil, fmt.Errorf("%s get the namespace %s, error: %+v", K8SFlinkLogPrefix, namespace, err)
}

if ns.DeletionTimestamp == nil {
logrus.Debugf("%s start to delete the namespace %s", K8SFlinkLogPrefix, namespace)
err = k.client.ClientSet.CoreV1().Namespaces().Delete(ctx, namespace, metav1.DeleteOptions{})
if err != nil {
if !k8serrors.IsNotFound(err) {
errMsg := fmt.Errorf("%s delete the namespace %s, error: %+v", K8SFlinkLogPrefix, namespace, err)
return nil, errMsg
}
logrus.Warningf("%s not found the namespace %s", K8SFlinkLogPrefix, namespace)
}
logrus.Debugf("%s clean namespace %s successfully", K8SFlinkLogPrefix, namespace)
}
}
}
return nil, nil
}

Expand All @@ -213,7 +258,6 @@ func (k *K8sFlink) GetFlinkClusterInfo(ctx context.Context, data apistructs.Bigd
logrus.Debugf("get flinkCluster name %s in ns %s", data.Name, data.Namespace)

flinkCluster := flinkoperatorv1beta1.FlinkCluster{}

err := k.client.CRClient.Get(context.Background(), client.ObjectKey{
Name: data.Name,
Namespace: data.Namespace,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,18 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/erda-project/erda/modules/pipeline/pkg/containers"

"github.com/erda-project/erda/apistructs"
"github.com/erda-project/erda/modules/pipeline/pipengine/actionexecutor/plugins/scheduler/executor/types"
"github.com/erda-project/erda/modules/pipeline/pipengine/actionexecutor/plugins/scheduler/logic"
"github.com/erda-project/erda/modules/pipeline/pkg/containers"
"github.com/erda-project/erda/modules/pipeline/spec"
"github.com/erda-project/erda/pkg/strutil"
)

const (
sparkDriverType = "driver"
sparkExecutorType = "executor"
K8SSparkLogPrefix = "[k8sspark]"
)

func init() {
Expand Down Expand Up @@ -192,6 +192,45 @@ func (k *K8sSpark) Remove(ctx context.Context, task *spec.PipelineTask) (interfa
}
return nil, errors.Errorf("failed to remove spark application, namespace: %s, name: %s, err: %v", task.Extra.Namespace, task.Extra.UUID, err)
}

sparkApps := sparkv1beta2.SparkApplicationList{}
namespace := task.Extra.Namespace
if !task.Extra.NotPipelineControlledNs {
err = k.client.CRClient.List(ctx, &sparkApps, &client.ListOptions{Namespace: namespace})
if err != nil {
return nil, fmt.Errorf("%s list k8sspark apps error: %+v, namespace: %s", K8SSparkLogPrefix, err, namespace)
}
remainCount := 0
if len(sparkApps.Items) != 0 {
for _, app := range sparkApps.Items {
if app.DeletionTimestamp == nil {
remainCount++
}
}
}
if remainCount < 1 {
ns, err := k.client.ClientSet.CoreV1().Namespaces().Get(ctx, namespace, metav1.GetOptions{})
if err != nil {
if k8serrors.IsNotFound(err) {
return nil, nil
}
return nil, fmt.Errorf("%s get the namespace: %s, error: %+v", K8SSparkLogPrefix, namespace, err)
}

if ns.DeletionTimestamp == nil {
logrus.Debugf(" %s start to delete the namespace %s", K8SSparkLogPrefix, namespace)
err = k.client.ClientSet.CoreV1().Namespaces().Delete(ctx, namespace, metav1.DeleteOptions{})
if err != nil {
if !k8serrors.IsNotFound(err) {
errMsg := fmt.Errorf("%s delete the namespace: %s, error: %+v", K8SSparkLogPrefix, namespace, err)
return nil, errMsg
}
logrus.Warningf("%s not found the namespace %s", K8SSparkLogPrefix, namespace)
}
logrus.Debugf("%s clean namespace %s successfully", K8SSparkLogPrefix, namespace)
}
}
}
return nil, nil
}

Expand Down Expand Up @@ -522,7 +561,6 @@ func (k *K8sSpark) appendEnvs(podSpec *sparkv1beta2.SparkPodSpec, resource *apis
Value: v,
})
}
podSpec.EnvVars = envMap

clusterInfo, err := logic.GetCLusterInfo(k.clusterName)
if err != nil {
Expand All @@ -537,4 +575,9 @@ func (k *K8sSpark) appendEnvs(podSpec *sparkv1beta2.SparkPodSpec, resource *apis
})
}
}

podSpec.EnvVars = map[string]string{}
for _, v := range podSpec.Env {
podSpec.EnvVars[v.Name] = v.Value
}
}

0 comments on commit b7ac3d8

Please sign in to comment.