Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New observer abstraction for future backpressure support and decluttered stack traces #799

Merged
merged 3 commits into from
Jan 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ PlaygroundUtility.remap
# SwiftPM
.build
Packages
.swiftpm

# Carthage
Carthage/Build
Expand Down
82 changes: 72 additions & 10 deletions ReactiveSwift.xcodeproj/project.pbxproj

Large diffs are not rendered by default.

110 changes: 32 additions & 78 deletions Sources/Event.swift
Original file line number Diff line number Diff line change
Expand Up @@ -205,75 +205,29 @@ extension Signal.Event: EventProtocol {
// This operator performs side effect upon interruption.

extension Signal.Event {
internal typealias Transformation<U, E: Swift.Error> = (@escaping Signal<U, E>.Observer.Action, Lifetime) -> Signal<Value, Error>.Observer.Action
internal typealias Transformation<U, E: Swift.Error> = (ReactiveSwift.Observer<U, E>, Lifetime) -> ReactiveSwift.Observer<Value, Error>

internal static func filter(_ isIncluded: @escaping (Value) -> Bool) -> Transformation<Value, Error> {
return { action, _ in
return { event in
switch event {
case let .value(value):
if isIncluded(value) {
action(.value(value))
}

case .completed:
action(.completed)

case let .failed(error):
action(.failed(error))

case .interrupted:
action(.interrupted)
}
}
return { downstream, _ in
Operators.Filter(downstream: downstream, predicate: isIncluded)
}
}

internal static func compactMap<U>(_ transform: @escaping (Value) -> U?) -> Transformation<U, Error> {
return { action, _ in
return { event in
switch event {
case let .value(value):
if let newValue = transform(value) {
action(.value(newValue))
}

case .completed:
action(.completed)

case let .failed(error):
action(.failed(error))

case .interrupted:
action(.interrupted)
}
}
return { downstream, _ in
Operators.CompactMap(downstream: downstream, transform: transform)
}
}

internal static func map<U>(_ transform: @escaping (Value) -> U) -> Transformation<U, Error> {
return { action, _ in
return { event in
switch event {
case let .value(value):
action(.value(transform(value)))

case .completed:
action(.completed)

case let .failed(error):
action(.failed(error))

case .interrupted:
action(.interrupted)
}
}
Operators.Map(downstream: action, transform: transform)
}
}

internal static func mapError<E>(_ transform: @escaping (Error) -> E) -> Transformation<Value, E> {
return { action, _ in
return { event in
return Signal.Observer { event in
switch event {
case let .value(value):
action(.value(value))
Expand All @@ -293,7 +247,7 @@ extension Signal.Event {

internal static var materialize: Transformation<Signal<Value, Error>.Event, Never> {
return { action, _ in
return { event in
return Signal.Observer { event in
action(.value(event))

switch event {
Expand All @@ -312,7 +266,7 @@ extension Signal.Event {

internal static var materializeResults: Transformation<Result<Value, Error>, Never> {
return { action, _ in
return { event in
return Signal.Observer { event in
switch event {
case .value(let value):
action(.value(Result(success: value)))
Expand All @@ -333,7 +287,7 @@ extension Signal.Event {

internal static func attemptMap<U>(_ transform: @escaping (Value) -> Result<U, Error>) -> Transformation<U, Error> {
return { action, _ in
return { event in
return Signal.Observer { event in
switch event {
case let .value(value):
switch transform(value) {
Expand Down Expand Up @@ -382,7 +336,7 @@ extension Signal.Event {
return { action, _ in
var taken = 0

return { event in
return Signal.Observer { event in
guard let value = event.value else {
action(event)
return
Expand All @@ -405,7 +359,7 @@ extension Signal.Event {
var buffer: [Value] = []
buffer.reserveCapacity(count)

return { event in
return Signal.Observer { event in
switch event {
case let .value(value):
// To avoid exceeding the reserved capacity of the buffer,
Expand All @@ -430,7 +384,7 @@ extension Signal.Event {

internal static func take(while shouldContinue: @escaping (Value) -> Bool) -> Transformation<Value, Error> {
return { action, _ in
return { event in
return Signal.Observer { event in
if let value = event.value, !shouldContinue(value) {
action(.completed)
} else {
Expand All @@ -446,7 +400,7 @@ extension Signal.Event {
return { action, _ in
var skipped = 0

return { event in
return Signal.Observer { event in
if case .value = event, skipped < count {
skipped += 1
} else {
Expand All @@ -460,7 +414,7 @@ extension Signal.Event {
return { action, _ in
var isSkipping = true

return { event in
return Signal.Observer { event in
switch event {
case let .value(value):
isSkipping = isSkipping && shouldContinue(value)
Expand All @@ -479,7 +433,7 @@ extension Signal.Event {
extension Signal.Event where Value: EventProtocol, Error == Never {
internal static var dematerialize: Transformation<Value.Value, Value.Error> {
return { action, _ in
return { event in
return Signal.Observer { event in
switch event {
case let .value(innerEvent):
action(innerEvent.event)
Expand All @@ -501,7 +455,7 @@ extension Signal.Event where Value: EventProtocol, Error == Never {
extension Signal.Event where Value: ResultProtocol, Error == Never {
internal static var dematerializeResults: Transformation<Value.Success, Value.Failure> {
return { action, _ in
return { event in
return Signal.Observer { event in
let event = event.map { $0.result }

switch event {
Expand Down Expand Up @@ -577,7 +531,7 @@ extension Signal.Event {
return { action, _ in
let state = CollectState<Value>()

return { event in
return Signal.Observer { event in
switch event {
case let .value(value):
state.append(value)
Expand All @@ -603,7 +557,7 @@ extension Signal.Event {
return { action, _ in
let state = CollectState<Value>()

return { event in
return Signal.Observer { event in
switch event {
case let .value(value):
if shouldEmit(state.values, value) {
Expand Down Expand Up @@ -633,7 +587,7 @@ extension Signal.Event {
return { action, _ in
var previous = initial

return { event in
return Signal.Observer { event in
switch event {
case let .value(value):
if let previous = previous {
Expand All @@ -655,7 +609,7 @@ extension Signal.Event {
return { action, _ in
var previous: Value?

return { event in
return Signal.Observer { event in
switch event {
case let .value(value):
if let previous = previous, isEquivalent(previous, value) {
Expand All @@ -674,7 +628,7 @@ extension Signal.Event {
return { action, _ in
var seenValues: Set<Identity> = []

return { event in
return Signal.Observer { event in
switch event {
case let .value(value):
let identity = transform(value)
Expand All @@ -694,7 +648,7 @@ extension Signal.Event {
return { action, _ in
var accumulator = initialResult

return { event in
return Signal.Observer { event in
switch event {
case let .value(value):
nextPartialResult(&accumulator, value)
Expand Down Expand Up @@ -729,7 +683,7 @@ extension Signal.Event {
return { action, _ in
var accumulator = initialState

return { event in
return Signal.Observer { event in
switch event {
case let .value(value):
let output = next(&accumulator, value)
Expand Down Expand Up @@ -761,7 +715,7 @@ extension Signal.Event {
}
}

return { event in
return Signal.Observer { event in
scheduler.schedule {
if !lifetime.hasEnded {
action(event)
Expand All @@ -786,7 +740,7 @@ extension Signal.Event {
}
}

return { event in
return Signal.Observer { event in
switch event {
case let .value(value):
// Schedule only when there is no prior outstanding value.
Expand Down Expand Up @@ -828,7 +782,7 @@ extension Signal.Event {
}
}

return { event in
return Signal.Observer { event in
switch event {
case .failed, .interrupted:
scheduler.schedule {
Expand Down Expand Up @@ -859,7 +813,7 @@ extension Signal.Event {
scheduler.schedule { action(.interrupted) }
}

return { event in
return Signal.Observer { event in
guard let value = event.value else {
schedulerDisposable.inner = scheduler.schedule {
action(event)
Expand Down Expand Up @@ -901,7 +855,7 @@ extension Signal.Event {
scheduler.schedule { action(.interrupted) }
}

return { event in
return Signal.Observer { event in
switch event {
case let .value(value):
state.modify { state in
Expand Down Expand Up @@ -960,7 +914,7 @@ extension Signal.Event {
scheduler.schedule { action(.interrupted) }
}

return { event in
return Signal.Observer { event in
switch event {
case let .value(value):
state.modify { $0.values.append(value) }
Expand Down Expand Up @@ -1016,7 +970,7 @@ private struct ThrottleState<Value> {
extension Signal.Event where Error == Never {
internal static func promoteError<F>(_: F.Type) -> Transformation<Value, F> {
return { action, _ in
return { event in
return Signal.Observer { event in
switch event {
case let .value(value):
action(.value(value))
Expand All @@ -1035,7 +989,7 @@ extension Signal.Event where Error == Never {
extension Signal.Event where Value == Never {
internal static func promoteValue<U>(_: U.Type) -> Transformation<U, Error> {
return { action, _ in
return { event in
return Signal.Observer { event in
action(event.promoteValue())
}
}
Expand Down
21 changes: 21 additions & 0 deletions Sources/Observers/CompactMap.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
extension Operators {
internal final class CompactMap<InputValue, OutputValue, Error: Swift.Error>: Observer<InputValue, Error> {
let downstream: Observer<OutputValue, Error>
let transform: (InputValue) -> OutputValue?

init(downstream: Observer<OutputValue, Error>, transform: @escaping (InputValue) -> OutputValue?) {
self.downstream = downstream
self.transform = transform
}

override func receive(_ value: InputValue) {
if let output = transform(value) {
downstream.receive(output)
}
}

override func terminate(_ termination: Termination<Error>) {
downstream.terminate(termination)
}
}
}
21 changes: 21 additions & 0 deletions Sources/Observers/Filter.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
extension Operators {
internal final class Filter<Value, Error: Swift.Error>: Observer<Value, Error> {
let downstream: Observer<Value, Error>
let predicate: (Value) -> Bool

init(downstream: Observer<Value, Error>, predicate: @escaping (Value) -> Bool) {
self.downstream = downstream
self.predicate = predicate
}

override func receive(_ value: Value) {
if predicate(value) {
downstream.receive(value)
}
}

override func terminate(_ termination: Termination<Error>) {
downstream.terminate(termination)
}
}
}
19 changes: 19 additions & 0 deletions Sources/Observers/Map.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
extension Operators {
internal final class Map<InputValue, OutputValue, Error: Swift.Error>: Observer<InputValue, Error> {
let downstream: Observer<OutputValue, Error>
let transform: (InputValue) -> OutputValue

init(downstream: Observer<OutputValue, Error>, transform: @escaping (InputValue) -> OutputValue) {
self.downstream = downstream
self.transform = transform
}

override func receive(_ value: InputValue) {
downstream.receive(transform(value))
}

override func terminate(_ termination: Termination<Error>) {
downstream.terminate(termination)
}
}
}
Loading