Skip to content

Commit

Permalink
Fix: Diagnostic API was not updated
Browse files Browse the repository at this point in the history
We published multiple changes to the diagnostic VM recently but none of these was released.

This provides a new diagnostic VM, based on a new runtime [1], with fixes:

- Reading messages with the newer SDK
- Better handling of IPv6 detection errors
- Two different tests for signing messages (local and remote)
- aleph-message version was not specified
- fetching a single message was not tested
  • Loading branch information
hoh committed Apr 26, 2024
1 parent ab79b77 commit fe9235a
Show file tree
Hide file tree
Showing 8 changed files with 224 additions and 41 deletions.
5 changes: 4 additions & 1 deletion .github/workflows/test-on-droplets-matrix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,11 @@ jobs:
- alias: "runtime-6770" # Old runtime, using Debian 11
item_hash: "67705389842a0a1b95eaa408b009741027964edc805997475e95c505d642edd8"
query_params: "?retro-compatibility=true"
- alias: "runtime-3fc0" # New runtime, using Debian 12
- alias: "runtime-3fc0" # Newer runtime, using Debian 12 but now old SDK
item_hash: "3fc0aa9569da840c43e7bd2033c3c580abb46b007527d6d20f2d4e98e867f7af"
query_params: "?retro-compatibility=true"
- alias: "runtime-63fa" # Latest runtime, using Debian 12 and SDK 0.9.0
item_hash: "63faf8b5db1cf8d965e6a464a0cb8062af8e7df131729e48738342d956f29ace"
query_params: ""

steps:
Expand Down
6 changes: 6 additions & 0 deletions examples/example_fastapi/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Publish using:

```shell
aleph program upload ../aleph-vm/examples/example_fastapi main:app \
--persistent-volume "persistence=host,size_mib=1,mount=/var/lib/example,name=increment-storage,comment=Persistence"
```
187 changes: 153 additions & 34 deletions examples/example_fastapi/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,30 @@
import socket
import subprocess
import sys
from datetime import datetime
from datetime import datetime, timezone
from os import listdir
from pathlib import Path
from typing import List, Optional
from typing import Any, Optional

import aiohttp
from aleph_message.models import (
MessagesResponse,
PostMessage,
ProgramMessage,
StoreMessage,
)
from aleph_message.status import MessageStatus
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import PlainTextResponse
from pip._internal.operations.freeze import freeze
from pydantic import BaseModel, HttpUrl
from starlette.responses import JSONResponse

from aleph.sdk.chains.ethereum import get_fallback_account
from aleph.sdk.chains.remote import RemoteAccount
from aleph.sdk.client import AlephClient, AuthenticatedAlephClient
from aleph.sdk.client import AlephHttpClient, AuthenticatedAlephHttpClient
from aleph.sdk.query.filters import MessageFilter
from aleph.sdk.types import StorageEnum
from aleph.sdk.vm.app import AlephApp
from aleph.sdk.vm.cache import VmCache
Expand All @@ -42,30 +51,47 @@


@app.on_event("startup")
async def startup_event():
async def startup_event() -> None:
global startup_lifespan_executed
startup_lifespan_executed = True


@app.get("/")
async def index():
async def index() -> dict[str, Any]:
if os.path.exists("/opt/venv"):
opt_venv = list(listdir("/opt/venv"))
else:
opt_venv = []
return {
"Example": "example_fastapi",
"endpoints": [
# Features
"/lifespan",
"/environ",
"/messages",
"/state/increment",
"/wait-for/{delay}",
# Local cache
"/cache/get/{key}",
"/cache/set/{key}/{value}",
"/cache/remove/{key}",
"/cache/keys",
# Networking
"/dns",
"ip/address",
"/ip/address",
"/ip/4",
"/ip/6",
"/internet",
# Error handling
"/raise",
"/crash",
# Aleph.im
"/messages",
"/get_a_message",
"/post_a_message",
"/state/increment",
"/wait-for/{delay}",
"/post_a_message_local_account",
"/post_a_file",
"/sign_a_message",
# Platform properties
"/platform/os",
"/platform/python",
"/platform/pip-freeze",
Expand All @@ -91,10 +117,11 @@ async def environ() -> dict[str, str]:


@app.get("/messages")
async def read_aleph_messages():
async def read_aleph_messages() -> dict[str, MessagesResponse]:
"""Read data from Aleph using the Aleph Client library."""
async with AlephClient() as client:
data = await client.get_messages(hashes=["f246f873c3e0f637a15c566e7a465d2ecbb83eaa024d54ccb8fb566b549a929e"])
async with AlephHttpClient() as client:
message_filter = MessageFilter(hashes=["f246f873c3e0f637a15c566e7a465d2ecbb83eaa024d54ccb8fb566b549a929e"])
data = await client.get_messages(message_filter=message_filter)
return {"Messages": data}


Expand Down Expand Up @@ -163,9 +190,13 @@ async def connect_ipv6():
if resp.status != 404:
resp.raise_for_status()
return {"result": True, "headers": resp.headers}
except aiohttp.ClientTimeout:
logger.warning(f"Session connection for host {ipv6_host} failed")
return {"result": False, "headers": resp.headers}
except TimeoutError:
logger.warning(f"Session connection to host {ipv6_host} timed out")
return {"result": False, "reason": "Timeout"}
except aiohttp.ClientConnectionError as error:
logger.warning(f"Client connection to host {ipv6_host} failed: {error}")
# Get a string that describes the error
return {"result": False, "reason": str(error.args[0])}


async def check_url(internet_host: HttpUrl, timeout_seconds: int = 5):
Expand All @@ -184,15 +215,15 @@ async def check_url(internet_host: HttpUrl, timeout_seconds: int = 5):
@app.get("/internet")
async def read_internet():
"""Check Internet connectivity of the system, requiring IP connectivity, domain resolution and HTTPS/TLS."""
internet_hosts: List[HttpUrl] = [
internet_hosts: list[HttpUrl] = [
HttpUrl(url="https://aleph.im/", scheme="https"),
HttpUrl(url="https://ethereum.org", scheme="https"),
HttpUrl(url="https://ipfs.io/", scheme="https"),
]
timeout_seconds = 5

# Create a list of tasks to check the URLs in parallel
tasks: set[asyncio.Task] = set(asyncio.create_task(check_url(host, timeout_seconds)) for host in internet_hosts)
tasks: set[asyncio.Task] = {asyncio.create_task(check_url(host, timeout_seconds)) for host in internet_hosts}

# While no tasks have completed, keep waiting for the next one to finish
while tasks:
Expand All @@ -211,34 +242,121 @@ async def read_internet():
return {"result": False}


@app.get("/post_a_message")
async def post_a_message():
"""Post a message on the Aleph network"""
@app.get("/get_a_message")
async def get_a_message():
"""Get a message from the Aleph.im network"""
item_hash = "3fc0aa9569da840c43e7bd2033c3c580abb46b007527d6d20f2d4e98e867f7af"
async with AlephHttpClient() as client:
message = await client.get_message(
item_hash=item_hash,
message_type=ProgramMessage,
)
return message.dict()

account = await RemoteAccount.from_crypto_host(host="http://localhost", unix_socket="/tmp/socat-socket")

@app.post("/post_a_message")
async def post_with_remote_account():
"""Post a message on the Aleph.im network using the remote account of the host."""
try:
account = await RemoteAccount.from_crypto_host(host="http://localhost", unix_socket="/tmp/socat-socket")

content = {
"date": datetime.now(tz=timezone.utc).isoformat(),
"test": True,
"answer": 42,
"something": "interesting",
}
async with AuthenticatedAlephHttpClient(
account=account,
) as client:
message: PostMessage
status: MessageStatus
message, status = await client.create_post(
post_content=content,
post_type="test",
ref=None,
channel="TEST",
inline=True,
storage_engine=StorageEnum.storage,
sync=True,
)
if status != MessageStatus.PROCESSED:
return JSONResponse(status_code=500, content={"error": status})
return {
"message": message,
}
except aiohttp.client_exceptions.UnixClientConnectorError:
return JSONResponse(status_code=500, content={"error": "Could not connect to the remote account"})


@app.post("/post_a_message_local_account")
async def post_with_local_account():
"""Post a message on the Aleph.im network using a local private key."""

account = get_fallback_account()

content = {
"date": datetime.utcnow().isoformat(),
"date": datetime.now(tz=timezone.utc).isoformat(),
"test": True,
"answer": 42,
"something": "interesting",
}
async with AuthenticatedAlephClient(
async with AuthenticatedAlephHttpClient(
account=account,
api_server="https://api2.aleph.im",
allow_unix_sockets=False,
) as client:
response = await client.create_post(
message: PostMessage
status: MessageStatus
message, status = await client.create_post(
post_content=content,
post_type="test",
ref=None,
channel="TEST",
inline=True,
storage_engine=StorageEnum.storage,
sync=True,
)
if status != MessageStatus.PROCESSED:
return JSONResponse(status_code=500, content={"error": status})
return {
"message": message,
}


@app.post("/post_a_file")
async def post_a_file():
account = get_fallback_account()
file_path = Path(__file__).absolute()
async with AuthenticatedAlephHttpClient(
account=account,
) as client:
message: StoreMessage
status: MessageStatus
message, status = await client.create_store(
file_path=file_path,
ref=None,
channel="TEST",
storage_engine=StorageEnum.storage,
sync=True,
)
if status != MessageStatus.PROCESSED:
return JSONResponse(status_code=500, content={"error": status})
return {
"response": response,
"message": message,
}


@app.get("/sign_a_message")
async def sign_a_message():
"""Sign a message using a locally managed account within the virtual machine."""
# FIXME: Broken, fixing this depends on https://github.com/aleph-im/aleph-sdk-python/pull/120
account = get_fallback_account()
message = {"hello": "world", "chain": "ETH"}
signed_message = await account.sign_message(message)
return {"message": signed_message}


@app.get("/cache/get/{key}")
async def get_from_cache(key: str):
"""Get data in the VM cache"""
Expand All @@ -265,7 +383,7 @@ async def keys_from_cache(pattern: str = "*"):


@app.get("/state/increment")
async def increment():
async def increment() -> dict[str, int]:
path = "/var/lib/example/storage.json"
try:
with open(path) as fd:
Expand All @@ -284,7 +402,7 @@ class Data(BaseModel):


@app.post("/post")
async def receive_post(data: Data):
async def receive_post(data: Data) -> str:
return str(data)


Expand All @@ -293,13 +411,14 @@ class CustomError(Exception):


@app.get("/raise")
def raise_error():
def raise_error() -> None:
"""Raises an error to check that the init handles it properly without crashing"""
raise CustomError("Whoops")
error_message = "Whoops"
raise CustomError(error_message)


@app.get("/crash")
def crash():
def crash() -> None:
"""Crash the entire VM in order to check that the supervisor can handle it"""
sys.exit(1)

Expand All @@ -313,22 +432,22 @@ def crash():


@app.get("/platform/os")
def platform_os():
def platform_os() -> PlainTextResponse:
return PlainTextResponse(content=Path("/etc/os-release").read_text())


@app.get("/platform/python")
def platform_python():
def platform_python() -> PlainTextResponse:
return PlainTextResponse(content=sys.version)


@app.get("/platform/pip-freeze")
def platform_pip_freeze():
def platform_pip_freeze() -> list[str]:
return list(freeze())


@app.event(filters=filters)
async def aleph_event(event):
async def aleph_event(event) -> dict[str, str]:
print("aleph_event", event)
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector()) as session:
async with session.get("https://official.aleph.cloud/api/v0/info/public.json") as resp:
Expand Down
2 changes: 1 addition & 1 deletion runtimes/aleph-debian-12-python/create_disk_image.sh
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ locale-gen en_US.UTF-8
echo "Pip installing aleph-sdk-python"
mkdir -p /opt/aleph/libs
pip3 install --target /opt/aleph/libs 'aleph-sdk-python==0.9.0' 'fastapi~=0.109.2'
pip3 install --target /opt/aleph/libs 'aleph-sdk-python==0.9.0' 'aleph-message==0.4.4' 'fastapi~=0.109.2'
# Compile Python code to bytecode for faster execution
# -o2 is needed to compile with optimization level 2 which is what we launch init1.py (`python -OO`)
Expand Down
2 changes: 1 addition & 1 deletion src/aleph/vm/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ class Settings(BaseSettings):
)
FAKE_INSTANCE_MESSAGE = Path(abspath(join(__file__, "../../../../examples/instance_message_from_aleph.json")))

CHECK_FASTAPI_VM_ID = "3fc0aa9569da840c43e7bd2033c3c580abb46b007527d6d20f2d4e98e867f7af"
CHECK_FASTAPI_VM_ID = "63faf8b5db1cf8d965e6a464a0cb8062af8e7df131729e48738342d956f29ace"
LEGACY_CHECK_FASTAPI_VM_ID = "67705389842a0a1b95eaa408b009741027964edc805997475e95c505d642edd8"

# Developer options
Expand Down
1 change: 1 addition & 0 deletions src/aleph/vm/orchestrator/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ async def build_asgi_scope(path: str, request: web.Request) -> dict[str, Any]:


async def build_event_scope(event) -> dict[str, Any]:
"""Build an ASGI scope for an event."""
return {
"type": "aleph.message",
"body": event,
Expand Down
Loading

0 comments on commit fe9235a

Please sign in to comment.