Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Some operators revamped with curry flip #689

Draft
wants to merge 10 commits into
base: curry-flip
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion reactivex/observable/observable.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ def pipe(self, *operators: Callable[[Any], Any]) -> Any:

return pipe_(self, *operators)

def run(self) -> Any:
def run(self) -> _T_out:
"""Run source synchronously.

Subscribes to the observable source. Then blocks and waits for the
Expand Down
44 changes: 36 additions & 8 deletions reactivex/operators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,14 @@ def all(predicate: Predicate[_T]) -> Callable[[Observable[_T]], Observable[bool]
.. marble::
:alt: all

--1--2--3--4--5-|
[ all(i: i<10) ]
----------------true-|
--1--2--3--4--5--6----|
[ all(i: i<8) ]
------------------true|


--1--2--3--4--5--6----|
[ all(i: i<4) ]
------false|

Example:
>>> op = all(lambda value: value.length > 3)
Expand All @@ -78,6 +83,13 @@ def all(predicate: Predicate[_T]) -> Callable[[Observable[_T]], Observable[bool]
returns an observable sequence containing a single element
determining whether all elements in the source sequence pass
the test in the specified predicate.

If a predicate returns false, the result sequence emits false
and completes immediately, regardless of the state of the
source sequence.

If all items pass the predicate test, the emission of true
will only happen as the source completes.
"""
from ._all import all_

Expand All @@ -90,9 +102,8 @@ def amb(right_source: Observable[_T]) -> Callable[[Observable[_T]], Observable[_
.. marble::
:alt: amb

---8--6--9-----------|
---8--6--9---------|
--1--2--3---5--------|
----------10-20-30---|
[ amb() ]
--1--2--3---5--------|

Expand Down Expand Up @@ -2611,7 +2622,7 @@ def scan(
Applies an accumulator function over an observable sequence and
returns each intermediate result. The optional seed value is used
as the initial accumulator value. For aggregation behavior with no
intermediate results, see `aggregate()` or `Observable()`.
intermediate results, see `reduce()` or `Observable()`.

.. marble::
:alt: scan
Expand Down Expand Up @@ -2705,12 +2716,29 @@ def single(
the condition in the optional predicate, and reports an exception
if there is not exactly one element in the observable sequence.

If the predicates does not match any item, the resulting sequence
errors once the source completes.

If the predicate matches more than one item, the resulting sequence
errors immediately, without waiting for the source to complete.

If the source never completes, the resulting sequence does not
emit anything, nor completes.

.. marble::
:alt: single

----1--2--3--4-----|
[ single(3) ]
----------3--------|
[ single(x==3) ]
-----------------3-|

----1--3--3--4-----|
[ single(x==3) ]
----------x

----1--1--1--1-----|
[ single(x==3) ]
-------------------x

Example:
>>> res = single()
Expand Down
147 changes: 73 additions & 74 deletions reactivex/operators/_amb.py
Original file line number Diff line number Diff line change
@@ -1,92 +1,91 @@
from asyncio import Future
from typing import Callable, List, Optional, TypeVar, Union
from typing import List, Optional, TypeVar, Union

from reactivex import Observable, abc, from_future
from reactivex.curry import curry_flip
from reactivex.disposable import CompositeDisposable, SingleAssignmentDisposable

_T = TypeVar("_T")


@curry_flip(1)
def amb_(
right_source: Union[Observable[_T], "Future[_T]"]
) -> Callable[[Observable[_T]], Observable[_T]]:
left_source: Observable[_T], right_source: Union[Observable[_T], "Future[_T]"]
) -> Observable[_T]:

if isinstance(right_source, Future):
obs: Observable[_T] = from_future(right_source)
else:
obs = right_source

def amb(left_source: Observable[_T]) -> Observable[_T]:
def subscribe(
observer: abc.ObserverBase[_T],
scheduler: Optional[abc.SchedulerBase] = None,
) -> abc.DisposableBase:
choice: List[Optional[str]] = [None]
left_choice = "L"
right_choice = "R"
left_subscription = SingleAssignmentDisposable()
right_subscription = SingleAssignmentDisposable()

def choice_left():
if not choice[0]:
choice[0] = left_choice
right_subscription.dispose()

def choice_right():
if not choice[0]:
choice[0] = right_choice
left_subscription.dispose()

def on_next_left(value: _T) -> None:
with left_source.lock:
choice_left()
if choice[0] == left_choice:
observer.on_next(value)

def on_error_left(err: Exception) -> None:
with left_source.lock:
choice_left()
if choice[0] == left_choice:
observer.on_error(err)

def on_completed_left() -> None:
with left_source.lock:
choice_left()
if choice[0] == left_choice:
observer.on_completed()

left_d = left_source.subscribe(
on_next_left, on_error_left, on_completed_left, scheduler=scheduler
)
left_subscription.disposable = left_d

def send_right(value: _T) -> None:
with left_source.lock:
choice_right()
if choice[0] == right_choice:
observer.on_next(value)

def on_error_right(err: Exception) -> None:
with left_source.lock:
choice_right()
if choice[0] == right_choice:
observer.on_error(err)

def on_completed_right() -> None:
with left_source.lock:
choice_right()
if choice[0] == right_choice:
observer.on_completed()

right_d = obs.subscribe(
send_right, on_error_right, on_completed_right, scheduler=scheduler
)
right_subscription.disposable = right_d
return CompositeDisposable(left_subscription, right_subscription)

return Observable(subscribe)

return amb
def subscribe(
observer: abc.ObserverBase[_T],
scheduler: Optional[abc.SchedulerBase] = None,
) -> abc.DisposableBase:
choice: List[Optional[str]] = [None]
left_choice = "L"
right_choice = "R"
left_subscription = SingleAssignmentDisposable()
right_subscription = SingleAssignmentDisposable()

def choice_left():
if not choice[0]:
choice[0] = left_choice
right_subscription.dispose()

def choice_right():
if not choice[0]:
choice[0] = right_choice
left_subscription.dispose()

def on_next_left(value: _T) -> None:
with left_source.lock:
choice_left()
if choice[0] == left_choice:
observer.on_next(value)

def on_error_left(err: Exception) -> None:
with left_source.lock:
choice_left()
if choice[0] == left_choice:
observer.on_error(err)

def on_completed_left() -> None:
with left_source.lock:
choice_left()
if choice[0] == left_choice:
observer.on_completed()

left_d = left_source.subscribe(
on_next_left, on_error_left, on_completed_left, scheduler=scheduler
)
left_subscription.disposable = left_d

def send_right(value: _T) -> None:
with left_source.lock:
choice_right()
if choice[0] == right_choice:
observer.on_next(value)

def on_error_right(err: Exception) -> None:
with left_source.lock:
choice_right()
if choice[0] == right_choice:
observer.on_error(err)

def on_completed_right() -> None:
with left_source.lock:
choice_right()
if choice[0] == right_choice:
observer.on_completed()

right_d = obs.subscribe(
send_right, on_error_right, on_completed_right, scheduler=scheduler
)
right_subscription.disposable = right_d
return CompositeDisposable(left_subscription, right_subscription)

return Observable(subscribe)


__all__ = ["amb_"]
67 changes: 32 additions & 35 deletions reactivex/operators/_average.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from dataclasses import dataclass
from typing import Any, Callable, Optional, TypeVar, cast
from typing import Any, Optional, TypeVar, cast

from reactivex import Observable, operators, typing
from reactivex.curry import curry_flip

_T = TypeVar("_T")

Expand All @@ -12,51 +13,47 @@ class AverageValue:
count: int


@curry_flip(1)
def average_(
source: Observable[Any],
key_mapper: Optional[typing.Mapper[_T, float]] = None,
) -> Callable[[Observable[_T]], Observable[float]]:
def average(source: Observable[Any]) -> Observable[float]:
"""Partially applied average operator.
) -> Observable[float]:
"""Partially applied average operator.

Computes the average of an observable sequence of values that
are in the sequence or obtained by invoking a transform
function on each element of the input sequence if present.
Computes the average of an observable sequence of values that
are in the sequence or obtained by invoking a transform
function on each element of the input sequence if present.

Examples:
>>> res = average(source)
Examples:
>>> res = average(source)

Args:
source: Source observable to average.
Args:
source: Source observable to average.

Returns:
An observable sequence containing a single element with the
average of the sequence of values.
"""
Returns:
An observable sequence containing a single element with the
average of the sequence of values.
"""

key_mapper_: typing.Mapper[_T, float] = key_mapper or (
lambda x: float(cast(Any, x))
)
key_mapper_: typing.Mapper[_T, float] = key_mapper or (
lambda x: float(cast(Any, x))
)

def accumulator(prev: AverageValue, cur: float) -> AverageValue:
return AverageValue(sum=prev.sum + cur, count=prev.count + 1)
def accumulator(prev: AverageValue, cur: float) -> AverageValue:
return AverageValue(sum=prev.sum + cur, count=prev.count + 1)

def mapper(s: AverageValue) -> float:
if s.count == 0:
raise Exception("The input sequence was empty")
def mapper(s: AverageValue) -> float:
return s.sum / float(s.count)

return s.sum / float(s.count)
seed = AverageValue(sum=0, count=0)

seed = AverageValue(sum=0, count=0)

ret = source.pipe(
operators.map(key_mapper_),
operators.scan(accumulator, seed),
operators.last(),
operators.map(mapper),
)
return ret

return average
ret = source.pipe(
operators.map(key_mapper_),
operators.scan(accumulator, seed),
operators.last(),
operators.map(mapper),
)
return ret


__all__ = ["average_"]
Loading