Skip to content

Commit

Permalink
Change from Network Latency to Step Processing Latency. Closes #239 (#…
Browse files Browse the repository at this point in the history
…240)

* fix(latency of step): Changed the logic of the profiler to instead focus on the out-going data chunk's and their latency.

* fix(registered method): Handling the added output to safe_step.

* fix(B->KB): Making the payload_size(KB) use KB.

* fix(B->KB): Making the payload_size(KB) use KB, with 1024
  • Loading branch information
edavalosanaya authored Aug 28, 2023
1 parent 73b6076 commit 0ab869c
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 118 deletions.
1 change: 1 addition & 0 deletions chimerapy/engine/networking/data_chunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def __init__(self):
"value": {
"ownership": [],
"created": datetime.datetime.now(),
"delta": 0, # ms
"transmitted": None,
"received": None,
},
Expand Down
22 changes: 15 additions & 7 deletions chimerapy/engine/node/processor_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import datetime
import logging
import asyncio
import time
from typing import Dict, List, Optional, Callable, Coroutine, Any, Literal

from chimerapy.engine import _logger
Expand Down Expand Up @@ -204,17 +205,17 @@ async def execute_registered_method(
success = False
if style == "concurrent":
# output = await function(**params) # type: ignore[call-arg]
output = await self.safe_exec(function, kwargs=params)
output, _ = await self.safe_exec(function, kwargs=params)
success = True

elif style == "blocking":
with self.step_lock:
output = await self.safe_exec(function, kwargs=params)
output, _ = await self.safe_exec(function, kwargs=params)
success = True

elif style == "reset":
with self.step_lock:
output = await self.safe_exec(function, kwargs=params)
output, _ = await self.safe_exec(function, kwargs=params)
await self.eventbus.asend(Event("reset"))
success = True

Expand Down Expand Up @@ -245,30 +246,36 @@ async def safe_exec(
output = None

try:
tic = time.perf_counter()
if asyncio.iscoroutinefunction(func):
output = await func(*args, **kwargs)
else:
await asyncio.sleep(1 / 1000) # Allow other functions to run as well
output = func(*args, **kwargs)
toc = time.perf_counter()
except Exception:
traceback_info = traceback.format_exc()
self.logger.error(traceback_info)

return output
# Compute delta
delta = (toc - tic) * 1000

return output, delta

async def safe_step(self, data_chunks: Dict[str, DataChunk] = {}):

# Default value
output = None
delta = 0

if self.main_fn:
if self.main_fn: # If user-defined ``step``
with self.step_lock:
if self.in_bound_data:
output = await self.safe_exec(
output, delta = await self.safe_exec(
self.main_fn, kwargs={"data_chunks": data_chunks}
)
else:
output = await self.safe_exec(self.main_fn)
output, delta = await self.safe_exec(self.main_fn)

# If output generated, send it!
if output:
Expand All @@ -286,6 +293,7 @@ async def safe_step(self, data_chunks: Dict[str, DataChunk] = {}):
# Add timestamp and step id to the DataChunk
meta = output_data_chunk.get("meta")
meta["value"]["transmitted"] = datetime.datetime.now()
meta["value"]["delta"] = delta
output_data_chunk.update("meta", meta)

# Send out the output to the OutputsHandler
Expand Down
56 changes: 19 additions & 37 deletions chimerapy/engine/node/profiler_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from ..service import Service
from ..eventbus import EventBus, TypedObserver, Event
from ..states import NodeState
from .events import NewInBoundDataEvent, DiagnosticsReportEvent
from .events import NewOutBoundDataEvent, DiagnosticsReportEvent


class ProfilerService(Service):
Expand Down Expand Up @@ -54,10 +54,10 @@ def __init__(
# Add observers to profile
self.observers: Dict[str, TypedObserver] = {
"setup": TypedObserver("setup", on_asend=self.setup, handle_event="drop"),
"in_step": TypedObserver(
"in_step",
NewInBoundDataEvent,
on_asend=self.pre_step,
"out_step": TypedObserver(
"out_step",
NewOutBoundDataEvent,
on_asend=self.post_step,
handle_event="unpack",
),
"teardown": TypedObserver(
Expand Down Expand Up @@ -113,7 +113,7 @@ async def diagnostics_report(self):

# Send the information to the Worker and ultimately the Manager
event_data = DiagnosticsReportEvent(diag)
# self.logger.debug(f"{self}: data = {diag}")
self.logger.debug(f"{self}: data = {diag}")
await self.eventbus.asend(Event("diagnostics_report", event_data))

# Write to a csv, if diagnostics enabled
Expand All @@ -138,54 +138,36 @@ async def diagnostics_report(self):
index=False,
)

async def pre_step(self, data_chunks: Dict[str, DataChunk]):
async def post_step(self, data_chunk: DataChunk):
# assert self.process
if not self.process:
return None

# Computing the diagnostics metrics per step (multiple datachunks)
mean_latency = 0.0
# Computing the diagnostics metrics per step
payload_size = 0.0

# Containers to compute metrics
latencies: List[float] = []
payload_sizes: List[float] = []

# First compute latency
for name, chunk in data_chunks.items():
# Obtain the meta data of the data chunk
meta = data_chunk.get("meta")["value"]

# Only process unseen data_chunks, as in the "in_step" event,
# it is possible for the same data chunks to be re-used in the step
if chunk.uuid in self.seen_uuids:
continue
else:
self.seen_uuids.append(chunk.uuid)
# Get the payload size (of all keys)
total_size = 0.0
for key in data_chunk.contains():
payload = data_chunk.get(key)["value"]
total_size += self.get_object_kilobytes(payload)
payload_sizes.append(total_size)

# Obtain the meta data of the data chunk
meta = chunk.get("meta")["value"]

# Get latency
latency = (meta["received"] - meta["transmitted"]).total_seconds() * 1000
latencies.append(latency)

# Get the payload size (of all keys)
total_size = 0.0
for key in chunk.contains():
payload = chunk.get(key)["value"]
total_size += self.get_object_kilobytes(payload)
payload_sizes.append(total_size)

# After processing all data_chunks, get latency and payload means
mean_latency = sum(latencies) / len(latencies)
# After processing all data_chunk keys, get payload total
payload_size = sum(payload_sizes)

# Store results
self.deques["latency(ms)"].append(mean_latency)
self.deques["latency(ms)"].append(meta["delta"])
self.deques["payload_size(KB)"].append(payload_size)

def get_object_kilobytes(self, payload: Any) -> float:
# return sys.getsizeof(payload) / 1024
return len(pickle.dumps(payload))
return len(pickle.dumps(payload)) / 1024

async def teardown(self):
await self.async_timer.stop()
13 changes: 5 additions & 8 deletions test/manager/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import pytest

from chimerapy.engine import config
import chimerapy.engine as cpe
from ..conftest import GenNode, ConsumeNode, TEST_DATA_DIR

Expand Down Expand Up @@ -72,6 +73,9 @@ def test_sending_package(self, manager, _worker, config_graph):
def test_manager_lifecycle(self, manager_with_worker, context):
manager, worker = manager_with_worker

# Enable diagnostics
config.set("diagnostics.logging-enabled", True)

# Define graph
gen_node = GenNode(name="Gen1")
con_node = ConsumeNode(name="Con1")
Expand All @@ -84,16 +88,9 @@ def test_manager_lifecycle(self, manager_with_worker, context):
manager.commit_graph(graph, mapping, context=context).result(timeout=30)

assert manager.start().result()

# time.sleep(3)

assert manager.record().result()

# for i in range(50):
# logger.debug(manager.state)
# time.sleep(1)

time.sleep(20)
time.sleep(5)

assert manager.stop().result()
assert manager.collect().result()
Expand Down
72 changes: 6 additions & 66 deletions test/node/test_profiling_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@
import time
import pathlib
import tempfile
import random

import numpy as np
import pytest

import chimerapy.engine as cpe
from chimerapy.engine import config
from chimerapy.engine.node.profiler_service import ProfilerService
from chimerapy.engine.node.events import NewInBoundDataEvent
from chimerapy.engine.node.events import NewOutBoundDataEvent
from chimerapy.engine.states import NodeState
from chimerapy.engine.eventbus import EventBus, Event
from chimerapy.engine.networking.async_loop_thread import AsyncLoopThread
Expand All @@ -34,7 +35,6 @@ def profiler_setup():
eventbus = EventBus(thread=thread)

# Create sample state
# state = NodeState(logdir=TEST_DATA_DIR)
state = NodeState(logdir=pathlib.Path(tempfile.mkdtemp()))

# Create the profiler
Expand Down Expand Up @@ -64,20 +64,10 @@ def test_single_data_chunk(profiler_setup):
# Mock how the processor marks the time when it got the datachunk
# and transmitted it
meta = example_data_chunk.get("meta")
meta["value"]["transmitted"] = datetime.datetime.now()
meta["value"]["delta"] = random.randrange(500, 1500, 1) # ms
example_data_chunk.update("meta", meta)

# Transmission time
time.sleep(0.1)

# Modify the received timestamp to mock the Poller
meta = example_data_chunk.get("meta")
meta["value"]["received"] = datetime.datetime.now()
example_data_chunk.update("meta", meta)

dcs = {"test": example_data_chunk}

eventbus.send(Event("in_step", NewInBoundDataEvent(dcs))).result()
eventbus.send(Event("out_step", NewOutBoundDataEvent(example_data_chunk))).result()

assert profiler.log_file.exists()

Expand All @@ -95,59 +85,9 @@ def test_single_data_chunk_with_multiple_payloads(profiler_setup):
# Mock how the processor marks the time when it got the datachunk
# and transmitted it
meta = example_data_chunk.get("meta")
meta["value"]["transmitted"] = datetime.datetime.now()
example_data_chunk.update("meta", meta)

# Transmission time
time.sleep(0.1)

# Modify the received timestamp to mock the Poller
meta = example_data_chunk.get("meta")
meta["value"]["received"] = datetime.datetime.now()
example_data_chunk.update("meta", meta)

dcs = {"test": example_data_chunk}

eventbus.send(Event("in_step", NewInBoundDataEvent(dcs))).result()

assert profiler.log_file.exists()


def test_multiple_data_chunk(profiler_setup):
profiler, eventbus = profiler_setup

for i in range(50):

# Run the step multiple times
example_data_chunk = DataChunk()
example_data_chunk2 = DataChunk()
example_data_chunk.add("random", np.random.rand(1000, 1000, 3))
example_data_chunk2.add("random", np.random.rand(1000, 1000, 3))

# Mock how the processor marks the time when it got the datachunk
# and transmitted it
meta = example_data_chunk.get("meta")
meta["value"]["transmitted"] = datetime.datetime.now()
example_data_chunk.update("meta", meta)

meta = example_data_chunk2.get("meta")
meta["value"]["transmitted"] = datetime.datetime.now()
example_data_chunk2.update("meta", meta)

# Transmission time
time.sleep(0.1)

# Modify the received timestamp to mock the Poller
meta = example_data_chunk.get("meta")
meta["value"]["received"] = datetime.datetime.now()
meta["value"]["delta"] = random.randrange(500, 1500, 1)
example_data_chunk.update("meta", meta)

meta = example_data_chunk2.get("meta")
meta["value"]["received"] = datetime.datetime.now()
example_data_chunk2.update("meta", meta)

dcs = {"test": example_data_chunk, "test2": example_data_chunk2}

eventbus.send(Event("in_step", NewInBoundDataEvent(dcs))).result()
eventbus.send(Event("out_step", NewOutBoundDataEvent(example_data_chunk))).result()

assert profiler.log_file.exists()

0 comments on commit 0ab869c

Please sign in to comment.