Skip to content

Commit

Permalink
Add Spark application (opensearch-project#1723)
Browse files Browse the repository at this point in the history
* Initial spark application draft

Signed-off-by: Rupal Mahajan <maharup@amazon.com>

* Remove temp table

Signed-off-by: Rupal Mahajan <maharup@amazon.com>

* Add license header

Signed-off-by: Rupal Mahajan <maharup@amazon.com>

* Add scalastyle-config and update readme

Signed-off-by: Rupal Mahajan <maharup@amazon.com>

* Fix datatype for result and schema

Signed-off-by: Rupal Mahajan <maharup@amazon.com>

* Add test

Signed-off-by: Rupal Mahajan <maharup@amazon.com>

* Simplify code using toJSON.collect.toList

Signed-off-by: Rupal Mahajan <maharup@amazon.com>

* Add example in readme

Signed-off-by: Rupal Mahajan <maharup@amazon.com>

* Fix triple quotes issue

Signed-off-by: Rupal Mahajan <maharup@amazon.com>

* Update method name and description

Signed-off-by: Rupal Mahajan <maharup@amazon.com>

---------

Signed-off-by: Rupal Mahajan <maharup@amazon.com>
  • Loading branch information
rupal-bq authored Jun 23, 2023
1 parent 34cad6e commit 6c3744e
Show file tree
Hide file tree
Showing 8 changed files with 414 additions and 0 deletions.
14 changes: 14 additions & 0 deletions spark-sql-application/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Compiled output
target/
project/target/

# sbt-specific files
.sbtserver
.sbt/
.bsp/

# Miscellaneous
.DS_Store
*.class
*.log
*.zip
107 changes: 107 additions & 0 deletions spark-sql-application/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
# Spark SQL Application

This application execute sql query and store the result in OpenSearch index in following format
```
"stepId":"<emr-step-id>",
"schema": "json blob",
"result": "json blob"
```

## Prerequisites

+ Spark 3.3.1
+ Scala 2.12.15
+ flint-spark-integration

## Usage

To use this application, you can run Spark with Flint extension:

```
./bin/spark-submit \
--class org.opensearch.sql.SQLJob \
--jars <flint-spark-integration-jar> \
sql-job.jar \
<spark-sql-query> \
<opensearch-index> \
<opensearch-host> \
<opensearch-port> \
<opensearch-scheme> \
<opensearch-auth> \
<opensearch-region> \
```

## Result Specifications

Following example shows how the result is written to OpenSearch index after query execution.

Let's assume sql query result is
```
+------+------+
|Letter|Number|
+------+------+
|A |1 |
|B |2 |
|C |3 |
+------+------+
```
OpenSearch index document will look like
```json
{
"_index" : ".query_execution_result",
"_id" : "A2WOsYgBMUoqCqlDJHrn",
"_score" : 1.0,
"_source" : {
"result" : [
"{'Letter':'A','Number':1}",
"{'Letter':'B','Number':2}",
"{'Letter':'C','Number':3}"
],
"schema" : [
"{'column_name':'Letter','data_type':'string'}",
"{'column_name':'Number','data_type':'integer'}"
],
"stepId" : "s-JZSB1139WIVU"
}
}
```

## Build

To build and run this application with Spark, you can run:

```
sbt clean publishLocal
```

## Test

To run tests, you can use:

```
sbt test
```

## Scalastyle

To check code with scalastyle, you can run:

```
sbt scalastyle
```

## Code of Conduct

This project has adopted an [Open Source Code of Conduct](../CODE_OF_CONDUCT.md).

## Security

If you discover a potential security issue in this project we ask that you notify AWS/Amazon Security via our [vulnerability reporting page](http://aws.amazon.com/security/vulnerability-reporting/). Please do **not** create a public GitHub issue.

## License

See the [LICENSE](../LICENSE.txt) file for our project's licensing. We will ask you to confirm the licensing of your contribution.

## Copyright

Copyright OpenSearch Contributors. See [NOTICE](../NOTICE) for details.
28 changes: 28 additions & 0 deletions spark-sql-application/build.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

name := "sql-job"

version := "1.0"

scalaVersion := "2.12.15"

val sparkVersion = "3.3.2"

mainClass := Some("org.opensearch.sql.SQLJob")

artifactName := { (sv: ScalaVersion, module: ModuleID, artifact: Artifact) =>
"sql-job.jar"
}

resolvers ++= Seq(
("apache-snapshots" at "http://repository.apache.org/snapshots/").withAllowInsecureProtocol(true)
)

libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion % "provided",
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
"org.scalatest" %% "scalatest" % "3.2.15" % Test
)
1 change: 1 addition & 0 deletions spark-sql-application/project/build.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
sbt.version=1.8.2
6 changes: 6 additions & 0 deletions spark-sql-application/project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "1.0.0")
106 changes: 106 additions & 0 deletions spark-sql-application/scalastyle-config.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
<scalastyle>
<name>Scalastyle standard configuration</name>
<check level="warning" class="org.scalastyle.file.FileTabChecker" enabled="true"></check>
<check level="warning" class="org.scalastyle.file.FileLengthChecker" enabled="true">
<parameters>
<parameter name="maxFileLength"><![CDATA[800]]></parameter>
</parameters>
</check>
<check level="warning" class="org.scalastyle.file.HeaderMatchesChecker" enabled="true">
<parameters>
<parameter name="header"><![CDATA[/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/]]></parameter>
</parameters>
</check>
<check level="warning" class="org.scalastyle.scalariform.SpacesAfterPlusChecker" enabled="true"></check>
<check level="warning" class="org.scalastyle.file.WhitespaceEndOfLineChecker" enabled="true"></check>
<check level="warning" class="org.scalastyle.scalariform.SpacesBeforePlusChecker" enabled="true"></check>
<check level="warning" class="org.scalastyle.file.FileLineLengthChecker" enabled="true">
<parameters>
<parameter name="maxLineLength"><![CDATA[160]]></parameter>
<parameter name="tabSize"><![CDATA[4]]></parameter>
</parameters>
</check>
<check level="warning" class="org.scalastyle.scalariform.ClassNamesChecker" enabled="true">
<parameters>
<parameter name="regex"><![CDATA[[A-Z][A-Za-z]*]]></parameter>
</parameters>
</check>
<check level="warning" class="org.scalastyle.scalariform.ObjectNamesChecker" enabled="true">
<parameters>
<parameter name="regex"><![CDATA[[A-Z][A-Za-z]*]]></parameter>
</parameters>
</check>
<check level="warning" class="org.scalastyle.scalariform.PackageObjectNamesChecker" enabled="true">
<parameters>
<parameter name="regex"><![CDATA[^[a-z][A-Za-z]*$]]></parameter>
</parameters>
</check>
<check level="warning" class="org.scalastyle.scalariform.EqualsHashCodeChecker" enabled="true"></check>
<check level="warning" class="org.scalastyle.scalariform.IllegalImportsChecker" enabled="true">
<parameters>
<parameter name="illegalImports"><![CDATA[sun._,java.awt._]]></parameter>
</parameters>
</check>
<check level="warning" class="org.scalastyle.scalariform.ParameterNumberChecker" enabled="true">
<parameters>
<parameter name="maxParameters"><![CDATA[8]]></parameter>
</parameters>
</check>
<check level="warning" class="org.scalastyle.scalariform.MagicNumberChecker" enabled="true">
<parameters>
<parameter name="ignore"><![CDATA[-1,0,1,2,3]]></parameter>
</parameters>
</check>
<check level="warning" class="org.scalastyle.scalariform.NoWhitespaceBeforeLeftBracketChecker" enabled="true"></check>
<check level="warning" class="org.scalastyle.scalariform.NoWhitespaceAfterLeftBracketChecker" enabled="true"></check>
<check level="warning" class="org.scalastyle.scalariform.ReturnChecker" enabled="true"></check>
<check level="warning" class="org.scalastyle.scalariform.NullChecker" enabled="true"></check>
<check level="warning" class="org.scalastyle.scalariform.NoCloneChecker" enabled="true"></check>
<check level="warning" class="org.scalastyle.scalariform.NoFinalizeChecker" enabled="true"></check>
<check level="warning" class="org.scalastyle.scalariform.CovariantEqualsChecker" enabled="true"></check>
<check level="warning" class="org.scalastyle.scalariform.StructuralTypeChecker" enabled="true"></check>
<check level="warning" class="org.scalastyle.file.RegexChecker" enabled="true">
<parameters>
<parameter name="regex"><![CDATA[println]]></parameter>
</parameters>
</check>
<check level="warning" class="org.scalastyle.scalariform.NumberOfTypesChecker" enabled="true">
<parameters>
<parameter name="maxTypes"><![CDATA[30]]></parameter>
</parameters>
</check>
<check level="warning" class="org.scalastyle.scalariform.CyclomaticComplexityChecker" enabled="true">
<parameters>
<parameter name="maximum"><![CDATA[10]]></parameter>
</parameters>
</check>
<check level="warning" class="org.scalastyle.scalariform.UppercaseLChecker" enabled="true"></check>
<check level="warning" class="org.scalastyle.scalariform.SimplifyBooleanExpressionChecker" enabled="true"></check>
<check level="warning" class="org.scalastyle.scalariform.IfBraceChecker" enabled="true">
<parameters>
<parameter name="singleLineAllowed"><![CDATA[true]]></parameter>
<parameter name="doubleLineAllowed"><![CDATA[false]]></parameter>
</parameters>
</check>
<check level="warning" class="org.scalastyle.scalariform.MethodLengthChecker" enabled="true">
<parameters>
<parameter name="maxLength"><![CDATA[50]]></parameter>
</parameters>
</check>
<check level="warning" class="org.scalastyle.scalariform.MethodNamesChecker" enabled="true">
<parameters>
<parameter name="regex"><![CDATA[^[a-z][A-Za-z0-9]*$]]></parameter>
</parameters>
</check>
<check level="warning" class="org.scalastyle.scalariform.NumberOfMethodsInTypeChecker" enabled="true">
<parameters>
<parameter name="maxMethods"><![CDATA[30]]></parameter>
</parameters>
</check>
<check level="warning" class="org.scalastyle.scalariform.PublicMethodsHaveTypeChecker" enabled="true"></check>
<check level="warning" class="org.scalastyle.file.NewLineAtEofChecker" enabled="true"></check>
<check level="warning" class="org.scalastyle.file.NoNewLineAtEofChecker" enabled="false"></check>
</scalastyle>
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql

import org.apache.spark.sql.{DataFrame, SparkSession, Row}
import org.apache.spark.sql.types._

/**
* Spark SQL Application entrypoint
*
* @param args(0)
* sql query
* @param args(1)
* opensearch index name
* @param args(2-6)
* opensearch connection values required for flint-integration jar. host, port, scheme, auth, region respectively.
* @return
* write sql query result to given opensearch index
*/
object SQLJob {
def main(args: Array[String]) {
// Get the SQL query and Opensearch Config from the command line arguments
val query = args(0)
val index = args(1)
val host = args(2)
val port = args(3)
val scheme = args(4)
val auth = args(5)
val region = args(6)

// Create a SparkSession
val spark = SparkSession.builder().appName("SQLJob").getOrCreate()

try {
// Execute SQL query
val result: DataFrame = spark.sql(query)

// Get Data
val data = getFormattedData(result, spark)

// Write data to OpenSearch index
val aos = Map(
"host" -> host,
"port" -> port,
"scheme" -> scheme,
"auth" -> auth,
"region" -> region)

data.write
.format("flint")
.options(aos)
.mode("append")
.save(index)

} finally {
// Stop SparkSession
spark.stop()
}
}

/**
* Create a new formatted dataframe with json result, json schema and EMR_STEP_ID.
*
* @param result
* sql query result dataframe
* @param spark
* spark session
* @return
* dataframe with result, schema and emr step id
*/
def getFormattedData(result: DataFrame, spark: SparkSession): DataFrame = {
// Create the schema dataframe
val schemaRows = result.schema.fields.map { field =>
Row(field.name, field.dataType.typeName)
}
val resultSchema = spark.createDataFrame(spark.sparkContext.parallelize(schemaRows), StructType(Seq(
StructField("column_name", StringType, nullable = false),
StructField("data_type", StringType, nullable = false))))

// Define the data schema
val schema = StructType(Seq(
StructField("result", ArrayType(StringType, containsNull = true), nullable = true),
StructField("schema", ArrayType(StringType, containsNull = true), nullable = true),
StructField("stepId", StringType, nullable = true)))

// Create the data rows
val rows = Seq((
result.toJSON.collect.toList.map(_.replaceAll("\"", "'")),
resultSchema.toJSON.collect.toList.map(_.replaceAll("\"", "'")),
sys.env.getOrElse("EMR_STEP_ID", "")))

// Create the DataFrame for data
spark.createDataFrame(rows).toDF(schema.fields.map(_.name): _*)
}
}
Loading

0 comments on commit 6c3744e

Please sign in to comment.