Skip to content

Commit

Permalink
Added async samples.
Browse files Browse the repository at this point in the history
Signed-off-by: dblock <dblock@amazon.com>
  • Loading branch information
dblock committed Jul 24, 2023
1 parent cf856e0 commit 94ff485
Show file tree
Hide file tree
Showing 7 changed files with 829 additions and 9 deletions.
9 changes: 3 additions & 6 deletions USER_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,7 @@ Then import it like any other module:
from opensearchpy import OpenSearch
```

For better performance we recommend the async client. To add the async client to your project, install it using [pip](https://pip.pypa.io/):

```bash
pip install opensearch-py[async]
```
For better performance we recommend the async client. See [Asynchronous I/O](guides/async.md) for more information.

In general, we recommend using a package manager, such as [poetry](https://python-poetry.org/docs/), for your projects. This is the package manager used for [samples](samples).

Expand Down Expand Up @@ -61,7 +57,7 @@ info = client.info()
print(f"Welcome to {info['version']['distribution']} {info['version']['number']}!")
```

See [hello.py](samples/hello/hello.py) for a working sample, and [guides/ssl](guides/ssl.md) for how to setup SSL certificates.
See [hello.py](samples/hello/hello.py) for a working synchronous sample, and [guides/ssl](guides/ssl.md) for how to setup SSL certificates.

### Creating an Index

Expand Down Expand Up @@ -148,6 +144,7 @@ print(response)

## Advanced Features

- [Asynchronous I/O](guides/async.md)
- [Authentication (IAM, SigV4)](guides/auth.md)
- [Configuring SSL](guides/ssl.md)
- [Bulk Indexing](guides/bulk.md)
Expand Down
152 changes: 152 additions & 0 deletions guides/async.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
- [Asynchronous I/O](#asynchronous-io)
- [Setup](#setup)
- [Async Loop](#async-loop)
- [Connect to OpenSearch](#connect-to-opensearch)
- [Create an Index](#create-an-index)
- [Index Documents](#index-documents)
- [Refresh the Index](#refresh-the-index)
- [Search](#search)
- [Delete Documents](#delete-documents)
- [Delete the Index](#delete-the-index)

# Asynchronous I/O

This client supports asynchronous I/O that improves performance and increases throughput. See [hello-async.py](../samples/hello/hello-async.py) or [knn-async-basics.py](../samples/knn/knn-async-basics.py) for a working asynchronous sample.

## Setup

To add the async client to your project, install it using [pip](https://pip.pypa.io/):

```bash
pip install opensearch-py[async]
```

In general, we recommend using a package manager, such as [poetry](https://python-poetry.org/docs/), for your projects. This is the package manager used for [samples](../samples). The following example includes `opensearch-py[async]` in `pyproject.toml`.

```toml
[tool.poetry.dependencies]
opensearch-py = { path = "../", extras=["async"] }
```

## Async Loop

```python
import asyncio

async def main():
client = AsyncOpenSearch(...)
try:
# your code here
finally:
client.close()

if __name__ == "__main__":
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(main())
loop.close()
```

## Connect to OpenSearch

```python
host = 'localhost'
port = 9200
auth = ('admin', 'admin') # For testing only. Don't store credentials in code.

client = AsyncOpenSearch(
hosts = [{'host': host, 'port': port}],
http_auth = auth,
use_ssl = True,
verify_certs = False,
ssl_show_warn = False
)

info = await client.info()
print(f"Welcome to {info['version']['distribution']} {info['version']['number']}!")
```

## Create an Index

```python
index_name = 'test-index'

index_body = {
'settings': {
'index': {
'number_of_shards': 4
}
}
}

if not await client.indices.exists(index=index_name):
await client.indices.create(
index_name,
body=index_body
)
```

## Index Documents

```python
await asyncio.gather(*[
client.index(
index = index_name,
body = {
'title': f"Moneyball {i}",
'director': 'Bennett Miller',
'year': '2011'
},
id = i
) for i in range(10)
])
```

## Refresh the Index

```python
await client.indices.refresh(index=index_name)
```

## Search

```python
q = 'miller'

query = {
'size': 5,
'query': {
'multi_match': {
'query': q,
'fields': ['title^2', 'director']
}
}
}

results = await client.search(
body = query,
index = index_name
)

for hit in results["hits"]["hits"]:
print(hit)
```

## Delete Documents

```python
await asyncio.gather(*[
client.delete(
index = index_name,
id = i
) for i in range(10)
])
```

## Delete the Index

```python
await client.indices.delete(
index = index_name
)
```
2 changes: 1 addition & 1 deletion guides/plugins/knn.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ Short for k-nearest neighbors, the k-NN plugin enables users to search for the k

## Basic Approximate k-NN

In the following example we create a 5-dimensional k-NN index with random data. You can find this working sample in [samples/knn/knn-basics.py](../../samples/knn/knn-basics.py).
In the following example we create a 5-dimensional k-NN index with random data. You can find a synchronous version of this working sample in [samples/knn/knn-basics.py](../../samples/knn/knn-basics.py) and an asynchronous one in [samples/knn/knn-async-basics.py](../../samples/knn/knn-async-basics.py).

```bash
$ poetry run knn/knn-basics.py
Expand Down
107 changes: 107 additions & 0 deletions samples/hello/hello-async.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
#!/usr/bin/env python

# SPDX-License-Identifier: Apache-2.0
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.

import asyncio

from opensearchpy import AsyncOpenSearch

async def main():
# connect to OpenSearch
host = 'localhost'
port = 9200
auth = ('admin', 'admin') # For testing only. Don't store credentials in code.

client = AsyncOpenSearch(
hosts = [{'host': host, 'port': port}],
http_auth = auth,
use_ssl = True,
verify_certs = False,
ssl_show_warn = False
)

try:
info = await client.info()
print(f"Welcome to {info['version']['distribution']} {info['version']['number']}!")

# create an index

index_name = 'test-index'

index_body = {
'settings': {
'index': {
'number_of_shards': 4
}
}
}

if not await client.indices.exists(index=index_name):
await client.indices.create(
index_name,
body=index_body
)

# add some documents to the index, asynchronously
await asyncio.gather(*[
client.index(
index = index_name,
body = {
'title': f"Moneyball {i}",
'director': 'Bennett Miller',
'year': '2011'
},
id = i
) for i in range(10)
])

# refresh the index
await client.indices.refresh(index=index_name)

# search for a document
q = 'miller'

query = {
'size': 5,
'query': {
'multi_match': {
'query': q,
'fields': ['title^2', 'director']
}
}
}

results = await client.search(
body = query,
index = index_name
)

for hit in results["hits"]["hits"]:
print(hit)

# delete the documents
await asyncio.gather(*[
client.delete(
index = index_name,
id = i
) for i in range(10)
])

# delete the index
await client.indices.delete(
index = index_name
)

finally:
await client.close()

if __name__ == "__main__":
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(main())
loop.close()

93 changes: 93 additions & 0 deletions samples/knn/knn-async-basics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
#!/usr/bin/env python

# SPDX-License-Identifier: Apache-2.0
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.

import asyncio
import os
import random

from opensearchpy import AsyncOpenSearch, AsyncHttpConnection, helpers

async def main():
# connect to an instance of OpenSearch
host = os.getenv('HOST', default='localhost')
port = int(os.getenv('PORT', 9200))
auth = (
os.getenv('USERNAME', 'admin'),
os.getenv('PASSWORD', 'admin')
)

client = AsyncOpenSearch(
hosts = [{'host': host, 'port': port}],
http_auth = auth,
use_ssl = True,
verify_certs = False,
connection_class=AsyncHttpConnection,
ssl_show_warn = False
)

# check whether an index exists
index_name = "my-index"
dimensions = 5

if not await client.indices.exists(index_name):
await client.indices.create(index_name,
body={
"settings":{
"index.knn": True
},
"mappings":{
"properties": {
"values": {
"type": "knn_vector",
"dimension": dimensions
},
}
}
}
)

# index data
vectors = []
for i in range(10):
vec = []
for j in range(dimensions):
vec.append(round(random.uniform(0, 1), 2))

vectors.append({
"_index": index_name,
"_id": i,
"values": vec,
})

# bulk index
await helpers.async_bulk(client, vectors)

await client.indices.refresh(index=index_name)

# search
vec = []
for j in range(dimensions):
vec.append(round(random.uniform(0, 1), 2))
print(f"Searching for {vec} ...")

search_query = {"query": {"knn": {"values": {"vector": vec, "k": 3}}}}
results = await client.search(index=index_name, body=search_query)
for hit in results["hits"]["hits"]:
print(hit)

# delete index
await client.indices.delete(index=index_name)

await client.close()

if __name__ == "__main__":
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(main())
loop.close()

Loading

0 comments on commit 94ff485

Please sign in to comment.