Skip to content

Commit

Permalink
Feature: Run Aleph VMs in reaction to Aleph messages
Browse files Browse the repository at this point in the history
  • Loading branch information
hoh committed Sep 27, 2021
1 parent c81647a commit b7f1317
Show file tree
Hide file tree
Showing 10 changed files with 245 additions and 12 deletions.
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,8 @@ __pycache__
/pydantic/
node_modules
*.squashfs
/examples/example_http_rust/target/
/examples/example_django/static/admin/
/runtimes/aleph-debian-11-python/rootfs/
/packaging/aleph-vm/opt/
/packaging/target/
21 changes: 20 additions & 1 deletion examples/example_fastapi_2/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@
from aleph_client.asynchronous import get_messages, create_post
from aleph_client.chains.remote import RemoteAccount
from aleph_client.vm.cache import VmCache
from aleph_client.vm.app import AlephApp

logger.debug("import fastapi")
from fastapi import FastAPI
logger.debug("imports done")

app = FastAPI()
http_app = FastAPI()
app = AlephApp(http_app=http_app)
cache = VmCache()


Expand Down Expand Up @@ -129,3 +131,20 @@ class Data(BaseModel):
@app.post("/post")
async def receive_post(data: Data):
return str(data)


filters = [{
# "sender": "0xB31B787AdA86c6067701d4C0A250c89C7f1f29A5",
"channel": "TEST"
}],

@app.event(filters=filters)
async def aleph_event(event):
print("aleph_event", event)
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector()) as session:
async with session.get("https://api2.aleph.im/api/v0/info/public.json") as resp:
print('RESP', resp)
resp.raise_for_status()
return {
"result": "Good"
}
15 changes: 13 additions & 2 deletions examples/message_from_aleph.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,18 @@
"use_latest": false
},
"on": {
"http": true
"http": true,
"message": [
{
"sender": "0xb5F010860b0964090d5414406273E6b3A8726E96",
"channel": "TEST"
},
{
"content": {
"ref": "4d4db19afca380fdf06ba7f916153d0f740db9de9eee23ad26ba96a90d8a2920"
}
}
]
},
"environment": {
"reproducible": true,
Expand Down Expand Up @@ -64,7 +75,7 @@
"replaces": "0x9319Ad3B7A8E0eE24f2E639c40D8eD124C5520Ba",
"time": 1619017773.8950517
},
"item_content": "{\"type\": \"vm-function\", \"address\": \"0x9319Ad3B7A8E0eE24f2E639c40D8eD124C5520Ba\", \"allow_amend\": false, \"code\": {\"encoding\": \"squashfs\", \"entrypoint\": \"__init__:app\", \"ref\": \"7eb2eca2378ea8855336ed76c8b26219f1cb90234d04441de9cf8cb1c649d003\", \"use_latest\": false}, \"on\": {\"http\": true}, \"environment\": {\"reproducible\": true, \"internet\": true, \"aleph_api\": true, \"shared_cache\": false}, \"resources\": {\"vcpus\": 1, \"memory\": 128, \"seconds\": 30}, \"runtime\": {\"ref\": \"5f31b0706f59404fad3d0bff97ef89ddf24da4761608ea0646329362c662ba51\", \"use_latest\": false, \"comment\": \"Aleph Alpine Linux with Python 3.8\"}, \"volumes\": [{\"mount\": \"/opt/venv\", \"ref\": \"5f31b0706f59404fad3d0bff97ef89ddf24da4761608ea0646329362c662ba51\", \"use_latest\": false}, {\"comment\": \"Working data persisted on the VM supervisor, not available on other nodes\", \"mount\": \"/var/lib/sqlite\", \"name\": \"database\", \"persistence\": \"host\"}], \"data\": {\"encoding\": \"zip\", \"mount\": \"/data\", \"ref\": \"7eb2eca2378ea8855336ed76c8b26219f1cb90234d04441de9cf8cb1c649d003\", \"use_latest\": false}, \"export\": {\"encoding\": \"zip\", \"mount\": \"/data\"}, \"replaces\": \"0x9319Ad3B7A8E0eE24f2E639c40D8eD124C5520Ba\", \"time\": 1619017773.8950517}",
"item_content": "{\"type\": \"vm-function\", \"address\": \"0x9319Ad3B7A8E0eE24f2E639c40D8eD124C5520Ba\", \"allow_amend\": false, \"code\": {\"encoding\": \"squashfs\", \"entrypoint\": \"main:app\", \"ref\": \"7eb2eca2378ea8855336ed76c8b26219f1cb90234d04441de9cf8cb1c649d003\", \"use_latest\": false}, \"on\": {\"http\": true, \"message\": [{\"sender\": \"0xB31B787AdA86c6067701d4C0A250c89C7f1f29A5\", \"channel\": \"TEST\"}, {\"content\": {\"ref\": \"4d4db19afca380fdf06ba7f916153d0f740db9de9eee23ad26ba96a90d8a2920\"}}]}, \"environment\": {\"reproducible\": true, \"internet\": true, \"aleph_api\": true, \"shared_cache\": false}, \"resources\": {\"vcpus\": 1, \"memory\": 128, \"seconds\": 30}, \"runtime\": {\"ref\": \"5f31b0706f59404fad3d0bff97ef89ddf24da4761608ea0646329362c662ba51\", \"use_latest\": false, \"comment\": \"Aleph Alpine Linux with Python 3.8\"}, \"volumes\": [{\"mount\": \"/opt/venv\", \"ref\": \"5f31b0706f59404fad3d0bff97ef89ddf24da4761608ea0646329362c662ba51\", \"use_latest\": false}, {\"comment\": \"Working data persisted on the VM supervisor, not available on other nodes\", \"mount\": \"/var/lib/sqlite\", \"name\": \"database\", \"persistence\": \"host\", \"size_mib\": 5}], \"data\": {\"encoding\": \"zip\", \"mount\": \"/data\", \"ref\": \"7eb2eca2378ea8855336ed76c8b26219f1cb90234d04441de9cf8cb1c649d003\", \"use_latest\": false}, \"export\": {\"encoding\": \"zip\", \"mount\": \"/data\"}, \"replaces\": \"0x9319Ad3B7A8E0eE24f2E639c40D8eD124C5520Ba\", \"time\": 1619017773.8950517}",
"item_type": "inline",
"signature": "0x372da8230552b8c3e65c05b31a0ff3a24666d66c575f8e11019f62579bf48c2b7fe2f0bbe907a2a5bf8050989cdaf8a59ff8a1cbcafcdef0656c54279b4aa0c71b",
"size": 749,
Expand Down
20 changes: 18 additions & 2 deletions runtimes/aleph-alpine-3.13-python/init1.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,8 @@ async def run_python_code_http(application: ASGIApplication, scope: dict
body: bytes = scope.pop('body')

async def receive():
return {'type': 'http.request',
type_ = 'http.request' if scope['type'] in ('http', 'websocket') else 'aleph.message'
return {'type': type_,
'body': body,
'more_body': False}

Expand All @@ -237,11 +238,26 @@ async def send(dico):
await send_queue.put(dico)

# TODO: Better error handling
logger.debug("Awaiting application...")
await application(scope, receive, send)
headers: Dict = await send_queue.get()

logger.debug("Waiting for headers")
headers: Dict
if scope['type'] == 'http':
headers = await send_queue.get()
else:
headers = {}

logger.debug("Waiting for body")
body: Dict = await send_queue.get()

logger.debug("Waiting for buffer")
output = buf.getvalue()

logger.debug(f"Headers {headers}")
logger.debug(f"Body {body}")
logger.debug(f"Output {output}")

logger.debug("Getting output data")
output_data: bytes
if os.path.isdir('/data') and os.listdir('/data'):
Expand Down
4 changes: 2 additions & 2 deletions runtimes/aleph-debian-11-python/create_disk_image.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ apt-get install -y --no-install-recommends --no-install-suggests \
python3-setuptools \
python3-pip python3-cytoolz python3-pydantic \
iproute2 unzip \
nodejs
nodejs npm
pip3 install fastapi django
echo "Pip installing aleph-client"
pip3 install 'aleph-client>=0.2.7' 'coincurve==15.0.0' 'eth_account>=0.4.0'
pip3 install 'aleph-client>=0.3.2' 'coincurve==15.0.0'
# Compile all Python bytecode
python3 -m compileall -f /usr/local/lib/python3.9
Expand Down
1 change: 1 addition & 0 deletions vm_connector/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ async def properties(request: Request):
@app.post("/sign")
async def sign_message(request: Request):
"""Sign a message"""
# TODO: Check
private_key = get_fallback_private_key()
account: ETHAccount = ETHAccount(private_key=private_key)

Expand Down
10 changes: 8 additions & 2 deletions vm_supervisor/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@

from aiohttp.web import Response

from .run import run_code_on_request
from vm_supervisor.pubsub import PubSub
from .run import run_code_on_request, run_code_on_event
from .models import VmHash
from . import supervisor
from .conf import settings
Expand Down Expand Up @@ -78,6 +79,7 @@ def parse_args(args):
"-n",
"--do-not-run",
dest="do_not_run",
action="store_true",
default=False,
)
parser.add_argument(
Expand All @@ -101,7 +103,7 @@ async def benchmark(runs: int):
"""Measure performance by immediately running the supervisor
with fake requests.
"""
ref = VmHash("TEST_HASH")
ref = VmHash("cad11970efe9b7478300fd04d7cc91c646ca0a792b9cc718650f86e1ccfac73e")

class FakeRequest:
headers: Dict[str, str]
Expand Down Expand Up @@ -162,6 +164,10 @@ class FakeRequest:
)
logger.info(bench)

event = None
result = await run_code_on_event(vm_hash=ref, event=event, pubsub=PubSub())
print("Event result", result)


def main():
args = parse_args(sys.argv[1:])
Expand Down
73 changes: 73 additions & 0 deletions vm_supervisor/reactor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import asyncio
import logging
from typing import List, Dict, Coroutine

from aleph_message.models import Message, ProgramMessage
from vm_supervisor.pubsub import PubSub
from vm_supervisor.run import run_code_on_event

logger = logging.getLogger(__name__)


def is_equal_or_includes(value, compare_to) -> bool:
if isinstance(value, str):
return value == compare_to
elif isinstance(value, dict):
for subkey, subvalue in value.items():
if not hasattr(compare_to, subkey):
return False
if not is_equal_or_includes(subvalue, getattr(compare_to, subkey)):
return False
return True
else:
raise ValueError("Unsupported value")


def subscription_matches(subscription: Dict, message: ProgramMessage) -> bool:
if not subscription:
# Require at least one value to match
return False
for key, value in subscription.items():
if not is_equal_or_includes(value, getattr(message, key)):
return False
return True


class Reactor:

pubsub: PubSub
listeners: List[ProgramMessage]

def __init__(self, pubsub: PubSub):
self.pubsub = pubsub
self.listeners = []

async def trigger(self, message: Message):
coroutines: List[Coroutine] = []

for listener in self.listeners:
if not listener.content.on.message:
logger.warning("Program with no subscription was registered in reactor listeners: "
f"{listener.item_hash}")
continue

for subscription in listener.content.on.message:
if subscription_matches(subscription, message):
vm_hash = listener.item_hash
event = message.json()
# Register the listener in the list of coroutines to run asynchronously:
coroutines.append(run_code_on_event(vm_hash, event, self.pubsub))
break

# Call all listeners asynchronously from the event loop:
loop = asyncio.get_event_loop()
for coroutine in coroutines:
loop.create_task(coroutine)

def register(self, message: ProgramMessage):
if message.content.on.message:
self.listeners.append(message)
else:
logger.debug(
"Program with no subscription cannot be registered in reactor listeners: "
f"{message.item_hash}")
82 changes: 82 additions & 0 deletions vm_supervisor/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from .messages import load_updated_message
from .models import VmHash, VmExecution
from .pool import VmPool
from .pubsub import PubSub
from .vm.firecracker_microvm import ResourceDownloadError, VmSetupError

logger = logging.getLogger(__name__)
Expand All @@ -31,6 +32,13 @@ async def build_asgi_scope(path: str, request: web.Request) -> Dict[str, Any]:
}


async def build_event_scope(event) -> Dict[str, Any]:
return {
"type": "aleph.message",
"body": event,
}


async def run_code_on_request(vm_hash: VmHash, path: str, request: web.Request) -> web.Response:
"""
Execute the code corresponding to the 'code id' in the path.
Expand Down Expand Up @@ -105,3 +113,77 @@ async def run_code_on_request(vm_hash: VmHash, path: str, request: web.Request)
execution.stop_after_timeout(timeout=settings.REUSE_TIMEOUT)
else:
await execution.stop()


async def run_code_on_event(vm_hash: VmHash, event, pubsub: PubSub):
"""
Execute code in response to an event.
"""

try:
execution: VmExecution = await pool.get_running_vm(vm_hash=vm_hash)
except Exception as error:
logger.exception(error)
raise

if not execution:
message, original_message = await load_updated_message(vm_hash)
pool.message_cache[vm_hash] = message

try:
execution = await pool.create_a_vm(
vm_hash=vm_hash,
program=message.content,
original=original_message.content,
)
except ResourceDownloadError as error:
logger.exception(error)
pool.forget_vm(vm_hash=vm_hash)
raise HTTPBadRequest(reason="Code, runtime or data not available")
except VmSetupError as error:
logger.exception(error)
pool.forget_vm(vm_hash=vm_hash)
raise HTTPInternalServerError(reason="Error during program initialisation")
except MicroVMFailedInit as error:
logger.exception(error)
pool.forget_vm(vm_hash=vm_hash)
raise HTTPInternalServerError(reason="Error during runtime initialisation")

logger.debug(f"Using vm={execution.vm.vm_id}")

scope: Dict = await build_event_scope(event)

try:
await execution.becomes_ready()
result_raw: bytes = await execution.run_code(scope=scope)
except UnpackValueError as error:
logger.exception(error)
return web.Response(status=502, reason="Invalid response from VM")

try:
result = msgpack.loads(result_raw, raw=False)

logger.debug(f"Result from VM: <<<\n\n{str(result)[:1000]}\n\n>>>")

if "traceback" in result:
logger.warning(result["traceback"])
return web.Response(
status=500,
reason="Error in VM execution",
body=result["traceback"],
content_type="text/plain",
)

logger.info(f"Result: {result['body']}")
return result['body']

except UnpackValueError as error:
logger.exception(error)
return web.Response(status=502, reason="Invalid response from VM")
finally:
if settings.REUSE_TIMEOUT > 0:
if settings.WATCH_FOR_UPDATES:
execution.start_watching_for_updates(pubsub=pubsub)
execution.stop_after_timeout(timeout=settings.REUSE_TIMEOUT)
else:
await execution.stop()
Loading

0 comments on commit b7f1317

Please sign in to comment.