Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup: Use black formatter #161

Merged
merged 1 commit into from
Mar 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 85 additions & 59 deletions runtimes/aleph-alpine-3.13-python/init1.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#!/usr/bin/python3 -OO

import logging

logging.basicConfig(
level=logging.DEBUG,
format="%(relativeCreated)4f |V %(levelname)s | %(message)s",
Expand Down Expand Up @@ -29,7 +30,7 @@

logger.debug("Imports finished")

ASGIApplication = NewType('AsgiApplication', Any)
ASGIApplication = NewType("AsgiApplication", Any)


class Encoding(str, Enum):
Expand Down Expand Up @@ -105,8 +106,9 @@ def setup_variables(variables: Optional[Dict[str, str]]):
os.environ[key] = value


def setup_network(ip: Optional[str], route: Optional[str],
dns_servers: Optional[List[str]] = None):
def setup_network(
ip: Optional[str], route: Optional[str], dns_servers: Optional[List[str]] = None
):
"""Setup the system with info from the host."""
dns_servers = dns_servers or []
if not os.path.exists("/sys/class/net/eth0"):
Expand Down Expand Up @@ -157,7 +159,9 @@ def setup_volumes(volumes: List[Volume]):
system("mount")


def setup_code_asgi(code: bytes, encoding: Encoding, entrypoint: str) -> ASGIApplication:
def setup_code_asgi(
code: bytes, encoding: Encoding, entrypoint: str
) -> ASGIApplication:
# Allow importing packages from /opt/packages
sys.path.append("/opt/packages")

Expand All @@ -167,7 +171,7 @@ def setup_code_asgi(code: bytes, encoding: Encoding, entrypoint: str) -> ASGIApp
module_name, app_name = entrypoint.split(":", 1)
logger.debug("import module")
module = __import__(module_name)
for level in module_name.split('.')[1:]:
for level in module_name.split(".")[1:]:
module = getattr(module, level)
app: ASGIApplication = getattr(module, app_name)
elif encoding == Encoding.zip:
Expand All @@ -180,7 +184,7 @@ def setup_code_asgi(code: bytes, encoding: Encoding, entrypoint: str) -> ASGIApp
module_name, app_name = entrypoint.split(":", 1)
logger.debug("import module")
module = __import__(module_name)
for level in module_name.split('.')[1:]:
for level in module_name.split(".")[1:]:
module = getattr(module, level)
app: ASGIApplication = getattr(module, app_name)
elif encoding == Encoding.plain:
Expand All @@ -193,7 +197,9 @@ def setup_code_asgi(code: bytes, encoding: Encoding, entrypoint: str) -> ASGIApp
return app


def setup_code_executable(code: bytes, encoding: Encoding, entrypoint: str) -> subprocess.Popen:
def setup_code_executable(
code: bytes, encoding: Encoding, entrypoint: str
) -> subprocess.Popen:
logger.debug("Extracting code")
if encoding == Encoding.squashfs:
path = f"/opt/code/{entrypoint}"
Expand Down Expand Up @@ -223,32 +229,38 @@ def setup_code_executable(code: bytes, encoding: Encoding, entrypoint: str) -> s
return process


def setup_code(code: bytes, encoding: Encoding, entrypoint: str, interface: Interface
) -> Union[ASGIApplication, subprocess.Popen]:
def setup_code(
code: bytes, encoding: Encoding, entrypoint: str, interface: Interface
) -> Union[ASGIApplication, subprocess.Popen]:

if interface == Interface.asgi:
return setup_code_asgi(code=code, encoding=encoding, entrypoint=entrypoint)
elif interface == Interface.executable:
return setup_code_executable(code=code, encoding=encoding, entrypoint=entrypoint)
return setup_code_executable(
code=code, encoding=encoding, entrypoint=entrypoint
)
else:
raise ValueError("Invalid interface. This should never happen.")


async def run_python_code_http(application: ASGIApplication, scope: dict
) -> Tuple[Dict, Dict, str, Optional[bytes]]:
async def run_python_code_http(
application: ASGIApplication, scope: dict
) -> Tuple[Dict, Dict, str, Optional[bytes]]:

logger.debug("Running code")
with StringIO() as buf, redirect_stdout(buf):
# Execute in the same process, saves ~20ms than a subprocess

# The body should not be part of the ASGI scope itself
body: bytes = scope.pop('body')
body: bytes = scope.pop("body")

async def receive():
type_ = 'http.request' if scope['type'] in ('http', 'websocket') else 'aleph.message'
return {'type': type_,
'body': body,
'more_body': False}
type_ = (
"http.request"
if scope["type"] in ("http", "websocket")
else "aleph.message"
)
return {"type": type_, "body": body, "more_body": False}

send_queue: asyncio.Queue = asyncio.Queue()

Expand All @@ -261,7 +273,7 @@ async def send(dico):

logger.debug("Waiting for headers")
headers: Dict
if scope['type'] == 'http':
if scope["type"] == "http":
headers = await send_queue.get()
else:
headers = {}
Expand All @@ -278,34 +290,32 @@ async def send(dico):

logger.debug("Getting output data")
output_data: bytes
if os.path.isdir('/data') and os.listdir('/data'):
make_archive("/opt/output", 'zip', "/data")
if os.path.isdir("/data") and os.listdir("/data"):
make_archive("/opt/output", "zip", "/data")
with open("/opt/output.zip", "rb") as output_zipfile:
output_data = output_zipfile.read()
else:
output_data = b''
output_data = b""

logger.debug("Returning result")
return headers, body, output, output_data


async def make_request(session, scope):
async with session.request(
scope["method"],
url="http://localhost:8080{}".format(scope["path"]),
params=scope["query_string"],
headers=[(a.decode('utf-8'), b.decode('utf-8'))
for a, b in scope['headers']],
data=scope.get("body", None)
) as resp:
scope["method"],
url="http://localhost:8080{}".format(scope["path"]),
params=scope["query_string"],
headers=[(a.decode("utf-8"), b.decode("utf-8")) for a, b in scope["headers"]],
data=scope.get("body", None),
) as resp:
headers = {
'headers': [(a.encode('utf-8'), b.encode('utf-8'))
for a, b in resp.headers.items()],
'status': resp.status
}
body = {
'body': await resp.content.read()
"headers": [
(a.encode("utf-8"), b.encode("utf-8")) for a, b in resp.headers.items()
],
"status": resp.status,
}
body = {"body": await resp.content.read()}
return headers, body


Expand All @@ -325,7 +335,7 @@ async def run_executable_http(scope: dict) -> Tuple[Dict, Dict, str, Optional[by
except aiohttp.ClientConnectorError:
if tries > 20:
raise
await asyncio.sleep(.05)
await asyncio.sleep(0.05)

output = "" # Process stdout is not captured per request
output_data = None
Expand All @@ -334,7 +344,9 @@ async def run_executable_http(scope: dict) -> Tuple[Dict, Dict, str, Optional[by


async def process_instruction(
instruction: bytes, interface: Interface, application: Union[ASGIApplication, subprocess.Popen]
instruction: bytes,
interface: Interface,
application: Union[ASGIApplication, subprocess.Popen],
) -> AsyncIterable[bytes]:

if instruction == b"halt":
Expand All @@ -348,6 +360,7 @@ async def process_instruction(
else:
# Close the cached session in aleph_client:
from aleph_client.asynchronous import get_fallback_session

session: aiohttp.ClientSession = get_fallback_session()
await session.close()
logger.debug("Aiohttp cached session closed")
Expand All @@ -358,7 +371,9 @@ async def process_instruction(
# Execute shell commands in the form `!ls /`
msg = instruction[1:].decode()
try:
process_output = subprocess.check_output(msg, stderr=subprocess.STDOUT, shell=True)
process_output = subprocess.check_output(
msg, stderr=subprocess.STDOUT, shell=True
)
yield process_output
except subprocess.CalledProcessError as error:
yield str(error).encode() + b"\n" + error.output
Expand All @@ -376,11 +391,13 @@ async def process_instruction(
output_data: Optional[bytes]

if interface == Interface.asgi:
headers, body, output, output_data = \
await run_python_code_http(application=application, scope=payload.scope)
headers, body, output, output_data = await run_python_code_http(
application=application, scope=payload.scope
)
elif interface == Interface.executable:
headers, body, output, output_data = \
await run_executable_http(scope=payload.scope)
headers, body, output, output_data = await run_executable_http(
scope=payload.scope
)
else:
raise ValueError("Unknown interface. This should never happen")

Expand All @@ -392,11 +409,13 @@ async def process_instruction(
}
yield msgpack.dumps(result, use_bin_type=True)
except Exception as error:
yield msgpack.dumps({
"error": str(error),
"traceback": str(traceback.format_exc()),
"output": output
})
yield msgpack.dumps(
{
"error": str(error),
"traceback": str(traceback.format_exc()),
"output": output,
}
)


def receive_data_length(client) -> int:
Expand All @@ -413,8 +432,7 @@ def receive_data_length(client) -> int:

def load_configuration(data: bytes) -> ConfigurationPayload:
msg_ = msgpack.loads(data, raw=False)
msg_['volumes'] = [Volume(**volume_dict)
for volume_dict in msg_.get('volumes')]
msg_["volumes"] = [Volume(**volume_dict) for volume_dict in msg_.get("volumes")]
return ConfigurationPayload(**msg_)


Expand Down Expand Up @@ -452,20 +470,26 @@ async def main():

try:
app: Union[ASGIApplication, subprocess.Popen] = setup_code(
config.code, config.encoding, config.entrypoint, config.interface)
config.code, config.encoding, config.entrypoint, config.interface
)
client.send(msgpack.dumps({"success": True}))
except Exception as error:
client.send(msgpack.dumps({
"success": False,
"error": str(error),
"traceback": str(traceback.format_exc()),
}))
client.send(
msgpack.dumps(
{
"success": False,
"error": str(error),
"traceback": str(traceback.format_exc()),
}
)
)
logger.exception("Program could not be started")
raise

class ServerReference:
"Reference used to close the server from within `handle_instruction"
server: asyncio.AbstractServer

server_reference = ServerReference()

async def handle_instruction(reader, writer):
Expand All @@ -477,8 +501,9 @@ async def handle_instruction(reader, writer):
logger.debug(f"<<<\n\n{data_to_print}\n\n>>>")

try:
async for result in process_instruction(instruction=data, interface=config.interface,
application=app):
async for result in process_instruction(
instruction=data, interface=config.interface, application=app
):
writer.write(result)
await writer.drain()

Expand All @@ -497,7 +522,7 @@ async def handle_instruction(reader, writer):
server_reference.server = server

addr = server.sockets[0].getsockname()
print(f'Serving on {addr}')
print(f"Serving on {addr}")

try:
async with server:
Expand All @@ -511,7 +536,8 @@ async def handle_instruction(reader, writer):
umount_volumes(config.volumes)
logger.debug("User volumes unmounted")

if __name__ == '__main__':

if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
asyncio.run(main())

Expand All @@ -524,6 +550,6 @@ async def handle_instruction(reader, writer):
# Send reboot syscall, see man page
# https://man7.org/linux/man-pages/man2/reboot.2.html
libc = ctypes.CDLL(None)
libc.syscall(169, 0xfee1dead, 672274793, 0x4321fedc, None)
libc.syscall(169, 0xFEE1DEAD, 672274793, 0x4321FEDC, None)
# The exit should not happen due to system halt.
sys.exit(0)
10 changes: 5 additions & 5 deletions vm_connector/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,12 @@ async def download_message(
ref: str, use_latest: Optional[bool] = True
) -> Union[Dict, Response]:
"""
Fetch on Aleph and return a VM function message, after checking its validity.
Used by the VM Supervisor run the code.
Fetch on Aleph and return a VM function message, after checking its validity.
Used by the VM Supervisor run the code.

:param ref: item_hash of the code file
:param use_latest: should the last amend to the code be used
:return: a file containing the code file
:param ref: item_hash of the code file
:param use_latest: should the last amend to the code be used
:return: a file containing the code file
"""

msg = await get_message(hash_=ref)
Expand Down
9 changes: 5 additions & 4 deletions vm_supervisor/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,17 +218,18 @@ def main():
sentry_sdk.init(
dsn=settings.SENTRY_DSN,
server_name=settings.DOMAIN_NAME,

# Set traces_sample_rate to 1.0 to capture 100%
# of transactions for performance monitoring.
# We recommend adjusting this value in production.
traces_sample_rate=1.0
traces_sample_rate=1.0,
)
else:
logger.debug("Sentry SDK found with no DNS configured.")
else:
logger.debug("Sentry SDK not found. \n"
"Use `pip install sentry-sdk` and configure SENTRY_DSN if you'd like to monitor errors.")
logger.debug(
"Sentry SDK not found. \n"
"Use `pip install sentry-sdk` and configure SENTRY_DSN if you'd like to monitor errors."
)

settings.setup()
if args.print_settings:
Expand Down
Loading