azure-cosmosdb-spark
is the official connector for Azure CosmosDB and Apache Spark. The connector allows you to easily read to and write from Azure Cosmos DB via Apache Spark DataFrames in python
and scala
. It also allows you to easily create a lambda architecture for batch-processing, stream-processing, and a serving layer while being globally replicated and minimizing the latency involved in working with big data.
Table of Contents
Below are excerpts in Python
and Scala
on how to create a Spark DataFrame to read from Cosmos DB
# Read Configuration
readConfig = {
"Endpoint" : "https://doctorwho.documents.azure.com:443/",
"Masterkey" : "SPSVkSfA7f6vMgMvnYdzc1MaWb65v4VQNcI2Tp1WfSP2vtgmAwGXEPcxoYra5QBHHyjDGYuHKSkguHIz1vvmWQ==",
"Database" : "DepartureDelays",
"preferredRegions" : "Central US;East US2",
"Collection" : "flights_pcoll",
"SamplingRatio" : "1.0",
"schema_samplesize" : "1000",
"query_pagesize" : "2147483647",
"query_custom" : "SELECT c.date, c.delay, c.distance, c.origin, c.destination FROM c WHERE c.origin = 'SEA'"
}
# Connect via azure-cosmosdb-spark to create Spark DataFrame
flights = spark.read.format("com.microsoft.azure.cosmosdb.spark").options(**readConfig).load()
flights.count()
Click for Scala Excerpt
// Import Necessary Libraries
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark._
import com.microsoft.azure.cosmosdb.spark.config.Config
// Configure connection to your collection
val readConfig = Config(Map(
"Endpoint" -> "https://doctorwho.documents.azure.com:443/",
"Masterkey" -> "SPSVkSfA7f6vMgMvnYdzc1MaWb65v4VQNcI2Tp1WfSP2vtgmAwGXEPcxoYra5QBHHyjDGYuHKSkguHIz1vvmWQ==",
"Database" -> "DepartureDelays",
"PreferredRegions" -> "Central US;East US2;",
"Collection" -> "flights_pcoll",
"SamplingRatio" -> "1.0",
"query_custom" -> "SELECT c.date, c.delay, c.distance, c.origin, c.destination FROM c WHERE c.origin = 'SEA'"
))
// Connect via azure-cosmosdb-spark to create Spark DataFrame
val flights = spark.read.cosmosDB(readConfig)
flights.count()
Below are excerpts in Python
and Scala
on how to write a Spark DataFrame to Cosmos DB
# Write configuration
writeConfig = {
"Endpoint" : "https://doctorwho.documents.azure.com:443/",
"Masterkey" : "SPSVkSfA7f6vMgMvnYdzc1MaWb65v4VQNcI2Tp1WfSP2vtgmAwGXEPcxoYra5QBHHyjDGYuHKSkguHIz1vvmWQ==",
"Database" : "DepartureDelays",
"Collection" : "flights_fromsea",
"Upsert" : "true"
}
# Write to Cosmos DB from the flights DataFrame
flights.write.format("com.microsoft.azure.cosmosdb.spark").options(**writeConfig).save()
Click for Scala Excerpt
// Configure connection to the sink collection
val writeConfig = Config(Map(
"Endpoint" -> "https://doctorwho.documents.azure.com:443/",
"Masterkey" -> "SPSVkSfA7f6vMgMvnYdzc1MaWb65v4VQNcI2Tp1WfSP2vtgmAwGXEPcxoYra5QBHHyjDGYuHKSkguHIz1vvmWQ==",
"Database" -> "DepartureDelays",
"PreferredRegions" -> "Central US;East US2;",
"Collection" -> "flights_fromsea",
"WritingBatchSize" -> "100"
))
// Upsert the dataframe to Cosmos DB
import org.apache.spark.sql.SaveMode
flights.write.mode(SaveMode.Overwrite).cosmosDB(writeConfig)
See other sample Jupyter and Databricks notebooks as well as PySpark and Spark scripts.
azure-cosmosdb-spark
has been regularly tested using Azure Databricks Runtime 3.5 (Spark 2.2.1), 4.0 (Spark 2.3.0), HDInsight 3.6 (Spark 2.1), and 3.7 (Spark 2.2).
Review supported component versions
Component | Versions Supported |
---|---|
Apache Spark | 2.2.1, 2.3.X, 2.4.X |
Scala | 2.11 |
Python | 2.7, 3.6 |
Azure Cosmos DB Java SDK | 1.16.1, 1.16.2 |
You can build and/or use the maven coordinates to work with azure-cosmosdb-spark
.
Review the connector's maven versions
Spark | Scala | Latest version |
---|---|---|
2.4.0 | 2.11 | azure-cosmosdb-spark_2.4.0_2.11_1.3.5 |
2.3.0 | 2.11 | azure-cosmosdb-spark_2.3.0_2.11_1.3.3 |
2.2.0 | 2.11 | azure-cosmosdb-spark_2.2.0_2.11_1.1.1 |
2.1.0 | 2.11 | azure-cosmosdb-spark_2.1.0_2.11_1.2.2 |
Please create a library using within your Databricks workspace by following the guidance within the Azure Databricks Guide > Use the Azure Cosmos DB Spark connector
Note, the Use the Azure Cosmos DB Spark Connector page is currently not up-to-date; issue is assigned to @dennyglee. Instead of downloading the six separate JARs into six different libraries, you can download the uber jar from maven at https://search.maven.org/artifact/com.microsoft.azure/azure-cosmosdb-spark_2.4.0_2.11/1.3.5/jar) and install this one jar/library.
To work with the connector using the spark-cli (i.e. spark-shell
, pyspark
, spark-submit
), you can use the --packages
parameter with the connector's maven coordinates.
spark-shell --master yarn --packages "com.microsoft.azure:azure-cosmosdb-spark_2.4.0_2.11:1.3.5"
If you're using Jupyter notebooks within HDInsight, you can use spark-magic %%configure
cell to specify the connector's maven coordinates.
{ "name":"Spark-to-Cosmos_DB_Connector",
"conf": {
"spark.jars.packages": "com.microsoft.azure:azure-cosmosdb-spark_2.4.0_2.11:1.3.5",
"spark.jars.excludes": "org.scala-lang:scala-reflect"
}
...
}
Note, the inclusion of the
spark.jars.excludes
is specific to remove potential conflicts between the connector, Apache Spark, and Livy.
Currently, this connector project uses maven
so to build without dependencies, you can run:
mvn clean package
Included in this GitHub repository are a number of sample notebooks and scripts that you can utilize:
- On-Time Flight Performance with Spark and Cosmos DB (Seattle) ipynb | html: This notebook utilizing
azure-cosmosdb-spark
to connect Spark to Cosmos DB using HDInsight Jupyter notebook service to showcase Spark SQL, GraphFrames, and predicting flight delays using ML pipelines. - Connecting Spark with Cosmos DB Change feed: A quick showcase on how to connect Spark to Cosmos DB Change Feed.
- Twitter Source with Apache Spark and Azure Cosmos DB Change Feed: ipynb | html
- Using Apache Spark to query Cosmos DB Graphs: ipynb | html
- Connecting Azure Databricks to Azure Cosmos DB using
azure-cosmosdb-spark
. Linked here is also an Azure Databricks version of the On-Time Flight Performance notebook. - Lambda Architecture with Azure Cosmos DB and HDInsight (Apache Spark): Combining the Azure Cosmos DB, , and HDInsight not only allows you to accelerate real-time big data analytics, but also allows you to benefit from a Lambda Architecture while simplifying its operations.
We have more information in the azure-cosmosdb-spark
wiki including:
Configuration and Setup
- Spark Connector Configuration
- Spark to Cosmos DB Connector Setup (In progress)
- Configuring Power BI Direct Query to Azure Cosmos DB via Apache Spark (HDI)
Troubleshooting
Performance
Change Feed
- Stream Processing Changes using Azure Cosmos DB Change Feed and Apache Spark
- Change Feed Demos
- Structured Stream Demos
Monitoring
This project has adopted the Microsoft Open Source Code of Conduct. For more information see the Code of Conduct FAQ or contact opencode@microsoft.com with any additional questions or comments.
See CONTRIBUTING.md for contribution guidelines.
To give feedback and/or report an issue, open a GitHub Issue.
Apache®, Apache Spark, and Spark® are either registered trademarks or trademarks of the Apache Software Foundation in the United States and/or other countries.