Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Sysbench metrics #9

Open
wants to merge 1 commit into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 61 additions & 1 deletion aleph_scoring/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
from aleph.sdk.types import Account
from hexbytes import HexBytes

from aleph_scoring.benchmarks import benchmark_node_performance_sync
from aleph_scoring.benchmarks.models import BenchmarksPost, NodeBenchmarks
from aleph_scoring.config import settings
from aleph_scoring.metrics import measure_node_performance_sync
from aleph_scoring.metrics.models import MetricsPost, NodeMetrics
from aleph_scoring.scoring import compute_ccn_scores, compute_crn_scores
from aleph_scoring.scoring.models import NodeScores, NodeScoresPost
from aleph_scoring.utils import LogLevel, Period, get_latest_github_releases
from aleph_scoring.utils import LogLevel, Period

logger = logging.getLogger(__name__)
aleph_account: Optional[ETHAccount] = None
Expand Down Expand Up @@ -90,6 +92,23 @@ async def publish_scores_on_aleph(
"Published scores on Aleph with status %s: %s", status, scores_post.item_hash
)

async def publish_benchmarks_on_aleph(account: Account, node_benchmarks: NodeBenchmarks):
channel = settings.ALEPH_POST_TYPE_CHANNEL
aleph_api_server = settings.NODE_DATA_HOST

benchmarks_post_data = BenchmarksPost(tags=["devnet"], benchmarks=node_benchmarks)
async with AuthenticatedAlephClient(
account=account, api_server=aleph_api_server
) as client:
metrics_post, status = await client.create_post(
post_content=benchmarks_post_data,
post_type=settings.ALEPH_POST_TYPE_BENCHMARKS,
channel=channel,
)
logger.info(
"Published benchmarks on Aleph with status %s: %s", status, metrics_post.item_hash
)


def run_measurements(
output: Optional[Path] = typer.Option(
Expand All @@ -114,6 +133,30 @@ def run_measurements(
)


def run_benchmarks(
output: Optional[Path] = typer.Option(
default=None, help="Path where to save the result in JSON format."
),
stdout: bool = typer.Option(default=False, help="Print the result on stdout"),
publish: bool = typer.Option(
default=False,
help="Publish the results on Aleph.",
),
):
node_benchmarks = benchmark_node_performance_sync()

if output:
save_as_json(node_metrics=node_benchmarks, file=output)
if stdout:
print(node_benchmarks.json(indent=4))
if publish:
account = get_aleph_account()
asyncio.run(
publish_benchmarks_on_aleph(account=account, node_benchmarks=node_benchmarks)
)



@app.command()
def measure(
output: Optional[Path] = typer.Option(
Expand Down Expand Up @@ -296,6 +339,23 @@ def compute_on_schedule(
def export_as_html(input_file: Optional[Path]):
os.system("jupyter nbconvert --execute Node\\ Score\\ Analysis.ipynb --to html")

@app.command()
def benchmark(
output: Optional[Path] = typer.Option(
default=None, help="Path where to save the result in JSON format."
),
publish: bool = typer.Option(
default=False,
help="Publish the results on Aleph.",
),
log_level: str = typer.Option(
default=LogLevel.INFO.name,
help="Logging level",
),
):
logging.basicConfig(level=LogLevel[log_level])
run_benchmarks(output=output, publish=publish)


def main():
if settings.SENTRY_DSN:
Expand Down
135 changes: 135 additions & 0 deletions aleph_scoring/benchmarks/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
import asyncio
import logging
import socket
from datetime import datetime
from random import random, shuffle
from typing import (
Any,
Awaitable,
Callable,
Dict,
Iterable,
NewType,
Sequence,
TypeVar,
Union,
)

import aiohttp
from aleph_scoring.config import settings
from aleph_scoring.utils import (
NodeInfo,
get_aleph_nodes,
get_compute_resource_node_urls,
get_crn_version,
timeout_generator,
)

from .models import CrnBenchmarks, NodeBenchmarks

logger = logging.getLogger(__name__)

TimeoutGenerator = NewType("TimeoutGenerator", Callable[[], aiohttp.ClientTimeout])

B = TypeVar("B")


async def get_crn_benchmarks(
timeout_generator: TimeoutGenerator, node_info: NodeInfo
) -> CrnBenchmarks:
# Avoid doing all the calls at the same time
await asyncio.sleep(random() * 30)

url = node_info.url.url
measured_at = datetime.utcnow()

# Get the version over IPv4 or IPv6
async with aiohttp.ClientSession(timeout=timeout_generator()) as session_any_ip:
for attempt in range(3):
version = await get_crn_version(session=session_any_ip, node_url=url)
if version:
break

async with aiohttp.ClientSession(
timeout=timeout_generator(),
connector=aiohttp.TCPConnector(
family=socket.AF_INET,
keepalive_timeout=300,
limit=1000,
limit_per_host=20,
),
) as session_ipv4:
try:
async with session_ipv4.get(f"{url}vm/{settings.BENCHMARK_VM_HASH}/sysbench/cpu") as resp:
if resp.status != 200:
cpu_bench = None
else:
cpu_bench = await resp.json()

async with session_ipv4.get(f"{url}vm/{settings.BENCHMARK_VM_HASH}/sysbench/memory") as resp:
if resp.status != 200:
ram_bench = None
else:
ram_bench = await resp.json()
async with session_ipv4.get(f"{url}vm/{settings.BENCHMARK_VM_HASH}/sysbench/disk") as resp:
if resp.status != 200:
disk_bench = None
else:
disk_bench = await resp.json()
except aiohttp.ClientResponseError:
logger.debug(f"Error when fetching {url}")
cpu_bench = None
ram_bench = None
disk_bench = None
except aiohttp.ClientConnectorError:
logger.debug(f"Error when fetching {url}")
cpu_bench = None
ram_bench = None
disk_bench = None
except asyncio.TimeoutError:
logger.debug(f"Timeout error when fetching {url}")
cpu_bench = None
ram_bench = None
disk_bench = None

return CrnBenchmarks(
measured_at=measured_at.timestamp(),
node_id=node_info.hash,
version=version,
cpu=cpu_bench,
ram=ram_bench,
disk=disk_bench
)


async def collect_all_crn_benchmarks(node_data: Dict[str, Any]) -> Sequence[CrnBenchmarks]:
node_infos = list(get_compute_resource_node_urls(node_data))
shuffle(node_infos) # Avoid artifacts from the order in the list
return await collect_node_benchmarks(
node_infos=node_infos, metrics_function=get_crn_benchmarks
)


async def collect_node_benchmarks(
node_infos: Iterable[NodeInfo],
metrics_function: Callable[[TimeoutGenerator, NodeInfo], Awaitable[B]],
) -> Sequence[Union[B, BaseException]]:
timeout = timeout_generator(
total=60.0, connect=10.0, sock_connect=10.0, sock_read=60.0
)
return await asyncio.gather(
*[metrics_function(timeout, node_info) for node_info in node_infos]
)


async def benchmark_node_performance() -> NodeBenchmarks:
logger.debug("Benchmark node performance")
aleph_nodes = await get_aleph_nodes()
crn_benchmarks = await collect_all_crn_benchmarks(aleph_nodes)
return NodeBenchmarks(
crn=crn_benchmarks
)


def benchmark_node_performance_sync() -> NodeBenchmarks:
return asyncio.run(benchmark_node_performance())
22 changes: 22 additions & 0 deletions aleph_scoring/benchmarks/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from typing import List, Optional

from pydantic import BaseModel


class CrnBenchmarks(BaseModel):
measured_at: float
node_id: str
version: Optional[str]
cpu: Optional[dict]
ram: Optional[dict]
disk: Optional[dict]


class NodeBenchmarks(BaseModel):
crn: List[CrnBenchmarks]


class BenchmarksPost(BaseModel):
version: str = "1.0"
tags: List[str]
benchmarks: NodeBenchmarks
3 changes: 3 additions & 0 deletions aleph_scoring/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class Settings(BaseSettings):
ALEPH_POST_TYPE_CHANNEL: Optional[str] = "aleph-scoring"
ALEPH_POST_TYPE_METRICS: str = "test-aleph-network-metrics"
ALEPH_POST_TYPE_SCORES: str = "test-aleph-scoring-scores"
ALEPH_POST_TYPE_BENCHMARKS: str = "test-aleph-scoring-benchmarks"
ASN_DB_DIRECTORY: Path = "/srv/asn"
ASN_DB_PATH: str = "/tmp/asn_db.bz2"
ASN_DB_REFRESH_PERIOD_DAYS: int = 1
Expand All @@ -36,6 +37,8 @@ class Settings(BaseSettings):
VERSION_GRACE_PERIOD: timedelta = timedelta(weeks=2)
SCORE_METRICS_PERIOD: timedelta = timedelta(days=1) # TODO: bring back to 2 weeks

BENCHMARK_VM_HASH: str = "80b48e93995b3f31ee624c085cc6fa4cee4ced0174ff33e95b32e5992d68f755"

class Config:
env_file = ".env"
env_prefix = "ALEPH_SCORING_"
Expand Down
Loading