Skip to content

Commit

Permalink
[DPE-2164] Expose Spark SQL in Spark Client snap (#91)
Browse files Browse the repository at this point in the history
  • Loading branch information
theoctober19th authored Feb 21, 2024
1 parent d81049c commit 7773432
Show file tree
Hide file tree
Showing 6 changed files with 163 additions and 22 deletions.
7 changes: 6 additions & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ jobs:
- name: Checkout repo
uses: actions/checkout@v4

- name: Setup MicroK8s
run: |
make microk8s
- name: Run Integration Tests
run: |
make integration-tests
sg microk8s -c "make integration-tests"
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,9 @@
.make_cache
__pycache__
.coverage
*.snap
*.tar
env/
.vscode/
derby.log
metastore_db/
12 changes: 11 additions & 1 deletion snap/snapcraft.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,16 @@ apps:
- network-bind
- home
- dot-kube-config
spark-sql:
command: lib/python3.10/site-packages/spark8t/cli/spark_sql.py $SPARK8T_EXTRA_CONF
environment:
PYTHONPATH: $SNAP/python:$SNAP/lib/python3.10/site-packages:$PYTHONPATH
SPARK8T_EXTRA_CONF: --conf spark.driver.extraJavaOptions="-Duser.home=$SNAP_USER_DATA" --conf spark.jars.ivy=/tmp
plugs:
- network
- network-bind
- home
- dot-kube-config

parts:

Expand All @@ -74,7 +84,7 @@ parts:
spark8t:
plugin: python
python-packages:
- https://github.com/canonical/spark-k8s-toolkit-py/releases/download/v0.0.4/spark8t-0.0.4-py3-none-any.whl
- https://github.com/canonical/spark-k8s-toolkit-py/releases/download/v0.0.5/spark8t-0.0.5-py3-none-any.whl
source: .
build-packages:
- python3
Expand Down
34 changes: 33 additions & 1 deletion tests/integration/config-aws-cli.sh
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,43 @@ sudo snap install aws-cli --classic
ACCESS_KEY=$(kubectl get secret -n minio-operator microk8s-user-1 -o jsonpath='{.data.CONSOLE_ACCESS_KEY}' | base64 -d)
SECRET_KEY=$(kubectl get secret -n minio-operator microk8s-user-1 -o jsonpath='{.data.CONSOLE_SECRET_KEY}' | base64 -d)

S3_BUCKET="test"
get_s3_endpoint(){
# Get S3 endpoint from MinIO
kubectl get service minio -n minio-operator -o jsonpath='{.spec.clusterIP}'
}

wait_and_retry(){
# Retry a command for a number of times by waiting a few seconds.

command="$@"
retries=0
max_retries=50
until [ "$retries" -ge $max_retries ]
do
$command &> /dev/null && break
retries=$((retries+1))
echo "Trying to execute command ${command}..."
sleep 5
done

# If the command was not successful even on maximum retries
if [ "$retries" -ge $max_retries ]; then
echo "Maximum number of retries ($max_retries) reached. ${command} returned with non zero status."
exit 1
fi
}

# Wait for `minio` service to be ready and S3 endpoint to be available
wait_and_retry get_s3_endpoint
S3_ENDPOINT=$(get_s3_endpoint)

DEFAULT_REGION="us-east-2"

# Configure AWS CLI credentials
aws configure set aws_access_key_id $ACCESS_KEY
aws configure set aws_secret_access_key $SECRET_KEY
aws configure set default.region $DEFAULT_REGION
aws configure set endpoint_url "http://$S3_ENDPOINT"

wait_and_retry aws s3 ls
echo "AWS CLI credentials set successfully"
120 changes: 101 additions & 19 deletions tests/integration/ie-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,16 @@ get_s3_endpoint(){

create_s3_bucket(){
# Creates a S3 bucket with the given name.
S3_ENDPOINT=$(get_s3_endpoint)
BUCKET_NAME=$1
aws --endpoint-url "http://$S3_ENDPOINT" s3api create-bucket --bucket "$BUCKET_NAME"
aws s3 mb "s3://$BUCKET_NAME"
echo "Created S3 bucket ${BUCKET_NAME}"
}

delete_s3_bucket(){
# Deletes a S3 bucket with the given name.
S3_ENDPOINT=$(get_s3_endpoint)
BUCKET_NAME=$1
aws --endpoint-url "http://$S3_ENDPOINT" s3 rb "s3://$BUCKET_NAME" --force
aws s3 rb "s3://$BUCKET_NAME" --force
echo "Deleted S3 bucket ${BUCKET_NAME}"
}

Expand All @@ -71,18 +70,6 @@ copy_file_to_s3_bucket(){
echo "Copied file ${FILE_PATH} to S3 bucket ${BUCKET_NAME}"
}

list_s3_bucket(){
# List files in a bucket.
# The bucket name and the path to file that is to be uploaded is to be provided as arguments
BUCKET_NAME=$1

S3_ENDPOINT=$(get_s3_endpoint)

# List files in the S3 bucket
aws --endpoint-url "http://$S3_ENDPOINT" s3 ls s3://"$BUCKET_NAME/"
echo "Listed files for bucket: ${BUCKET_NAME}"
}

run_example_job() {

KUBE_CONFIG=/home/${USER}/.kube/config
Expand Down Expand Up @@ -204,8 +191,6 @@ run_pyspark_s3() {
# Copy 'example.txt' script to 'test' bucket
copy_file_to_s3_bucket test ./tests/integration/resources/example.txt

list_s3_bucket test

echo -e "$(cat ./tests/integration/resources/test-pyspark-s3.py | spark-client.pyspark \
--username=${USERNAME} \
--conf spark.kubernetes.container.image=$SPARK_IMAGE \
Expand All @@ -221,9 +206,47 @@ run_pyspark_s3() {
l=$(cat pyspark.out | grep "Number of lines" | rev | cut -d' ' -f1 | rev | cut -c 1-3)
echo -e "Number of lines: \n ${l}"
rm pyspark.out
delete_s3_bucket test
validate_file_length $l
}

run_spark_sql() {
echo "run_spark_sql ${1} ${2}"

NAMESPACE=$1
USERNAME=$2

ACCESS_KEY="$(get_s3_access_key)"
SECRET_KEY="$(get_s3_secret_key)"
S3_ENDPOINT="$(get_s3_endpoint)"

# First create S3 bucket named 'test'
create_s3_bucket test

echo -e "$(cat ./tests/integration/resources/test-spark-sql.sql | spark-client.spark-sql \
--username=${USERNAME} --namespace ${NAMESPACE} \
--conf spark.kubernetes.container.image=$SPARK_IMAGE \
--conf spark.hadoop.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider \
--conf spark.hadoop.fs.s3a.connection.ssl.enabled=false \
--conf spark.hadoop.fs.s3a.path.style.access=true \
--conf spark.hadoop.fs.s3a.endpoint=$S3_ENDPOINT \
--conf spark.hadoop.fs.s3a.access.key=$ACCESS_KEY \
--conf spark.hadoop.fs.s3a.secret.key=$SECRET_KEY \
--conf spark.sql.catalog.local.warehouse=s3a://spark/warehouse \
--conf spark.sql.warehouse.dir=s3a://test/warehouse \
--conf hive.metastore.warehouse.dir=s3a://test/hwarehouse \
--conf spark.executor.instances=2)" > spark_sql.out
cat spark_sql.out
l=$(cat spark_sql.out | grep "^Inserted Rows:" | rev | cut -d' ' -f1 | rev)
echo -e "Number of rows inserted: ${l}"
rm spark_sql.out
delete_s3_bucket test
if [ "$l" != "3" ]; then
echo "ERROR: Number of rows inserted: $l, Expected: 3. Aborting with exit code 1."
exit 1
fi
}

test_pyspark() {
run_pyspark tests spark
}
Expand All @@ -232,6 +255,10 @@ test_pyspark_s3() {
run_pyspark_s3 tests spark
}

test_spark_sql() {
run_spark_sql tests spark
}

test_restricted_account() {

kubectl config set-context spark-context --namespace=tests --cluster=prod --user=spark
Expand Down Expand Up @@ -418,13 +445,62 @@ run_pyspark_in_pod() {
validate_pi_value $pi
}

run_spark_sql_in_pod() {
echo "run_spark_sql_in_pod ${1} ${2}"

NAMESPACE=$1
USERNAME=$2

create_s3_bucket test

SPARK_SQL_COMMANDS=$(cat ./tests/integration/resources/test-spark-sql.sql)

echo -e "$(kubectl exec testpod -- \
env \
UU="$USERNAME" \
NN="$NAMESPACE" \
CMDS="$SPARK_SQL_COMMANDS" \
IM="$SPARK_IMAGE" \
ACCESS_KEY="$(get_s3_access_key)" \
SECRET_KEY="$(get_s3_secret_key)" \
S3_ENDPOINT="$(get_s3_endpoint)" \
/bin/bash -c 'echo "$CMDS" | spark-client.spark-sql \
--username $UU \
--namespace $NN \
--conf spark.kubernetes.container.image=$IM \
--conf spark.hadoop.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider \
--conf spark.hadoop.fs.s3a.connection.ssl.enabled=false \
--conf spark.hadoop.fs.s3a.path.style.access=true \
--conf spark.hadoop.fs.s3a.endpoint=$S3_ENDPOINT \
--conf spark.hadoop.fs.s3a.access.key=$ACCESS_KEY \
--conf spark.hadoop.fs.s3a.secret.key=$SECRET_KEY \
--conf spark.sql.warehouse.dir=s3a://test/warehouse \
--conf spark.driver.extraJavaOptions='-Dderby.system.home=/tmp/derby' \
')" > spark_sql.out
cat spark_sql.out
num_rows=$(cat spark_sql.out | grep "^Inserted Rows:" | rev | cut -d' ' -f1 | rev )
echo -e "Inserted Rows: ${num_rows}"
rm spark_sql.out
delete_s3_bucket test
if [ "$num_rows" != "3" ]; then
echo "ERROR: Number of rows inserted: $num_rows, Expected: 3. Aborting with exit code 1."
exit 1
fi
}


run_pyspark_s3_in_pod() {
echo "run_pyspark_s3_in_pod ${1} ${2}"

NAMESPACE=$1
USERNAME=$2

# Create bucket
create_s3_bucket test

# Copy example.txt file to the bucket
copy_file_to_s3_bucket test ./tests/integration/resources/example.txt

PYSPARK_COMMANDS=$(cat ./tests/integration/resources/test-pyspark-s3.py)

# Check output of pyspark process with s3
Expand Down Expand Up @@ -466,6 +542,10 @@ test_pyspark_in_pod() {
run_pyspark_in_pod tests spark
}

test_spark_sql_in_pod() {
run_spark_sql_in_pod tests spark
}

test_restricted_account_in_pod() {

kubectl config set-context spark-context --namespace=tests --cluster=prod --user=spark
Expand All @@ -486,6 +566,8 @@ setup_tests

(setup_user_admin_context && test_pyspark && cleanup_user_success) || cleanup_user_failure

(setup_user_admin_context && test_spark_sql && cleanup_user_success) || cleanup_user_failure

(setup_user_admin_context && test_pyspark_s3 && cleanup_user_success) || cleanup_user_failure

(setup_user_restricted_context && test_restricted_account && cleanup_user_success) || cleanup_user_failure
Expand All @@ -498,8 +580,8 @@ setup_test_pod

(setup_user_admin_context && test_pyspark_in_pod && cleanup_user_success) || cleanup_user_failure_in_pod

(setup_user_admin_context && test_pyspark_s3_in_pod && cleanup_user_success) || cleanup_user_failure_in_pod
(setup_user_admin_context && test_spark_sql_in_pod && cleanup_user_success) || cleanup_user_failure_in_pod

#(setup_user_restricted_context && test_restricted_account_in_pod && cleanup_user_success) || cleanup_user_failure_in_pod
(setup_user_admin_context && test_pyspark_s3_in_pod && cleanup_user_success) || cleanup_user_failure_in_pod

teardown_test_pod
6 changes: 6 additions & 0 deletions tests/integration/resources/test-spark-sql.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
CREATE DATABASE IF NOT EXISTS sparkdb;
USE sparkdb;
CREATE TABLE IF NOT EXISTS sparkdb.testTable (number Int, word String);
INSERT INTO sparkdb.testTable VALUES (1, "foo"), (2, "bar"), (3, "grok");
SELECT CONCAT("Inserted Rows: ", COUNT(*)) FROM sparkdb.testTable;
EXIT;

0 comments on commit 7773432

Please sign in to comment.