From 3c5beed49127247ee04073b4d71ed6a67405f51e Mon Sep 17 00:00:00 2001 From: Dan Morton Date: Mon, 26 Feb 2024 13:21:49 -0600 Subject: [PATCH 01/23] EPD-612: Evaluators in tracer POC --- autoblocks/_impl/testing/models.py | 27 +++++++++++ autoblocks/_impl/tracer.py | 78 +++++++++++++++++++++++++++--- autoblocks/_impl/util.py | 2 +- tests/autoblocks/test_tracer.py | 55 +++++++++++++++++++++ 4 files changed, 155 insertions(+), 7 deletions(-) diff --git a/autoblocks/_impl/testing/models.py b/autoblocks/_impl/testing/models.py index a5f5d8a4..36415ee1 100644 --- a/autoblocks/_impl/testing/models.py +++ b/autoblocks/_impl/testing/models.py @@ -1,6 +1,7 @@ import abc import dataclasses import functools +import uuid from typing import Any from typing import Optional @@ -13,6 +14,16 @@ class Threshold: gte: Optional[float] = None +@dataclasses.dataclass() +class EventEvaluation: + evaluator_external_id: str + score: float + id: Optional[str] = dataclasses.field(default_factory=lambda: str(uuid.uuid4())) + metadata: Optional[dict] = None + threshold: Optional[Threshold] = None + + +# TODO: Rename TestEvaluation? @dataclasses.dataclass() class Evaluation: score: float @@ -43,3 +54,19 @@ def id(self) -> str: @abc.abstractmethod def evaluate_test_case(self, test_case: BaseTestCase, output: Any) -> Evaluation: pass + + +class BaseEventEvaluator(abc.ABC): + """ + An abstract base class for implementing an evaluator that runs on events + in an online testing scenario. + """ + + @property + @abc.abstractmethod + def id(self) -> str: + pass + + @abc.abstractmethod + def evaluate_event(self, event: Any) -> EventEvaluation: + pass diff --git a/autoblocks/_impl/tracer.py b/autoblocks/_impl/tracer.py index 03ead02d..b373a8e2 100644 --- a/autoblocks/_impl/tracer.py +++ b/autoblocks/_impl/tracer.py @@ -1,4 +1,6 @@ import asyncio +import contextvars +import inspect import logging import uuid from contextlib import contextmanager @@ -6,12 +8,17 @@ from datetime import datetime from datetime import timedelta from datetime import timezone +from typing import Any from typing import Dict +from typing import List from typing import Optional from autoblocks._impl import global_state from autoblocks._impl.config.constants import INGESTION_ENDPOINT +from autoblocks._impl.testing.models import BaseEventEvaluator +from autoblocks._impl.testing.models import EventEvaluation from autoblocks._impl.util import AutoblocksEnvVar +from autoblocks._impl.util import gather_with_max_concurrency log = logging.getLogger(__name__) @@ -101,6 +108,31 @@ def start_span(self): props["span_id"] = prev_span_id self.set_properties(props) + async def evaluate_event(self, event: Any, evaluator: BaseEventEvaluator) -> None: + """ + Evaluates an event using a provided evaluator. + """ + if inspect.iscoroutinefunction(evaluator.evaluate_event): + try: + evaluation = await evaluator.evaluate_event(event=event) + except Exception: + print("NEED TO LOG ERROR HERE") + else: + try: + ctx = contextvars.copy_context() + evaluation = await global_state.event_loop().run_in_executor( + None, + ctx.run, + evaluator.evaluate_event, + event, + ) + except Exception: + print("NEED TO LOG ERROR HERE") + + if evaluation is None: + return + return evaluation + async def _send_event_unsafe( self, # Require all arguments to be specified via key=value @@ -112,6 +144,8 @@ async def _send_event_unsafe( timestamp: Optional[str] = None, properties: Optional[Dict] = None, prompt_tracking: Optional[Dict] = None, + evaluators: List[BaseEventEvaluator] = [], + max_evaluator_concurrency: int, ) -> SendEventResponse: merged_properties = dict(self._properties) merged_properties.update(properties or {}) @@ -125,14 +159,42 @@ async def _send_event_unsafe( trace_id = trace_id or self._trace_id timestamp = timestamp or datetime.now(timezone.utc).isoformat() + event_json = { + "message": message, + "traceId": trace_id, + "timestamp": timestamp, + "properties": merged_properties, + } + try: + evaluations: List[EventEvaluation] = await gather_with_max_concurrency( + max_evaluator_concurrency, + [ + self.evaluate_event( + event=event_json, + evaluator=evaluator, + ) + for evaluator in evaluators + ], + ) + if evaluations and len(evaluations) > 0: + # loop through each evaluations and build a dict containing externalEvaluationId as the key + formatted_eval_json = [ + dict( + externalEvaluationId=evaluation.evaluator_external_id, + id=str(evaluation.id), + score=evaluation.score, + metadata=dict(evaluation.metadata) if evaluation.metadata else None, + threshold=dict(evaluation.threshold) if evaluation.threshold else None, + ) + for evaluation in evaluations + ] + event_json["properties"]["evaluations"] = formatted_eval_json + except Exception: + print("NEED TO LOG ERROR HERE") + req = await global_state.http_client().post( url=INGESTION_ENDPOINT, - json={ - "message": message, - "traceId": trace_id, - "timestamp": timestamp, - "properties": merged_properties, - }, + json=event_json, headers=self._client_headers, timeout=self._timeout_seconds, ) @@ -150,6 +212,8 @@ def send_event( parent_span_id: Optional[str] = None, timestamp: Optional[str] = None, properties: Optional[Dict] = None, + evaluators: List[BaseEventEvaluator] = [], + max_evaluator_concurrency: int = 5, ) -> SendEventResponse: """ Sends an event to the Autoblocks ingestion API. @@ -166,6 +230,8 @@ def send_event( parent_span_id=parent_span_id, timestamp=timestamp, properties=properties, + evaluators=evaluators, + max_evaluator_concurrency=max_evaluator_concurrency, ), global_state.event_loop(), ) diff --git a/autoblocks/_impl/util.py b/autoblocks/_impl/util.py index 10d6d93c..aa772f28 100644 --- a/autoblocks/_impl/util.py +++ b/autoblocks/_impl/util.py @@ -50,4 +50,4 @@ async def sem_coro(coro: Coroutine): # return_exceptions=True causes exceptions to be returned as values instead # of propagating them to the caller. this is similar in behavior to Promise.allSettled - await asyncio.gather(*(sem_coro(c) for c in coroutines), return_exceptions=True) + return await asyncio.gather(*(sem_coro(c) for c in coroutines), return_exceptions=True) diff --git a/tests/autoblocks/test_tracer.py b/tests/autoblocks/test_tracer.py index b28c9c80..f5636e5d 100644 --- a/tests/autoblocks/test_tracer.py +++ b/tests/autoblocks/test_tracer.py @@ -1,12 +1,15 @@ import os import uuid from datetime import datetime +from typing import Any from unittest import mock import freezegun import pytest from autoblocks._impl.config.constants import INGESTION_ENDPOINT +from autoblocks._impl.testing.models import BaseEventEvaluator +from autoblocks._impl.testing.models import EventEvaluation from autoblocks.tracer import AutoblocksTracer from tests.autoblocks.util import make_expected_body @@ -448,3 +451,55 @@ def test_tracer_start_span(*args, **kwargs): assert tracer._properties.get("span_id") is None assert tracer._properties.get("parent_span_id") is None + + +def test_tracer_prod_evaluations(httpx_mock): + test_event_id = uuid.uuid4() + + class MyEvaluator(BaseEventEvaluator): + id = "my-evaluator" + + def evaluate_event(self, event: Any) -> EventEvaluation: + return EventEvaluation( + evaluator_external_id=self.id, + id=test_event_id, + score=0.9, + threshold={"gt": 0.5}, + ) + + mock_input = { + "trace_id": "my-trace-id", + "timestamp": timestamp, + "properties": {}, + "evaluators": [ + MyEvaluator(), + ], + } + httpx_mock.add_response( + url=INGESTION_ENDPOINT, + method="POST", + status_code=200, + json={"traceId": "my-trace-id"}, + match_headers={"Authorization": "Bearer mock-ingestion-key"}, + match_content=make_expected_body( + dict( + message="my-message", + traceId="my-trace-id", + timestamp=timestamp, + properties={ + "evaluations": [ + { + "externalEvaluationId": "my-evaluator", + "id": str(test_event_id), + "score": 0.9, + "metadata": None, + "threshold": {"gt": 0.5}, + } + ] + }, + ) + ), + ) + tracer = AutoblocksTracer("mock-ingestion-key") + resp = tracer.send_event("my-message", **mock_input) + assert resp.trace_id == "my-trace-id" From 4264ec1d384f9d08d7eca523378c5d274060ae04 Mon Sep 17 00:00:00 2001 From: Dan Morton Date: Mon, 26 Feb 2024 15:50:12 -0600 Subject: [PATCH 02/23] EPD-612: Updates --- autoblocks/_impl/testing/models.py | 30 +++++++++++- autoblocks/_impl/tracer.py | 78 ++++++++++++++++-------------- tests/autoblocks/test_tracer.py | 9 ++-- 3 files changed, 74 insertions(+), 43 deletions(-) diff --git a/autoblocks/_impl/testing/models.py b/autoblocks/_impl/testing/models.py index 36415ee1..f1913bbe 100644 --- a/autoblocks/_impl/testing/models.py +++ b/autoblocks/_impl/testing/models.py @@ -6,6 +6,23 @@ from typing import Optional +@dataclasses.dataclass() +class TracerEvent: + message: str + trace_id: str + timestamp: str + properties: dict + + @classmethod + def to_json(cls, event): + return { + "message": event.message, + "traceId": event.trace_id, + "timestamp": event.timestamp, + "properties": event.properties, + } + + @dataclasses.dataclass() class Threshold: lt: Optional[float] = None @@ -22,6 +39,17 @@ class EventEvaluation: metadata: Optional[dict] = None threshold: Optional[Threshold] = None + @classmethod + def to_json(cls, event_evaluation): + print(event_evaluation) + return dict( + evaluatorExternalId=event_evaluation.evaluator_external_id, + id=str(event_evaluation.id), + score=event_evaluation.score, + metadata=dict(event_evaluation.metadata) if event_evaluation.metadata else None, + threshold=dict(event_evaluation.threshold) if event_evaluation.threshold else None, + ) + # TODO: Rename TestEvaluation? @dataclasses.dataclass() @@ -68,5 +96,5 @@ def id(self) -> str: pass @abc.abstractmethod - def evaluate_event(self, event: Any) -> EventEvaluation: + def evaluate_event(self, event: TracerEvent) -> EventEvaluation: pass diff --git a/autoblocks/_impl/tracer.py b/autoblocks/_impl/tracer.py index b075063a..6cf9918b 100644 --- a/autoblocks/_impl/tracer.py +++ b/autoblocks/_impl/tracer.py @@ -8,7 +8,6 @@ from datetime import datetime from datetime import timedelta from datetime import timezone -from typing import Any from typing import Dict from typing import List from typing import Optional @@ -17,17 +16,19 @@ from autoblocks._impl.config.constants import INGESTION_ENDPOINT from autoblocks._impl.testing.models import BaseEventEvaluator from autoblocks._impl.testing.models import EventEvaluation +from autoblocks._impl.testing.models import TracerEvent from autoblocks._impl.util import AutoblocksEnvVar from autoblocks._impl.util import gather_with_max_concurrency -log = logging.getLogger(__name__) - @dataclass class SendEventResponse: trace_id: Optional[str] +log = logging.getLogger(__name__) + + class AutoblocksTracer: def __init__( self, @@ -108,7 +109,7 @@ def start_span(self): props["span_id"] = prev_span_id self.set_properties(props) - async def evaluate_event(self, event: Any, evaluator: BaseEventEvaluator) -> None: + async def evaluate_event(self, event: TracerEvent, evaluator: BaseEventEvaluator) -> None: """ Evaluates an event using a provided evaluator. """ @@ -133,6 +134,32 @@ async def evaluate_event(self, event: Any, evaluator: BaseEventEvaluator) -> Non return return evaluation + async def _run_and_build_evals_properties( + self, evaluators: BaseEventEvaluator, event: TracerEvent, max_evaluator_concurrency: int + ) -> List[EventEvaluation]: + event_dict = TracerEvent.to_json(event) + if len(evaluators) == 0: + return event_dict + try: + evaluations: List[EventEvaluation] = await gather_with_max_concurrency( + max_evaluator_concurrency, + [ + self.evaluate_event( + event=event, + evaluator=evaluator, + ) + for evaluator in evaluators + ], + ) + if evaluations and len(evaluations) > 0: + # loop through each evaluations and build a dict containing externalEvaluationId as the key + formatted_eval_json = [EventEvaluation.to_json(evaluation) for evaluation in evaluations] + event_dict["properties"]["evaluations"] = formatted_eval_json + return event_dict + except Exception: + print("NEED TO LOG ERROR HERE") + return event_dict + async def _send_event_unsafe( self, # Require all arguments to be specified via key=value @@ -159,42 +186,19 @@ async def _send_event_unsafe( trace_id = trace_id or self._trace_id timestamp = timestamp or datetime.now(timezone.utc).isoformat() - event_json = { - "message": message, - "traceId": trace_id, - "timestamp": timestamp, - "properties": merged_properties, - } - try: - evaluations: List[EventEvaluation] = await gather_with_max_concurrency( - max_evaluator_concurrency, - [ - self.evaluate_event( - event=event_json, - evaluator=evaluator, - ) - for evaluator in evaluators - ], - ) - if evaluations and len(evaluations) > 0: - # loop through each evaluations and build a dict containing externalEvaluationId as the key - formatted_eval_json = [ - dict( - externalEvaluationId=evaluation.evaluator_external_id, - id=str(evaluation.id), - score=evaluation.score, - metadata=dict(evaluation.metadata) if evaluation.metadata else None, - threshold=dict(evaluation.threshold) if evaluation.threshold else None, - ) - for evaluation in evaluations - ] - event_json["properties"]["evaluations"] = formatted_eval_json - except Exception: - print("NEED TO LOG ERROR HERE") + event = TracerEvent( + message=message, + trace_id=trace_id, + timestamp=timestamp, + properties=merged_properties, + ) + transformed_event_json = await self._run_and_build_evals_properties( + evaluators, event, max_evaluator_concurrency + ) req = await global_state.http_client().post( url=INGESTION_ENDPOINT, - json=event_json, + json=transformed_event_json, headers=self._client_headers, timeout=self._timeout_seconds, ) diff --git a/tests/autoblocks/test_tracer.py b/tests/autoblocks/test_tracer.py index f5636e5d..3faa5aab 100644 --- a/tests/autoblocks/test_tracer.py +++ b/tests/autoblocks/test_tracer.py @@ -1,7 +1,6 @@ import os import uuid from datetime import datetime -from typing import Any from unittest import mock import freezegun @@ -10,6 +9,7 @@ from autoblocks._impl.config.constants import INGESTION_ENDPOINT from autoblocks._impl.testing.models import BaseEventEvaluator from autoblocks._impl.testing.models import EventEvaluation +from autoblocks._impl.testing.models import TracerEvent from autoblocks.tracer import AutoblocksTracer from tests.autoblocks.util import make_expected_body @@ -459,12 +459,11 @@ def test_tracer_prod_evaluations(httpx_mock): class MyEvaluator(BaseEventEvaluator): id = "my-evaluator" - def evaluate_event(self, event: Any) -> EventEvaluation: + def evaluate_event(self, event: TracerEvent) -> EventEvaluation: return EventEvaluation( evaluator_external_id=self.id, id=test_event_id, score=0.9, - threshold={"gt": 0.5}, ) mock_input = { @@ -489,11 +488,11 @@ def evaluate_event(self, event: Any) -> EventEvaluation: properties={ "evaluations": [ { - "externalEvaluationId": "my-evaluator", + "evaluatorExternalId": "my-evaluator", "id": str(test_event_id), "score": 0.9, "metadata": None, - "threshold": {"gt": 0.5}, + "threshold": None, } ] }, From e0b4e447ee9e5e6984cb8113d8ecd3f1e8775ca6 Mon Sep 17 00:00:00 2001 From: Dan Morton Date: Mon, 26 Feb 2024 16:08:45 -0600 Subject: [PATCH 03/23] EPD-612: Handling failing evaluator --- autoblocks/_impl/testing/models.py | 1 - autoblocks/_impl/tracer.py | 21 ++++++++------ tests/autoblocks/test_tracer.py | 44 +++++++++++++++++++++++++++--- 3 files changed, 52 insertions(+), 14 deletions(-) diff --git a/autoblocks/_impl/testing/models.py b/autoblocks/_impl/testing/models.py index f1913bbe..68156ebe 100644 --- a/autoblocks/_impl/testing/models.py +++ b/autoblocks/_impl/testing/models.py @@ -41,7 +41,6 @@ class EventEvaluation: @classmethod def to_json(cls, event_evaluation): - print(event_evaluation) return dict( evaluatorExternalId=event_evaluation.evaluator_external_id, id=str(event_evaluation.id), diff --git a/autoblocks/_impl/tracer.py b/autoblocks/_impl/tracer.py index 6cf9918b..8ad28042 100644 --- a/autoblocks/_impl/tracer.py +++ b/autoblocks/_impl/tracer.py @@ -116,8 +116,8 @@ async def evaluate_event(self, event: TracerEvent, evaluator: BaseEventEvaluator if inspect.iscoroutinefunction(evaluator.evaluate_event): try: evaluation = await evaluator.evaluate_event(event=event) - except Exception: - print("NEED TO LOG ERROR HERE") + except Exception as err: + log.error("Event evaluation through an exception", err) else: try: ctx = contextvars.copy_context() @@ -127,8 +127,8 @@ async def evaluate_event(self, event: TracerEvent, evaluator: BaseEventEvaluator evaluator.evaluate_event, event, ) - except Exception: - print("NEED TO LOG ERROR HERE") + except Exception as err: + log.error("Event evaluation through an exception", err) if evaluation is None: return @@ -152,12 +152,15 @@ async def _run_and_build_evals_properties( ], ) if evaluations and len(evaluations) > 0: - # loop through each evaluations and build a dict containing externalEvaluationId as the key - formatted_eval_json = [EventEvaluation.to_json(evaluation) for evaluation in evaluations] - event_dict["properties"]["evaluations"] = formatted_eval_json + evaluations_json = [ + EventEvaluation.to_json(evaluation) + for evaluation in filter(lambda x: isinstance(x, EventEvaluation), evaluations) + ] + if len(evaluations_json) > 0: + event_dict["properties"]["evaluations"] = evaluations_json return event_dict - except Exception: - print("NEED TO LOG ERROR HERE") + except Exception as err: + log.error("Unable to complete evaluating event", err) return event_dict async def _send_event_unsafe( diff --git a/tests/autoblocks/test_tracer.py b/tests/autoblocks/test_tracer.py index 3faa5aab..1ab72c8c 100644 --- a/tests/autoblocks/test_tracer.py +++ b/tests/autoblocks/test_tracer.py @@ -454,7 +454,7 @@ def test_tracer_start_span(*args, **kwargs): def test_tracer_prod_evaluations(httpx_mock): - test_event_id = uuid.uuid4() + test_evaluation_id = uuid.uuid4() class MyEvaluator(BaseEventEvaluator): id = "my-evaluator" @@ -462,8 +462,9 @@ class MyEvaluator(BaseEventEvaluator): def evaluate_event(self, event: TracerEvent) -> EventEvaluation: return EventEvaluation( evaluator_external_id=self.id, - id=test_event_id, + id=test_evaluation_id, score=0.9, + threshold={"gte": 0.5}, ) mock_input = { @@ -489,10 +490,10 @@ def evaluate_event(self, event: TracerEvent) -> EventEvaluation: "evaluations": [ { "evaluatorExternalId": "my-evaluator", - "id": str(test_event_id), + "id": str(test_evaluation_id), "score": 0.9, "metadata": None, - "threshold": None, + "threshold": {"gte": 0.5}, } ] }, @@ -502,3 +503,38 @@ def evaluate_event(self, event: TracerEvent) -> EventEvaluation: tracer = AutoblocksTracer("mock-ingestion-key") resp = tracer.send_event("my-message", **mock_input) assert resp.trace_id == "my-trace-id" + + +def test_tracer_failing_evaluation(httpx_mock): + class MyEvaluator(BaseEventEvaluator): + id = "my-evaluator" + + def evaluate_event(self, event: TracerEvent) -> EventEvaluation: + raise Exception("Something terrible went wrong") + + mock_input = { + "trace_id": "my-trace-id", + "timestamp": timestamp, + "properties": {}, + "evaluators": [ + MyEvaluator(), + ], + } + httpx_mock.add_response( + url=INGESTION_ENDPOINT, + method="POST", + status_code=200, + json={"traceId": "my-trace-id"}, + match_headers={"Authorization": "Bearer mock-ingestion-key"}, + match_content=make_expected_body( + dict( + message="my-message", + traceId="my-trace-id", + timestamp=timestamp, + properties={}, + ) + ), + ) + tracer = AutoblocksTracer("mock-ingestion-key") + resp = tracer.send_event("my-message", **mock_input) + assert resp.trace_id == "my-trace-id" From 86245e271191e7c9f39f07a109f2fc9ff00f1b87 Mon Sep 17 00:00:00 2001 From: Dan Morton Date: Mon, 26 Feb 2024 16:29:36 -0600 Subject: [PATCH 04/23] Update autoblocks/_impl/tracer.py Co-authored-by: Nicole White --- autoblocks/_impl/tracer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/autoblocks/_impl/tracer.py b/autoblocks/_impl/tracer.py index 8ad28042..feceb1b9 100644 --- a/autoblocks/_impl/tracer.py +++ b/autoblocks/_impl/tracer.py @@ -117,7 +117,7 @@ async def evaluate_event(self, event: TracerEvent, evaluator: BaseEventEvaluator try: evaluation = await evaluator.evaluate_event(event=event) except Exception as err: - log.error("Event evaluation through an exception", err) + log.error("Event evaluation threw an exception", err) else: try: ctx = contextvars.copy_context() From a4f7d92954db2f962bf30c1ed308e714a796ecd9 Mon Sep 17 00:00:00 2001 From: Dan Morton Date: Mon, 26 Feb 2024 16:29:44 -0600 Subject: [PATCH 05/23] Update autoblocks/_impl/tracer.py Co-authored-by: Nicole White --- autoblocks/_impl/tracer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/autoblocks/_impl/tracer.py b/autoblocks/_impl/tracer.py index feceb1b9..0c08396e 100644 --- a/autoblocks/_impl/tracer.py +++ b/autoblocks/_impl/tracer.py @@ -128,7 +128,7 @@ async def evaluate_event(self, event: TracerEvent, evaluator: BaseEventEvaluator event, ) except Exception as err: - log.error("Event evaluation through an exception", err) + log.error("Event evaluation threw an exception", err) if evaluation is None: return From 96de3d3da40b853f7324213c97338152a7f8d18b Mon Sep 17 00:00:00 2001 From: Dan Morton Date: Mon, 26 Feb 2024 16:30:17 -0600 Subject: [PATCH 06/23] Update autoblocks/_impl/tracer.py Co-authored-by: Nicole White --- autoblocks/_impl/tracer.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/autoblocks/_impl/tracer.py b/autoblocks/_impl/tracer.py index 0c08396e..b3b51209 100644 --- a/autoblocks/_impl/tracer.py +++ b/autoblocks/_impl/tracer.py @@ -130,8 +130,6 @@ async def evaluate_event(self, event: TracerEvent, evaluator: BaseEventEvaluator except Exception as err: log.error("Event evaluation threw an exception", err) - if evaluation is None: - return return evaluation async def _run_and_build_evals_properties( From 08f764802371dcca1b9b009ca9ee1a160b808c13 Mon Sep 17 00:00:00 2001 From: Dan Morton Date: Mon, 26 Feb 2024 16:51:38 -0600 Subject: [PATCH 07/23] EPD-612: Initial feedback updates --- autoblocks/_impl/testing/models.py | 9 ++++---- autoblocks/_impl/tracer.py | 10 +++++---- autoblocks/_impl/util.py | 2 +- tests/autoblocks/test_tracer.py | 33 ++++++++++++------------------ 4 files changed, 25 insertions(+), 29 deletions(-) diff --git a/autoblocks/_impl/testing/models.py b/autoblocks/_impl/testing/models.py index 68156ebe..44134bcb 100644 --- a/autoblocks/_impl/testing/models.py +++ b/autoblocks/_impl/testing/models.py @@ -3,6 +3,7 @@ import functools import uuid from typing import Any +from typing import Dict from typing import Optional @@ -14,7 +15,7 @@ class TracerEvent: properties: dict @classmethod - def to_json(cls, event): + def to_json(cls, event: Dict[str, any]): return { "message": event.message, "traceId": event.trace_id, @@ -40,13 +41,13 @@ class EventEvaluation: threshold: Optional[Threshold] = None @classmethod - def to_json(cls, event_evaluation): + def to_json(cls, event_evaluation: Dict[str, any]): return dict( evaluatorExternalId=event_evaluation.evaluator_external_id, id=str(event_evaluation.id), score=event_evaluation.score, - metadata=dict(event_evaluation.metadata) if event_evaluation.metadata else None, - threshold=dict(event_evaluation.threshold) if event_evaluation.threshold else None, + metadata=event_evaluation.metadata, + threshold=dataclasses.asdict(event_evaluation.threshold) if event_evaluation.threshold else None, ) diff --git a/autoblocks/_impl/tracer.py b/autoblocks/_impl/tracer.py index b3b51209..5f4aa82e 100644 --- a/autoblocks/_impl/tracer.py +++ b/autoblocks/_impl/tracer.py @@ -109,10 +109,11 @@ def start_span(self): props["span_id"] = prev_span_id self.set_properties(props) - async def evaluate_event(self, event: TracerEvent, evaluator: BaseEventEvaluator) -> None: + async def _evaluate_event(self, event: TracerEvent, evaluator: BaseEventEvaluator) -> Optional[EventEvaluation]: """ Evaluates an event using a provided evaluator. """ + evaluation = None if inspect.iscoroutinefunction(evaluator.evaluate_event): try: evaluation = await evaluator.evaluate_event(event=event) @@ -142,7 +143,7 @@ async def _run_and_build_evals_properties( evaluations: List[EventEvaluation] = await gather_with_max_concurrency( max_evaluator_concurrency, [ - self.evaluate_event( + self._evaluate_event( event=event, evaluator=evaluator, ) @@ -152,7 +153,8 @@ async def _run_and_build_evals_properties( if evaluations and len(evaluations) > 0: evaluations_json = [ EventEvaluation.to_json(evaluation) - for evaluation in filter(lambda x: isinstance(x, EventEvaluation), evaluations) + for evaluation in evaluations + if isinstance(evaluation, EventEvaluation) ] if len(evaluations_json) > 0: event_dict["properties"]["evaluations"] = evaluations_json @@ -195,7 +197,7 @@ async def _send_event_unsafe( ) transformed_event_json = await self._run_and_build_evals_properties( - evaluators, event, max_evaluator_concurrency + evaluators=evaluators, event=event, max_evaluator_concurrency=max_evaluator_concurrency ) req = await global_state.http_client().post( url=INGESTION_ENDPOINT, diff --git a/autoblocks/_impl/util.py b/autoblocks/_impl/util.py index aa772f28..2de5b464 100644 --- a/autoblocks/_impl/util.py +++ b/autoblocks/_impl/util.py @@ -38,7 +38,7 @@ def encode_uri_component(s: str) -> str: async def gather_with_max_concurrency( max_concurrency: int, coroutines: List[Coroutine], -) -> None: +) -> asyncio.Future[list]: """ Borrowed from https://stackoverflow.com/a/61478547 """ diff --git a/tests/autoblocks/test_tracer.py b/tests/autoblocks/test_tracer.py index 1ab72c8c..1090fddb 100644 --- a/tests/autoblocks/test_tracer.py +++ b/tests/autoblocks/test_tracer.py @@ -9,6 +9,7 @@ from autoblocks._impl.config.constants import INGESTION_ENDPOINT from autoblocks._impl.testing.models import BaseEventEvaluator from autoblocks._impl.testing.models import EventEvaluation +from autoblocks._impl.testing.models import Threshold from autoblocks._impl.testing.models import TracerEvent from autoblocks.tracer import AutoblocksTracer from tests.autoblocks.util import make_expected_body @@ -464,17 +465,9 @@ def evaluate_event(self, event: TracerEvent) -> EventEvaluation: evaluator_external_id=self.id, id=test_evaluation_id, score=0.9, - threshold={"gte": 0.5}, + threshold=Threshold(gte=0.5), ) - mock_input = { - "trace_id": "my-trace-id", - "timestamp": timestamp, - "properties": {}, - "evaluators": [ - MyEvaluator(), - ], - } httpx_mock.add_response( url=INGESTION_ENDPOINT, method="POST", @@ -493,7 +486,7 @@ def evaluate_event(self, event: TracerEvent) -> EventEvaluation: "id": str(test_evaluation_id), "score": 0.9, "metadata": None, - "threshold": {"gte": 0.5}, + "threshold": {"lt": None, "lte": None, "gt": None, "gte": 0.5}, } ] }, @@ -501,7 +494,13 @@ def evaluate_event(self, event: TracerEvent) -> EventEvaluation: ), ) tracer = AutoblocksTracer("mock-ingestion-key") - resp = tracer.send_event("my-message", **mock_input) + resp = tracer.send_event( + "my-message", + trace_id="my-trace-id", + timestamp=timestamp, + properties={}, + evaluators=[MyEvaluator()], + ) assert resp.trace_id == "my-trace-id" @@ -512,14 +511,6 @@ class MyEvaluator(BaseEventEvaluator): def evaluate_event(self, event: TracerEvent) -> EventEvaluation: raise Exception("Something terrible went wrong") - mock_input = { - "trace_id": "my-trace-id", - "timestamp": timestamp, - "properties": {}, - "evaluators": [ - MyEvaluator(), - ], - } httpx_mock.add_response( url=INGESTION_ENDPOINT, method="POST", @@ -536,5 +527,7 @@ def evaluate_event(self, event: TracerEvent) -> EventEvaluation: ), ) tracer = AutoblocksTracer("mock-ingestion-key") - resp = tracer.send_event("my-message", **mock_input) + resp = tracer.send_event( + "my-message", trace_id="my-trace-id", timestamp=timestamp, properties={}, evaluators=[MyEvaluator()] + ) assert resp.trace_id == "my-trace-id" From ac3136043c4ff55cc7d9d934d555d21d7c411c6b Mon Sep 17 00:00:00 2001 From: Dan Morton Date: Mon, 26 Feb 2024 17:18:24 -0600 Subject: [PATCH 08/23] EPD-612: Updates --- autoblocks/_impl/testing/models.py | 35 +--- autoblocks/_impl/tracer.py | 106 +++++++----- autoblocks/_impl/util.py | 3 +- tests/autoblocks/test_tracer.py | 253 ++++++++++++++++++++++++----- 4 files changed, 282 insertions(+), 115 deletions(-) diff --git a/autoblocks/_impl/testing/models.py b/autoblocks/_impl/testing/models.py index 44134bcb..81c5993e 100644 --- a/autoblocks/_impl/testing/models.py +++ b/autoblocks/_impl/testing/models.py @@ -1,7 +1,6 @@ import abc import dataclasses import functools -import uuid from typing import Any from typing import Dict from typing import Optional @@ -14,13 +13,12 @@ class TracerEvent: timestamp: str properties: dict - @classmethod - def to_json(cls, event: Dict[str, any]): + def to_json(self) -> Dict[str, Any]: return { - "message": event.message, - "traceId": event.trace_id, - "timestamp": event.timestamp, - "properties": event.properties, + "message": self.message, + "traceId": self.trace_id, + "timestamp": self.timestamp, + "properties": self.properties, } @@ -32,30 +30,11 @@ class Threshold: gte: Optional[float] = None -@dataclasses.dataclass() -class EventEvaluation: - evaluator_external_id: str - score: float - id: Optional[str] = dataclasses.field(default_factory=lambda: str(uuid.uuid4())) - metadata: Optional[dict] = None - threshold: Optional[Threshold] = None - - @classmethod - def to_json(cls, event_evaluation: Dict[str, any]): - return dict( - evaluatorExternalId=event_evaluation.evaluator_external_id, - id=str(event_evaluation.id), - score=event_evaluation.score, - metadata=event_evaluation.metadata, - threshold=dataclasses.asdict(event_evaluation.threshold) if event_evaluation.threshold else None, - ) - - -# TODO: Rename TestEvaluation? @dataclasses.dataclass() class Evaluation: score: float threshold: Optional[Threshold] = None + metadata: Optional[dict] = None class BaseTestCase(abc.ABC): @@ -96,5 +75,5 @@ def id(self) -> str: pass @abc.abstractmethod - def evaluate_event(self, event: TracerEvent) -> EventEvaluation: + def evaluate_event(self, event: TracerEvent) -> Evaluation: pass diff --git a/autoblocks/_impl/tracer.py b/autoblocks/_impl/tracer.py index 5f4aa82e..1dca9a99 100644 --- a/autoblocks/_impl/tracer.py +++ b/autoblocks/_impl/tracer.py @@ -1,34 +1,35 @@ import asyncio import contextvars +import dataclasses import inspect import logging import uuid from contextlib import contextmanager -from dataclasses import dataclass from datetime import datetime from datetime import timedelta from datetime import timezone +from typing import Any from typing import Dict from typing import List from typing import Optional +from typing import Union from autoblocks._impl import global_state from autoblocks._impl.config.constants import INGESTION_ENDPOINT from autoblocks._impl.testing.models import BaseEventEvaluator -from autoblocks._impl.testing.models import EventEvaluation +from autoblocks._impl.testing.models import Evaluation from autoblocks._impl.testing.models import TracerEvent from autoblocks._impl.util import AutoblocksEnvVar from autoblocks._impl.util import gather_with_max_concurrency +log = logging.getLogger(__name__) + -@dataclass +@dataclasses.dataclass() class SendEventResponse: trace_id: Optional[str] -log = logging.getLogger(__name__) - - class AutoblocksTracer: def __init__( self, @@ -109,7 +110,18 @@ def start_span(self): props["span_id"] = prev_span_id self.set_properties(props) - async def _evaluate_event(self, event: TracerEvent, evaluator: BaseEventEvaluator) -> Optional[EventEvaluation]: + def _evaluation_to_json(evaluation: Evaluation, evaluator_external_id: str) -> Dict[str, Any]: + return dict( + id=str(uuid.uuid4()), + score=evaluation.score, + threshold=dataclasses.asdict(evaluation.threshold) if evaluation.threshold else None, + metadata=evaluation.metadata, + evaluatorExternalId=evaluator_external_id, + ) + + async def _evaluate_event( + self, event: TracerEvent, evaluator: BaseEventEvaluator + ) -> Optional[Union[Evaluation, None]]: """ Evaluates an event using a provided evaluator. """ @@ -118,7 +130,7 @@ async def _evaluate_event(self, event: TracerEvent, evaluator: BaseEventEvaluato try: evaluation = await evaluator.evaluate_event(event=event) except Exception as err: - log.error("Event evaluation threw an exception", err) + log.error("Unable to execute evaluation. Error: %s", repr(err), exc_info=True) else: try: ctx = contextvars.copy_context() @@ -129,18 +141,17 @@ async def _evaluate_event(self, event: TracerEvent, evaluator: BaseEventEvaluato event, ) except Exception as err: - log.error("Event evaluation threw an exception", err) + log.error("Unable to execute evaluation. Error: %s", repr(err), exc_info=True) return evaluation - async def _run_and_build_evals_properties( - self, evaluators: BaseEventEvaluator, event: TracerEvent, max_evaluator_concurrency: int - ) -> List[EventEvaluation]: - event_dict = TracerEvent.to_json(event) + async def _run_evaluators( + self, evaluators: List[BaseEventEvaluator], event: TracerEvent, max_evaluator_concurrency: int + ) -> List[Dict[str, Any]]: if len(evaluators) == 0: - return event_dict + return [] try: - evaluations: List[EventEvaluation] = await gather_with_max_concurrency( + evaluations: List[Union[Evaluation, None]] = await gather_with_max_concurrency( max_evaluator_concurrency, [ self._evaluate_event( @@ -150,58 +161,68 @@ async def _run_and_build_evals_properties( for evaluator in evaluators ], ) - if evaluations and len(evaluations) > 0: - evaluations_json = [ - EventEvaluation.to_json(evaluation) - for evaluation in evaluations - if isinstance(evaluation, EventEvaluation) - ] - if len(evaluations_json) > 0: - event_dict["properties"]["evaluations"] = evaluations_json - return event_dict + return [ + self._evaluation_to_json(event_evaluation=item, evaluator_external_id=evaluator.id) + for evaluator, item in zip(evaluators, evaluations) + if item is not None + ] + except Exception as err: - log.error("Unable to complete evaluating event", err) - return event_dict + log.error("Unable to complete evaluating events. Error: %s", str(err), exc_info=True) + return [] async def _send_event_unsafe( self, # Require all arguments to be specified via key=value *, message: str, - trace_id: Optional[str] = None, - span_id: Optional[str] = None, - parent_span_id: Optional[str] = None, - timestamp: Optional[str] = None, - properties: Optional[Dict] = None, - prompt_tracking: Optional[Dict] = None, - evaluators: List[BaseEventEvaluator] = [], + trace_id: Optional[str], + span_id: Optional[str], + parent_span_id: Optional[str], + timestamp: Optional[str], + properties: Optional[Dict], + prompt_tracking: Optional[Dict], + evaluators: Optional[List[BaseEventEvaluator]], max_evaluator_concurrency: int, ) -> SendEventResponse: merged_properties = dict(self._properties) merged_properties.update(properties or {}) + + # Overwrite properties with any provided as top level arguments if span_id: merged_properties["span_id"] = span_id if parent_span_id: merged_properties["parent_span_id"] = parent_span_id if prompt_tracking: merged_properties["promptTracking"] = prompt_tracking - trace_id = trace_id or self._trace_id timestamp = timestamp or datetime.now(timezone.utc).isoformat() - event = TracerEvent( + # If there are evaluators, run them and compute the evaluations property + if evaluators: + evaluations = await self._run_evaluators( + event=TracerEvent( + message=message, + trace_id=trace_id, + timestamp=timestamp, + properties=merged_properties, + ), + evaluators=evaluators, + max_evaluator_concurrency=max_evaluator_concurrency, + ) + if evaluations: + # Update merged properties with computed evaluations property + merged_properties["evaluations"] = evaluations + + traced_event = TracerEvent( message=message, trace_id=trace_id, timestamp=timestamp, properties=merged_properties, ) - - transformed_event_json = await self._run_and_build_evals_properties( - evaluators=evaluators, event=event, max_evaluator_concurrency=max_evaluator_concurrency - ) req = await global_state.http_client().post( url=INGESTION_ENDPOINT, - json=transformed_event_json, + json=traced_event.to_json(), headers=self._client_headers, timeout=self._timeout_seconds, ) @@ -219,7 +240,7 @@ def send_event( parent_span_id: Optional[str] = None, timestamp: Optional[str] = None, properties: Optional[Dict] = None, - evaluators: List[BaseEventEvaluator] = [], + evaluators: Optional[List[BaseEventEvaluator]] = None, max_evaluator_concurrency: int = 5, prompt_tracking: Optional[Dict] = None, ) -> SendEventResponse: @@ -244,7 +265,8 @@ def send_event( ), global_state.event_loop(), ) - return future.result() + if AutoblocksEnvVar.TRACER_BLOCK_ON_SEND_EVENT.get() == "1": + future.result() except Exception as err: log.error(f"Failed to send event to Autoblocks: {err}", exc_info=True) if AutoblocksEnvVar.TRACER_THROW_ON_ERROR.get() == "1": diff --git a/autoblocks/_impl/util.py b/autoblocks/_impl/util.py index 2de5b464..50f4af19 100644 --- a/autoblocks/_impl/util.py +++ b/autoblocks/_impl/util.py @@ -21,6 +21,7 @@ class AutoblocksEnvVar(StrEnum): CLI_SERVER_ADDRESS = "AUTOBLOCKS_CLI_SERVER_ADDRESS" INGESTION_KEY = "AUTOBLOCKS_INGESTION_KEY" TRACER_THROW_ON_ERROR = "AUTOBLOCKS_TRACER_THROW_ON_ERROR" + TRACER_BLOCK_ON_SEND_EVENT = "TRACER_BLOCK_ON_SEND_EVENT" def get(self) -> Optional[str]: return os.environ.get(self.value) @@ -38,7 +39,7 @@ def encode_uri_component(s: str) -> str: async def gather_with_max_concurrency( max_concurrency: int, coroutines: List[Coroutine], -) -> asyncio.Future[list]: +): """ Borrowed from https://stackoverflow.com/a/61478547 """ diff --git a/tests/autoblocks/test_tracer.py b/tests/autoblocks/test_tracer.py index 1090fddb..aa1156bb 100644 --- a/tests/autoblocks/test_tracer.py +++ b/tests/autoblocks/test_tracer.py @@ -8,7 +8,7 @@ from autoblocks._impl.config.constants import INGESTION_ENDPOINT from autoblocks._impl.testing.models import BaseEventEvaluator -from autoblocks._impl.testing.models import EventEvaluation +from autoblocks._impl.testing.models import Evaluation from autoblocks._impl.testing.models import Threshold from autoblocks._impl.testing.models import TracerEvent from autoblocks.tracer import AutoblocksTracer @@ -45,6 +45,10 @@ def test_client_init_headers_with_env_var(): assert tracer._client_headers["Authorization"] == "Bearer mock-ingestion-key" +@mock.patch.dict( + os.environ, + dict(TRACER_BLOCK_ON_SEND_EVENT="1"), +) def test_tracer_prod(httpx_mock): httpx_mock.add_response( url=INGESTION_ENDPOINT, @@ -62,11 +66,13 @@ def test_tracer_prod(httpx_mock): ), ) tracer = AutoblocksTracer("mock-ingestion-key") - resp = tracer.send_event("my-message") - - assert resp.trace_id == "my-trace-id" + tracer.send_event("my-message") +@mock.patch.dict( + os.environ, + dict(TRACER_BLOCK_ON_SEND_EVENT="1"), +) def test_tracer_prod_no_trace_id_in_response(httpx_mock): httpx_mock.add_response( url=INGESTION_ENDPOINT, @@ -84,11 +90,13 @@ def test_tracer_prod_no_trace_id_in_response(httpx_mock): ), ) tracer = AutoblocksTracer("mock-ingestion-key") - resp = tracer.send_event("my-message") - - assert resp.trace_id is None + tracer.send_event("my-message") +@mock.patch.dict( + os.environ, + dict(TRACER_BLOCK_ON_SEND_EVENT="1"), +) def test_tracer_prod_with_trace_id_in_send_event(httpx_mock): httpx_mock.add_response( url=INGESTION_ENDPOINT, @@ -110,6 +118,10 @@ def test_tracer_prod_with_trace_id_in_send_event(httpx_mock): tracer.send_event("my-message", trace_id="my-trace-id") +@mock.patch.dict( + os.environ, + dict(TRACER_BLOCK_ON_SEND_EVENT="1"), +) def test_tracer_prod_with_trace_id_in_init(httpx_mock): httpx_mock.add_response( url=INGESTION_ENDPOINT, @@ -130,6 +142,10 @@ def test_tracer_prod_with_trace_id_in_init(httpx_mock): tracer.send_event("my-message") +@mock.patch.dict( + os.environ, + dict(TRACER_BLOCK_ON_SEND_EVENT="1"), +) def test_tracer_prod_with_trace_id_override(httpx_mock): httpx_mock.add_response( url=INGESTION_ENDPOINT, @@ -151,6 +167,10 @@ def test_tracer_prod_with_trace_id_override(httpx_mock): tracer.send_event("my-message", trace_id="override-trace-id") +@mock.patch.dict( + os.environ, + dict(TRACER_BLOCK_ON_SEND_EVENT="1"), +) def test_tracer_prod_with_set_trace_id(httpx_mock): httpx_mock.add_response( url=INGESTION_ENDPOINT, @@ -173,6 +193,10 @@ def test_tracer_prod_with_set_trace_id(httpx_mock): tracer.send_event("my-message") +@mock.patch.dict( + os.environ, + dict(TRACER_BLOCK_ON_SEND_EVENT="1"), +) def test_tracer_prod_with_properties(httpx_mock): httpx_mock.add_response( url=INGESTION_ENDPOINT, @@ -194,6 +218,10 @@ def test_tracer_prod_with_properties(httpx_mock): tracer.send_event("my-message") +@mock.patch.dict( + os.environ, + dict(TRACER_BLOCK_ON_SEND_EVENT="1"), +) def test_tracer_prod_with_set_properties(httpx_mock): httpx_mock.add_response( url=INGESTION_ENDPOINT, @@ -216,6 +244,10 @@ def test_tracer_prod_with_set_properties(httpx_mock): tracer.send_event("my-message") +@mock.patch.dict( + os.environ, + dict(TRACER_BLOCK_ON_SEND_EVENT="1"), +) def test_tracer_prod_with_update_properties(httpx_mock): httpx_mock.add_response( url=INGESTION_ENDPOINT, @@ -238,6 +270,10 @@ def test_tracer_prod_with_update_properties(httpx_mock): tracer.send_event("my-message") +@mock.patch.dict( + os.environ, + dict(TRACER_BLOCK_ON_SEND_EVENT="1"), +) def test_tracer_prod_with_update_properties_and_send_event_properties(httpx_mock): httpx_mock.add_response( url=INGESTION_ENDPOINT, @@ -260,6 +296,10 @@ def test_tracer_prod_with_update_properties_and_send_event_properties(httpx_mock tracer.send_event("my-message", properties=dict(z=3)) +@mock.patch.dict( + os.environ, + dict(TRACER_BLOCK_ON_SEND_EVENT="1"), +) def test_tracer_prod_with_properties_with_conflicting_keys(httpx_mock): httpx_mock.add_response( url=INGESTION_ENDPOINT, @@ -282,6 +322,10 @@ def test_tracer_prod_with_properties_with_conflicting_keys(httpx_mock): tracer.send_event("my-message", properties=dict(x=3, z=3)) +@mock.patch.dict( + os.environ, + dict(TRACER_BLOCK_ON_SEND_EVENT="1"), +) def test_tracer_prod_with_timestamp(httpx_mock): httpx_mock.add_response( url=INGESTION_ENDPOINT, @@ -303,14 +347,21 @@ def test_tracer_prod_with_timestamp(httpx_mock): tracer.send_event("my-message", timestamp="2023-07-24T21:52:52.742Z") +@mock.patch.dict( + os.environ, + dict(TRACER_BLOCK_ON_SEND_EVENT="1"), +) def test_tracer_prod_swallows_errors(httpx_mock): httpx_mock.add_exception(Exception()) tracer = AutoblocksTracer("mock-ingestion-key") - resp = tracer.send_event("my-message") - assert resp.trace_id is None + tracer.send_event("my-message") +@mock.patch.dict( + os.environ, + dict(TRACER_BLOCK_ON_SEND_EVENT="1"), +) @mock.patch.dict( os.environ, dict(AUTOBLOCKS_TRACER_THROW_ON_ERROR="1"), @@ -326,6 +377,10 @@ class MyCustomException(Exception): tracer.send_event("my-message") +@mock.patch.dict( + os.environ, + dict(TRACER_BLOCK_ON_SEND_EVENT="1"), +) def test_tracer_prod_handles_non_200(httpx_mock): httpx_mock.add_response( url=INGESTION_ENDPOINT, @@ -344,10 +399,13 @@ def test_tracer_prod_handles_non_200(httpx_mock): ) tracer = AutoblocksTracer("mock-ingestion-key") - resp = tracer.send_event("my-message") - assert resp.trace_id is None + tracer.send_event("my-message") +@mock.patch.dict( + os.environ, + dict(TRACER_BLOCK_ON_SEND_EVENT="1"), +) def test_tracer_sends_span_id_as_property(httpx_mock): httpx_mock.add_response( url=INGESTION_ENDPOINT, @@ -365,11 +423,13 @@ def test_tracer_sends_span_id_as_property(httpx_mock): ), ) tracer = AutoblocksTracer("mock-ingestion-key") - resp = tracer.send_event("my-message", span_id="my-span-id") - - assert resp.trace_id == "my-trace-id" + tracer.send_event("my-message", span_id="my-span-id") +@mock.patch.dict( + os.environ, + dict(TRACER_BLOCK_ON_SEND_EVENT="1"), +) def test_tracer_sends_parent_span_id_as_property(httpx_mock): httpx_mock.add_response( url=INGESTION_ENDPOINT, @@ -387,11 +447,13 @@ def test_tracer_sends_parent_span_id_as_property(httpx_mock): ), ) tracer = AutoblocksTracer("mock-ingestion-key") - resp = tracer.send_event("my-message", parent_span_id="my-parent-span-id") - - assert resp.trace_id == "my-trace-id" + tracer.send_event("my-message", parent_span_id="my-parent-span-id") +@mock.patch.dict( + os.environ, + dict(TRACER_BLOCK_ON_SEND_EVENT="1"), +) def test_tracer_sends_span_id_and_parent_span_id_as_property(httpx_mock): httpx_mock.add_response( url=INGESTION_ENDPOINT, @@ -409,21 +471,17 @@ def test_tracer_sends_span_id_and_parent_span_id_as_property(httpx_mock): ), ) tracer = AutoblocksTracer("mock-ingestion-key") - resp = tracer.send_event("my-message", span_id="my-span-id", parent_span_id="my-parent-span-id") - - assert resp.trace_id == "my-trace-id" + tracer.send_event("my-message", span_id="my-span-id", parent_span_id="my-parent-span-id") -@mock.patch.dict( - os.environ, - { - "AUTOBLOCKS_INGESTION_KEY": "key", - }, -) @mock.patch.object( uuid, "uuid4", - side_effect=[f"mock-uuid-{i}" for i in range(10)], + side_effect=[f"mock-uuid-{i}" for i in range(4)], +) +@mock.patch.dict( + os.environ, + {"AUTOBLOCKS_INGESTION_KEY": "key", "TRACER_BLOCK_ON_SEND_EVENT": "1"}, ) def test_tracer_start_span(*args, **kwargs): tracer = AutoblocksTracer() @@ -454,16 +512,21 @@ def test_tracer_start_span(*args, **kwargs): assert tracer._properties.get("parent_span_id") is None +@mock.patch.object( + uuid, + "uuid4", + side_effect=["mock-uuid-1"], +) +@mock.patch.dict( + os.environ, + dict(TRACER_BLOCK_ON_SEND_EVENT="1"), +) def test_tracer_prod_evaluations(httpx_mock): - test_evaluation_id = uuid.uuid4() - class MyEvaluator(BaseEventEvaluator): id = "my-evaluator" - def evaluate_event(self, event: TracerEvent) -> EventEvaluation: - return EventEvaluation( - evaluator_external_id=self.id, - id=test_evaluation_id, + def evaluate_event(self, event: TracerEvent) -> Evaluation: + return Evaluation( score=0.9, threshold=Threshold(gte=0.5), ) @@ -482,11 +545,11 @@ def evaluate_event(self, event: TracerEvent) -> EventEvaluation: properties={ "evaluations": [ { - "evaluatorExternalId": "my-evaluator", - "id": str(test_evaluation_id), + "id": "mock-uuid-1", "score": 0.9, "metadata": None, "threshold": {"lt": None, "lte": None, "gt": None, "gte": 0.5}, + "evaluatorExternalId": "my-evaluator", } ] }, @@ -494,21 +557,110 @@ def evaluate_event(self, event: TracerEvent) -> EventEvaluation: ), ) tracer = AutoblocksTracer("mock-ingestion-key") - resp = tracer.send_event( + tracer.send_event( "my-message", trace_id="my-trace-id", timestamp=timestamp, properties={}, evaluators=[MyEvaluator()], ) - assert resp.trace_id == "my-trace-id" +@mock.patch.object( + uuid, + "uuid4", + side_effect=["mock-uuid" for _ in range(2)], +) +@mock.patch.dict( + os.environ, + dict(TRACER_BLOCK_ON_SEND_EVENT="1"), +) +def test_tracer_prod_async_evaluations(httpx_mock): + test_evaluation_id = uuid.uuid4() + + class MyEvaluator1(BaseEventEvaluator): + id = "my-evaluator-1" + + async def evaluate_event(self, event: TracerEvent) -> Evaluation: + return Evaluation( + evaluator_external_id=self.id, + id=test_evaluation_id, + score=0.9, + threshold=Threshold(gte=0.5), + ) + + class MyEvaluator2(BaseEventEvaluator): + id = "my-evaluator-2" + + async def evaluate_event(self, event: TracerEvent) -> Evaluation: + return Evaluation( + score=0.3, + ) + + httpx_mock.add_response( + url=INGESTION_ENDPOINT, + method="POST", + status_code=200, + json={"traceId": "my-trace-id"}, + match_headers={"Authorization": "Bearer mock-ingestion-key"}, + match_content=make_expected_body( + dict( + message="my-message", + traceId="my-trace-id", + timestamp=timestamp, + properties={ + "evaluations": [ + { + "evaluatorExternalId": "my-evaluator-1", + "id": "mock-uuid", + "score": 0.9, + "metadata": None, + "threshold": {"lt": None, "lte": None, "gt": None, "gte": 0.5}, + }, + { + "evaluatorExternalId": "my-evaluator-2", + "id": "mock-uuid", + "score": 0.3, + "metadata": None, + "threshold": None, + }, + ] + }, + ) + ), + ) + tracer = AutoblocksTracer("mock-ingestion-key") + tracer.send_event( + "my-message", + trace_id="my-trace-id", + timestamp=timestamp, + properties={}, + evaluators=[MyEvaluator1(), MyEvaluator2()], + ) + + +@mock.patch.object( + uuid, + "uuid4", + side_effect=["mock-uuid" for i in range(2)], +) +@mock.patch.dict( + os.environ, + dict(TRACER_BLOCK_ON_SEND_EVENT="1"), +) def test_tracer_failing_evaluation(httpx_mock): - class MyEvaluator(BaseEventEvaluator): - id = "my-evaluator" + class MyValidEvaluator(BaseEventEvaluator): + id = "my-valid-evaluator" + + def evaluate_event(self, event: TracerEvent) -> Evaluation: + return Evaluation( + score=0.3, + ) - def evaluate_event(self, event: TracerEvent) -> EventEvaluation: + class MyFailingEvaluator(BaseEventEvaluator): + id = "my-failing-evaluator" + + def evaluate_event(self, event: TracerEvent) -> Evaluation: raise Exception("Something terrible went wrong") httpx_mock.add_response( @@ -522,12 +674,25 @@ def evaluate_event(self, event: TracerEvent) -> EventEvaluation: message="my-message", traceId="my-trace-id", timestamp=timestamp, - properties={}, + properties={ + "evaluations": [ + { + "evaluatorExternalId": "my-valid-evaluator", + "id": "mock-uuid", + "score": 0.3, + "metadata": None, + "threshold": None, + }, + ] + }, ) ), ) tracer = AutoblocksTracer("mock-ingestion-key") - resp = tracer.send_event( - "my-message", trace_id="my-trace-id", timestamp=timestamp, properties={}, evaluators=[MyEvaluator()] + tracer.send_event( + "my-message", + trace_id="my-trace-id", + timestamp=timestamp, + properties={}, + evaluators=[MyFailingEvaluator(), MyValidEvaluator()], ) - assert resp.trace_id == "my-trace-id" From 1dcb95c9d2ec608feaf0cb7da0ba89c4841bb318 Mon Sep 17 00:00:00 2001 From: Dan Morton Date: Wed, 28 Feb 2024 10:17:58 -0600 Subject: [PATCH 09/23] epd=612: PR feedback --- autoblocks/_impl/tracer.py | 19 +++--- tests/autoblocks/test_tracer.py | 106 ++++---------------------------- 2 files changed, 20 insertions(+), 105 deletions(-) diff --git a/autoblocks/_impl/tracer.py b/autoblocks/_impl/tracer.py index 1dca9a99..4d64f2a9 100644 --- a/autoblocks/_impl/tracer.py +++ b/autoblocks/_impl/tracer.py @@ -12,7 +12,6 @@ from typing import Dict from typing import List from typing import Optional -from typing import Union from autoblocks._impl import global_state from autoblocks._impl.config.constants import INGESTION_ENDPOINT @@ -119,9 +118,7 @@ def _evaluation_to_json(evaluation: Evaluation, evaluator_external_id: str) -> D evaluatorExternalId=evaluator_external_id, ) - async def _evaluate_event( - self, event: TracerEvent, evaluator: BaseEventEvaluator - ) -> Optional[Union[Evaluation, None]]: + async def _evaluate_event(self, event: TracerEvent, evaluator: BaseEventEvaluator) -> Optional[Evaluation]: """ Evaluates an event using a provided evaluator. """ @@ -130,7 +127,7 @@ async def _evaluate_event( try: evaluation = await evaluator.evaluate_event(event=event) except Exception as err: - log.error("Unable to execute evaluation. Error: %s", repr(err), exc_info=True) + log.error(f"Unable to execute evaluator with id: {evaluator.id}. Error: %s", str(err), exc_info=True) else: try: ctx = contextvars.copy_context() @@ -141,7 +138,7 @@ async def _evaluate_event( event, ) except Exception as err: - log.error("Unable to execute evaluation. Error: %s", repr(err), exc_info=True) + log.error(f"Unable to execute evaluator with id: {evaluator.id}. Error: %s", str(err), exc_info=True) return evaluation @@ -151,7 +148,7 @@ async def _run_evaluators( if len(evaluators) == 0: return [] try: - evaluations: List[Union[Evaluation, None]] = await gather_with_max_concurrency( + evaluations: List[Optional[Evaluation]] = await gather_with_max_concurrency( max_evaluator_concurrency, [ self._evaluate_event( @@ -162,9 +159,9 @@ async def _run_evaluators( ], ) return [ - self._evaluation_to_json(event_evaluation=item, evaluator_external_id=evaluator.id) - for evaluator, item in zip(evaluators, evaluations) - if item is not None + self._evaluation_to_json(event_evaluation=evaluation, evaluator_external_id=evaluator.id) + for evaluator, evaluation in zip(evaluators, evaluations) + if evaluation is not None ] except Exception as err: @@ -243,7 +240,7 @@ def send_event( evaluators: Optional[List[BaseEventEvaluator]] = None, max_evaluator_concurrency: int = 5, prompt_tracking: Optional[Dict] = None, - ) -> SendEventResponse: + ) -> None: """ Sends an event to the Autoblocks ingestion API. diff --git a/tests/autoblocks/test_tracer.py b/tests/autoblocks/test_tracer.py index aa1156bb..d4e310db 100644 --- a/tests/autoblocks/test_tracer.py +++ b/tests/autoblocks/test_tracer.py @@ -34,21 +34,23 @@ def test_client_headers_init_with_key(): assert tracer._client_headers["Authorization"] == "Bearer mock-ingestion-key" -@mock.patch.dict( - os.environ, - { - "AUTOBLOCKS_INGESTION_KEY": "mock-ingestion-key", - }, -) +@pytest.fixture(autouse=True) +def mock_evn_vars(): + with mock.patch.dict( + os.environ, + { + "AUTOBLOCKS_INGESTION_KEY": "mock-ingestion-key", + "TRACER_BLOCK_ON_SEND_EVENT": "1", + }, + ): + yield + + def test_client_init_headers_with_env_var(): tracer = AutoblocksTracer() assert tracer._client_headers["Authorization"] == "Bearer mock-ingestion-key" -@mock.patch.dict( - os.environ, - dict(TRACER_BLOCK_ON_SEND_EVENT="1"), -) def test_tracer_prod(httpx_mock): httpx_mock.add_response( url=INGESTION_ENDPOINT, @@ -69,10 +71,6 @@ def test_tracer_prod(httpx_mock): tracer.send_event("my-message") -@mock.patch.dict( - os.environ, - dict(TRACER_BLOCK_ON_SEND_EVENT="1"), -) def test_tracer_prod_no_trace_id_in_response(httpx_mock): httpx_mock.add_response( url=INGESTION_ENDPOINT, @@ -93,10 +91,6 @@ def test_tracer_prod_no_trace_id_in_response(httpx_mock): tracer.send_event("my-message") -@mock.patch.dict( - os.environ, - dict(TRACER_BLOCK_ON_SEND_EVENT="1"), -) def test_tracer_prod_with_trace_id_in_send_event(httpx_mock): httpx_mock.add_response( url=INGESTION_ENDPOINT, @@ -118,10 +112,6 @@ def test_tracer_prod_with_trace_id_in_send_event(httpx_mock): tracer.send_event("my-message", trace_id="my-trace-id") -@mock.patch.dict( - os.environ, - dict(TRACER_BLOCK_ON_SEND_EVENT="1"), -) def test_tracer_prod_with_trace_id_in_init(httpx_mock): httpx_mock.add_response( url=INGESTION_ENDPOINT, @@ -142,10 +132,6 @@ def test_tracer_prod_with_trace_id_in_init(httpx_mock): tracer.send_event("my-message") -@mock.patch.dict( - os.environ, - dict(TRACER_BLOCK_ON_SEND_EVENT="1"), -) def test_tracer_prod_with_trace_id_override(httpx_mock): httpx_mock.add_response( url=INGESTION_ENDPOINT, @@ -167,10 +153,6 @@ def test_tracer_prod_with_trace_id_override(httpx_mock): tracer.send_event("my-message", trace_id="override-trace-id") -@mock.patch.dict( - os.environ, - dict(TRACER_BLOCK_ON_SEND_EVENT="1"), -) def test_tracer_prod_with_set_trace_id(httpx_mock): httpx_mock.add_response( url=INGESTION_ENDPOINT, @@ -193,10 +175,6 @@ def test_tracer_prod_with_set_trace_id(httpx_mock): tracer.send_event("my-message") -@mock.patch.dict( - os.environ, - dict(TRACER_BLOCK_ON_SEND_EVENT="1"), -) def test_tracer_prod_with_properties(httpx_mock): httpx_mock.add_response( url=INGESTION_ENDPOINT, @@ -218,10 +196,6 @@ def test_tracer_prod_with_properties(httpx_mock): tracer.send_event("my-message") -@mock.patch.dict( - os.environ, - dict(TRACER_BLOCK_ON_SEND_EVENT="1"), -) def test_tracer_prod_with_set_properties(httpx_mock): httpx_mock.add_response( url=INGESTION_ENDPOINT, @@ -244,10 +218,6 @@ def test_tracer_prod_with_set_properties(httpx_mock): tracer.send_event("my-message") -@mock.patch.dict( - os.environ, - dict(TRACER_BLOCK_ON_SEND_EVENT="1"), -) def test_tracer_prod_with_update_properties(httpx_mock): httpx_mock.add_response( url=INGESTION_ENDPOINT, @@ -270,10 +240,6 @@ def test_tracer_prod_with_update_properties(httpx_mock): tracer.send_event("my-message") -@mock.patch.dict( - os.environ, - dict(TRACER_BLOCK_ON_SEND_EVENT="1"), -) def test_tracer_prod_with_update_properties_and_send_event_properties(httpx_mock): httpx_mock.add_response( url=INGESTION_ENDPOINT, @@ -296,10 +262,6 @@ def test_tracer_prod_with_update_properties_and_send_event_properties(httpx_mock tracer.send_event("my-message", properties=dict(z=3)) -@mock.patch.dict( - os.environ, - dict(TRACER_BLOCK_ON_SEND_EVENT="1"), -) def test_tracer_prod_with_properties_with_conflicting_keys(httpx_mock): httpx_mock.add_response( url=INGESTION_ENDPOINT, @@ -322,10 +284,6 @@ def test_tracer_prod_with_properties_with_conflicting_keys(httpx_mock): tracer.send_event("my-message", properties=dict(x=3, z=3)) -@mock.patch.dict( - os.environ, - dict(TRACER_BLOCK_ON_SEND_EVENT="1"), -) def test_tracer_prod_with_timestamp(httpx_mock): httpx_mock.add_response( url=INGESTION_ENDPOINT, @@ -347,10 +305,6 @@ def test_tracer_prod_with_timestamp(httpx_mock): tracer.send_event("my-message", timestamp="2023-07-24T21:52:52.742Z") -@mock.patch.dict( - os.environ, - dict(TRACER_BLOCK_ON_SEND_EVENT="1"), -) def test_tracer_prod_swallows_errors(httpx_mock): httpx_mock.add_exception(Exception()) @@ -358,10 +312,6 @@ def test_tracer_prod_swallows_errors(httpx_mock): tracer.send_event("my-message") -@mock.patch.dict( - os.environ, - dict(TRACER_BLOCK_ON_SEND_EVENT="1"), -) @mock.patch.dict( os.environ, dict(AUTOBLOCKS_TRACER_THROW_ON_ERROR="1"), @@ -377,10 +327,6 @@ class MyCustomException(Exception): tracer.send_event("my-message") -@mock.patch.dict( - os.environ, - dict(TRACER_BLOCK_ON_SEND_EVENT="1"), -) def test_tracer_prod_handles_non_200(httpx_mock): httpx_mock.add_response( url=INGESTION_ENDPOINT, @@ -402,10 +348,6 @@ def test_tracer_prod_handles_non_200(httpx_mock): tracer.send_event("my-message") -@mock.patch.dict( - os.environ, - dict(TRACER_BLOCK_ON_SEND_EVENT="1"), -) def test_tracer_sends_span_id_as_property(httpx_mock): httpx_mock.add_response( url=INGESTION_ENDPOINT, @@ -426,10 +368,6 @@ def test_tracer_sends_span_id_as_property(httpx_mock): tracer.send_event("my-message", span_id="my-span-id") -@mock.patch.dict( - os.environ, - dict(TRACER_BLOCK_ON_SEND_EVENT="1"), -) def test_tracer_sends_parent_span_id_as_property(httpx_mock): httpx_mock.add_response( url=INGESTION_ENDPOINT, @@ -450,10 +388,6 @@ def test_tracer_sends_parent_span_id_as_property(httpx_mock): tracer.send_event("my-message", parent_span_id="my-parent-span-id") -@mock.patch.dict( - os.environ, - dict(TRACER_BLOCK_ON_SEND_EVENT="1"), -) def test_tracer_sends_span_id_and_parent_span_id_as_property(httpx_mock): httpx_mock.add_response( url=INGESTION_ENDPOINT, @@ -517,10 +451,6 @@ def test_tracer_start_span(*args, **kwargs): "uuid4", side_effect=["mock-uuid-1"], ) -@mock.patch.dict( - os.environ, - dict(TRACER_BLOCK_ON_SEND_EVENT="1"), -) def test_tracer_prod_evaluations(httpx_mock): class MyEvaluator(BaseEventEvaluator): id = "my-evaluator" @@ -571,20 +501,12 @@ def evaluate_event(self, event: TracerEvent) -> Evaluation: "uuid4", side_effect=["mock-uuid" for _ in range(2)], ) -@mock.patch.dict( - os.environ, - dict(TRACER_BLOCK_ON_SEND_EVENT="1"), -) def test_tracer_prod_async_evaluations(httpx_mock): - test_evaluation_id = uuid.uuid4() - class MyEvaluator1(BaseEventEvaluator): id = "my-evaluator-1" async def evaluate_event(self, event: TracerEvent) -> Evaluation: return Evaluation( - evaluator_external_id=self.id, - id=test_evaluation_id, score=0.9, threshold=Threshold(gte=0.5), ) @@ -644,10 +566,6 @@ async def evaluate_event(self, event: TracerEvent) -> Evaluation: "uuid4", side_effect=["mock-uuid" for i in range(2)], ) -@mock.patch.dict( - os.environ, - dict(TRACER_BLOCK_ON_SEND_EVENT="1"), -) def test_tracer_failing_evaluation(httpx_mock): class MyValidEvaluator(BaseEventEvaluator): id = "my-valid-evaluator" From 7a8211fd008d423d3ea30dee407a478b579e3772 Mon Sep 17 00:00:00 2001 From: Dan Morton Date: Wed, 28 Feb 2024 11:17:42 -0600 Subject: [PATCH 10/23] epd-612: Attemp graceful exit --- autoblocks/_impl/global_state.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/autoblocks/_impl/global_state.py b/autoblocks/_impl/global_state.py index b94e0b57..3b6051c6 100644 --- a/autoblocks/_impl/global_state.py +++ b/autoblocks/_impl/global_state.py @@ -1,9 +1,13 @@ import asyncio +import logging +import signal import threading from typing import Optional import httpx +log = logging.getLogger(__name__) + _client: Optional[httpx.AsyncClient] = None _loop: Optional[asyncio.AbstractEventLoop] = None _started: bool = False @@ -25,10 +29,19 @@ def init() -> None: daemon=True, ) background_thread.start() - + signal.signal(signal.SIGTERM, _on_unexpected_exit) + signal.signal(signal.SIGINT, _on_unexpected_exit) _started = True +def _on_unexpected_exit(signum, frame): + if _loop and _loop.is_running(): + log.info(f"Attempting to gracefully stop event loop. Received signal {signum}") + _loop.stop() + _loop.close() + exit(0) + + def _run_event_loop(_event_loop: asyncio.AbstractEventLoop) -> None: asyncio.set_event_loop(_event_loop) _event_loop.run_forever() From 57d80703e1ac84fb238b3372e3a7a669fc63fa50 Mon Sep 17 00:00:00 2001 From: Dan Morton Date: Wed, 28 Feb 2024 11:19:09 -0600 Subject: [PATCH 11/23] epd-612: Cleanup --- autoblocks/_impl/tracer.py | 6 +++--- tests/autoblocks/test_tracer.py | 4 ---- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/autoblocks/_impl/tracer.py b/autoblocks/_impl/tracer.py index 4d64f2a9..f09d7bba 100644 --- a/autoblocks/_impl/tracer.py +++ b/autoblocks/_impl/tracer.py @@ -127,7 +127,7 @@ async def _evaluate_event(self, event: TracerEvent, evaluator: BaseEventEvaluato try: evaluation = await evaluator.evaluate_event(event=event) except Exception as err: - log.error(f"Unable to execute evaluator with id: {evaluator.id}. Error: %s", str(err), exc_info=True) + log.error(f"Unable to execute evaluator with id: {evaluator.id}. Error: %s", err, exc_info=True) else: try: ctx = contextvars.copy_context() @@ -138,7 +138,7 @@ async def _evaluate_event(self, event: TracerEvent, evaluator: BaseEventEvaluato event, ) except Exception as err: - log.error(f"Unable to execute evaluator with id: {evaluator.id}. Error: %s", str(err), exc_info=True) + log.error(f"Unable to execute evaluator with id: {evaluator.id}. Error: %s", err, exc_info=True) return evaluation @@ -165,7 +165,7 @@ async def _run_evaluators( ] except Exception as err: - log.error("Unable to complete evaluating events. Error: %s", str(err), exc_info=True) + log.error("Unable to complete evaluating events. Error: %s", err, exc_info=True) return [] async def _send_event_unsafe( diff --git a/tests/autoblocks/test_tracer.py b/tests/autoblocks/test_tracer.py index d4e310db..5c9bc9ab 100644 --- a/tests/autoblocks/test_tracer.py +++ b/tests/autoblocks/test_tracer.py @@ -413,10 +413,6 @@ def test_tracer_sends_span_id_and_parent_span_id_as_property(httpx_mock): "uuid4", side_effect=[f"mock-uuid-{i}" for i in range(4)], ) -@mock.patch.dict( - os.environ, - {"AUTOBLOCKS_INGESTION_KEY": "key", "TRACER_BLOCK_ON_SEND_EVENT": "1"}, -) def test_tracer_start_span(*args, **kwargs): tracer = AutoblocksTracer() From f1c82d979e1604f2637f19f6a3b9129036ee3de1 Mon Sep 17 00:00:00 2001 From: Dan Morton Date: Wed, 28 Feb 2024 11:33:13 -0600 Subject: [PATCH 12/23] epd-612: Updated tracer to try catch running evaluations and added unit test for unexpected failure --- autoblocks/_impl/tracer.py | 29 ++++++++++++++----------- tests/autoblocks/test_tracer.py | 38 +++++++++++++++++++++++++++++++++ 2 files changed, 54 insertions(+), 13 deletions(-) diff --git a/autoblocks/_impl/tracer.py b/autoblocks/_impl/tracer.py index f09d7bba..67f30995 100644 --- a/autoblocks/_impl/tracer.py +++ b/autoblocks/_impl/tracer.py @@ -197,19 +197,22 @@ async def _send_event_unsafe( # If there are evaluators, run them and compute the evaluations property if evaluators: - evaluations = await self._run_evaluators( - event=TracerEvent( - message=message, - trace_id=trace_id, - timestamp=timestamp, - properties=merged_properties, - ), - evaluators=evaluators, - max_evaluator_concurrency=max_evaluator_concurrency, - ) - if evaluations: - # Update merged properties with computed evaluations property - merged_properties["evaluations"] = evaluations + try: + evaluations = await self._run_evaluators( + event=TracerEvent( + message=message, + trace_id=trace_id, + timestamp=timestamp, + properties=merged_properties, + ), + evaluators=evaluators, + max_evaluator_concurrency=max_evaluator_concurrency, + ) + if evaluations: + # Update merged properties with computed evaluations property + merged_properties["evaluations"] = evaluations + except Exception as err: + log.error("Unable to evaluate events. Error: %s", err, exc_info=True) traced_event = TracerEvent( message=message, diff --git a/tests/autoblocks/test_tracer.py b/tests/autoblocks/test_tracer.py index 5c9bc9ab..393d4bf1 100644 --- a/tests/autoblocks/test_tracer.py +++ b/tests/autoblocks/test_tracer.py @@ -610,3 +610,41 @@ def evaluate_event(self, event: TracerEvent) -> Evaluation: properties={}, evaluators=[MyFailingEvaluator(), MyValidEvaluator()], ) + + +@mock.patch.object( + AutoblocksTracer, + "_run_evaluators", + side_effect=Exception("Something went wrong with our code when evaluating the event"), +) +def test_tracer_evaluation_unexpected_error(httpx_mock): + class MyEvaluator: + id = "my-evaluator" + + def evaluate_event(self, event: TracerEvent) -> Evaluation: + return Evaluation(score=1) + + httpx_mock.add_response( + url=INGESTION_ENDPOINT, + method="POST", + status_code=200, + json={"traceId": "my-trace-id"}, + match_headers={"Authorization": "Bearer mock-ingestion-key"}, + match_content=make_expected_body( + dict( + message="my-message", + traceId="my-trace-id", + timestamp=timestamp, + properties={}, + ) + ), + ) + + tracer = AutoblocksTracer("mock-ingestion-key") + tracer.send_event( + "my-message", + trace_id="my-trace-id", + timestamp=timestamp, + properties={}, + evaluators=[MyEvaluator()], + ) From 511680793b8c106e6033807c52ae2a599b8f8710 Mon Sep 17 00:00:00 2001 From: Dan Morton Date: Wed, 28 Feb 2024 11:49:33 -0600 Subject: [PATCH 13/23] epd-612: on_exit adjustment --- autoblocks/_impl/global_state.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/autoblocks/_impl/global_state.py b/autoblocks/_impl/global_state.py index 3b6051c6..9bce4967 100644 --- a/autoblocks/_impl/global_state.py +++ b/autoblocks/_impl/global_state.py @@ -29,17 +29,17 @@ def init() -> None: daemon=True, ) background_thread.start() - signal.signal(signal.SIGTERM, _on_unexpected_exit) - signal.signal(signal.SIGINT, _on_unexpected_exit) + for sig in [signal.SIGINT, signal.SIGTERM]: + _loop.add_signal_handler(sig, lambda: asyncio.ensure_future(_on_exist_signal())) _started = True -def _on_unexpected_exit(signum, frame): - if _loop and _loop.is_running(): - log.info(f"Attempting to gracefully stop event loop. Received signal {signum}") - _loop.stop() - _loop.close() - exit(0) +async def _on_exist_signal() -> None: + tasks = [x for x in asyncio.all_tasks(loop=_loop) if x.get_coro().__name__ == "_send_event_unsafe" and not x.done()] + logging.info(f"Attempting to flush f{len(tasks)} outstanding send event tasks") + await asyncio.gather(*tasks) + log.info("Stopping event loop") + _loop.stop() def _run_event_loop(_event_loop: asyncio.AbstractEventLoop) -> None: From 8476f3112c800c286491b475eeff2ead53aca216 Mon Sep 17 00:00:00 2001 From: Dan Morton Date: Wed, 28 Feb 2024 15:05:20 -0600 Subject: [PATCH 14/23] updates --- autoblocks/_impl/global_state.py | 18 ++++++++++--- autoblocks/_impl/testing/models.py | 4 +-- autoblocks/_impl/tracer.py | 43 +++++++++++++----------------- autoblocks/_impl/util.py | 5 ++++ tests/autoblocks/test_tracer.py | 14 ++++++---- 5 files changed, 47 insertions(+), 37 deletions(-) diff --git a/autoblocks/_impl/global_state.py b/autoblocks/_impl/global_state.py index 3bcb560e..7105fdc6 100644 --- a/autoblocks/_impl/global_state.py +++ b/autoblocks/_impl/global_state.py @@ -6,6 +6,8 @@ import httpx +from autoblocks._impl.util import SEND_EVENT_CORO_NAME + log = logging.getLogger(__name__) _client: Optional[httpx.AsyncClient] = None @@ -30,18 +32,26 @@ def init() -> None: ) background_thread.start() for sig in [signal.SIGINT, signal.SIGTERM]: - _loop.add_signal_handler(sig, lambda: asyncio.ensure_future(_on_exist_signal())) + _loop.add_signal_handler(sig, lambda: asyncio.ensure_future(_on_exit_signal())) _started = True -async def _on_exist_signal() -> None: +async def _on_exit_signal() -> None: + """ + This function handles if the event loop recieves + a SIGINT or SIGTERM signal. It will attempt to finish + all outstanding "send event" tasks and then stop. + Taken from blog: https://www.roguelynn.com/words/asyncio-graceful-shutdowns/ + """ if _loop: # Ignore type below - accessing __name__ on coro which mypy doesn't like tasks = [ - x for x in asyncio.all_tasks(loop=_loop) if x.get_coro().__name__ == "_send_event_unsafe" and not x.done() # type: ignore + x + for x in asyncio.all_tasks(loop=_loop) + if x.get_coro().__name__ == SEND_EVENT_CORO_NAME and not x.done() # type: ignore ] logging.info(f"Attempting to flush f{len(tasks)} outstanding send event tasks") - await asyncio.gather(*tasks) + await asyncio.gather(*tasks, return_exceptions=True) log.info("Stopping event loop") _loop.stop() diff --git a/autoblocks/_impl/testing/models.py b/autoblocks/_impl/testing/models.py index 2361558d..b3a3fe4d 100644 --- a/autoblocks/_impl/testing/models.py +++ b/autoblocks/_impl/testing/models.py @@ -1,11 +1,9 @@ import abc -import asyncio import dataclasses import functools from typing import Any from typing import Dict from typing import Optional -from typing import Union @dataclasses.dataclass() @@ -77,5 +75,5 @@ def id(self) -> str: pass @abc.abstractmethod - def evaluate_event(self, event: TracerEvent) -> Union[Evaluation, asyncio.Future[Evaluation]]: + def evaluate_event(self, event: TracerEvent) -> Evaluation: pass diff --git a/autoblocks/_impl/tracer.py b/autoblocks/_impl/tracer.py index 9977bd18..0621d173 100644 --- a/autoblocks/_impl/tracer.py +++ b/autoblocks/_impl/tracer.py @@ -143,31 +143,26 @@ async def _evaluate_event(self, event: TracerEvent, evaluator: BaseEventEvaluato return evaluation - async def _run_evaluators( + async def _run_evaluators_unsafe( self, evaluators: List[BaseEventEvaluator], event: TracerEvent, max_evaluator_concurrency: int ) -> List[Dict[str, Any]]: if len(evaluators) == 0: return [] - try: - evaluations: List[Evaluation] = await gather_with_max_concurrency( - max_evaluator_concurrency, - [ - self._evaluate_event( - event=event, - evaluator=evaluator, - ) - for evaluator in evaluators - ], - ) - return [ - self._evaluation_to_json(evaluation=evaluation, evaluator_external_id=evaluator.id) - for evaluator, evaluation in zip(evaluators, evaluations) - if evaluation is not None - ] - - except Exception as err: - log.error("Unable to complete evaluating events. Error: %s", err, exc_info=True) - return [] + evaluations: List[Evaluation] = await gather_with_max_concurrency( + max_evaluator_concurrency, + [ + self._evaluate_event( + event=event, + evaluator=evaluator, + ) + for evaluator in evaluators + ], + ) + return [ + self._evaluation_to_json(evaluation=evaluation, evaluator_external_id=evaluator.id) + for evaluator, evaluation in zip(evaluators, evaluations) + if evaluation is not None + ] async def _send_event_unsafe( self, @@ -182,7 +177,7 @@ async def _send_event_unsafe( properties: Optional[Dict[Any, Any]], prompt_tracking: Optional[Dict[str, Any]], evaluators: Optional[List[BaseEventEvaluator]], - ) -> SendEventResponse: + ) -> None: merged_properties = dict(self._properties) merged_properties.update(properties or {}) @@ -199,7 +194,7 @@ async def _send_event_unsafe( # If there are evaluators, run them and compute the evaluations property if evaluators: try: - evaluations = await self._run_evaluators( + evaluations = await self._run_evaluators_unsafe( event=TracerEvent( message=message, trace_id=trace_id, @@ -228,8 +223,6 @@ async def _send_event_unsafe( timeout=self._timeout_seconds, ) req.raise_for_status() - resp = req.json() - return SendEventResponse(trace_id=resp.get("traceId")) def send_event( self, diff --git a/autoblocks/_impl/util.py b/autoblocks/_impl/util.py index 35838ec5..410c2a88 100644 --- a/autoblocks/_impl/util.py +++ b/autoblocks/_impl/util.py @@ -10,6 +10,11 @@ log = logging.getLogger(__name__) +# This constant is used during the shutdown process to find all outstanding +# send event tasks and attempt to resolved them before stopping the event loop. +# We have a test asserting that this function name does not change. +SEND_EVENT_CORO_NAME = "_send_event_unsafe" + class StrEnum(str, Enum): def __str__(self) -> str: diff --git a/tests/autoblocks/test_tracer.py b/tests/autoblocks/test_tracer.py index 0963b165..bd4cd21b 100644 --- a/tests/autoblocks/test_tracer.py +++ b/tests/autoblocks/test_tracer.py @@ -1,4 +1,3 @@ -import asyncio import os import uuid from datetime import datetime @@ -12,6 +11,7 @@ from autoblocks._impl.testing.models import Evaluation from autoblocks._impl.testing.models import Threshold from autoblocks._impl.testing.models import TracerEvent +from autoblocks._impl.util import SEND_EVENT_CORO_NAME from autoblocks.tracer import AutoblocksTracer from tests.autoblocks.util import make_expected_body @@ -36,7 +36,7 @@ def test_client_headers_init_with_key(): @pytest.fixture(autouse=True) -def mock_evn_vars(): +def mock_env_vars(): with mock.patch.dict( os.environ, { @@ -502,7 +502,7 @@ def test_tracer_prod_async_evaluations(httpx_mock): class MyEvaluator1(BaseEventEvaluator): id = "my-evaluator-1" - async def evaluate_event(self, event: TracerEvent) -> asyncio.Future[Evaluation]: + async def evaluate_event(self, event: TracerEvent): return Evaluation( score=0.9, threshold=Threshold(gte=0.5), @@ -511,7 +511,7 @@ async def evaluate_event(self, event: TracerEvent) -> asyncio.Future[Evaluation] class MyEvaluator2(BaseEventEvaluator): id = "my-evaluator-2" - async def evaluate_event(self, event: TracerEvent) -> asyncio.Future[Evaluation]: + async def evaluate_event(self, event: TracerEvent): return Evaluation( score=0.3, ) @@ -615,7 +615,7 @@ def evaluate_event(self, event: TracerEvent) -> Evaluation: @mock.patch.object( AutoblocksTracer, - "_run_evaluators", + "_run_evaluators_unsafe", side_effect=Exception("Something went wrong with our code when evaluating the event"), ) def test_tracer_evaluation_unexpected_error(httpx_mock): @@ -649,3 +649,7 @@ def evaluate_event(self, event: TracerEvent) -> Evaluation: properties={}, evaluators=[MyEvaluator()], ) + + +def test_tracer_has_send_event_unsafe(): + assert AutoblocksTracer._send_event_unsafe.__name__ == SEND_EVENT_CORO_NAME From 37995e92657b4d37db4ad577b8998136f3f92e11 Mon Sep 17 00:00:00 2001 From: Dan Morton Date: Thu, 29 Feb 2024 11:17:51 -0600 Subject: [PATCH 15/23] EPD-612: Shutdown test --- tests/e2e/tracer_shutdown/main.py | 34 +++++++++++++++++++ .../tracer_shutdown/tracer_shutdown_test.py | 32 +++++++++++++++++ 2 files changed, 66 insertions(+) create mode 100644 tests/e2e/tracer_shutdown/main.py create mode 100644 tests/e2e/tracer_shutdown/tracer_shutdown_test.py diff --git a/tests/e2e/tracer_shutdown/main.py b/tests/e2e/tracer_shutdown/main.py new file mode 100644 index 00000000..f760d7c2 --- /dev/null +++ b/tests/e2e/tracer_shutdown/main.py @@ -0,0 +1,34 @@ +# Write a main function that sends an event using the autoblocks tracer +import asyncio +import sys +import time + +from autoblocks._impl.testing.models import BaseEventEvaluator +from autoblocks._impl.testing.models import Evaluation +from autoblocks._impl.testing.models import Threshold +from autoblocks._impl.testing.models import TracerEvent +from autoblocks.tracer import AutoblocksTracer + +tracer = AutoblocksTracer() + + +# Send an event using the autoblocks tracer +class MyEvaluator(BaseEventEvaluator): + id = "e2e-test-evaluator" + + async def evaluate_event(self, event: TracerEvent) -> Evaluation: + await asyncio.sleep(1) + return Evaluation( + score=0.9, + threshold=Threshold(gte=0.5), + ) + + +def main(): + trace = sys.argv[1] + tracer.send_event("tracer_shutdown", trace_id=str(trace), evaluators=[MyEvaluator()]) + time.sleep(10) # Sleep for 10 seconds to allow time for signal to be sent + + +if __name__ == "__main__": + main() diff --git a/tests/e2e/tracer_shutdown/tracer_shutdown_test.py b/tests/e2e/tracer_shutdown/tracer_shutdown_test.py new file mode 100644 index 00000000..571d063c --- /dev/null +++ b/tests/e2e/tracer_shutdown/tracer_shutdown_test.py @@ -0,0 +1,32 @@ +import os +import signal +import subprocess +import time +import unittest +import uuid + +from autoblocks._impl.api.client import AutoblocksAPIClient + +client = AutoblocksAPIClient() + + +class TestMainScript(unittest.TestCase): + def test_tracer_received_sigint_or_sigterm(self): + test_trace_id = uuid.uuid4() + # Start the main.py script as a subprocess + process = subprocess.Popen(["python", "tests/e2e/tracer_shutdown/main.py", str(test_trace_id)]) + time.sleep(1) + + # Send SIGTERM to terminate the process + os.kill(process.pid, signal.SIGTERM) + + # Wait for the process to terminate + process.wait(timeout=15) + + test = client.get_trace(str(test_trace_id)) + # Assert that the process has terminated successfully + self.assertIsNotNone(test) + + +if __name__ == "__main__": + unittest.main() From 4547ca2ef9741f1a8c421d62c8c58e580f061fb8 Mon Sep 17 00:00:00 2001 From: Dan Morton Date: Fri, 1 Mar 2024 18:01:37 -0600 Subject: [PATCH 16/23] EPD-612: Improved cleanup --- autoblocks/_impl/global_state.py | 23 ++++++++++--- autoblocks/_impl/tracer.py | 10 +++++- .../tracer_shutdown/test_tracer_shutdown.py | 27 ++++++++++++++++ .../tracer_shutdown/tracer_shutdown_test.py | 32 ------------------- ...ain.py => tracer_shutdown_test_process.py} | 19 ++++++++--- 5 files changed, 70 insertions(+), 41 deletions(-) create mode 100644 tests/e2e/tracer_shutdown/test_tracer_shutdown.py delete mode 100644 tests/e2e/tracer_shutdown/tracer_shutdown_test.py rename tests/e2e/tracer_shutdown/{main.py => tracer_shutdown_test_process.py} (76%) diff --git a/autoblocks/_impl/global_state.py b/autoblocks/_impl/global_state.py index 7105fdc6..91f7ed56 100644 --- a/autoblocks/_impl/global_state.py +++ b/autoblocks/_impl/global_state.py @@ -11,28 +11,31 @@ log = logging.getLogger(__name__) _client: Optional[httpx.AsyncClient] = None +_sync_client: Optional[httpx.Client] = None _loop: Optional[asyncio.AbstractEventLoop] = None _started: bool = False +_background_thread: Optional[threading.Thread] = None def init() -> None: - global _client, _loop, _started + global _client, _loop, _started, _background_thread, _sync_client if _started: return _client = httpx.AsyncClient() + _sync_client = httpx.Client() _loop = asyncio.new_event_loop() - background_thread = threading.Thread( + _background_thread = threading.Thread( target=_run_event_loop, args=(_loop,), daemon=True, ) - background_thread.start() + _background_thread.start() for sig in [signal.SIGINT, signal.SIGTERM]: - _loop.add_signal_handler(sig, lambda: asyncio.ensure_future(_on_exit_signal())) + _loop.add_signal_handler(sig, lambda: asyncio.create_task(_on_exit_signal())) _started = True @@ -71,3 +74,15 @@ def http_client() -> httpx.AsyncClient: if not _client: raise Exception("HTTP client not initialized") return _client + + +def sync_http_client() -> httpx.Client: + if not _sync_client: + raise Exception("HTTP client not initialized") + return _sync_client + + +def background_thread() -> threading.Thread: + if not _background_thread: + raise Exception("Background thread not initialized") + return _background_thread diff --git a/autoblocks/_impl/tracer.py b/autoblocks/_impl/tracer.py index 0621d173..c7fa7191 100644 --- a/autoblocks/_impl/tracer.py +++ b/autoblocks/_impl/tracer.py @@ -1,4 +1,5 @@ import asyncio +import atexit import contextvars import dataclasses import inspect @@ -25,6 +26,13 @@ log = logging.getLogger(__name__) +@atexit.register +def _cleanup_tracer() -> None: + bg_thread = global_state.background_thread() + if bg_thread: + bg_thread.join() + + @dataclasses.dataclass() class SendEventResponse: trace_id: Optional[str] @@ -216,7 +224,7 @@ async def _send_event_unsafe( timestamp=timestamp, properties=merged_properties, ) - req = await global_state.http_client().post( + req = global_state.sync_http_client().post( url=INGESTION_ENDPOINT, json=traced_event.to_json(), headers=self._client_headers, diff --git a/tests/e2e/tracer_shutdown/test_tracer_shutdown.py b/tests/e2e/tracer_shutdown/test_tracer_shutdown.py new file mode 100644 index 00000000..db331c59 --- /dev/null +++ b/tests/e2e/tracer_shutdown/test_tracer_shutdown.py @@ -0,0 +1,27 @@ +import os +import signal +import subprocess +import time +import uuid + +from autoblocks._impl.api.client import AutoblocksAPIClient + +client = AutoblocksAPIClient() + + +def test_tracer_received_sigint_or_sigterm(): + test_trace_id = uuid.uuid4() + # Start the main.py script as a subprocess + process = subprocess.Popen( + ["python", "tests/e2e/tracer_shutdown/tracer_shutdown_test_process.py", str(test_trace_id)] + ) + time.sleep(1) + + os.kill(process.pid, signal.SIGINT) + + # Wait for the process to terminate + process.wait() + time.sleep(4) # give a moment for autoblocks to + test = client.get_trace(str(test_trace_id)) + # Assert that the process has terminated successfully + assert test is not None diff --git a/tests/e2e/tracer_shutdown/tracer_shutdown_test.py b/tests/e2e/tracer_shutdown/tracer_shutdown_test.py deleted file mode 100644 index 571d063c..00000000 --- a/tests/e2e/tracer_shutdown/tracer_shutdown_test.py +++ /dev/null @@ -1,32 +0,0 @@ -import os -import signal -import subprocess -import time -import unittest -import uuid - -from autoblocks._impl.api.client import AutoblocksAPIClient - -client = AutoblocksAPIClient() - - -class TestMainScript(unittest.TestCase): - def test_tracer_received_sigint_or_sigterm(self): - test_trace_id = uuid.uuid4() - # Start the main.py script as a subprocess - process = subprocess.Popen(["python", "tests/e2e/tracer_shutdown/main.py", str(test_trace_id)]) - time.sleep(1) - - # Send SIGTERM to terminate the process - os.kill(process.pid, signal.SIGTERM) - - # Wait for the process to terminate - process.wait(timeout=15) - - test = client.get_trace(str(test_trace_id)) - # Assert that the process has terminated successfully - self.assertIsNotNone(test) - - -if __name__ == "__main__": - unittest.main() diff --git a/tests/e2e/tracer_shutdown/main.py b/tests/e2e/tracer_shutdown/tracer_shutdown_test_process.py similarity index 76% rename from tests/e2e/tracer_shutdown/main.py rename to tests/e2e/tracer_shutdown/tracer_shutdown_test_process.py index f760d7c2..75bd4dac 100644 --- a/tests/e2e/tracer_shutdown/main.py +++ b/tests/e2e/tracer_shutdown/tracer_shutdown_test_process.py @@ -1,7 +1,7 @@ # Write a main function that sends an event using the autoblocks tracer import asyncio +import signal import sys -import time from autoblocks._impl.testing.models import BaseEventEvaluator from autoblocks._impl.testing.models import Evaluation @@ -16,18 +16,29 @@ class MyEvaluator(BaseEventEvaluator): id = "e2e-test-evaluator" - async def evaluate_event(self, event: TracerEvent) -> Evaluation: - await asyncio.sleep(1) + async def evaluate_event(self, event: TracerEvent) -> Evaluation: # type: ignore + await asyncio.sleep(15) return Evaluation( score=0.9, threshold=Threshold(gte=0.5), ) +_running = True + + +def signal_handler(sig, frame): + global _running + _running = False + + def main(): + signal.signal(signal.SIGINT, signal_handler) trace = sys.argv[1] tracer.send_event("tracer_shutdown", trace_id=str(trace), evaluators=[MyEvaluator()]) - time.sleep(10) # Sleep for 10 seconds to allow time for signal to be sent + while True: + if not _running: + break if __name__ == "__main__": From 2541b5dbd5bb49f826c162d8d20ca91deddcaceb Mon Sep 17 00:00:00 2001 From: Dan Morton Date: Fri, 1 Mar 2024 19:32:42 -0600 Subject: [PATCH 17/23] EPD-612: More cleanup --- autoblocks/_impl/tracer.py | 12 +++++++++--- tests/e2e/tracer_shutdown/test_tracer_shutdown.py | 2 +- .../tracer_shutdown/tracer_shutdown_test_process.py | 3 ++- 3 files changed, 12 insertions(+), 5 deletions(-) diff --git a/autoblocks/_impl/tracer.py b/autoblocks/_impl/tracer.py index c7fa7191..aefccf83 100644 --- a/autoblocks/_impl/tracer.py +++ b/autoblocks/_impl/tracer.py @@ -4,6 +4,7 @@ import dataclasses import inspect import logging +import time import uuid from contextlib import contextmanager from datetime import datetime @@ -28,9 +29,14 @@ @atexit.register def _cleanup_tracer() -> None: - bg_thread = global_state.background_thread() - if bg_thread: - bg_thread.join() + print("Waiting for tasks to finish") + num_tries = 0 + done = [x for x in asyncio.all_tasks(loop=global_state.event_loop())] + while len(done) > 0 and num_tries < 10: + done = [x for x in asyncio.all_tasks(loop=global_state.event_loop())] + time.sleep(1) + num_tries += 1 + print("All tasks finished") @dataclasses.dataclass() diff --git a/tests/e2e/tracer_shutdown/test_tracer_shutdown.py b/tests/e2e/tracer_shutdown/test_tracer_shutdown.py index db331c59..74431328 100644 --- a/tests/e2e/tracer_shutdown/test_tracer_shutdown.py +++ b/tests/e2e/tracer_shutdown/test_tracer_shutdown.py @@ -21,7 +21,7 @@ def test_tracer_received_sigint_or_sigterm(): # Wait for the process to terminate process.wait() - time.sleep(4) # give a moment for autoblocks to + time.sleep(10) # give a moment for autoblocks to test = client.get_trace(str(test_trace_id)) # Assert that the process has terminated successfully assert test is not None diff --git a/tests/e2e/tracer_shutdown/tracer_shutdown_test_process.py b/tests/e2e/tracer_shutdown/tracer_shutdown_test_process.py index 75bd4dac..45ead12d 100644 --- a/tests/e2e/tracer_shutdown/tracer_shutdown_test_process.py +++ b/tests/e2e/tracer_shutdown/tracer_shutdown_test_process.py @@ -17,7 +17,7 @@ class MyEvaluator(BaseEventEvaluator): id = "e2e-test-evaluator" async def evaluate_event(self, event: TracerEvent) -> Evaluation: # type: ignore - await asyncio.sleep(15) + await asyncio.sleep(10) return Evaluation( score=0.9, threshold=Threshold(gte=0.5), @@ -39,6 +39,7 @@ def main(): while True: if not _running: break + print("Exiting") if __name__ == "__main__": From 8c60a5ca28bb10efc40995733de66e036e94449c Mon Sep 17 00:00:00 2001 From: Dan Morton Date: Mon, 4 Mar 2024 08:50:50 -0600 Subject: [PATCH 18/23] epd-612: Move tests around --- autoblocks/_impl/tracer.py | 20 ++++++++++---- tests/e2e/test_e2e.py | 19 +++++++++++++ .../tracer_shutdown/test_tracer_shutdown.py | 27 ------------------- .../tracer_shutdown_test_process.py | 6 +++-- 4 files changed, 38 insertions(+), 34 deletions(-) delete mode 100644 tests/e2e/tracer_shutdown/test_tracer_shutdown.py rename tests/e2e/{tracer_shutdown => }/tracer_shutdown_test_process.py (89%) diff --git a/autoblocks/_impl/tracer.py b/autoblocks/_impl/tracer.py index aefccf83..863ed09f 100644 --- a/autoblocks/_impl/tracer.py +++ b/autoblocks/_impl/tracer.py @@ -21,6 +21,7 @@ from autoblocks._impl.testing.models import BaseEventEvaluator from autoblocks._impl.testing.models import Evaluation from autoblocks._impl.testing.models import TracerEvent +from autoblocks._impl.util import SEND_EVENT_CORO_NAME from autoblocks._impl.util import AutoblocksEnvVar from autoblocks._impl.util import gather_with_max_concurrency @@ -29,14 +30,23 @@ @atexit.register def _cleanup_tracer() -> None: - print("Waiting for tasks to finish") + """ + On program exit, attempt to flush any outstanding send event tasks. + Since the background thread is a daemon, we have to actively + check task counts and sleep to measure progress. + """ num_tries = 0 - done = [x for x in asyncio.all_tasks(loop=global_state.event_loop())] - while len(done) > 0 and num_tries < 10: - done = [x for x in asyncio.all_tasks(loop=global_state.event_loop())] + + def get_pending() -> List[asyncio.Task[Any]]: + return [ + task + for task in asyncio.all_tasks(loop=global_state.event_loop()) + if task.get_coro().__name__ == SEND_EVENT_CORO_NAME and not task.done() # type: ignore + ] + + while len(get_pending()) > 0 and num_tries < 10: time.sleep(1) num_tries += 1 - print("All tasks finished") @dataclasses.dataclass() diff --git a/tests/e2e/test_e2e.py b/tests/e2e/test_e2e.py index f6d833e8..49cc1a7e 100644 --- a/tests/e2e/test_e2e.py +++ b/tests/e2e/test_e2e.py @@ -1,3 +1,6 @@ +import os +import signal +import subprocess import time import uuid from datetime import timedelta @@ -208,3 +211,19 @@ def test_prompt_manager_no_model_params_undeployed(): with mgr.exec() as prompt: assert prompt.params is None + + +def test_tracer_received_sigint_or_sigterm_cleanup(): + test_trace_id = str(uuid.uuid4()) + # Start the main.py script as a subprocess + process = subprocess.Popen(["python", "tests/e2e/tracer_shutdown_test_process.py", test_trace_id]) + time.sleep(1) + + os.kill(process.pid, signal.SIGINT) + + # Wait for the process to terminate + process.wait() + time.sleep(10) # give a moment for autoblocks to + test = client.get_trace(test_trace_id) + # Assert that the process has terminated successfully + assert test is not None diff --git a/tests/e2e/tracer_shutdown/test_tracer_shutdown.py b/tests/e2e/tracer_shutdown/test_tracer_shutdown.py deleted file mode 100644 index 74431328..00000000 --- a/tests/e2e/tracer_shutdown/test_tracer_shutdown.py +++ /dev/null @@ -1,27 +0,0 @@ -import os -import signal -import subprocess -import time -import uuid - -from autoblocks._impl.api.client import AutoblocksAPIClient - -client = AutoblocksAPIClient() - - -def test_tracer_received_sigint_or_sigterm(): - test_trace_id = uuid.uuid4() - # Start the main.py script as a subprocess - process = subprocess.Popen( - ["python", "tests/e2e/tracer_shutdown/tracer_shutdown_test_process.py", str(test_trace_id)] - ) - time.sleep(1) - - os.kill(process.pid, signal.SIGINT) - - # Wait for the process to terminate - process.wait() - time.sleep(10) # give a moment for autoblocks to - test = client.get_trace(str(test_trace_id)) - # Assert that the process has terminated successfully - assert test is not None diff --git a/tests/e2e/tracer_shutdown/tracer_shutdown_test_process.py b/tests/e2e/tracer_shutdown_test_process.py similarity index 89% rename from tests/e2e/tracer_shutdown/tracer_shutdown_test_process.py rename to tests/e2e/tracer_shutdown_test_process.py index 45ead12d..18c64bec 100644 --- a/tests/e2e/tracer_shutdown/tracer_shutdown_test_process.py +++ b/tests/e2e/tracer_shutdown_test_process.py @@ -17,7 +17,7 @@ class MyEvaluator(BaseEventEvaluator): id = "e2e-test-evaluator" async def evaluate_event(self, event: TracerEvent) -> Evaluation: # type: ignore - await asyncio.sleep(10) + await asyncio.sleep(8) return Evaluation( score=0.9, threshold=Threshold(gte=0.5), @@ -36,10 +36,12 @@ def main(): signal.signal(signal.SIGINT, signal_handler) trace = sys.argv[1] tracer.send_event("tracer_shutdown", trace_id=str(trace), evaluators=[MyEvaluator()]) + + # While a signal is not received, keep the this process + # running on the main thread while True: if not _running: break - print("Exiting") if __name__ == "__main__": From 134368493b0821515bac172f9f1fff7b7aa76cf0 Mon Sep 17 00:00:00 2001 From: Dan Morton Date: Mon, 4 Mar 2024 09:50:28 -0600 Subject: [PATCH 19/23] epd-612: Removed background_thread helpers in main --- autoblocks/_impl/global_state.py | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/autoblocks/_impl/global_state.py b/autoblocks/_impl/global_state.py index 91f7ed56..72f9d5c4 100644 --- a/autoblocks/_impl/global_state.py +++ b/autoblocks/_impl/global_state.py @@ -14,11 +14,10 @@ _sync_client: Optional[httpx.Client] = None _loop: Optional[asyncio.AbstractEventLoop] = None _started: bool = False -_background_thread: Optional[threading.Thread] = None def init() -> None: - global _client, _loop, _started, _background_thread, _sync_client + global _client, _loop, _started, _sync_client if _started: return @@ -28,12 +27,12 @@ def init() -> None: _loop = asyncio.new_event_loop() - _background_thread = threading.Thread( + background_thread = threading.Thread( target=_run_event_loop, args=(_loop,), daemon=True, ) - _background_thread.start() + background_thread.start() for sig in [signal.SIGINT, signal.SIGTERM]: _loop.add_signal_handler(sig, lambda: asyncio.create_task(_on_exit_signal())) _started = True @@ -80,9 +79,3 @@ def sync_http_client() -> httpx.Client: if not _sync_client: raise Exception("HTTP client not initialized") return _sync_client - - -def background_thread() -> threading.Thread: - if not _background_thread: - raise Exception("Background thread not initialized") - return _background_thread From 680f9119c4435c19875ccd02674597b854776008 Mon Sep 17 00:00:00 2001 From: Dan Morton Date: Mon, 4 Mar 2024 10:27:33 -0600 Subject: [PATCH 20/23] EPD-612: Parametrize shutdown test --- tests/e2e/test_e2e.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/tests/e2e/test_e2e.py b/tests/e2e/test_e2e.py index 49cc1a7e..1ea7173c 100644 --- a/tests/e2e/test_e2e.py +++ b/tests/e2e/test_e2e.py @@ -5,6 +5,8 @@ import uuid from datetime import timedelta +import pytest + from autoblocks.api.client import AutoblocksAPIClient from autoblocks.api.models import EventFilter from autoblocks.api.models import EventFilterOperator @@ -213,13 +215,14 @@ def test_prompt_manager_no_model_params_undeployed(): assert prompt.params is None -def test_tracer_received_sigint_or_sigterm_cleanup(): +@pytest.mark.parametrize("signal", [signal.SIGINT, signal.SIGTERM]) +def test_tracer_received_sigint_or_sigterm_cleanup(signal: int): test_trace_id = str(uuid.uuid4()) # Start the main.py script as a subprocess process = subprocess.Popen(["python", "tests/e2e/tracer_shutdown_test_process.py", test_trace_id]) time.sleep(1) - os.kill(process.pid, signal.SIGINT) + os.kill(process.pid, signal) # Wait for the process to terminate process.wait() From 506b2580905a7d6f7e7a44b0c3abf30dad488d94 Mon Sep 17 00:00:00 2001 From: Dan Morton Date: Tue, 5 Mar 2024 09:28:55 -0600 Subject: [PATCH 21/23] EPD-612: Remove set and forget --- autoblocks/_impl/global_state.py | 25 ------------ autoblocks/_impl/tracer.py | 27 +------------ tests/e2e/test_e2e.py | 22 ----------- tests/e2e/tracer_shutdown_test_process.py | 48 ----------------------- 4 files changed, 1 insertion(+), 121 deletions(-) delete mode 100644 tests/e2e/tracer_shutdown_test_process.py diff --git a/autoblocks/_impl/global_state.py b/autoblocks/_impl/global_state.py index 72f9d5c4..5b3c0654 100644 --- a/autoblocks/_impl/global_state.py +++ b/autoblocks/_impl/global_state.py @@ -1,13 +1,10 @@ import asyncio import logging -import signal import threading from typing import Optional import httpx -from autoblocks._impl.util import SEND_EVENT_CORO_NAME - log = logging.getLogger(__name__) _client: Optional[httpx.AsyncClient] = None @@ -33,31 +30,9 @@ def init() -> None: daemon=True, ) background_thread.start() - for sig in [signal.SIGINT, signal.SIGTERM]: - _loop.add_signal_handler(sig, lambda: asyncio.create_task(_on_exit_signal())) _started = True -async def _on_exit_signal() -> None: - """ - This function handles if the event loop recieves - a SIGINT or SIGTERM signal. It will attempt to finish - all outstanding "send event" tasks and then stop. - Taken from blog: https://www.roguelynn.com/words/asyncio-graceful-shutdowns/ - """ - if _loop: - # Ignore type below - accessing __name__ on coro which mypy doesn't like - tasks = [ - x - for x in asyncio.all_tasks(loop=_loop) - if x.get_coro().__name__ == SEND_EVENT_CORO_NAME and not x.done() # type: ignore - ] - logging.info(f"Attempting to flush f{len(tasks)} outstanding send event tasks") - await asyncio.gather(*tasks, return_exceptions=True) - log.info("Stopping event loop") - _loop.stop() - - def _run_event_loop(_event_loop: asyncio.AbstractEventLoop) -> None: asyncio.set_event_loop(_event_loop) _event_loop.run_forever() diff --git a/autoblocks/_impl/tracer.py b/autoblocks/_impl/tracer.py index 863ed09f..53f214d6 100644 --- a/autoblocks/_impl/tracer.py +++ b/autoblocks/_impl/tracer.py @@ -1,10 +1,8 @@ import asyncio -import atexit import contextvars import dataclasses import inspect import logging -import time import uuid from contextlib import contextmanager from datetime import datetime @@ -21,34 +19,12 @@ from autoblocks._impl.testing.models import BaseEventEvaluator from autoblocks._impl.testing.models import Evaluation from autoblocks._impl.testing.models import TracerEvent -from autoblocks._impl.util import SEND_EVENT_CORO_NAME from autoblocks._impl.util import AutoblocksEnvVar from autoblocks._impl.util import gather_with_max_concurrency log = logging.getLogger(__name__) -@atexit.register -def _cleanup_tracer() -> None: - """ - On program exit, attempt to flush any outstanding send event tasks. - Since the background thread is a daemon, we have to actively - check task counts and sleep to measure progress. - """ - num_tries = 0 - - def get_pending() -> List[asyncio.Task[Any]]: - return [ - task - for task in asyncio.all_tasks(loop=global_state.event_loop()) - if task.get_coro().__name__ == SEND_EVENT_CORO_NAME and not task.done() # type: ignore - ] - - while len(get_pending()) > 0 and num_tries < 10: - time.sleep(1) - num_tries += 1 - - @dataclasses.dataclass() class SendEventResponse: trace_id: Optional[str] @@ -283,8 +259,7 @@ def send_event( ), global_state.event_loop(), ) - if AutoblocksEnvVar.TRACER_BLOCK_ON_SEND_EVENT.get() == "1": - future.result() + future.result() # Block on result: Todo: make this not blocking except Exception as err: log.error(f"Failed to send event to Autoblocks: {err}", exc_info=True) if AutoblocksEnvVar.TRACER_THROW_ON_ERROR.get() == "1": diff --git a/tests/e2e/test_e2e.py b/tests/e2e/test_e2e.py index 1ea7173c..f6d833e8 100644 --- a/tests/e2e/test_e2e.py +++ b/tests/e2e/test_e2e.py @@ -1,12 +1,7 @@ -import os -import signal -import subprocess import time import uuid from datetime import timedelta -import pytest - from autoblocks.api.client import AutoblocksAPIClient from autoblocks.api.models import EventFilter from autoblocks.api.models import EventFilterOperator @@ -213,20 +208,3 @@ def test_prompt_manager_no_model_params_undeployed(): with mgr.exec() as prompt: assert prompt.params is None - - -@pytest.mark.parametrize("signal", [signal.SIGINT, signal.SIGTERM]) -def test_tracer_received_sigint_or_sigterm_cleanup(signal: int): - test_trace_id = str(uuid.uuid4()) - # Start the main.py script as a subprocess - process = subprocess.Popen(["python", "tests/e2e/tracer_shutdown_test_process.py", test_trace_id]) - time.sleep(1) - - os.kill(process.pid, signal) - - # Wait for the process to terminate - process.wait() - time.sleep(10) # give a moment for autoblocks to - test = client.get_trace(test_trace_id) - # Assert that the process has terminated successfully - assert test is not None diff --git a/tests/e2e/tracer_shutdown_test_process.py b/tests/e2e/tracer_shutdown_test_process.py deleted file mode 100644 index 18c64bec..00000000 --- a/tests/e2e/tracer_shutdown_test_process.py +++ /dev/null @@ -1,48 +0,0 @@ -# Write a main function that sends an event using the autoblocks tracer -import asyncio -import signal -import sys - -from autoblocks._impl.testing.models import BaseEventEvaluator -from autoblocks._impl.testing.models import Evaluation -from autoblocks._impl.testing.models import Threshold -from autoblocks._impl.testing.models import TracerEvent -from autoblocks.tracer import AutoblocksTracer - -tracer = AutoblocksTracer() - - -# Send an event using the autoblocks tracer -class MyEvaluator(BaseEventEvaluator): - id = "e2e-test-evaluator" - - async def evaluate_event(self, event: TracerEvent) -> Evaluation: # type: ignore - await asyncio.sleep(8) - return Evaluation( - score=0.9, - threshold=Threshold(gte=0.5), - ) - - -_running = True - - -def signal_handler(sig, frame): - global _running - _running = False - - -def main(): - signal.signal(signal.SIGINT, signal_handler) - trace = sys.argv[1] - tracer.send_event("tracer_shutdown", trace_id=str(trace), evaluators=[MyEvaluator()]) - - # While a signal is not received, keep the this process - # running on the main thread - while True: - if not _running: - break - - -if __name__ == "__main__": - main() From 4f138765d87185cd8d9aba3555c3d259ac1b1837 Mon Sep 17 00:00:00 2001 From: Dan Morton Date: Tue, 5 Mar 2024 09:32:31 -0600 Subject: [PATCH 22/23] EPD-612: Feedback --- autoblocks/_impl/tracer.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/autoblocks/_impl/tracer.py b/autoblocks/_impl/tracer.py index 53f214d6..63f27f4d 100644 --- a/autoblocks/_impl/tracer.py +++ b/autoblocks/_impl/tracer.py @@ -161,7 +161,7 @@ async def _run_evaluators_unsafe( return [ self._evaluation_to_json(evaluation=evaluation, evaluator_external_id=evaluator.id) for evaluator, evaluation in zip(evaluators, evaluations) - if evaluation is not None + if isinstance(evaluation, Evaluation) ] async def _send_event_unsafe( @@ -169,14 +169,14 @@ async def _send_event_unsafe( # Require all arguments to be specified via key=value *, message: str, - max_evaluator_concurrency: int, trace_id: Optional[str], span_id: Optional[str], parent_span_id: Optional[str], timestamp: Optional[str], properties: Optional[Dict[Any, Any]], - prompt_tracking: Optional[Dict[str, Any]], evaluators: Optional[List[BaseEventEvaluator]], + max_evaluator_concurrency: int, + prompt_tracking: Optional[Dict[str, Any]], ) -> None: merged_properties = dict(self._properties) merged_properties.update(properties or {}) From f954a9981f737effeb2abf7d636d74c061abed32 Mon Sep 17 00:00:00 2001 From: Dan Morton Date: Tue, 5 Mar 2024 09:47:50 -0600 Subject: [PATCH 23/23] EPD-612: Feedback --- autoblocks/_impl/tracer.py | 2 +- autoblocks/_impl/util.py | 6 ------ tests/autoblocks/test_tracer.py | 6 ------ 3 files changed, 1 insertion(+), 13 deletions(-) diff --git a/autoblocks/_impl/tracer.py b/autoblocks/_impl/tracer.py index 63f27f4d..722dbb3a 100644 --- a/autoblocks/_impl/tracer.py +++ b/autoblocks/_impl/tracer.py @@ -216,7 +216,7 @@ async def _send_event_unsafe( timestamp=timestamp, properties=merged_properties, ) - req = global_state.sync_http_client().post( + req = await global_state.http_client().post( url=INGESTION_ENDPOINT, json=traced_event.to_json(), headers=self._client_headers, diff --git a/autoblocks/_impl/util.py b/autoblocks/_impl/util.py index 410c2a88..07ef9c3d 100644 --- a/autoblocks/_impl/util.py +++ b/autoblocks/_impl/util.py @@ -10,11 +10,6 @@ log = logging.getLogger(__name__) -# This constant is used during the shutdown process to find all outstanding -# send event tasks and attempt to resolved them before stopping the event loop. -# We have a test asserting that this function name does not change. -SEND_EVENT_CORO_NAME = "_send_event_unsafe" - class StrEnum(str, Enum): def __str__(self) -> str: @@ -27,7 +22,6 @@ class AutoblocksEnvVar(StrEnum): CLI_SERVER_ADDRESS = "AUTOBLOCKS_CLI_SERVER_ADDRESS" INGESTION_KEY = "AUTOBLOCKS_INGESTION_KEY" TRACER_THROW_ON_ERROR = "AUTOBLOCKS_TRACER_THROW_ON_ERROR" - TRACER_BLOCK_ON_SEND_EVENT = "TRACER_BLOCK_ON_SEND_EVENT" def get(self) -> Optional[str]: return os.environ.get(self.value) diff --git a/tests/autoblocks/test_tracer.py b/tests/autoblocks/test_tracer.py index bd4cd21b..1c4096d6 100644 --- a/tests/autoblocks/test_tracer.py +++ b/tests/autoblocks/test_tracer.py @@ -11,7 +11,6 @@ from autoblocks._impl.testing.models import Evaluation from autoblocks._impl.testing.models import Threshold from autoblocks._impl.testing.models import TracerEvent -from autoblocks._impl.util import SEND_EVENT_CORO_NAME from autoblocks.tracer import AutoblocksTracer from tests.autoblocks.util import make_expected_body @@ -41,7 +40,6 @@ def mock_env_vars(): os.environ, { "AUTOBLOCKS_INGESTION_KEY": "mock-ingestion-key", - "TRACER_BLOCK_ON_SEND_EVENT": "1", }, ): yield @@ -649,7 +647,3 @@ def evaluate_event(self, event: TracerEvent) -> Evaluation: properties={}, evaluators=[MyEvaluator()], ) - - -def test_tracer_has_send_event_unsafe(): - assert AutoblocksTracer._send_event_unsafe.__name__ == SEND_EVENT_CORO_NAME