Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Spark application #1723

Merged
merged 11 commits into from
Jun 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions spark-sql-application/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Compiled output
target/
project/target/

# sbt-specific files
.sbtserver
.sbt/
.bsp/

# Miscellaneous
.DS_Store
*.class
*.log
*.zip
107 changes: 107 additions & 0 deletions spark-sql-application/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
# Spark SQL Application

This application execute sql query and store the result in OpenSearch index in following format
```
"stepId":"<emr-step-id>",
"schema": "json blob",
"result": "json blob"
```

## Prerequisites

+ Spark 3.3.1
+ Scala 2.12.15
+ flint-spark-integration

## Usage

To use this application, you can run Spark with Flint extension:

```
./bin/spark-submit \
--class org.opensearch.sql.SQLJob \
--jars <flint-spark-integration-jar> \
sql-job.jar \
<spark-sql-query> \
<opensearch-index> \
<opensearch-host> \
<opensearch-port> \
<opensearch-scheme> \
<opensearch-auth> \
<opensearch-region> \
```

## Result Specifications

Following example shows how the result is written to OpenSearch index after query execution.

Let's assume sql query result is
```
+------+------+
|Letter|Number|
+------+------+
|A |1 |
|B |2 |
|C |3 |
+------+------+
```
OpenSearch index document will look like
```json
{
"_index" : ".query_execution_result",
"_id" : "A2WOsYgBMUoqCqlDJHrn",
"_score" : 1.0,
"_source" : {
"result" : [
"{'Letter':'A','Number':1}",
"{'Letter':'B','Number':2}",
"{'Letter':'C','Number':3}"
],
"schema" : [
"{'column_name':'Letter','data_type':'string'}",
"{'column_name':'Number','data_type':'integer'}"
],
"stepId" : "s-JZSB1139WIVU"
}
}
```

## Build

To build and run this application with Spark, you can run:

```
sbt clean publishLocal
```

## Test

To run tests, you can use:

```
sbt test
```

## Scalastyle

To check code with scalastyle, you can run:

```
sbt scalastyle
```

## Code of Conduct

This project has adopted an [Open Source Code of Conduct](../CODE_OF_CONDUCT.md).

## Security

If you discover a potential security issue in this project we ask that you notify AWS/Amazon Security via our [vulnerability reporting page](http://aws.amazon.com/security/vulnerability-reporting/). Please do **not** create a public GitHub issue.

## License

See the [LICENSE](../LICENSE.txt) file for our project's licensing. We will ask you to confirm the licensing of your contribution.

## Copyright

Copyright OpenSearch Contributors. See [NOTICE](../NOTICE) for details.
28 changes: 28 additions & 0 deletions spark-sql-application/build.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

name := "sql-job"

version := "1.0"

scalaVersion := "2.12.15"

val sparkVersion = "3.3.2"

mainClass := Some("org.opensearch.sql.SQLJob")

artifactName := { (sv: ScalaVersion, module: ModuleID, artifact: Artifact) =>
"sql-job.jar"
}

resolvers ++= Seq(
("apache-snapshots" at "http://repository.apache.org/snapshots/").withAllowInsecureProtocol(true)
)

libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion % "provided",
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
"org.scalatest" %% "scalatest" % "3.2.15" % Test
)
1 change: 1 addition & 0 deletions spark-sql-application/project/build.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
sbt.version=1.8.2
6 changes: 6 additions & 0 deletions spark-sql-application/project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "1.0.0")
106 changes: 106 additions & 0 deletions spark-sql-application/scalastyle-config.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
<scalastyle>
<name>Scalastyle standard configuration</name>
<check level="warning" class="org.scalastyle.file.FileTabChecker" enabled="true"></check>
<check level="warning" class="org.scalastyle.file.FileLengthChecker" enabled="true">
<parameters>
<parameter name="maxFileLength"><![CDATA[800]]></parameter>
</parameters>
</check>
<check level="warning" class="org.scalastyle.file.HeaderMatchesChecker" enabled="true">
<parameters>
<parameter name="header"><![CDATA[/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/]]></parameter>
</parameters>
</check>
<check level="warning" class="org.scalastyle.scalariform.SpacesAfterPlusChecker" enabled="true"></check>
<check level="warning" class="org.scalastyle.file.WhitespaceEndOfLineChecker" enabled="true"></check>
<check level="warning" class="org.scalastyle.scalariform.SpacesBeforePlusChecker" enabled="true"></check>
<check level="warning" class="org.scalastyle.file.FileLineLengthChecker" enabled="true">
<parameters>
<parameter name="maxLineLength"><![CDATA[160]]></parameter>
<parameter name="tabSize"><![CDATA[4]]></parameter>
</parameters>
</check>
<check level="warning" class="org.scalastyle.scalariform.ClassNamesChecker" enabled="true">
<parameters>
<parameter name="regex"><![CDATA[[A-Z][A-Za-z]*]]></parameter>
</parameters>
</check>
<check level="warning" class="org.scalastyle.scalariform.ObjectNamesChecker" enabled="true">
<parameters>
<parameter name="regex"><![CDATA[[A-Z][A-Za-z]*]]></parameter>
</parameters>
</check>
<check level="warning" class="org.scalastyle.scalariform.PackageObjectNamesChecker" enabled="true">
<parameters>
<parameter name="regex"><![CDATA[^[a-z][A-Za-z]*$]]></parameter>
</parameters>
</check>
<check level="warning" class="org.scalastyle.scalariform.EqualsHashCodeChecker" enabled="true"></check>
<check level="warning" class="org.scalastyle.scalariform.IllegalImportsChecker" enabled="true">
<parameters>
<parameter name="illegalImports"><![CDATA[sun._,java.awt._]]></parameter>
</parameters>
</check>
<check level="warning" class="org.scalastyle.scalariform.ParameterNumberChecker" enabled="true">
<parameters>
<parameter name="maxParameters"><![CDATA[8]]></parameter>
</parameters>
</check>
<check level="warning" class="org.scalastyle.scalariform.MagicNumberChecker" enabled="true">
<parameters>
<parameter name="ignore"><![CDATA[-1,0,1,2,3]]></parameter>
</parameters>
</check>
<check level="warning" class="org.scalastyle.scalariform.NoWhitespaceBeforeLeftBracketChecker" enabled="true"></check>
<check level="warning" class="org.scalastyle.scalariform.NoWhitespaceAfterLeftBracketChecker" enabled="true"></check>
<check level="warning" class="org.scalastyle.scalariform.ReturnChecker" enabled="true"></check>
<check level="warning" class="org.scalastyle.scalariform.NullChecker" enabled="true"></check>
<check level="warning" class="org.scalastyle.scalariform.NoCloneChecker" enabled="true"></check>
<check level="warning" class="org.scalastyle.scalariform.NoFinalizeChecker" enabled="true"></check>
<check level="warning" class="org.scalastyle.scalariform.CovariantEqualsChecker" enabled="true"></check>
<check level="warning" class="org.scalastyle.scalariform.StructuralTypeChecker" enabled="true"></check>
<check level="warning" class="org.scalastyle.file.RegexChecker" enabled="true">
<parameters>
<parameter name="regex"><![CDATA[println]]></parameter>
</parameters>
</check>
<check level="warning" class="org.scalastyle.scalariform.NumberOfTypesChecker" enabled="true">
<parameters>
<parameter name="maxTypes"><![CDATA[30]]></parameter>
</parameters>
</check>
<check level="warning" class="org.scalastyle.scalariform.CyclomaticComplexityChecker" enabled="true">
<parameters>
<parameter name="maximum"><![CDATA[10]]></parameter>
</parameters>
</check>
<check level="warning" class="org.scalastyle.scalariform.UppercaseLChecker" enabled="true"></check>
<check level="warning" class="org.scalastyle.scalariform.SimplifyBooleanExpressionChecker" enabled="true"></check>
<check level="warning" class="org.scalastyle.scalariform.IfBraceChecker" enabled="true">
<parameters>
<parameter name="singleLineAllowed"><![CDATA[true]]></parameter>
<parameter name="doubleLineAllowed"><![CDATA[false]]></parameter>
</parameters>
</check>
<check level="warning" class="org.scalastyle.scalariform.MethodLengthChecker" enabled="true">
<parameters>
<parameter name="maxLength"><![CDATA[50]]></parameter>
</parameters>
</check>
<check level="warning" class="org.scalastyle.scalariform.MethodNamesChecker" enabled="true">
<parameters>
<parameter name="regex"><![CDATA[^[a-z][A-Za-z0-9]*$]]></parameter>
</parameters>
</check>
<check level="warning" class="org.scalastyle.scalariform.NumberOfMethodsInTypeChecker" enabled="true">
<parameters>
<parameter name="maxMethods"><![CDATA[30]]></parameter>
</parameters>
</check>
<check level="warning" class="org.scalastyle.scalariform.PublicMethodsHaveTypeChecker" enabled="true"></check>
<check level="warning" class="org.scalastyle.file.NewLineAtEofChecker" enabled="true"></check>
<check level="warning" class="org.scalastyle.file.NoNewLineAtEofChecker" enabled="false"></check>
</scalastyle>
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql

import org.apache.spark.sql.{DataFrame, SparkSession, Row}
import org.apache.spark.sql.types._

/**
* Spark SQL Application entrypoint
*
* @param args(0)
* sql query
* @param args(1)
* opensearch index name
* @param args(2-6)
* opensearch connection values required for flint-integration jar. host, port, scheme, auth, region respectively.
* @return
* write sql query result to given opensearch index
*/
object SQLJob {
def main(args: Array[String]) {
// Get the SQL query and Opensearch Config from the command line arguments
val query = args(0)
val index = args(1)
val host = args(2)
val port = args(3)
val scheme = args(4)
val auth = args(5)
val region = args(6)

// Create a SparkSession
val spark = SparkSession.builder().appName("SQLJob").getOrCreate()

try {
// Execute SQL query
val result: DataFrame = spark.sql(query)

// Get Data
val data = getFormattedData(result, spark)

// Write data to OpenSearch index
val aos = Map(
"host" -> host,
"port" -> port,
"scheme" -> scheme,
"auth" -> auth,
"region" -> region)

data.write
.format("flint")
.options(aos)
.mode("append")
.save(index)

} finally {
// Stop SparkSession
spark.stop()
}
}

/**
* Create a new formatted dataframe with json result, json schema and EMR_STEP_ID.
*
* @param result
* sql query result dataframe
* @param spark
* spark session
* @return
* dataframe with result, schema and emr step id
*/
def getFormattedData(result: DataFrame, spark: SparkSession): DataFrame = {
// Create the schema dataframe
val schemaRows = result.schema.fields.map { field =>
Row(field.name, field.dataType.typeName)
}
val resultSchema = spark.createDataFrame(spark.sparkContext.parallelize(schemaRows), StructType(Seq(
StructField("column_name", StringType, nullable = false),
StructField("data_type", StringType, nullable = false))))

// Define the data schema
val schema = StructType(Seq(
StructField("result", ArrayType(StringType, containsNull = true), nullable = true),
StructField("schema", ArrayType(StringType, containsNull = true), nullable = true),
StructField("stepId", StringType, nullable = true)))

// Create the data rows
val rows = Seq((
result.toJSON.collect.toList.map(_.replaceAll("\"", "'")),
resultSchema.toJSON.collect.toList.map(_.replaceAll("\"", "'")),
sys.env.getOrElse("EMR_STEP_ID", "")))

// Create the DataFrame for data
spark.createDataFrame(rows).toDF(schema.fields.map(_.name): _*)
}
}
Loading