-
Notifications
You must be signed in to change notification settings - Fork 21
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Play with how a contrib module could look like #312
Conversation
one note for @oeway : note for @constantinpape (to bring you up to speed, because you are probably wondering what the Halloween this is about): The current state of this draft (which should maybe rather live in a new bioimageio.contrib repo...) changes ops to be awaitable and adapts the workflow accordingly. This (and smth like FAQ ;-):
|
this could of course simply run in CI or we don't unparse (and black) and simple compile the updated ast. I just thought (for debugging) it's nice to see the changed Python code as code and not just ast.dump printout. Reasons not to use
|
mixing @oeway can we orchastrate the workflow execution using Considering |
Nope, concurrent.futures won't work for the browser where we don't support multi-threading yet, and asyncio is more efficient.
I think it's much harder than that, different executors may not allow direct data reference, sometimes, it requires them to be pickable for example.
That's possible if we separate the execution and the workflow orchestration. Let's say we allow distributed workers to run ops, then we should go for asyncio only, since we won't run cpu-bound tasks (so no threading needed, which is perfect for the browser too).
Very good point, in fact, you reminded me that the imjoy-rpc is exactly for this purpose. In fact, we even support language-agnostic (to be exact, we have only python and javascript supported so far). In the imjoy/hypha realm, we group the Let's say, if we have the bioimageio.core contrib ops, we can basically export them as a service in a worker: import asyncio
from imjoy_rpc.hypha import connect_to_server
from bioimageio.core.contrib import all_ops
async def start_server(server_url):
server = await connect_to_server({"server_url": server_url})
def hello(name):
print("Hello " + name)
return "Hello " + name
await server.register_service({
"name": "BioImageIO Contrib Module",
"id": "bioimageio-contrib",
"config": {
"visibility": "public",
"run_in_executor": True # This will make sure all the sync functions run in a separate thread
},
"hello": hello,
"ops": all_ops
})
print(f"hello world service regisered at workspace: {server.config.workspace}")
print(f"Test it with the http proxy: {server_url}/{server.config.workspace}/services/bioimageio-contrib/hello?name=John")
if __name__ == "__main__":
server_url = "http://localhost:9000"
loop = asyncio.get_event_loop()
loop.create_task(start_server(server_url))
loop.run_forever() Now to create a workflow with the contrib ops, we can run it like this: import asyncio
from imjoy_rpc.hypha import connect_to_server
async def main():
server = await connect_to_server({"server_url": "http://localhost:9000"})
# get an existing service
# since bioimageio-contrib is registered as a public service, we can access it with only the name "bioimageio-contrib"
contrib = await server.get_service("bioimageio-contrib")
ret = await contrib.hello("John")
print(ret)
# sequential execution
result1 = await contrib.ops.run_model("affable-shark-1", ...)
result2 = await contrib.ops.run_model("affable-shark-2", ...)
# parallel execution
p1 = contrib.ops.run_model("affable-shark-1", ...)
p2 = contrib.ops.run_model("affable-shark-2", ...)
result1, result2 = await asyncio.gather(p1, p2)
# Interact with the BioEngine [working code]
triton = await server.get_service("triton-client")
results = await triton.execute(inputs=[image, {'diameter': 30}], model_name='cellpose-python', decode_json=True)
mask = results['mask'][0]
...
asyncio.run(main()) Here you can find more details for trying the hypha server: https://ha.amun.ai/ and live demos: https://slides.imjoy.io/?slides=https://raw.githubusercontent.com/oeway/slides/master/2022/i2k-2022-hypha-introduction.md#/7 For custom io, here you can find an example on how we can encode the itkImage for imjoy-rpc: https://github.com/InsightSoftwareConsortium/itkwidgets/blob/main/itkwidgets/imjoy.py#L48-L49 |
FYI: If we going for the route of hypha/imjoy-rpc, we don't need much effort to develop it, since we have it already supported in the bioimage.io website and also the BioEngine ;) We need however, to standardize it and create documentation for it. |
great example snippets to play with :-) |
when trying to start a serve I get this error:
|
is |
I guess I found how: https://github.com/imjoy-team/hypha#usage |
with the hypha server running I now get this error when trying to connect
nvm: you just forgot an |
@FynnBe I just tried it with a new conda env and I cannot reproduce the issue, it all works for me. Could you please try it again by following the example here: https://ha.amun.ai/ (Note: You don't need the server-apps for now.)
Alternatively, you can use our public server by replacing Once you have the server running or changed the url to the public server, in another terminal, start the worker:
Now the workflow client:
Nope, you don't have to define function as async, it will convert it automatically. But when you call the function, you need to treat it as async function since it's a remote function, always. |
Thanks @oeway !
Interesting, I did remove the |
I've been trying to transfer dask arrays/zarr arrays, so I wanted to add
Any idea what's going wrong? |
I guess that's a bug, because we changed the function name to register_codec() and forgot to update the utils function for hypha. I think you can just do something like: ''' ''' And you can find the encode_zarr_store function here: https://github.com/imjoy-team/imjoy-rpc/blob/645a7f1450fe3af65cd9746c6a2d08ed05d2bf83/python/imjoy_rpc/hypha/utils.py#L238 |
still doesn't work for me though...
conda list
(another) conda list
|
Hi, sorry for the confusion, we are in the process of migrating from imjoy-rpc to hypha, so you should import stuff from imjoy_rpc.hypha. And here the usage of the register_codecs function has changed too, you can only call than when you connected to the server. server = await connect_to_server({"server_url": server_url})
# Register the codecs
server.register_codec(
{"name": "zarr-group", "type": zarr.Group, "encoder": encode_zarr_store}
)
z = zarr.array(np.arange(100))
# Use the echo function to do a round-trip with the zarr object
# Note: Since we didn't create a decoder, so we won't get the zarr object, but a zarr store interface
z2 = await server.echo(z)
print(z2) For more detailed usage for the register_codecs, see here: https://github.com/imjoy-team/imjoy-rpc#encoding-and-decoding-custom-objects Here is a complete implementation: Worker with zarr codecsimport asyncio
from imjoy_rpc.hypha import connect_to_server
# from bioimageio.core.contrib import all_ops
import zarr
import numpy as np
def encode_zarr_store(zobj):
"""Encode the zarr store."""
import zarr
path_prefix = f"{zobj.path}/" if zobj.path else ""
def getItem(key, options=None):
return zobj.store[path_prefix + key]
def setItem(key, value):
zobj.store[path_prefix + key] = value
def containsItem(key, options=None):
if path_prefix + key in zobj.store:
return True
return {
"_rintf": True,
"_rtype": "zarr-array" if isinstance(zobj, zarr.Array) else "zarr-group",
"getItem": getItem,
"setItem": setItem,
"containsItem": containsItem,
}
def register_codecs(server):
"""Register default codecs."""
server.register_codec(
{"name": "zarr-array", "type": zarr.Array, "encoder": encode_zarr_store}
)
server.register_codec(
{"name": "zarr-group", "type": zarr.Group, "encoder": encode_zarr_store}
)
async def start_server(server_url):
server = await connect_to_server({"server_url": server_url})
# Register the codecs
register_codecs(server)
z = zarr.array(np.arange(100))
# Use the echo function to do a round-trip with the zarr object
# Note: Since we didn't create a decoder, so we won't get the zarr object, but a zarr store interface
z2 = await server.echo(z)
print(z2)
def hello(name):
print("Hello " + name)
return "Hello " + name
await server.register_service({
"name": "BioImageIO Contrib Module",
"id": "bioimageio-contrib",
"config": {
"visibility": "public",
"run_in_executor": True # This will make sure all the sync functions run in a separate thread
},
"hello": hello,
# "ops": all_ops
})
print(f"hello world service regisered at workspace: {server.config.workspace}")
print(f"Test it with the http proxy: {server_url}/{server.config.workspace}/services/bioimageio-contrib/hello?name=John")
if __name__ == "__main__":
server_url = "https://ai.imjoy.io"
loop = asyncio.get_event_loop()
loop.create_task(start_server(server_url))
loop.run_forever() |
FYI: I updated the imjoy-rpc docs for the new version (now called v2) here: https://github.com/imjoy-team/imjoy-rpc/blob/master/imjoy-rpc-v2.md |
@oeway How about decoding? Ultimately in this context I'm interested in xarray.DataArray (has either numpy.ndarray or dask.array.DataArray as data storage). The most elegant would probably to go through fsspec, which both dask and zarr are compatible with. They also support an https interface. https://filesystem-spec.readthedocs.io/en/latest/api.html#built-in-implementations What would your approach to encoding dask/zarr be (and why is it not (yet?) included in the examples 😇 )? |
This dummy encoding/decoding for zarr works, but is not asynchronous...import asyncio
from collections.abc import MutableMapping
from typing import Literal, TypedDict
import numpy as np
import zarr
class IZarr(TypedDict):
_rintf: bool
_rtype: Literal["zarr-array", "zarr_group"]
getItem: callable
setItem: callable
containsItem: callable
def encode_zarr_store(zobj) -> IZarr:
"""Encode the zarr store."""
path_prefix = f"{zobj.path}/" if zobj.path else ""
def getItem(key, options=None):
return zobj.store[path_prefix + key]
def setItem(key, value):
zobj.store[path_prefix + key] = value
def containsItem(key, options=None):
return path_prefix + key in zobj.store
return dict(
_rintf=True,
_rtype="zarr-array" if isinstance(zobj, zarr.Array) else "zarr-group",
getItem=getItem,
setItem=setItem,
containsItem=containsItem,
)
class RPC_Store(MutableMapping):
def __init__(self, get_item, set_item, contains_item): # , fill_value, nchunks: int):
super().__init__()
self.get_item = get_item
self.set_item = set_item
self.contains = contains_item
def __getitem__(self, key):
return self.get_item(key)
def __setitem__(self, key, value):
self.set_item(key, value)
def __contains__(self, key):
return self.contains(key)
def __delitem__(self, key):
raise NotImplementedError
self[key] = self.fill_value
def __iter__(self):
raise NotImplementedError
def __len__(self):
raise NotImplementedError
return self.nchunks
def decode_zarr_store(iz: IZarr):
return zarr.convenience.open(
RPC_Store(
get_item=iz["getItem"],
set_item=iz["setItem"],
contains_item=iz["containsItem"],
# fill_value=iz["fill_value"],
# nchunks=iz["nchunks"],
)
)
if __name__ == "__main__":
z = zarr.array(np.arange(100))
e = encode_zarr_store(z)
z2 = decode_zarr_store(e)
print(z2.shape)
print(z2[:10])
print(z2[10]) with for now I'll hack it to run synchronously, blocking the event loop. But this could be optimized. |
While I found working options for calling async functions synchronously (e.g. unsync), these don't work nested, i.e. within another async function, which we would need here... |
For decoding, we need to convert the async functions into sync so Zarr can work with it. This can be achieved by using threading(something similar has been implemented in fsspec too). Or, we can potentially use azarr, see martindurant/async-zarr#1 |
Hi @FynnBe If you want to just encode/decode xarray, you can already do it without any async conversion. async conversion is only needed when we want to do lazy encoding (meaning the encoded object contains functions). Here you can find a working example for encoding/decoding xarray: import asyncio
from imjoy_rpc.hypha import connect_to_server
import xarray as xr
import numpy as np
def encode_xarray(obj):
"""Encode the zarr store."""
assert isinstance(obj, xr.DataArray)
return {
"_rintf": True,
"_rtype": "xarray",
"data": obj.data,
"dims": obj.dims,
"attrs": obj.attrs,
"name": obj.name,
}
def decode_xarray(obj):
assert obj["_rtype"] == "xarray"
return xr.DataArray(
data=obj["data"],
dims=obj["dims"],
attrs=obj.get("attrs", {}),
name=obj.get("name", None),
)
async def start_server(server_url):
server = await connect_to_server({"server_url": server_url})
# Register the codecs
server.register_codec(
{"name": "xarray", "type": xr.DataArray, "encoder": encode_xarray, "decoder": decode_xarray}
)
z = xr.DataArray(data=np.arange(100), dims=["x"], attrs={"test": "test"}, name="mydata")
# Use the echo function to do a round-trip with the xarray object
z2 = await server.echo(z)
assert isinstance(z2, xr.DataArray)
assert z2.attrs["test"] == "test"
assert z2.dims == ("x",)
assert z2.data[0] == 0
assert z2.data[99] == 99
assert z2.name == "mydata"
print("Success!")
if __name__ == "__main__":
server_url = "https://ai.imjoy.io"
loop = asyncio.get_event_loop()
loop.create_task(start_server(server_url))
loop.run_forever() Also see here: https://github.com/imjoy-team/imjoy-rpc/blob/master/imjoy-rpc-v2.md#example-1-encode-and-decode-xarray |
I saw that, but I would like to decode lazy xarrays with a dask array as their data property... |
closed in favor of new repo: https://github.com/bioimage-io/workflows-bioimage-io-python |
as discussed today with @oeway we could 'compile' operator and workflow functions to use asyncio, etc...
With
ast
we can also validate (with useful error messages) user code wrt to other criteria, e.g. third party imports.This example does not substitute an op call with another function atm.. (as discussed) But this is of course also possible.
Note that
compiled_ops.py
andcompiled_wfs.py
are 100% generated.Dependencies for the
compile.py
script are: Python 3.9 (forast.unparse
andblack
)