Skip to content

Commit

Permalink
New observer abstraction for future backpressure support and declutte…
Browse files Browse the repository at this point in the history
…red stack traces (#799)

* Subscriber

* /Subscriber/Observer/g; Create the `Operators` namespace.
  • Loading branch information
andersio authored Jan 1, 2021
1 parent 04d406a commit d7f60d9
Show file tree
Hide file tree
Showing 11 changed files with 222 additions and 93 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ PlaygroundUtility.remap
# SwiftPM
.build
Packages
.swiftpm

# Carthage
Carthage/Build
Expand Down
82 changes: 72 additions & 10 deletions ReactiveSwift.xcodeproj/project.pbxproj

Large diffs are not rendered by default.

110 changes: 32 additions & 78 deletions Sources/Event.swift
Original file line number Diff line number Diff line change
Expand Up @@ -205,75 +205,29 @@ extension Signal.Event: EventProtocol {
// This operator performs side effect upon interruption.

extension Signal.Event {
internal typealias Transformation<U, E: Swift.Error> = (@escaping Signal<U, E>.Observer.Action, Lifetime) -> Signal<Value, Error>.Observer.Action
internal typealias Transformation<U, E: Swift.Error> = (ReactiveSwift.Observer<U, E>, Lifetime) -> ReactiveSwift.Observer<Value, Error>

internal static func filter(_ isIncluded: @escaping (Value) -> Bool) -> Transformation<Value, Error> {
return { action, _ in
return { event in
switch event {
case let .value(value):
if isIncluded(value) {
action(.value(value))
}

case .completed:
action(.completed)

case let .failed(error):
action(.failed(error))

case .interrupted:
action(.interrupted)
}
}
return { downstream, _ in
Operators.Filter(downstream: downstream, predicate: isIncluded)
}
}

internal static func compactMap<U>(_ transform: @escaping (Value) -> U?) -> Transformation<U, Error> {
return { action, _ in
return { event in
switch event {
case let .value(value):
if let newValue = transform(value) {
action(.value(newValue))
}

case .completed:
action(.completed)

case let .failed(error):
action(.failed(error))

case .interrupted:
action(.interrupted)
}
}
return { downstream, _ in
Operators.CompactMap(downstream: downstream, transform: transform)
}
}

internal static func map<U>(_ transform: @escaping (Value) -> U) -> Transformation<U, Error> {
return { action, _ in
return { event in
switch event {
case let .value(value):
action(.value(transform(value)))

case .completed:
action(.completed)

case let .failed(error):
action(.failed(error))

case .interrupted:
action(.interrupted)
}
}
Operators.Map(downstream: action, transform: transform)
}
}

internal static func mapError<E>(_ transform: @escaping (Error) -> E) -> Transformation<Value, E> {
return { action, _ in
return { event in
return Signal.Observer { event in
switch event {
case let .value(value):
action(.value(value))
Expand All @@ -293,7 +247,7 @@ extension Signal.Event {

internal static var materialize: Transformation<Signal<Value, Error>.Event, Never> {
return { action, _ in
return { event in
return Signal.Observer { event in
action(.value(event))

switch event {
Expand All @@ -312,7 +266,7 @@ extension Signal.Event {

internal static var materializeResults: Transformation<Result<Value, Error>, Never> {
return { action, _ in
return { event in
return Signal.Observer { event in
switch event {
case .value(let value):
action(.value(Result(success: value)))
Expand All @@ -333,7 +287,7 @@ extension Signal.Event {

internal static func attemptMap<U>(_ transform: @escaping (Value) -> Result<U, Error>) -> Transformation<U, Error> {
return { action, _ in
return { event in
return Signal.Observer { event in
switch event {
case let .value(value):
switch transform(value) {
Expand Down Expand Up @@ -382,7 +336,7 @@ extension Signal.Event {
return { action, _ in
var taken = 0

return { event in
return Signal.Observer { event in
guard let value = event.value else {
action(event)
return
Expand All @@ -405,7 +359,7 @@ extension Signal.Event {
var buffer: [Value] = []
buffer.reserveCapacity(count)

return { event in
return Signal.Observer { event in
switch event {
case let .value(value):
// To avoid exceeding the reserved capacity of the buffer,
Expand All @@ -430,7 +384,7 @@ extension Signal.Event {

internal static func take(while shouldContinue: @escaping (Value) -> Bool) -> Transformation<Value, Error> {
return { action, _ in
return { event in
return Signal.Observer { event in
if let value = event.value, !shouldContinue(value) {
action(.completed)
} else {
Expand All @@ -446,7 +400,7 @@ extension Signal.Event {
return { action, _ in
var skipped = 0

return { event in
return Signal.Observer { event in
if case .value = event, skipped < count {
skipped += 1
} else {
Expand All @@ -460,7 +414,7 @@ extension Signal.Event {
return { action, _ in
var isSkipping = true

return { event in
return Signal.Observer { event in
switch event {
case let .value(value):
isSkipping = isSkipping && shouldContinue(value)
Expand All @@ -479,7 +433,7 @@ extension Signal.Event {
extension Signal.Event where Value: EventProtocol, Error == Never {
internal static var dematerialize: Transformation<Value.Value, Value.Error> {
return { action, _ in
return { event in
return Signal.Observer { event in
switch event {
case let .value(innerEvent):
action(innerEvent.event)
Expand All @@ -501,7 +455,7 @@ extension Signal.Event where Value: EventProtocol, Error == Never {
extension Signal.Event where Value: ResultProtocol, Error == Never {
internal static var dematerializeResults: Transformation<Value.Success, Value.Failure> {
return { action, _ in
return { event in
return Signal.Observer { event in
let event = event.map { $0.result }

switch event {
Expand Down Expand Up @@ -577,7 +531,7 @@ extension Signal.Event {
return { action, _ in
let state = CollectState<Value>()

return { event in
return Signal.Observer { event in
switch event {
case let .value(value):
state.append(value)
Expand All @@ -603,7 +557,7 @@ extension Signal.Event {
return { action, _ in
let state = CollectState<Value>()

return { event in
return Signal.Observer { event in
switch event {
case let .value(value):
if shouldEmit(state.values, value) {
Expand Down Expand Up @@ -633,7 +587,7 @@ extension Signal.Event {
return { action, _ in
var previous = initial

return { event in
return Signal.Observer { event in
switch event {
case let .value(value):
if let previous = previous {
Expand All @@ -655,7 +609,7 @@ extension Signal.Event {
return { action, _ in
var previous: Value?

return { event in
return Signal.Observer { event in
switch event {
case let .value(value):
if let previous = previous, isEquivalent(previous, value) {
Expand All @@ -674,7 +628,7 @@ extension Signal.Event {
return { action, _ in
var seenValues: Set<Identity> = []

return { event in
return Signal.Observer { event in
switch event {
case let .value(value):
let identity = transform(value)
Expand All @@ -694,7 +648,7 @@ extension Signal.Event {
return { action, _ in
var accumulator = initialResult

return { event in
return Signal.Observer { event in
switch event {
case let .value(value):
nextPartialResult(&accumulator, value)
Expand Down Expand Up @@ -729,7 +683,7 @@ extension Signal.Event {
return { action, _ in
var accumulator = initialState

return { event in
return Signal.Observer { event in
switch event {
case let .value(value):
let output = next(&accumulator, value)
Expand Down Expand Up @@ -761,7 +715,7 @@ extension Signal.Event {
}
}

return { event in
return Signal.Observer { event in
scheduler.schedule {
if !lifetime.hasEnded {
action(event)
Expand All @@ -786,7 +740,7 @@ extension Signal.Event {
}
}

return { event in
return Signal.Observer { event in
switch event {
case let .value(value):
// Schedule only when there is no prior outstanding value.
Expand Down Expand Up @@ -828,7 +782,7 @@ extension Signal.Event {
}
}

return { event in
return Signal.Observer { event in
switch event {
case .failed, .interrupted:
scheduler.schedule {
Expand Down Expand Up @@ -859,7 +813,7 @@ extension Signal.Event {
scheduler.schedule { action(.interrupted) }
}

return { event in
return Signal.Observer { event in
guard let value = event.value else {
schedulerDisposable.inner = scheduler.schedule {
action(event)
Expand Down Expand Up @@ -901,7 +855,7 @@ extension Signal.Event {
scheduler.schedule { action(.interrupted) }
}

return { event in
return Signal.Observer { event in
switch event {
case let .value(value):
state.modify { state in
Expand Down Expand Up @@ -960,7 +914,7 @@ extension Signal.Event {
scheduler.schedule { action(.interrupted) }
}

return { event in
return Signal.Observer { event in
switch event {
case let .value(value):
state.modify { $0.values.append(value) }
Expand Down Expand Up @@ -1016,7 +970,7 @@ private struct ThrottleState<Value> {
extension Signal.Event where Error == Never {
internal static func promoteError<F>(_: F.Type) -> Transformation<Value, F> {
return { action, _ in
return { event in
return Signal.Observer { event in
switch event {
case let .value(value):
action(.value(value))
Expand All @@ -1035,7 +989,7 @@ extension Signal.Event where Error == Never {
extension Signal.Event where Value == Never {
internal static func promoteValue<U>(_: U.Type) -> Transformation<U, Error> {
return { action, _ in
return { event in
return Signal.Observer { event in
action(event.promoteValue())
}
}
Expand Down
21 changes: 21 additions & 0 deletions Sources/Observers/CompactMap.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
extension Operators {
internal final class CompactMap<InputValue, OutputValue, Error: Swift.Error>: Observer<InputValue, Error> {
let downstream: Observer<OutputValue, Error>
let transform: (InputValue) -> OutputValue?

init(downstream: Observer<OutputValue, Error>, transform: @escaping (InputValue) -> OutputValue?) {
self.downstream = downstream
self.transform = transform
}

override func receive(_ value: InputValue) {
if let output = transform(value) {
downstream.receive(output)
}
}

override func terminate(_ termination: Termination<Error>) {
downstream.terminate(termination)
}
}
}
21 changes: 21 additions & 0 deletions Sources/Observers/Filter.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
extension Operators {
internal final class Filter<Value, Error: Swift.Error>: Observer<Value, Error> {
let downstream: Observer<Value, Error>
let predicate: (Value) -> Bool

init(downstream: Observer<Value, Error>, predicate: @escaping (Value) -> Bool) {
self.downstream = downstream
self.predicate = predicate
}

override func receive(_ value: Value) {
if predicate(value) {
downstream.receive(value)
}
}

override func terminate(_ termination: Termination<Error>) {
downstream.terminate(termination)
}
}
}
19 changes: 19 additions & 0 deletions Sources/Observers/Map.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
extension Operators {
internal final class Map<InputValue, OutputValue, Error: Swift.Error>: Observer<InputValue, Error> {
let downstream: Observer<OutputValue, Error>
let transform: (InputValue) -> OutputValue

init(downstream: Observer<OutputValue, Error>, transform: @escaping (InputValue) -> OutputValue) {
self.downstream = downstream
self.transform = transform
}

override func receive(_ value: InputValue) {
downstream.receive(transform(value))
}

override func terminate(_ termination: Termination<Error>) {
downstream.terminate(termination)
}
}
}
Loading

0 comments on commit d7f60d9

Please sign in to comment.