From 0ab869c229cc2d0aed4081da342e6f57710c6604 Mon Sep 17 00:00:00 2001 From: Eduardo Davalos Anaya <40870026+edavalosanaya@users.noreply.github.com> Date: Mon, 28 Aug 2023 10:44:28 -0500 Subject: [PATCH] Change from Network Latency to Step Processing Latency. Closes #239 (#240) * fix(latency of step): Changed the logic of the profiler to instead focus on the out-going data chunk's and their latency. * fix(registered method): Handling the added output to safe_step. * fix(B->KB): Making the payload_size(KB) use KB. * fix(B->KB): Making the payload_size(KB) use KB, with 1024 --- chimerapy/engine/networking/data_chunk.py | 1 + chimerapy/engine/node/processor_service.py | 22 ++++--- chimerapy/engine/node/profiler_service.py | 56 ++++++----------- test/manager/test_manager.py | 13 ++-- test/node/test_profiling_service.py | 72 ++-------------------- 5 files changed, 46 insertions(+), 118 deletions(-) diff --git a/chimerapy/engine/networking/data_chunk.py b/chimerapy/engine/networking/data_chunk.py index a4b4d363..6afcfedd 100644 --- a/chimerapy/engine/networking/data_chunk.py +++ b/chimerapy/engine/networking/data_chunk.py @@ -41,6 +41,7 @@ def __init__(self): "value": { "ownership": [], "created": datetime.datetime.now(), + "delta": 0, # ms "transmitted": None, "received": None, }, diff --git a/chimerapy/engine/node/processor_service.py b/chimerapy/engine/node/processor_service.py index 56dcc208..8b390063 100644 --- a/chimerapy/engine/node/processor_service.py +++ b/chimerapy/engine/node/processor_service.py @@ -3,6 +3,7 @@ import datetime import logging import asyncio +import time from typing import Dict, List, Optional, Callable, Coroutine, Any, Literal from chimerapy.engine import _logger @@ -204,17 +205,17 @@ async def execute_registered_method( success = False if style == "concurrent": # output = await function(**params) # type: ignore[call-arg] - output = await self.safe_exec(function, kwargs=params) + output, _ = await self.safe_exec(function, kwargs=params) success = True elif style == "blocking": with self.step_lock: - output = await self.safe_exec(function, kwargs=params) + output, _ = await self.safe_exec(function, kwargs=params) success = True elif style == "reset": with self.step_lock: - output = await self.safe_exec(function, kwargs=params) + output, _ = await self.safe_exec(function, kwargs=params) await self.eventbus.asend(Event("reset")) success = True @@ -245,30 +246,36 @@ async def safe_exec( output = None try: + tic = time.perf_counter() if asyncio.iscoroutinefunction(func): output = await func(*args, **kwargs) else: await asyncio.sleep(1 / 1000) # Allow other functions to run as well output = func(*args, **kwargs) + toc = time.perf_counter() except Exception: traceback_info = traceback.format_exc() self.logger.error(traceback_info) - return output + # Compute delta + delta = (toc - tic) * 1000 + + return output, delta async def safe_step(self, data_chunks: Dict[str, DataChunk] = {}): # Default value output = None + delta = 0 - if self.main_fn: + if self.main_fn: # If user-defined ``step`` with self.step_lock: if self.in_bound_data: - output = await self.safe_exec( + output, delta = await self.safe_exec( self.main_fn, kwargs={"data_chunks": data_chunks} ) else: - output = await self.safe_exec(self.main_fn) + output, delta = await self.safe_exec(self.main_fn) # If output generated, send it! if output: @@ -286,6 +293,7 @@ async def safe_step(self, data_chunks: Dict[str, DataChunk] = {}): # Add timestamp and step id to the DataChunk meta = output_data_chunk.get("meta") meta["value"]["transmitted"] = datetime.datetime.now() + meta["value"]["delta"] = delta output_data_chunk.update("meta", meta) # Send out the output to the OutputsHandler diff --git a/chimerapy/engine/node/profiler_service.py b/chimerapy/engine/node/profiler_service.py index 3e1804e8..c063e821 100644 --- a/chimerapy/engine/node/profiler_service.py +++ b/chimerapy/engine/node/profiler_service.py @@ -15,7 +15,7 @@ from ..service import Service from ..eventbus import EventBus, TypedObserver, Event from ..states import NodeState -from .events import NewInBoundDataEvent, DiagnosticsReportEvent +from .events import NewOutBoundDataEvent, DiagnosticsReportEvent class ProfilerService(Service): @@ -54,10 +54,10 @@ def __init__( # Add observers to profile self.observers: Dict[str, TypedObserver] = { "setup": TypedObserver("setup", on_asend=self.setup, handle_event="drop"), - "in_step": TypedObserver( - "in_step", - NewInBoundDataEvent, - on_asend=self.pre_step, + "out_step": TypedObserver( + "out_step", + NewOutBoundDataEvent, + on_asend=self.post_step, handle_event="unpack", ), "teardown": TypedObserver( @@ -113,7 +113,7 @@ async def diagnostics_report(self): # Send the information to the Worker and ultimately the Manager event_data = DiagnosticsReportEvent(diag) - # self.logger.debug(f"{self}: data = {diag}") + self.logger.debug(f"{self}: data = {diag}") await self.eventbus.asend(Event("diagnostics_report", event_data)) # Write to a csv, if diagnostics enabled @@ -138,54 +138,36 @@ async def diagnostics_report(self): index=False, ) - async def pre_step(self, data_chunks: Dict[str, DataChunk]): + async def post_step(self, data_chunk: DataChunk): # assert self.process if not self.process: return None - # Computing the diagnostics metrics per step (multiple datachunks) - mean_latency = 0.0 + # Computing the diagnostics metrics per step payload_size = 0.0 # Containers to compute metrics - latencies: List[float] = [] payload_sizes: List[float] = [] - # First compute latency - for name, chunk in data_chunks.items(): + # Obtain the meta data of the data chunk + meta = data_chunk.get("meta")["value"] - # Only process unseen data_chunks, as in the "in_step" event, - # it is possible for the same data chunks to be re-used in the step - if chunk.uuid in self.seen_uuids: - continue - else: - self.seen_uuids.append(chunk.uuid) + # Get the payload size (of all keys) + total_size = 0.0 + for key in data_chunk.contains(): + payload = data_chunk.get(key)["value"] + total_size += self.get_object_kilobytes(payload) + payload_sizes.append(total_size) - # Obtain the meta data of the data chunk - meta = chunk.get("meta")["value"] - - # Get latency - latency = (meta["received"] - meta["transmitted"]).total_seconds() * 1000 - latencies.append(latency) - - # Get the payload size (of all keys) - total_size = 0.0 - for key in chunk.contains(): - payload = chunk.get(key)["value"] - total_size += self.get_object_kilobytes(payload) - payload_sizes.append(total_size) - - # After processing all data_chunks, get latency and payload means - mean_latency = sum(latencies) / len(latencies) + # After processing all data_chunk keys, get payload total payload_size = sum(payload_sizes) # Store results - self.deques["latency(ms)"].append(mean_latency) + self.deques["latency(ms)"].append(meta["delta"]) self.deques["payload_size(KB)"].append(payload_size) def get_object_kilobytes(self, payload: Any) -> float: - # return sys.getsizeof(payload) / 1024 - return len(pickle.dumps(payload)) + return len(pickle.dumps(payload)) / 1024 async def teardown(self): await self.async_timer.stop() diff --git a/test/manager/test_manager.py b/test/manager/test_manager.py index f3136bf8..383a50bb 100644 --- a/test/manager/test_manager.py +++ b/test/manager/test_manager.py @@ -4,6 +4,7 @@ import pytest +from chimerapy.engine import config import chimerapy.engine as cpe from ..conftest import GenNode, ConsumeNode, TEST_DATA_DIR @@ -72,6 +73,9 @@ def test_sending_package(self, manager, _worker, config_graph): def test_manager_lifecycle(self, manager_with_worker, context): manager, worker = manager_with_worker + # Enable diagnostics + config.set("diagnostics.logging-enabled", True) + # Define graph gen_node = GenNode(name="Gen1") con_node = ConsumeNode(name="Con1") @@ -84,16 +88,9 @@ def test_manager_lifecycle(self, manager_with_worker, context): manager.commit_graph(graph, mapping, context=context).result(timeout=30) assert manager.start().result() - - # time.sleep(3) - assert manager.record().result() - # for i in range(50): - # logger.debug(manager.state) - # time.sleep(1) - - time.sleep(20) + time.sleep(5) assert manager.stop().result() assert manager.collect().result() diff --git a/test/node/test_profiling_service.py b/test/node/test_profiling_service.py index d6c76406..a73b419f 100644 --- a/test/node/test_profiling_service.py +++ b/test/node/test_profiling_service.py @@ -2,6 +2,7 @@ import time import pathlib import tempfile +import random import numpy as np import pytest @@ -9,7 +10,7 @@ import chimerapy.engine as cpe from chimerapy.engine import config from chimerapy.engine.node.profiler_service import ProfilerService -from chimerapy.engine.node.events import NewInBoundDataEvent +from chimerapy.engine.node.events import NewOutBoundDataEvent from chimerapy.engine.states import NodeState from chimerapy.engine.eventbus import EventBus, Event from chimerapy.engine.networking.async_loop_thread import AsyncLoopThread @@ -34,7 +35,6 @@ def profiler_setup(): eventbus = EventBus(thread=thread) # Create sample state - # state = NodeState(logdir=TEST_DATA_DIR) state = NodeState(logdir=pathlib.Path(tempfile.mkdtemp())) # Create the profiler @@ -64,20 +64,10 @@ def test_single_data_chunk(profiler_setup): # Mock how the processor marks the time when it got the datachunk # and transmitted it meta = example_data_chunk.get("meta") - meta["value"]["transmitted"] = datetime.datetime.now() + meta["value"]["delta"] = random.randrange(500, 1500, 1) # ms example_data_chunk.update("meta", meta) - # Transmission time - time.sleep(0.1) - - # Modify the received timestamp to mock the Poller - meta = example_data_chunk.get("meta") - meta["value"]["received"] = datetime.datetime.now() - example_data_chunk.update("meta", meta) - - dcs = {"test": example_data_chunk} - - eventbus.send(Event("in_step", NewInBoundDataEvent(dcs))).result() + eventbus.send(Event("out_step", NewOutBoundDataEvent(example_data_chunk))).result() assert profiler.log_file.exists() @@ -95,59 +85,9 @@ def test_single_data_chunk_with_multiple_payloads(profiler_setup): # Mock how the processor marks the time when it got the datachunk # and transmitted it meta = example_data_chunk.get("meta") - meta["value"]["transmitted"] = datetime.datetime.now() - example_data_chunk.update("meta", meta) - - # Transmission time - time.sleep(0.1) - - # Modify the received timestamp to mock the Poller - meta = example_data_chunk.get("meta") - meta["value"]["received"] = datetime.datetime.now() - example_data_chunk.update("meta", meta) - - dcs = {"test": example_data_chunk} - - eventbus.send(Event("in_step", NewInBoundDataEvent(dcs))).result() - - assert profiler.log_file.exists() - - -def test_multiple_data_chunk(profiler_setup): - profiler, eventbus = profiler_setup - - for i in range(50): - - # Run the step multiple times - example_data_chunk = DataChunk() - example_data_chunk2 = DataChunk() - example_data_chunk.add("random", np.random.rand(1000, 1000, 3)) - example_data_chunk2.add("random", np.random.rand(1000, 1000, 3)) - - # Mock how the processor marks the time when it got the datachunk - # and transmitted it - meta = example_data_chunk.get("meta") - meta["value"]["transmitted"] = datetime.datetime.now() - example_data_chunk.update("meta", meta) - - meta = example_data_chunk2.get("meta") - meta["value"]["transmitted"] = datetime.datetime.now() - example_data_chunk2.update("meta", meta) - - # Transmission time - time.sleep(0.1) - - # Modify the received timestamp to mock the Poller - meta = example_data_chunk.get("meta") - meta["value"]["received"] = datetime.datetime.now() + meta["value"]["delta"] = random.randrange(500, 1500, 1) example_data_chunk.update("meta", meta) - meta = example_data_chunk2.get("meta") - meta["value"]["received"] = datetime.datetime.now() - example_data_chunk2.update("meta", meta) - - dcs = {"test": example_data_chunk, "test2": example_data_chunk2} - - eventbus.send(Event("in_step", NewInBoundDataEvent(dcs))).result() + eventbus.send(Event("out_step", NewOutBoundDataEvent(example_data_chunk))).result() assert profiler.log_file.exists()