-
Notifications
You must be signed in to change notification settings - Fork 121
Performance tips
To configure the CosmosDB Spark Connector to achieve the best performance, below are a few things you can consider.
If the scenario only works on a subset of the data from CosmosDB collection, you can specify a custom query by setting the query_custom
that is used to fetch the data from each of the CosmosDB collection partition. By default, the query is constructed with the predicates from the SparkSQL query. This default query is overridden if a custom query is provided.
"query_custom" : "SELECT c.date, c.delay, c.distance, c.origin, c.destination FROM c"
When fetching data from CosmosDB collection, the connector maps each CosmosDB collection partition to an executor on the Spark worker node and uses the CosmosDB Java SDK to send a query to the target partition. As a parameter in the query options, you can specify the number of documents that each of the query pages should contain with query_pagesize
. The larger the page size the less network round trip is required to get the data and hence better throughput. The backend will get as many documents as the specified page size while keeping the response size within a certain threshold.
# Configuration for PySpark
# Connection
flightsConfig = {
"Endpoint" : "https://$account$.documents.azure.com:443/",
"Masterkey" : "$masterkey$",
"Database" : "$database$",
"preferredRegions" : "$region1$, $region2$",
"Collection" : "$collection",
"SamplingRatio" : "1.0",
"schema_samplesize" : "1000",
"query_pagesize" : "2147483647",
"query_custom" : "SELECT c.date, c.delay, c.distance, c.origin, c.destination FROM c"
}
When launching SparkContext, the executor parameters can be fine tuned so that all cluster resources (number of cores and memory) are utilized. The notable parameters are --num-executors
, --executor-cores
, and --executor-memory
which specifies the number of executors and their computing and memory capacity.
For applications which do more analytics, you can use more cores for each executor. On the other hand, if that is not needed, you can increase the number of executors while using less number of cores for each of them to increase the parallelism and throughput. Depending on the type of the job, you may need to do some experiments to derive the most suitable configuration.
For examples,
- In an HDI Spark cluster with 4 D4 V2 worker nodes (8 cores, 28 GB RAM), here are a few executors options:
--num-executors 4 --executor-cores 7 --executor-memory 24g
--num-executors 8 --executor-cores 4 --executor-memory 12g
--num-executors 32 --executor-cores 1 --executor-memory 6g
In writing scenario, if we can provision the Spark cluster so that the number of cores is at least the number of tasks, all the tasks will be done at once in parallel, giving the best throughput. If the number of Spark partition is more than the number of executors, it can usually help by coalescing the data by executors or worker nodes before writing to avoid the scheduling overhead.
For examples,
- With a 50-partition CosmosDB collection, we should set up a Spark cluster with at least 50 cores that can be used concurrently so that the 50 writing tasks can be executed in batch at once.
- On a 7 D13 V2 worker nodes (8 cores, 56 GB RAM) cluster we can run with
--num-executors 7 --executor-cores 7 --executor-memory 50g
- On a 13 D12 V2 worker nodes (4 cores, 28 GB RAM) cluster:
--num-executors 12 --executor-cores 4 --executor-memory 20g
- On a 7 D13 V2 worker nodes (8 cores, 56 GB RAM) cluster we can run with