Skip to content

Commit

Permalink
[Addon #603] Add Apache Spark as a experimental addon (#629)
Browse files Browse the repository at this point in the history
* [Addon #603] Add Apache Spark as a experimental addon

Signed-off-by: yanghua <yanghua1127@gmail.com>

* [Addon #579] Reformat cue file via cue fmt command

Signed-off-by: yanghua <yanghua1127@gmail.com>

* [Addon #579] Address the review suggestion

Signed-off-by: yanghua <yanghua1127@gmail.com>

* [Addon #579] Address the review suggestion

Signed-off-by: yanghua <yanghua1127@gmail.com>

* [Addon #579] Address the review suggestion

Signed-off-by: yanghua <yanghua1127@gmail.com>

* [Addon #579] Address the review suggestion

Signed-off-by: yanghua <yanghua1127@gmail.com>

* [Addon #579] Add more details about mount volume

Signed-off-by: yanghua <yanghua1127@gmail.com>

* [Addon #579] Refactor the spark-workload parameter definition

Signed-off-by: yanghua <yanghua1127@gmail.com>

* [Addon #579] Refactor the spark-workload parameter definition and add spark-py example

Signed-off-by: yanghua <yanghua1127@gmail.com>

* [Addon #603] Refactor the spark-workload parameter definition (add proxyUser option)

Signed-off-by: yanghua <yanghua1127@gmail.com>

---------

Signed-off-by: yanghua <yanghua1127@gmail.com>
  • Loading branch information
yanghua authored Mar 7, 2023
1 parent a40d216 commit 670b685
Show file tree
Hide file tree
Showing 7 changed files with 682 additions and 0 deletions.
49 changes: 49 additions & 0 deletions examples/spark-kubernetes-operator/sparkapp-py.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
apiVersion: core.oam.dev/v1beta1
kind: Application
metadata:
name: spark-app-v1
namespace: spark-cluster
spec:
components:
- name: spark-workload-component
type: spark-workload
properties:
name: my-spark-py-app
namespace: spark-cluster
type: Python
pythonVersion: "3"
mode: cluster
image: "gcr.io/spark-operator/spark-py:v3.1.1"
imagePullPolicy: Always
mainClass: org.apache.spark.examples.streaming.JavaQueueStream
mainApplicationFile: "local:///opt/spark/examples/src/main/python/pi.py"
sparkVersion: "3.1.1"
restartPolicy:
type: OnFailure
onFailureRetries: 3
onFailureRetryInterval: 10
onSubmissionFailureRetries: 5
onSubmissionFailureRetryInterval: 20
volumes:
- name: "test-volume"
hostPath:
path: "/tmp"
type: Directory
driver:
cores: 1
coreLimit: "1200m"
memory: "1024m"
labels:
version: 3.1.1
volumeMounts:
- name: "test-volume"
mountPath: "/tmp"
executor:
cores: 1
instances: 1
memory: "1024m"
labels:
version: 3.1.1
volumeMounts:
- name: "test-volume"
mountPath: "/tmp"
44 changes: 44 additions & 0 deletions examples/spark-kubernetes-operator/sparkapp.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
apiVersion: core.oam.dev/v1beta1
kind: Application
metadata:
name: spark-app-v1
namespace: spark-cluster
spec:
components:
- name: spark-workload-component
type: spark-workload
properties:
name: my-spark-app
namespace: spark-cluster
type: Scala
mode: cluster
image: "gcr.io/spark-operator/spark:v3.1.1"
imagePullPolicy: Always
mainClass: org.apache.spark.examples.streaming.JavaQueueStream
mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.12-3.1.1.jar"
sparkVersion: "3.1.1"
restartPolicy:
type: Never
volumes:
- name: "test-volume"
hostPath:
path: "/tmp"
type: Directory
driver:
cores: 1
coreLimit: "1200m"
memory: "1024m"
labels:
version: 3.1.1
volumeMounts:
- name: "test-volume"
mountPath: "/tmp"
executor:
cores: 1
instances: 1
memory: "1024m"
labels:
version: 3.1.1
volumeMounts:
- name: "test-volume"
mountPath: "/tmp"
295 changes: 295 additions & 0 deletions experimental/addons/spark-kubernetes-operator/README.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
"spark-workload": {
annotations: {}
attributes: workload: type: "autodetects.core.oam.dev"
description: "Describes a containerized spark application that can specify resource spec."
labels: {}
type: "component"
}

template: {
parameter: {
// +usage=Specify the spark application name
name: string
// +usage=Specify the namespace for spark application to install
namespace: string
// +usage=Specify the application language type, e.g. "Scala", "Python", "Java" or "R"
type: string
// +usage=Specify the python version
pythonVersion?: string
// +usage=Specify the deploy mode, e.go "cluster", "client" or "in-cluster-client"
mode: string
// +usage=Specify the container image for the driver, executor, and init-container
image: string
// +usage=Specify the image pull policy for the driver, executor, and init-container
imagePullPolicy: string
// +usage=Specify the fully-qualified main class of the Spark application
mainClass: string
// +usage=Specify the path to a bundled JAR, Python, or R file of the application
mainApplicationFile: string
// +usage=Specify the version of Spark the application uses
sparkVersion: string
// +usage=Specify the user to impersonate when submitting the application. It maps to the command-line flag “–proxy-user” in spark-submit
proxyUser?: string
// +usage=Specify the policy on if and in which conditions the controller should restart an application
restartPolicy?: {
// +usage=Type value option: "Always", "Never", "OnFailure"
type: string
// +usage=Specify the number of times to retry submitting an application before giving up. This is best effort and actual retry attempts can be >= the value specified due to caching. These are required if RestartPolicy is OnFailure
onSubmissionFailureRetries?: int
// +usage=Specify the number of times to retry running an application before giving up
onFailureRetries?: int
// +usage=Specify the interval in seconds between retries on failed submissions
onSubmissionFailureRetryInterval?: int
// +usage=Specify the interval in seconds between retries on failed runs
onFailureRetryInterval?: int
}
// +usage=Specify the driver sepc request for the driver pod
driver: {
// +usage=Specify the cores maps to spark.driver.cores or spark.executor.cores for the driver and executors, respectively
cores?: int
// +usage=Specify a hard limit on CPU cores for the pod
coreLimit?: string
// +usage=Specify the amount of memory to request for the pod
memory?: string
// +usage=Specify the Kubernetes labels to be added to the pod
labels?: [string]: string
// +usage=Specify the volumes listed in “.spec.volumes” to mount into the main container’s filesystem
volumeMounts?: [...{
name: string
mountPath: string
}]
}
// +usage=Specify the executor spec request for the executor pod
executor: {
// +usage=Specify the cores maps to spark.driver.cores or spark.executor.cores for the driver and executors, respectively
cores?: int
// +usage=Specify a hard limit on CPU cores for the pod
coreLimit?: string
// +usage=Specify the amount of memory to request for the pod
memory?: string
// +usage=Specify the number of executor instances
instances?: int
// +usage=Specify the Kubernetes labels to be added to the pod
labels?: [string]: string
// +usage=Specify the volumes listed in “.spec.volumes” to mount into the main container’s filesystem
volumeMounts?: [...{
name: string
mountPath: string
}]
}
// +usage=Specify a list of arguments to be passed to the application
arguments?: [...string]
// +usage=Specify the config information carries user-specified Spark configuration properties as they would use the "--conf" option in spark-submit
sparkConf?: [string]: string
// +usage=Specify the config information carries user-specified Hadoop configuration properties as they would use the the "--conf" option in spark-submit. The SparkApplication controller automatically adds prefix "spark.hadoop." to Hadoop configuration properties
hadoopConf?: [string]: string
// +usage=Specify the name of the ConfigMap containing Spark configuration files such as log4j.properties. The controller will add environment variable SPARK_CONF_DIR to the path where the ConfigMap is mounted to
sparkConfigMap?: string
// +usage=Specify the name of the ConfigMap containing Hadoop configuration files such as core-site.xml. The controller will add environment variable HADOOP_CONF_DIR to the path where the ConfigMap is mounted to
hadoopConfigMap?: string
// +usage=Specify the list of Kubernetes volumes that can be mounted by the driver and/or executors
volumes?: [...{
name: string
hostPath: {
path: string
type: *"Directory" | string
}
}]
// +usage=Specify the dependencies captures all possible types of dependencies of a Spark application
deps?: {
// +usage=Specify a list of JAR files the Spark application depends on
jars?: [...string]
// +usage=Specify a list of files the Spark application depends on
files?: [...string]
// +usage=Specify a list of Python files the Spark application depends on
pyFiles?: [...string]
// +usage=Specify a list of maven coordinates of jars to include on the driver and executor classpaths. This will search the local maven repo, then maven central and any additional remote repositories given by the “repositories” option. Each package should be of the form “groupId:artifactId:version”
packages?: [...string]
// +usage=Specify a list of “groupId:artifactId”, to exclude while resolving the dependencies provided in Packages to avoid dependency conflicts
excludePackages?: [...string]
// +usage=Specify a list of additional remote repositories to search for the maven coordinate given with the “packages” option
repositories?: [...string]
}
}

output: {
kind: "ClusterRoleBinding"
apiVersion: "rbac.authorization.k8s.io/v1"
metadata: name: parameter.name
roleRef: {
name: "edit"
apiGroup: "rbac.authorization.k8s.io"
kind: "ClusterRole"
}
subjects: [{
name: "default"
kind: "ServiceAccount"
namespace: parameter.namespace
}]
}

outputs: {

"spark": {
kind: "SparkApplication"
apiVersion: "sparkoperator.k8s.io/v1beta2"
metadata: {
name: parameter.name
namespace: parameter.namespace
}
spec: {
type: parameter.type
mode: parameter.mode
image: parameter.image
imagePullPolicy: parameter.imagePullPolicy
mainClass: parameter.mainClass
mainApplicationFile: parameter.mainApplicationFile
sparkVersion: parameter.sparkVersion
driver: parameter.driver
executor: parameter.executor
if parameter.volumes != _|_ {
volumes: parameter.volumes
}

}
}
}
}
15 changes: 15 additions & 0 deletions experimental/addons/spark-kubernetes-operator/metadata.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
description: A kubernetes operator for Apache Spark
icon: "https://spark.apache.org/images/spark-logo.png"
invisible: false
name: spark-kubernetes-operator
tags:
- GoogleCloudPlatform/spark-on-k8s-operator
version: v1beta2-1.3.8-3.1.1
url: https://github.com/GoogleCloudPlatform/spark-on-k8s-operator

dependencies:
- name: fluxcd

system:
vela: ">=1.5.0-beta.3"
kubernetes: ">=1.16"
20 changes: 20 additions & 0 deletions experimental/addons/spark-kubernetes-operator/parameter.cue
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// parameter.cue is used to store addon parameters.
//
// You can use these parameters in template.cue or in resources/ by 'parameter.myparam'
//
// For example, you can use parameters to allow the user to customize
// container images, ports, and etc.
parameter: {
// +usage=Deploy to specified clusters. Leave empty to deploy to all clusters.
clusters?: [...string]
// +usage=Namespace to deploy to
namespace: *"spark-operator" | string
// +usage=Specify if create the webhook or not
"createWebhook": *false | bool
// +usage=Specify the image repository
"imageRepository": *"ghcr.io/googlecloudplatform/spark-operator" | string
// +usage=Specify the image tag
"imageTag": *"v1beta2-1.3.8-3.1.1" | string
// +usage=Specify if create the sa for job or not
"createSparkServiceAccount": *false | bool
}
102 changes: 102 additions & 0 deletions experimental/addons/spark-kubernetes-operator/template.cue
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package main

output: {
apiVersion: "core.oam.dev/v1beta1"
kind: "Application"
spec: {
components: [
{
type: "k8s-objects"
name: "spark-operator-ns"
properties: objects: [{
apiVersion: "v1"
kind: "Namespace"
metadata: name: parameter.namespace
}]
},
{
type: "k8s-objects"
name: "spark-cluster-ns"
properties: objects: [{
apiVersion: "v1"
kind: "Namespace"
metadata: name: "spark-cluster"
},
{
apiVersion: "v1"
kind: "ServiceAccount"
metadata: {
name: "spark"
namespace: "spark-cluster"
}
}]
},
{
name: "spark-operator-helm"
type: "helm"
dependsOn: ["spark-operator-ns"]
type: "helm"
properties: {
repoType: "helm"
url: "https://googlecloudplatform.github.io/spark-on-k8s-operator/"
chart: "spark-operator"
targetNamespace: parameter["namespace"]
version: "1.1.26"
values: {
image: {
repository: parameter["imageRepository"]
tag: parameter["imageTag"]
}

serviceAccounts: {
spark: {
create: parameter["createSparkServiceAccount"]
}
}

serviceAccounts: {
sparkoperator: {
name: "spark-kubernetes-operator"
}
}

webhook: {
enable: parameter["createWebhook"]
}
}
}
},
]

policies: [
{
name: "gc-dependency"
type: "garbage-collect"
properties: {
order: "dependency"
}
},
{
type: "shared-resource"
name: "shared-resource-via-namespace"
properties: rules: [{
selector: resourceTypes: ["Namespace"]
}]
},
{
type: "topology"
name: "deploy-operator"
properties: {
namespace: parameter.namespace
if parameter.clusters != _|_ {
clusters: parameter.clusters
}

if parameter.clusters == _|_ {
clusterLabelSelector: {}
}
}
},
]
}
}

0 comments on commit 670b685

Please sign in to comment.