diff --git a/aleph_scoring/__main__.py b/aleph_scoring/__main__.py index d361bd8..638f6a4 100644 --- a/aleph_scoring/__main__.py +++ b/aleph_scoring/__main__.py @@ -15,12 +15,14 @@ from aleph.sdk.types import Account from hexbytes import HexBytes +from aleph_scoring.benchmarks import benchmark_node_performance_sync +from aleph_scoring.benchmarks.models import BenchmarksPost, NodeBenchmarks from aleph_scoring.config import settings from aleph_scoring.metrics import measure_node_performance_sync from aleph_scoring.metrics.models import MetricsPost, NodeMetrics from aleph_scoring.scoring import compute_ccn_scores, compute_crn_scores from aleph_scoring.scoring.models import NodeScores, NodeScoresPost -from aleph_scoring.utils import LogLevel, Period, get_latest_github_releases +from aleph_scoring.utils import LogLevel, Period logger = logging.getLogger(__name__) aleph_account: Optional[ETHAccount] = None @@ -90,6 +92,23 @@ async def publish_scores_on_aleph( "Published scores on Aleph with status %s: %s", status, scores_post.item_hash ) +async def publish_benchmarks_on_aleph(account: Account, node_benchmarks: NodeBenchmarks): + channel = settings.ALEPH_POST_TYPE_CHANNEL + aleph_api_server = settings.NODE_DATA_HOST + + benchmarks_post_data = BenchmarksPost(tags=["devnet"], benchmarks=node_benchmarks) + async with AuthenticatedAlephClient( + account=account, api_server=aleph_api_server + ) as client: + metrics_post, status = await client.create_post( + post_content=benchmarks_post_data, + post_type=settings.ALEPH_POST_TYPE_BENCHMARKS, + channel=channel, + ) + logger.info( + "Published benchmarks on Aleph with status %s: %s", status, metrics_post.item_hash + ) + def run_measurements( output: Optional[Path] = typer.Option( @@ -114,6 +133,30 @@ def run_measurements( ) +def run_benchmarks( + output: Optional[Path] = typer.Option( + default=None, help="Path where to save the result in JSON format." + ), + stdout: bool = typer.Option(default=False, help="Print the result on stdout"), + publish: bool = typer.Option( + default=False, + help="Publish the results on Aleph.", + ), +): + node_benchmarks = benchmark_node_performance_sync() + + if output: + save_as_json(node_metrics=node_benchmarks, file=output) + if stdout: + print(node_benchmarks.json(indent=4)) + if publish: + account = get_aleph_account() + asyncio.run( + publish_benchmarks_on_aleph(account=account, node_benchmarks=node_benchmarks) + ) + + + @app.command() def measure( output: Optional[Path] = typer.Option( @@ -296,6 +339,23 @@ def compute_on_schedule( def export_as_html(input_file: Optional[Path]): os.system("jupyter nbconvert --execute Node\\ Score\\ Analysis.ipynb --to html") +@app.command() +def benchmark( + output: Optional[Path] = typer.Option( + default=None, help="Path where to save the result in JSON format." + ), + publish: bool = typer.Option( + default=False, + help="Publish the results on Aleph.", + ), + log_level: str = typer.Option( + default=LogLevel.INFO.name, + help="Logging level", + ), +): + logging.basicConfig(level=LogLevel[log_level]) + run_benchmarks(output=output, publish=publish) + def main(): if settings.SENTRY_DSN: diff --git a/aleph_scoring/benchmarks/__init__.py b/aleph_scoring/benchmarks/__init__.py new file mode 100644 index 0000000..5084dd8 --- /dev/null +++ b/aleph_scoring/benchmarks/__init__.py @@ -0,0 +1,135 @@ +import asyncio +import logging +import socket +from datetime import datetime +from random import random, shuffle +from typing import ( + Any, + Awaitable, + Callable, + Dict, + Iterable, + NewType, + Sequence, + TypeVar, + Union, +) + +import aiohttp +from aleph_scoring.config import settings +from aleph_scoring.utils import ( + NodeInfo, + get_aleph_nodes, + get_compute_resource_node_urls, + get_crn_version, + timeout_generator, +) + +from .models import CrnBenchmarks, NodeBenchmarks + +logger = logging.getLogger(__name__) + +TimeoutGenerator = NewType("TimeoutGenerator", Callable[[], aiohttp.ClientTimeout]) + +B = TypeVar("B") + + +async def get_crn_benchmarks( + timeout_generator: TimeoutGenerator, node_info: NodeInfo +) -> CrnBenchmarks: + # Avoid doing all the calls at the same time + await asyncio.sleep(random() * 30) + + url = node_info.url.url + measured_at = datetime.utcnow() + + # Get the version over IPv4 or IPv6 + async with aiohttp.ClientSession(timeout=timeout_generator()) as session_any_ip: + for attempt in range(3): + version = await get_crn_version(session=session_any_ip, node_url=url) + if version: + break + + async with aiohttp.ClientSession( + timeout=timeout_generator(), + connector=aiohttp.TCPConnector( + family=socket.AF_INET, + keepalive_timeout=300, + limit=1000, + limit_per_host=20, + ), + ) as session_ipv4: + try: + async with session_ipv4.get(f"{url}vm/{settings.BENCHMARK_VM_HASH}/sysbench/cpu") as resp: + if resp.status != 200: + cpu_bench = None + else: + cpu_bench = await resp.json() + + async with session_ipv4.get(f"{url}vm/{settings.BENCHMARK_VM_HASH}/sysbench/memory") as resp: + if resp.status != 200: + ram_bench = None + else: + ram_bench = await resp.json() + async with session_ipv4.get(f"{url}vm/{settings.BENCHMARK_VM_HASH}/sysbench/disk") as resp: + if resp.status != 200: + disk_bench = None + else: + disk_bench = await resp.json() + except aiohttp.ClientResponseError: + logger.debug(f"Error when fetching {url}") + cpu_bench = None + ram_bench = None + disk_bench = None + except aiohttp.ClientConnectorError: + logger.debug(f"Error when fetching {url}") + cpu_bench = None + ram_bench = None + disk_bench = None + except asyncio.TimeoutError: + logger.debug(f"Timeout error when fetching {url}") + cpu_bench = None + ram_bench = None + disk_bench = None + + return CrnBenchmarks( + measured_at=measured_at.timestamp(), + node_id=node_info.hash, + version=version, + cpu=cpu_bench, + ram=ram_bench, + disk=disk_bench + ) + + +async def collect_all_crn_benchmarks(node_data: Dict[str, Any]) -> Sequence[CrnBenchmarks]: + node_infos = list(get_compute_resource_node_urls(node_data)) + shuffle(node_infos) # Avoid artifacts from the order in the list + return await collect_node_benchmarks( + node_infos=node_infos, metrics_function=get_crn_benchmarks + ) + + +async def collect_node_benchmarks( + node_infos: Iterable[NodeInfo], + metrics_function: Callable[[TimeoutGenerator, NodeInfo], Awaitable[B]], +) -> Sequence[Union[B, BaseException]]: + timeout = timeout_generator( + total=60.0, connect=10.0, sock_connect=10.0, sock_read=60.0 + ) + return await asyncio.gather( + *[metrics_function(timeout, node_info) for node_info in node_infos] + ) + + +async def benchmark_node_performance() -> NodeBenchmarks: + logger.debug("Benchmark node performance") + aleph_nodes = await get_aleph_nodes() + crn_benchmarks = await collect_all_crn_benchmarks(aleph_nodes) + return NodeBenchmarks( + crn=crn_benchmarks + ) + + +def benchmark_node_performance_sync() -> NodeBenchmarks: + return asyncio.run(benchmark_node_performance()) diff --git a/aleph_scoring/benchmarks/models.py b/aleph_scoring/benchmarks/models.py new file mode 100644 index 0000000..2f423b4 --- /dev/null +++ b/aleph_scoring/benchmarks/models.py @@ -0,0 +1,22 @@ +from typing import List, Optional + +from pydantic import BaseModel + + +class CrnBenchmarks(BaseModel): + measured_at: float + node_id: str + version: Optional[str] + cpu: Optional[dict] + ram: Optional[dict] + disk: Optional[dict] + + +class NodeBenchmarks(BaseModel): + crn: List[CrnBenchmarks] + + +class BenchmarksPost(BaseModel): + version: str = "1.0" + tags: List[str] + benchmarks: NodeBenchmarks diff --git a/aleph_scoring/config.py b/aleph_scoring/config.py index 02916af..960d70f 100644 --- a/aleph_scoring/config.py +++ b/aleph_scoring/config.py @@ -21,6 +21,7 @@ class Settings(BaseSettings): ALEPH_POST_TYPE_CHANNEL: Optional[str] = "aleph-scoring" ALEPH_POST_TYPE_METRICS: str = "test-aleph-network-metrics" ALEPH_POST_TYPE_SCORES: str = "test-aleph-scoring-scores" + ALEPH_POST_TYPE_BENCHMARKS: str = "test-aleph-scoring-benchmarks" ASN_DB_DIRECTORY: Path = "/srv/asn" ASN_DB_PATH: str = "/tmp/asn_db.bz2" ASN_DB_REFRESH_PERIOD_DAYS: int = 1 @@ -36,6 +37,8 @@ class Settings(BaseSettings): VERSION_GRACE_PERIOD: timedelta = timedelta(weeks=2) SCORE_METRICS_PERIOD: timedelta = timedelta(days=1) # TODO: bring back to 2 weeks + BENCHMARK_VM_HASH: str = "80b48e93995b3f31ee624c085cc6fa4cee4ced0174ff33e95b32e5992d68f755" + class Config: env_file = ".env" env_prefix = "ALEPH_SCORING_" diff --git a/aleph_scoring/metrics/__init__.py b/aleph_scoring/metrics/__init__.py index 0898927..2fec8a2 100644 --- a/aleph_scoring/metrics/__init__.py +++ b/aleph_scoring/metrics/__init__.py @@ -2,39 +2,43 @@ import logging import re import socket -import subprocess import time from datetime import datetime -from ipaddress import IPv6Network, IPv6Address, IPv4Address -from random import shuffle, random +from ipaddress import IPv4Address, IPv6Address, IPv6Network +from random import random, shuffle from typing import ( Any, Awaitable, Callable, Dict, - Generator, Iterable, - List, Literal, Optional, Sequence, Tuple, TypeVar, Union, - NewType, ) from urllib.parse import urlparse -from icmplib import async_ping + import aiohttp import async_timeout import pyasn -from aleph.sdk import AlephClient -from pydantic import BaseModel, validator -from urllib3.util import Url, parse_url - from aleph_scoring.config import settings from aleph_scoring.metrics.asn import get_asn_database from aleph_scoring.types.vm_type import VmType +from aleph_scoring.utils import ( + NodeInfo, + TimeoutGenerator, + get_aleph_nodes, + get_api_node_urls, + get_compute_resource_node_urls, + get_crn_version, + timeout_generator, +) +from icmplib import async_ping +from pydantic import BaseModel + from .models import AlephNodeMetrics, CcnMetrics, CrnMetrics, NodeMetrics logger = logging.getLogger(__name__) @@ -60,69 +64,6 @@ IP4_SERVICE_URL = "https://v4.ident.me/" -TimeoutGenerator = NewType("TimeoutGenerator", Callable[[], aiohttp.ClientTimeout]) - - -def timeout_generator( - total: float, connect: float, sock_connect: float, sock_read: float -) -> TimeoutGenerator: - def randomize(value: float) -> float: - return value + value * 0.3 * random() - - return lambda: aiohttp.ClientTimeout( - total=randomize(total), - connect=randomize(connect), - sock_connect=randomize(sock_connect), - sock_read=randomize(sock_read), - ) - - -class NodeInfo(BaseModel): - url: Url - hash: str - - @validator("hash") - def hash_format(cls, v) -> str: - if len(v) != 64: - raise ValueError("must have a length of 64") - try: - # Parse as hexadecimal using int() - int(v, 16) - except ValueError: - raise ValueError("must be hexadecimal") - return v - - -def get_api_node_urls(raw_data: Dict[str, Any]) -> Generator[NodeInfo, None, None]: - """Extract CCN urls from node data.""" - for node in raw_data["nodes"]: - multiaddress = node["multiaddress"] - match = re.findall(r"/ip4/([\d\\.]+)/.*", multiaddress) - if match: - ip = match[0] - yield NodeInfo( - url=parse_url(f"http://{ip}:4024/"), - hash=node["hash"], - ) - - -def get_compute_resource_node_urls( - raw_data: Dict[str, Any] -) -> Generator[NodeInfo, None, None]: - """Extract CRN node urls the node data.""" - for node in raw_data["resource_nodes"]: - addr = node["address"].strip("/") - if addr: - if not addr.startswith("https://"): - addr = "https://" + addr - url: Url = parse_url(addr + "/") - if url.query: - logger.warning("Unsupported url for node %s", node["hash"]) - yield NodeInfo( - url=url, - hash=node["hash"], - ) - async def measure_http_latency( session: aiohttp.ClientSession, @@ -169,34 +110,6 @@ async def measure_http_latency( return None, None -async def get_crn_version( - session: aiohttp.ClientSession, node_url: str -) -> Optional[str]: - # Retrieve the CRN version from header `server`. - try: - async with async_timeout.timeout( - settings.HTTP_REQUEST_TIMEOUT - + settings.HTTP_REQUEST_TIMEOUT * 0.3 * random(), - ): - async with session.get(node_url) as resp: - resp.raise_for_status() - if "Server" not in resp.headers: - return None - for server in resp.headers.getall("Server"): - version: List[str] = re.findall(r"^aleph-vm/(.*)$", server) - if version and version[0]: - return version[0] - else: - return None - - except (aiohttp.ClientResponseError, aiohttp.ClientConnectorError): - logger.debug(f"Error when fetching version from {node_url}") - return None - except asyncio.TimeoutError: - logger.debug(f"Timeout error when fetching version from {node_url}") - return None - - def get_url_domain(url: str) -> str: domain = urlparse(url).netloc return domain.split(":")[0] # Remove port @@ -537,15 +450,6 @@ async def collect_all_crn_metrics(node_data: Dict[str, Any]) -> Sequence[CrnMetr ) -async def get_aleph_nodes() -> Dict: - async with AlephClient(api_server=settings.NODE_DATA_HOST) as client: - return await client.fetch_aggregate( - address=settings.NODE_DATA_ADDR, - key="corechannel", - limit=50, - ) - - async def collect_server_metadata(asn_db: pyasn.pyasn) -> Tuple[str, int, str]: def is_valid_ip4(ip: str) -> bool: return bool(re.match(r"\d+\.\d+\.\d+\.\d+", ip)) diff --git a/aleph_scoring/utils.py b/aleph_scoring/utils.py index 23fa413..bdd627b 100644 --- a/aleph_scoring/utils.py +++ b/aleph_scoring/utils.py @@ -1,15 +1,29 @@ +import asyncio +import logging +import re from datetime import datetime from enum import Enum from functools import partial -from typing import Optional, Tuple, List +from random import random +from typing import Any, Callable, Dict, Generator, List, NewType, Optional, Tuple +import aiohttp +import async_timeout import asyncpg import requests +from aleph.sdk import AlephClient from cachetools import TTLCache, cached -from pydantic import BaseModel +from pydantic import BaseModel, validator +from urllib3.util import Url, parse_url + +from aleph_scoring.config import settings from .config import Settings +logger = logging.getLogger(__name__) + +TimeoutGenerator = NewType("TimeoutGenerator", Callable[[], aiohttp.ClientTimeout]) + class Period(BaseModel): from_date: datetime @@ -109,3 +123,100 @@ async def database_connection(settings: Settings): host=settings.DATABASE_HOST, port=settings.DATABASE_PORT, ) + + +class NodeInfo(BaseModel): + url: Url + hash: str + + @validator("hash") + def hash_format(cls, v) -> str: + if len(v) != 64: + raise ValueError("must have a length of 64") + try: + # Parse as hexadecimal using int() + int(v, 16) + except ValueError: + raise ValueError("must be hexadecimal") + return v + + +def get_api_node_urls(raw_data: Dict[str, Any]) -> Generator[NodeInfo, None, None]: + """Extract CCN urls from node data.""" + for node in raw_data["nodes"]: + multiaddress = node["multiaddress"] + match = re.findall(r"/ip4/([\d\\.]+)/.*", multiaddress) + if match: + ip = match[0] + yield NodeInfo( + url=parse_url(f"http://{ip}:4024/"), + hash=node["hash"], + ) + + +async def get_crn_version( + session: aiohttp.ClientSession, node_url: str +) -> Optional[str]: + # Retrieve the CRN version from header `server`. + try: + async with async_timeout.timeout( + settings.HTTP_REQUEST_TIMEOUT + + settings.HTTP_REQUEST_TIMEOUT * 0.3 * random(), + ): + async with session.get(node_url) as resp: + resp.raise_for_status() + if "Server" not in resp.headers: + return None + for server in resp.headers.getall("Server"): + version: List[str] = re.findall(r"^aleph-vm/(.*)$", server) + if version and version[0]: + return version[0] + else: + return None + + except (aiohttp.ClientResponseError, aiohttp.ClientConnectorError): + logger.debug(f"Error when fetching version from {node_url}") + return None + except asyncio.TimeoutError: + logger.debug(f"Timeout error when fetching version from {node_url}") + return None + + +def get_compute_resource_node_urls( + raw_data: Dict[str, Any] +) -> Generator[NodeInfo, None, None]: + """Extract CRN node urls the node data.""" + for node in raw_data["resource_nodes"]: + addr = node["address"].strip("/") + if addr: + if not addr.startswith("https://"): + addr = "https://" + addr + url: Url = parse_url(addr + "/") + if url.query: + logger.warning("Unsupported url for node %s", node["hash"]) + yield NodeInfo( + url=url, + hash=node["hash"], + ) + + +async def get_aleph_nodes() -> Dict: + async with AlephClient(api_server=settings.NODE_DATA_HOST) as client: + return await client.fetch_aggregate( + address=settings.NODE_DATA_ADDR, + key="corechannel", + limit=50, + ) + +def timeout_generator( + total: float, connect: float, sock_connect: float, sock_read: float +) -> TimeoutGenerator: + def randomize(value: float) -> float: + return value + value * 0.3 * random() + + return lambda: aiohttp.ClientTimeout( + total=randomize(total), + connect=randomize(connect), + sock_connect=randomize(sock_connect), + sock_read=randomize(sock_read), + )