Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add async support #76

Closed
wants to merge 3 commits into from
Closed

Conversation

creatorrr
Copy link

Closes #74

Example:

In [2]: import replicate

In [3]: model = replicate.models.get("creatorrr/instructor-large")

In [4]: version = await model.versions.get_async("bd2701dac1aea9d598bda71e6ae56b204287c0
   ...: a79e2cadf96b1393127d044495")

In [5]: inputs = {
   ...:     # Text to embed
   ...:     'text': "Hello world! How are you doing?",
   ...:
   ...:     # Embedding instruction
   ...:     'instruction': "Represent the following text",
   ...: }

In [6]: output = await version.predict_async(**inputs)

In [7]: output
Out[7]:
{'result': [[-0.008334644138813019, ... ]]}

Signed-off-by: Diwank Singh Tomer <diwank.singh@gmail.com>
@proksi21
Copy link

proksi21 commented Mar 6, 2023

hey guys, approve it please, we need async api

@creatorrr
Copy link
Author

@bfirsh request for a review

@ZentixUA
Copy link

ZentixUA commented Mar 24, 2023

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/httpx/_transports/default.py", line 60, in map_httpcore_exceptions
    yield
  File "/usr/local/lib/python3.10/dist-packages/httpx/_transports/default.py", line 353, in handle_async_request
    resp = await self._pool.handle_async_request(req)
  File "/usr/local/lib/python3.10/dist-packages/httpcore/_async/connection_pool.py", line 253, in handle_async_request
    raise exc
  File "/usr/local/lib/python3.10/dist-packages/httpcore/_async/connection_pool.py", line 237, in handle_async_request
    response = await connection.handle_async_request(request)
  File "/usr/local/lib/python3.10/dist-packages/httpcore/_async/connection.py", line 90, in handle_async_request
    return await self._connection.handle_async_request(request)
  File "/usr/local/lib/python3.10/dist-packages/httpcore/_async/http11.py", line 112, in handle_async_request
    raise exc
  File "/usr/local/lib/python3.10/dist-packages/httpcore/_async/http11.py", line 91, in handle_async_request
    ) = await self._receive_response_headers(**kwargs)
  File "/usr/local/lib/python3.10/dist-packages/httpcore/_async/http11.py", line 155, in _receive_response_headers
    event = await self._receive_event(timeout=timeout)
  File "/usr/local/lib/python3.10/dist-packages/httpcore/_async/http11.py", line 191, in _receive_event
    data = await self._network_stream.read(
  File "/usr/local/lib/python3.10/dist-packages/httpcore/backends/asyncio.py", line 31, in read
    with map_exceptions(exc_map):
  File "/usr/lib/python3.10/contextlib.py", line 153, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/usr/local/lib/python3.10/dist-packages/httpcore/_exceptions.py", line 14, in map_exceptions
    raise to_exc(exc)
httpcore.ReadError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/host/servers/bots/adventgpt/main.py", line 437, in improve
    output = await improve_ver.predict_async(**inputs)
  File "/home/host/servers/bots/adventgpt/replicate-python/replicate/version.py", line 37, in predict_async
    prediction = await self._client.predictions.create_async(version=self, input=kwargs)
  File "/home/host/servers/bots/adventgpt/replicate-python/replicate/prediction.py", line 144, in create_async
    resp = await self._client._request_async(
  File "/home/host/servers/bots/adventgpt/replicate-python/replicate/client.py", line 101, in _request_async
    resp = await client.request(method, self.base_url + path, **kwargs)
  File "/usr/local/lib/python3.10/dist-packages/httpx/_client.py", line 1533, in request
    return await self.send(request, auth=auth, follow_redirects=follow_redirects)
  File "/usr/local/lib/python3.10/dist-packages/httpx/_client.py", line 1620, in send
    response = await self._send_handling_auth(
  File "/usr/local/lib/python3.10/dist-packages/httpx/_client.py", line 1648, in _send_handling_auth
    response = await self._send_handling_redirects(
  File "/usr/local/lib/python3.10/dist-packages/httpx/_client.py", line 1685, in _send_handling_redirects
    response = await self._send_single_request(request)
  File "/usr/local/lib/python3.10/dist-packages/httpx/_client.py", line 1722, in _send_single_request
    response = await transport.handle_async_request(request)
  File "/usr/local/lib/python3.10/dist-packages/httpx/_transports/default.py", line 352, in handle_async_request
    with map_httpcore_exceptions():
  File "/usr/lib/python3.10/contextlib.py", line 153, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/usr/local/lib/python3.10/dist-packages/httpx/_transports/default.py", line 77, in map_httpcore_exceptions
    raise mapped_exc(message) from exc
httpx.ReadError

At the moment, even 2 tasks at the same time, it can not withstand.

I'm not sure what I'm saying, so I'm attaching my code:

CODE:

os.environ[str("REPLICATE_API_TOKEN")] = "..."
...

async def replicate_load():
    global improve_ver
    improve_model = replicate.models.get("xinntao/realesrgan")
    improve_ver = await improve_model.versions.get_async("1b976a4d456ed9e4d1a846597b7614e79eadad3032e9124fa63859db0fd59b56")

@bot.event
async def on_ready():
    await replicate_load()
    
async def....
    try:
        inputs = {
            # Input
            'img': f'{attachment.url}',

            'version': "General - RealESRGANplus",

            'scale': 8,

            'face_enhance': True,

            'tile': 0,
        }

        output = await improve_ver.predict_async(**inputs)
        embed = discord.Embed(title="Photo:", color=discord.Colour.blurple()).set_image(url=output)
        await msg.edit(embed=embed)
    except Exception as e:
        print(traceback.format_exc())

@ZentixUA
Copy link

I reinstalled all dependencies - the problem remained

Signed-off-by: Diwank Singh Tomer <diwank.singh@gmail.com>
@creatorrr
Copy link
Author

@GenifeG can you please share a github gist to reproduce the error? I wrote a simple test and that seems to be passing

@ZentixUA
Copy link

At the moment I can't.

Please use the model from my code in the test, and run the function twice with an interval of, for example, 2 seconds (when the first one is not yet ready)

@ZentixUA
Copy link

@creatorrr

import asyncio
import os
import traceback
import replicate

os.environ[str("REPLICATE_API_TOKEN")] = "YOUR_API_KEY"


async def replicate_load():
    global improve_ver
    improve_model = replicate.models.get("xinntao/realesrgan")
    improve_ver = await improve_model.versions.get_async(
        "1b976a4d456ed9e4d1a846597b7614e79eadad3032e9124fa63859db0fd59b56")


asyncio.run(replicate_load())


async def process():
    await asyncio.sleep(3)
    print(66)
    try:
        inputs = {
            # Input
            'img': f'https://media.discordapp.net/attachments/640960911204548648/1089223567746551889/image.png',

            'version': "General - RealESRGANplus",

            'scale': 3,
            'face_enhance': True,
            'tile': 0,
        }

        output = await improve_ver.predict_async(**inputs)
        print(output)
    except Exception:
        print(traceback.format_exc())


async def main():
    tasks = []
    for i in range(2):
        tasks.append(asyncio.create_task(process()))
        await asyncio.sleep(2)
    await asyncio.gather(*tasks)


asyncio.run(main())

Use it for test. Thanks!

@ZentixUA
Copy link

Any changes?

@issafuad
Copy link

Hi is this going to be merged soon ? could really use the async support . many thanks

@haoming29
Copy link

Hey @creatorrr Any updates to this PR? Is it idle? Many thanks.

@creatorrr
Copy link
Author

@Techming idle unfortunately. I'm not using this lib anymore so not able to find time to update this. Any takers?

@mattt
Copy link
Contributor

mattt commented Sep 28, 2023

Thanks for your contribution, @creatorrr, and to @Techming and everyone else for your interest. I apologize for not responding sooner to this.

I have an alternative implementation for async support in the async branch. It's effectively a rewrite of the current library, so I'm working to incrementally pull in functionality to minimize breaking changes. The first step towards that is #147, which replaces requests with httpx, which has async support.

Please take a look and let me know what you think. I hope to get this all wrapped up soon.

@haoming29
Copy link

Thanks @creatorrr for your previous work! and thank you @mattt for the updates and taking over. It seems that #147 has been ready for review for a couple of weeks and I'm wondering if there will be new features added to that PR or if we are simply waiting for review and approval. Thanks.

@mattt mattt mentioned this pull request Nov 8, 2023
mattt added a commit that referenced this pull request Nov 9, 2023
This PR adds support for async operations to the Replicate client.
Namespace operations like `predictions.list` and `models.create` now
have async variants with the `async_` prefix (`predictions.async_list`
and `models.async_create`).

Here's an example of what that looks like in practice:

```python
import replicate

model = await replicate.models.async_get("stability-ai/sdxl")

input = {
  "prompt": "A chariot pulled by a team of rainbow unicorns, driven by an astronaut, dramatic lighting",
}

output = await replicate.async_run(f"stability-ai/sdxl:{model.latest_version.id}", input)
```

<details>

<summary>Output</summary>

<img
src="https://github.com/replicate/replicate-python/assets/7659/6927f8b4-5f92-495d-a87c-135f31aa1847"/>


</details>

One of the most common questions I hear is how to run a bunch of
predictions in parallel. The async functionality provided by this PR
makes it really straightforward:

```python
import asyncio
import replicate

# https://replicate.com/stability-ai/sdxl
model_version = "stability-ai/sdxl:39ed52f2a78e934b3ba6e2a89f5b1c712de7dfea535525255b1aa35c5565e08b"
prompts = [
    f"A chariot pulled by a team of {count} rainbow unicorns"
    for count in ["two", "four", "six", "eight"]
]

async with asyncio.TaskGroup() as tg:
    tasks = [
        tg.create_task(replicate.async_run(model_version, input={"prompt": prompt}))
        for prompt in prompts
    ]

results = await asyncio.gather(*tasks)
print(results)
```

Under the hood, `Client` manages an `httpx.Client` and an
`httpx.AsyncClient`, which handle calls to `_request` and
`_async_request`, respectively. Both are created lazily, so API
consumers using only sync or only async functionality won't be affected
by functionality they aren't using.

Implementation-wise, sync and async variants have separate code paths.
This creates nontrivial amounts of duplication, but its benefits to
clarity and performance justify those costs. For instance, it'd have
been nice if the sync variant were implemented as a blocking call to the
async variant, but that would require starting an event loop, which has
additional overhead and causes problems if done within an existing event
loop.

Alternative to #76
Resolves #145
Resolves #107
Resolves #74

---------

Signed-off-by: Mattt Zmuda <mattt@replicate.com>
@mattt
Copy link
Contributor

mattt commented Nov 9, 2023

Hi @creatorrr. Following up on #76 (comment)

I just merged #193, which implements async support. One notable difference in this API is that the naming convention uses async as a method name prefix, rather than a suffix. Otherwise, it should be a drop-in replacement for your fork or existing synchronous usage.

Thank you again for contributing this PR. Please let me know if you see any ways we can improve our implementation of async.

@mattt mattt closed this Nov 9, 2023
Chris000102 added a commit to Chris000102/replicate-python that referenced this pull request May 3, 2024
This PR adds support for async operations to the Replicate client.
Namespace operations like `predictions.list` and `models.create` now
have async variants with the `async_` prefix (`predictions.async_list`
and `models.async_create`).

Here's an example of what that looks like in practice:

```python
import replicate

model = await replicate.models.async_get("stability-ai/sdxl")

input = {
  "prompt": "A chariot pulled by a team of rainbow unicorns, driven by an astronaut, dramatic lighting",
}

output = await replicate.async_run(f"stability-ai/sdxl:{model.latest_version.id}", input)
```

<details>

<summary>Output</summary>

<img
src="https://github.com/replicate/replicate-python/assets/7659/6927f8b4-5f92-495d-a87c-135f31aa1847"/>


</details>

One of the most common questions I hear is how to run a bunch of
predictions in parallel. The async functionality provided by this PR
makes it really straightforward:

```python
import asyncio
import replicate

# https://replicate.com/stability-ai/sdxl
model_version = "stability-ai/sdxl:39ed52f2a78e934b3ba6e2a89f5b1c712de7dfea535525255b1aa35c5565e08b"
prompts = [
    f"A chariot pulled by a team of {count} rainbow unicorns"
    for count in ["two", "four", "six", "eight"]
]

async with asyncio.TaskGroup() as tg:
    tasks = [
        tg.create_task(replicate.async_run(model_version, input={"prompt": prompt}))
        for prompt in prompts
    ]

results = await asyncio.gather(*tasks)
print(results)
```

Under the hood, `Client` manages an `httpx.Client` and an
`httpx.AsyncClient`, which handle calls to `_request` and
`_async_request`, respectively. Both are created lazily, so API
consumers using only sync or only async functionality won't be affected
by functionality they aren't using.

Implementation-wise, sync and async variants have separate code paths.
This creates nontrivial amounts of duplication, but its benefits to
clarity and performance justify those costs. For instance, it'd have
been nice if the sync variant were implemented as a blocking call to the
async variant, but that would require starting an event loop, which has
additional overhead and causes problems if done within an existing event
loop.

Alternative to replicate/replicate-python#76
Resolves replicate/replicate-python#145
Resolves replicate/replicate-python#107
Resolves replicate/replicate-python#74

---------

Signed-off-by: Mattt Zmuda <mattt@replicate.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Request: Add async support
6 participants