Skip to content

Commit

Permalink
Use Any in reworker tests to simplify eventstream typing
Browse files Browse the repository at this point in the history
  • Loading branch information
tpoliaw committed May 24, 2023
1 parent 40e513b commit 89bb6b3
Showing 1 changed file with 21 additions and 20 deletions.
41 changes: 21 additions & 20 deletions tests/worker/test_reworker.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import itertools
from concurrent.futures import Future
from typing import Callable, Iterable, List, Optional, TypeVar, Union
from typing import Any, Callable, Iterable, List, Optional, TypeVar, Union

import pytest

Expand Down Expand Up @@ -154,13 +154,14 @@ def assert_running_count_plan_produces_ordered_worker_and_data_events(
) -> None:
worker.start()

event_streams: List[
Union[EventStream[WorkerEvent, int], EventStream[DataEvent, int]]
] = [worker.data_events, worker.worker_events]
event_streams: List[EventStream[Any, int]] = [
worker.data_events,
worker.worker_events,
]

count = itertools.count()
events: "Future[List[Union[WorkerEvent, DataEvent]]]" = take_events_from_streams(
event_streams, # type: ignore
events: "Future[List[Any]]" = take_events_from_streams(
event_streams,
lambda _: next(count) >= len(expected_events) - 1,
)

Expand All @@ -178,19 +179,18 @@ def assert_running_count_plan_produces_ordered_worker_and_data_events(


E = TypeVar("E")
S = TypeVar("S")


def take_n_events(
stream: EventStream[E, S],
stream: EventStream[E, Any],
num: int,
) -> "Future[List[E]]":
count = itertools.count()
return take_events(stream, lambda _: next(count) >= num)


def take_events(
stream: EventStream[E, S],
stream: EventStream[E, Any],
cutoff_predicate: Callable[[E], bool],
) -> "Future[List[E]]":
events: List[E] = []
Expand All @@ -207,17 +207,18 @@ def on_event(event: E, event_id: Optional[str]) -> None:


def take_events_from_streams(
streams: List[EventStream[E, S]], # This isn't quite the type.
cutoff_predicate: Callable[[E], bool],
) -> "Future[List[E]]":
streams: List[EventStream[Any, int]],
cutoff_predicate: Callable[[Any], bool],
) -> "Future[List[Any]]":
"""Returns a collated list of futures for events in numerous event streams.
The support for generic and algebraic types doesn't appear to extend to taking an
arbirtarty list of concrete types with single but differing generic arguments while
also maintaining the generality of the argument types.
The support for generic and algebraic types doesn't appear to extend to
taking an arbitrary list of concrete types with single but differing
generic arguments while also maintaining the generality of the argument
types.
The type for streams will be any combination of event streams each of a given event
type, where the event type is generic:
The type for streams will be any combination of event streams each of a
given event type, where the event type is generic:
List[
Union[
Expand All @@ -228,10 +229,10 @@ def take_events_from_streams(
]
"""
events: List[E] = []
future: "Future[List[E]]" = Future()
events: List[Any] = []
future: "Future[List[Any]]" = Future()

def on_event(event: E, event_id: Optional[str]) -> None:
def on_event(event: Any, event_id: Optional[str]) -> None:
print(event)
events.append(event)
if cutoff_predicate(event):
Expand Down

0 comments on commit 89bb6b3

Please sign in to comment.