Skip to content

Commit

Permalink
[Cosmos] CosmosDB asynchronous client (#21404)
Browse files Browse the repository at this point in the history
* initial commit

* Client Constructor (#20310)

* Removed some stuff

* Looking at constructors

* Updated request

* Added client close

* working client creation

Co-authored-by: simorenoh <simonmorenohe@gmail.com>

* read database

database read works, but ignored exception is returned:
Fatal error on SSL transport
NoneType has no attribute 'send' (_loop._proactor.send)
RuntimeError: Event loop is closed
Unclosed connector/ connection

* Update simon_testfile.py

* with coroutine

Added methods needed to use async with when initializing client, but logs output "Exception ignored... Runtime Error: Event loop is closed"

* Update simon_testfile.py

* small changes

* async with returns no exceptions

* async read container

* async item read

* cleaning up

* create item/ database methods

* item delete working

* docs replace functionality

missing upsert and other resources

* upsert functionality

missing read_all_items and both query methods for container class

* missing query methods

* CRUD for udf, sproc, triggers

* initial query logic + container methods

* missing some execution logic and tests

* oops

* fully working queries

* small fix to query_items()

also fixed README and added examples_async

* Update _cosmos_client_connection_async.py

* Update _cosmos_client_connection.py

* documentation update

* updated MIT dates and get_user_client() description

* Update CHANGELOG.md

* Delete simon_testfile.py

* leftover retry utility

* Update README.md

* docs and removed six package

* changes based on comments

still missing discussion resolution on SSL verification and tests for async functionality under test module (apart from samples which are basically end to end tests)

* small change in type hints

* updated readme

* fixes based on conversations

* added missing type comments

* update changelog for ci pipeline

* added typehints, moved params into keywords, added decorators, made _connection_policy private

* changes based on sync with central sdk

* remove is_system_key from scripts (only used in execute_sproc)

is_system_key verifies that an empty partition key is properly dealt with if ['partitionKey']['systemKey'] exists in the container options - however, we do not allow containers to be created with empty partition key values in the python sdk, so the functionality is needless

* Revert "remove is_system_key from scripts (only used in execute_sproc)"

Reverting last commit, will find way to init is_system_key for now

* async script proxy using composition

* pylint

* capitalized constants

* Apply suggestions from code review

Clarifying comments for README

Co-authored-by: Gahl Levy <75269480+gahl-levy@users.noreply.github.com>

* closing python code snippet

* last doc updates

* Update sdk/cosmos/azure-cosmos/CHANGELOG.md

Co-authored-by: Simon Moreno <30335873+simorenoh@users.noreply.github.com>

Co-authored-by: annatisch <antisch@microsoft.com>
Co-authored-by: Gahl Levy <75269480+gahl-levy@users.noreply.github.com>
Co-authored-by: Travis Prescott <tjprescott@users.noreply.github.com>
  • Loading branch information
4 people authored Dec 7, 2021
1 parent 88393df commit 014e077
Show file tree
Hide file tree
Showing 35 changed files with 9,298 additions and 77 deletions.
5 changes: 3 additions & 2 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
## 4.2.1 (Unreleased)

## 4.3.0b1 (Unreleased)
**New features**
- Added language native async i/o client

## 4.2.0 (2020-10-08)

Expand Down
229 changes: 158 additions & 71 deletions sdk/cosmos/azure-cosmos/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ Once you've populated the `ACCOUNT_URI` and `ACCOUNT_KEY` environment variables,
from azure.cosmos import CosmosClient

import os
url = os.environ['ACCOUNT_URI']
key = os.environ['ACCOUNT_KEY']
client = CosmosClient(url, credential=key)
URL = os.environ['ACCOUNT_URI']
KEY = os.environ['ACCOUNT_KEY']
client = CosmosClient(URL, credential=KEY)
```

## Key concepts
Expand All @@ -90,14 +90,17 @@ For more information about these resources, see [Working with Azure Cosmos datab

The keyword-argument `enable_cross_partition_query` accepts 2 options: `None` (default) or `True`.

## Note on using queries by id

When using queries that try to find items based on an **id** value, always make sure you are passing in a string type variable. Azure Cosmos DB only allows string id values and if you use any other datatype, this SDK will return no results and no error messages.

## Limitations

Currently the features below are **not supported**. For alternatives options, check the **Workarounds** section below.

### Data Plane Limitations:

* Group By queries
* Language Native async i/o
* Queries with COUNT from a DISTINCT subquery: SELECT COUNT (1) FROM (SELECT DISTINCT C.ID FROM C)
* Bulk/Transactional batch processing
* Direct TCP Mode access
Expand Down Expand Up @@ -177,6 +180,7 @@ The following sections provide several code snippets covering some of the most c
* [Get database properties](#get-database-properties "Get database properties")
* [Get database and container throughputs](#get-database-and-container-throughputs "Get database and container throughputs")
* [Modify container properties](#modify-container-properties "Modify container properties")
* [Using the asynchronous client](#using-the-asynchronous-client "Using the asynchronous client")

### Create a database

Expand All @@ -186,14 +190,14 @@ After authenticating your [CosmosClient][ref_cosmosclient], you can work with an
from azure.cosmos import CosmosClient, exceptions
import os

url = os.environ['ACCOUNT_URI']
key = os.environ['ACCOUNT_KEY']
client = CosmosClient(url, credential=key)
database_name = 'testDatabase'
URL = os.environ['ACCOUNT_URI']
KEY = os.environ['ACCOUNT_KEY']
client = CosmosClient(URL, credential=KEY)
DATABASE_NAME = 'testDatabase'
try:
database = client.create_database(database_name)
database = client.create_database(DATABASE_NAME)
except exceptions.CosmosResourceExistsError:
database = client.get_database_client(database_name)
database = client.get_database_client(DATABASE_NAME)
```

### Create a container
Expand All @@ -204,17 +208,17 @@ This example creates a container with default settings. If a container with the
from azure.cosmos import CosmosClient, PartitionKey, exceptions
import os

url = os.environ['ACCOUNT_URI']
key = os.environ['ACCOUNT_KEY']
client = CosmosClient(url, credential=key)
database_name = 'testDatabase'
database = client.get_database_client(database_name)
container_name = 'products'
URL = os.environ['ACCOUNT_URI']
KEY = os.environ['ACCOUNT_KEY']
client = CosmosClient(URL, credential=KEY)
DATABASE_NAME = 'testDatabase'
database = client.get_database_client(DATABASE_NAME)
CONTAINER_NAME = 'products'

try:
container = database.create_container(id=container_name, partition_key=PartitionKey(path="/productName"))
container = database.create_container(id=CONTAINER_NAME, partition_key=PartitionKey(path="/productName"))
except exceptions.CosmosResourceExistsError:
container = database.get_container_client(container_name)
container = database.get_container_client(CONTAINER_NAME)
except exceptions.CosmosHttpResponseError:
raise
```
Expand All @@ -231,11 +235,11 @@ The options for analytical_storage_ttl are:


```Python
container_name = 'products'
CONTAINER_NAME = 'products'
try:
container = database.create_container(id=container_name, partition_key=PartitionKey(path="/productName"),analytical_storage_ttl=-1)
container = database.create_container(id=CONTAINER_NAME, partition_key=PartitionKey(path="/productName"),analytical_storage_ttl=-1)
except exceptions.CosmosResourceExistsError:
container = database.get_container_client(container_name)
container = database.get_container_client(CONTAINER_NAME)
except exceptions.CosmosHttpResponseError:
raise
```
Expand All @@ -250,13 +254,13 @@ Retrieve an existing container from the database:
from azure.cosmos import CosmosClient
import os

url = os.environ['ACCOUNT_URI']
key = os.environ['ACCOUNT_KEY']
client = CosmosClient(url, credential=key)
database_name = 'testDatabase'
database = client.get_database_client(database_name)
container_name = 'products'
container = database.get_container_client(container_name)
URL = os.environ['ACCOUNT_URI']
KEY = os.environ['ACCOUNT_KEY']
client = CosmosClient(URL, credential=KEY)
DATABASE_NAME = 'testDatabase'
database = client.get_database_client(DATABASE_NAME)
CONTAINER_NAME = 'products'
container = database.get_container_client(CONTAINER_NAME)
```

### Insert data
Expand All @@ -269,13 +273,13 @@ This example inserts several items into the container, each with a unique `id`:
from azure.cosmos import CosmosClient
import os

url = os.environ['ACCOUNT_URI']
key = os.environ['ACCOUNT_KEY']
client = CosmosClient(url, credential=key)
database_name = 'testDatabase'
database = client.get_database_client(database_name)
container_name = 'products'
container = database.get_container_client(container_name)
URL = os.environ['ACCOUNT_URI']
KEY = os.environ['ACCOUNT_KEY']
client = CosmosClient(URL, credential=KEY)
DATABASE_NAME = 'testDatabase'
database = client.get_database_client(DATABASE_NAME)
CONTAINER_NAME = 'products'
container = database.get_container_client(CONTAINER_NAME)

for i in range(1, 10):
container.upsert_item({
Expand All @@ -294,13 +298,13 @@ To delete items from a container, use [ContainerProxy.delete_item][ref_container
from azure.cosmos import CosmosClient
import os

url = os.environ['ACCOUNT_URI']
key = os.environ['ACCOUNT_KEY']
client = CosmosClient(url, credential=key)
database_name = 'testDatabase'
database = client.get_database_client(database_name)
container_name = 'products'
container = database.get_container_client(container_name)
URL = os.environ['ACCOUNT_URI']
KEY = os.environ['ACCOUNT_KEY']
client = CosmosClient(URL, credential=KEY)
DATABASE_NAME = 'testDatabase'
database = client.get_database_client(DATABASE_NAME)
CONTAINER_NAME = 'products'
container = database.get_container_client(CONTAINER_NAME)

for item in container.query_items(
query='SELECT * FROM products p WHERE p.productModel = "Model 2"',
Expand All @@ -320,13 +324,13 @@ This example queries a container for items with a specific `id`:
from azure.cosmos import CosmosClient
import os

url = os.environ['ACCOUNT_URI']
key = os.environ['ACCOUNT_KEY']
client = CosmosClient(url, credential=key)
database_name = 'testDatabase'
database = client.get_database_client(database_name)
container_name = 'products'
container = database.get_container_client(container_name)
URL = os.environ['ACCOUNT_URI']
KEY = os.environ['ACCOUNT_KEY']
client = CosmosClient(URL, credential=KEY)
DATABASE_NAME = 'testDatabase'
database = client.get_database_client(DATABASE_NAME)
CONTAINER_NAME = 'products'
container = database.get_container_client(CONTAINER_NAME)

# Enumerate the returned items
import json
Expand Down Expand Up @@ -363,11 +367,11 @@ from azure.cosmos import CosmosClient
import os
import json

url = os.environ['ACCOUNT_URI']
key = os.environ['ACCOUNT_KEY']
client = CosmosClient(url, credential=key)
database_name = 'testDatabase'
database = client.get_database_client(database_name)
URL = os.environ['ACCOUNT_URI']
KEY = os.environ['ACCOUNT_KEY']
client = CosmosClient(URL, credential=KEY)
DATABASE_NAME = 'testDatabase'
database = client.get_database_client(DATABASE_NAME)
properties = database.read()
print(json.dumps(properties))
```
Expand All @@ -381,19 +385,19 @@ from azure.cosmos import CosmosClient
import os
import json

url = os.environ['ACCOUNT_URI']
key = os.environ['ACCOUNT_KEY']
client = CosmosClient(url, credential=key)
URL = os.environ['ACCOUNT_URI']
KEY = os.environ['ACCOUNT_KEY']
client = CosmosClient(URL, credential=KEY)

# Database
database_name = 'testDatabase'
database = client.get_database_client(database_name)
DATABASE_NAME = 'testDatabase'
database = client.get_database_client(DATABASE_NAME)
db_offer = database.read_offer()
print('Found Offer \'{0}\' for Database \'{1}\' and its throughput is \'{2}\''.format(db_offer.properties['id'], database.id, db_offer.properties['content']['offerThroughput']))

# Container with dedicated throughput only. Will return error "offer not found" for containers without dedicated throughput
container_name = 'testContainer'
container = database.get_container_client(container_name)
CONTAINER_NAME = 'testContainer'
container = database.get_container_client(CONTAINER_NAME)
container_offer = container.read_offer()
print('Found Offer \'{0}\' for Container \'{1}\' and its throughput is \'{2}\''.format(container_offer.properties['id'], container.id, container_offer.properties['content']['offerThroughput']))
```
Expand All @@ -408,13 +412,13 @@ from azure.cosmos import CosmosClient, PartitionKey
import os
import json

url = os.environ['ACCOUNT_URI']
key = os.environ['ACCOUNT_KEY']
client = CosmosClient(url, credential=key)
database_name = 'testDatabase'
database = client.get_database_client(database_name)
container_name = 'products'
container = database.get_container_client(container_name)
URL = os.environ['ACCOUNT_URI']
KEY = os.environ['ACCOUNT_KEY']
client = CosmosClient(URL, credential=KEY)
DATABASE_NAME = 'testDatabase'
database = client.get_database_client(DATABASE_NAME)
CONTAINER_NAME = 'products'
container = database.get_container_client(CONTAINER_NAME)

database.replace_container(
container,
Expand All @@ -428,7 +432,90 @@ print(json.dumps(container_props['defaultTtl']))

For more information on TTL, see [Time to Live for Azure Cosmos DB data][cosmos_ttl].

### Using the asynchronous client

The asynchronous cosmos client is a separate client that looks and works in a similar fashion to the existing synchronous client. However, the async client needs to be imported separately and its methods need to be used with the async/await keywords.

```Python
from azure.cosmos.aio import CosmosClient
import os

URL = os.environ['ACCOUNT_URI']
KEY = os.environ['ACCOUNT_KEY']
client = CosmosClient(URL, credential=KEY)
DATABASE_NAME = 'testDatabase'
database = client.get_database_client(DATABASE_NAME)
CONTAINER_NAME = 'products'
container = database.get_container_client(CONTAINER_NAME)

async def create_items():
for i in range(10):
await container.upsert_item({
'id': 'item{0}'.format(i),
'productName': 'Widget',
'productModel': 'Model {0}'.format(i)
}
)
await client.close() # the async client must be closed manually if it's not initialized in a with statement
```

It is also worth pointing out that the asynchronous client has to be closed manually after its use, either by initializing it using async with or calling the close() method directly like shown above.

```Python
from azure.cosmos.aio import CosmosClient
import os

URL = os.environ['ACCOUNT_URI']
KEY = os.environ['ACCOUNT_KEY']
DATABASE_NAME = 'testDatabase'
CONTAINER_NAME = 'products'

async with CosmosClient(URL, credential=KEY) as client: # the with statement will automatically close the async client
database = client.get_database_client(DATABASE_NAME)
container = database.get_container_client(CONTAINER_NAME)
for i in range(10):
await container.upsert_item({
'id': 'item{0}'.format(i),
'productName': 'Widget',
'productModel': 'Model {0}'.format(i)
}
)
```

### Queries with the asynchronous client

Unlike the synchronous client, the async client does not have an `enable_cross_partition` flag in the request. Queries without a specified partition key value will attempt to do a cross partition query by default.

Query results can be iterated, but the query's raw output returns an asynchronous iterator. This means that each object from the iterator is an awaitable object, and does not yet contain the true query result. In order to obtain the query results you can use an async for loop, which awaits each result as you iterate on the object, or manually await each query result as you iterate over the asynchronous iterator.

Since the query results are an asynchronous iterator, they can't be cast into lists directly; instead, if you need to create lists from your results, use an async for loop or Python's list comprehension to populate a list:

```Python
from azure.cosmos.aio import CosmosClient
import os

URL = os.environ['ACCOUNT_URI']
KEY = os.environ['ACCOUNT_KEY']
client = CosmosClient(URL, credential=KEY)
DATABASE_NAME = 'testDatabase'
database = client.get_database_client(DATABASE_NAME)
CONTAINER_NAME = 'products'
container = database.get_container_client(CONTAINER_NAME)

async def create_lists():
results = container.query_items(
query='SELECT * FROM products p WHERE p.productModel = "Model 2"')

# iterates on "results" iterator to asynchronously create a complete list of the actual query results

item_list = []
async for item in results:
item_list.append(item)

# Asynchronously creates a complete list of the actual query results. This code performs the same action as the for-loop example above.
item_list = [item async for item in results]
await client.close()
```
## Troubleshooting

### General
Expand All @@ -441,7 +528,7 @@ For example, if you try to create a container using an ID (name) that's already

```Python
try:
database.create_container(id=container_name, partition_key=PartitionKey(path="/productName"))
database.create_container(id=CONTAINER_NAME, partition_key=PartitionKey(path="/productName"))
except exceptions.CosmosResourceExistsError:
print("""Error creating container
HTTP status code 409: The ID (name) provided for the container is already in use.
Expand Down Expand Up @@ -471,13 +558,13 @@ handler = logging.StreamHandler(stream=sys.stdout)
logger.addHandler(handler)

# This client will log detailed information about its HTTP sessions, at DEBUG level
client = CosmosClient(url, credential=key, logging_enable=True)
client = CosmosClient(URL, credential=KEY, logging_enable=True)
```

Similarly, `logging_enable` can enable detailed logging for a single operation,
even when it isn't enabled for the client:
```py
database = client.create_database(database_name, logging_enable=True)
database = client.create_database(DATABASE_NAME, logging_enable=True)
```

## Next steps
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# The MIT License (MIT)
# Copyright (c) 2021 Microsoft Corporation

# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:

# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.

# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
Loading

0 comments on commit 014e077

Please sign in to comment.