diff --git a/ReactiveSwift.xcodeproj/project.pbxproj b/ReactiveSwift.xcodeproj/project.pbxproj index c9e1c49e6..77760658e 100644 --- a/ReactiveSwift.xcodeproj/project.pbxproj +++ b/ReactiveSwift.xcodeproj/project.pbxproj @@ -85,6 +85,18 @@ 9A2D5D3B259F985B005682ED /* Delay.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5D39259F985B005682ED /* Delay.swift */; }; 9A2D5D3C259F985B005682ED /* Delay.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5D39259F985B005682ED /* Delay.swift */; }; 9A2D5D3D259F985B005682ED /* Delay.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5D39259F985B005682ED /* Delay.swift */; }; + 9A2D5D53259FA000005682ED /* Throttle.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5D52259FA000005682ED /* Throttle.swift */; }; + 9A2D5D54259FA000005682ED /* Throttle.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5D52259FA000005682ED /* Throttle.swift */; }; + 9A2D5D55259FA000005682ED /* Throttle.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5D52259FA000005682ED /* Throttle.swift */; }; + 9A2D5D56259FA000005682ED /* Throttle.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5D52259FA000005682ED /* Throttle.swift */; }; + 9A2D5D5D259FA0DD005682ED /* Debounce.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5D5C259FA0DD005682ED /* Debounce.swift */; }; + 9A2D5D5E259FA0DD005682ED /* Debounce.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5D5C259FA0DD005682ED /* Debounce.swift */; }; + 9A2D5D5F259FA0DD005682ED /* Debounce.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5D5C259FA0DD005682ED /* Debounce.swift */; }; + 9A2D5D60259FA0DD005682ED /* Debounce.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5D5C259FA0DD005682ED /* Debounce.swift */; }; + 9A2D5D67259FA59E005682ED /* CollectEvery.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5D66259FA59E005682ED /* CollectEvery.swift */; }; + 9A2D5D68259FA59E005682ED /* CollectEvery.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5D66259FA59E005682ED /* CollectEvery.swift */; }; + 9A2D5D69259FA59E005682ED /* CollectEvery.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5D66259FA59E005682ED /* CollectEvery.swift */; }; + 9A2D5D6A259FA59E005682ED /* CollectEvery.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5D66259FA59E005682ED /* CollectEvery.swift */; }; 9A67963B1F6056B90058C5B4 /* UninhabitedTypeGuards.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A67963A1F6056B90058C5B4 /* UninhabitedTypeGuards.swift */; }; 9A67963C1F6059420058C5B4 /* UninhabitedTypeGuards.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A67963A1F6056B90058C5B4 /* UninhabitedTypeGuards.swift */; }; 9A67963D1F6059430058C5B4 /* UninhabitedTypeGuards.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A67963A1F6056B90058C5B4 /* UninhabitedTypeGuards.swift */; }; @@ -284,6 +296,9 @@ 9A2D5D25259F9373005682ED /* ObserveOn.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ObserveOn.swift; sourceTree = ""; }; 9A2D5D2F259F942B005682ED /* LazyMap.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = LazyMap.swift; sourceTree = ""; }; 9A2D5D39259F985B005682ED /* Delay.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = Delay.swift; sourceTree = ""; }; + 9A2D5D52259FA000005682ED /* Throttle.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = Throttle.swift; sourceTree = ""; }; + 9A2D5D5C259FA0DD005682ED /* Debounce.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = Debounce.swift; sourceTree = ""; }; + 9A2D5D66259FA59E005682ED /* CollectEvery.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = CollectEvery.swift; sourceTree = ""; }; 9A67963A1F6056B90058C5B4 /* UninhabitedTypeGuards.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = UninhabitedTypeGuards.swift; sourceTree = ""; }; 9A681A9D1E5A241B00B097CF /* DeprecationSpec.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = DeprecationSpec.swift; sourceTree = ""; }; 9A9100DE1E0E6E620093E346 /* ValidatingProperty.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ValidatingProperty.swift; sourceTree = ""; }; @@ -441,6 +456,9 @@ 9A2D5D25259F9373005682ED /* ObserveOn.swift */, 9A2D5D2F259F942B005682ED /* LazyMap.swift */, 9A2D5D39259F985B005682ED /* Delay.swift */, + 9A2D5D52259FA000005682ED /* Throttle.swift */, + 9A2D5D5C259FA0DD005682ED /* Debounce.swift */, + 9A2D5D66259FA59E005682ED /* CollectEvery.swift */, ); path = Observers; sourceTree = ""; @@ -934,6 +952,7 @@ 57A4D1B11BA13D7A00F7D4B1 /* Optional.swift in Sources */, 57A4D1B41BA13D7A00F7D4B1 /* Disposable.swift in Sources */, 57A4D1B61BA13D7A00F7D4B1 /* Event.swift in Sources */, + 9A2D5D6A259FA59E005682ED /* CollectEvery.swift in Sources */, 57A4D1B81BA13D7A00F7D4B1 /* Scheduler.swift in Sources */, 9A9100E21E0E6E680093E346 /* ValidatingProperty.swift in Sources */, 57A4D1B91BA13D7A00F7D4B1 /* Action.swift in Sources */, @@ -943,7 +962,9 @@ 9A2D5D3D259F985B005682ED /* Delay.swift in Sources */, 57A4D1BB1BA13D7A00F7D4B1 /* Signal.swift in Sources */, 9AFA492824E9B15C003D263C /* Operators.swift in Sources */, + 9A2D5D56259FA000005682ED /* Throttle.swift in Sources */, 9A67963E1F6059440058C5B4 /* UninhabitedTypeGuards.swift in Sources */, + 9A2D5D60259FA0DD005682ED /* Debounce.swift in Sources */, 9AFA491424E9A196003D263C /* Map.swift in Sources */, 9A2D5D33259F942B005682ED /* LazyMap.swift in Sources */, 9AFA491E24E9A925003D263C /* Filter.swift in Sources */, @@ -999,6 +1020,7 @@ A9F793341B60D0140026BCBA /* Optional.swift in Sources */, A9B315BC1B3940810001CB9C /* Disposable.swift in Sources */, A9B315BE1B3940810001CB9C /* Event.swift in Sources */, + 9A2D5D69259FA59E005682ED /* CollectEvery.swift in Sources */, A9B315C01B3940810001CB9C /* Scheduler.swift in Sources */, 9A9100E11E0E6E680093E346 /* ValidatingProperty.swift in Sources */, A9B315C11B3940810001CB9C /* Action.swift in Sources */, @@ -1008,7 +1030,9 @@ 9A2D5D3C259F985B005682ED /* Delay.swift in Sources */, A9B315C31B3940810001CB9C /* Signal.swift in Sources */, 9AFA492724E9B15C003D263C /* Operators.swift in Sources */, + 9A2D5D55259FA000005682ED /* Throttle.swift in Sources */, 9A67963D1F6059430058C5B4 /* UninhabitedTypeGuards.swift in Sources */, + 9A2D5D5F259FA0DD005682ED /* Debounce.swift in Sources */, 9AFA491324E9A196003D263C /* Map.swift in Sources */, 9A2D5D32259F942B005682ED /* LazyMap.swift in Sources */, 9AFA491D24E9A925003D263C /* Filter.swift in Sources */, @@ -1036,6 +1060,7 @@ D871D69F1B3B29A40070F16C /* Optional.swift in Sources */, D08C54B61A69A3DB00AD8286 /* Event.swift in Sources */, D0C312D319EF2A5800984962 /* Disposable.swift in Sources */, + 9A2D5D67259FA59E005682ED /* CollectEvery.swift in Sources */, 9A9100DF1E0E6E620093E346 /* ValidatingProperty.swift in Sources */, EBCC7DBC1BBF010C00A2AE92 /* Signal.Observer.swift in Sources */, D03B4A3D19F4C39A009E02AC /* FoundationExtensions.swift in Sources */, @@ -1045,7 +1070,9 @@ 9A2D5D3A259F985B005682ED /* Delay.swift in Sources */, D85C652A1C0D84C7005A77AD /* Flatten.swift in Sources */, 9AFA492524E9B15C003D263C /* Operators.swift in Sources */, + 9A2D5D53259FA000005682ED /* Throttle.swift in Sources */, 9A67963B1F6056B90058C5B4 /* UninhabitedTypeGuards.swift in Sources */, + 9A2D5D5D259FA0DD005682ED /* Debounce.swift in Sources */, 9AFA491124E9A196003D263C /* Map.swift in Sources */, 9A2D5D30259F942B005682ED /* LazyMap.swift in Sources */, 9AFA491B24E9A925003D263C /* Filter.swift in Sources */, @@ -1101,6 +1128,7 @@ D08C54B41A69A2AF00AD8286 /* Signal.swift in Sources */, D8E84A671B3B32FB00C3E831 /* Optional.swift in Sources */, D0C312D419EF2A5800984962 /* Disposable.swift in Sources */, + 9A2D5D68259FA59E005682ED /* CollectEvery.swift in Sources */, D08C54B91A69A9D100AD8286 /* SignalProducer.swift in Sources */, 9A9100E01E0E6E670093E346 /* ValidatingProperty.swift in Sources */, 9ABCB1861D2A5B5A00BCA243 /* Deprecations+Removals.swift in Sources */, @@ -1110,7 +1138,9 @@ 9A2D5D3B259F985B005682ED /* Delay.swift in Sources */, D85C652B1C0E70E3005A77AD /* Flatten.swift in Sources */, 9AFA492624E9B15C003D263C /* Operators.swift in Sources */, + 9A2D5D54259FA000005682ED /* Throttle.swift in Sources */, 9A67963C1F6059420058C5B4 /* UninhabitedTypeGuards.swift in Sources */, + 9A2D5D5E259FA0DD005682ED /* Debounce.swift in Sources */, 9AFA491224E9A196003D263C /* Map.swift in Sources */, 9A2D5D31259F942B005682ED /* LazyMap.swift in Sources */, 9AFA491C24E9A925003D263C /* Filter.swift in Sources */, diff --git a/Sources/Event.swift b/Sources/Event.swift index 93449538f..eb998bbc0 100644 --- a/Sources/Event.swift +++ b/Sources/Event.swift @@ -726,170 +726,37 @@ extension Signal.Event { } internal static func throttle(_ interval: TimeInterval, on scheduler: DateScheduler) -> Transformation { - precondition(interval >= 0) - - return { action, lifetime in - let state: Atomic> = Atomic(ThrottleState()) - let schedulerDisposable = SerialDisposable() - - lifetime.observeEnded { - schedulerDisposable.dispose() - scheduler.schedule { action(.interrupted) } - } - - return Signal.Observer { event in - guard let value = event.value else { - schedulerDisposable.inner = scheduler.schedule { - action(event) - } - return - } - - let scheduleDate: Date = state.modify { state in - state.pendingValue = value - - let proposedScheduleDate: Date - if let previousDate = state.previousDate, previousDate <= scheduler.currentDate { - proposedScheduleDate = previousDate.addingTimeInterval(interval) - } else { - proposedScheduleDate = scheduler.currentDate - } - - return proposedScheduleDate < scheduler.currentDate ? scheduler.currentDate : proposedScheduleDate - } - - schedulerDisposable.inner = scheduler.schedule(after: scheduleDate) { - if let pendingValue = state.modify({ $0.retrieveValue(date: scheduleDate) }) { - action(.value(pendingValue)) - } - } - } + return { downstream, lifetime in + Operators.Throttle(downstream: downstream, downstreamLifetime: lifetime, target: scheduler, interval: interval) } } internal static func debounce(_ interval: TimeInterval, on scheduler: DateScheduler, discardWhenCompleted: Bool) -> Transformation { - precondition(interval >= 0) - - return { action, lifetime in - let state: Atomic> = Atomic(ThrottleState(previousDate: scheduler.currentDate, pendingValue: nil)) - let d = SerialDisposable() - - lifetime.observeEnded { - d.dispose() - scheduler.schedule { action(.interrupted) } - } - - return Signal.Observer { event in - switch event { - case let .value(value): - state.modify { state in - state.pendingValue = value - } - let date = scheduler.currentDate.addingTimeInterval(interval) - d.inner = scheduler.schedule(after: date) { - if let pendingValue = state.modify({ $0.retrieveValue(date: date) }) { - action(.value(pendingValue)) - } - } - - case .completed: - d.inner = scheduler.schedule { - let pending: (value: Value, previousDate: Date)? = state.modify { state in - defer { state.pendingValue = nil } - guard let pendingValue = state.pendingValue, let previousDate = state.previousDate else { return nil } - return (pendingValue, previousDate) - } - if !discardWhenCompleted, let (pendingValue, previousDate) = pending { - scheduler.schedule(after: previousDate.addingTimeInterval(interval)) { - action(.value(pendingValue)) - action(.completed) - } - } else { - action(.completed) - } - } - - case .failed, .interrupted: - d.inner = scheduler.schedule { - action(event) - } - } - } + return { downstream, lifetime in + Operators.Debounce( + downstream: downstream, + downstreamLifetime: lifetime, + target: scheduler, + interval: interval, + discardWhenCompleted: discardWhenCompleted + ) } } internal static func collect(every interval: DispatchTimeInterval, on scheduler: DateScheduler, skipEmpty: Bool, discardWhenCompleted: Bool) -> Transformation<[Value], Error> { - return { action, lifetime in - let state = Atomic>(.init(skipEmpty: skipEmpty)) - let d = SerialDisposable() - - d.inner = scheduler.schedule(after: scheduler.currentDate.addingTimeInterval(interval), interval: interval, leeway: interval * 0.1) { - let (currentValues, isCompleted) = state.modify { ($0.collect(), $0.isCompleted) } - if let currentValues = currentValues { - action(.value(currentValues)) - } - if isCompleted { - action(.completed) - } - } - - lifetime.observeEnded { - d.dispose() - scheduler.schedule { action(.interrupted) } - } - - return Signal.Observer { event in - switch event { - case let .value(value): - state.modify { $0.values.append(value) } - case let .failed(error): - d.inner = scheduler.schedule { action(.failed(error)) } - case .completed where !discardWhenCompleted: - state.modify { $0.isCompleted = true } - case .completed: - d.inner = scheduler.schedule { action(.completed) } - case .interrupted: - d.inner = scheduler.schedule { action(.interrupted) } - } - } + return { downstream, lifetime in + Operators.CollectEvery( + downstream: downstream, + downstreamLifetime: lifetime, + target: scheduler, + interval: interval, + skipEmpty: skipEmpty, + discardWhenCompleted: discardWhenCompleted + ) } } } -private struct CollectEveryState { - let skipEmpty: Bool - var values: [Value] = [] - var isCompleted: Bool = false - - init(skipEmpty: Bool) { - self.skipEmpty = skipEmpty - } - - var hasValues: Bool { - return !values.isEmpty || !skipEmpty - } - - mutating func collect() -> [Value]? { - guard hasValues else { return nil } - defer { values.removeAll() } - return values - } -} - -private struct ThrottleState { - var previousDate: Date? - var pendingValue: Value? - - mutating func retrieveValue(date: Date) -> Value? { - defer { - if pendingValue != nil { - pendingValue = nil - previousDate = date - } - } - return pendingValue - } -} extension Signal.Event where Error == Never { internal static func promoteError(_: F.Type) -> Transformation { diff --git a/Sources/Observers/CollectEvery.swift b/Sources/Observers/CollectEvery.swift new file mode 100644 index 000000000..7a8d5ccbc --- /dev/null +++ b/Sources/Observers/CollectEvery.swift @@ -0,0 +1,76 @@ +extension Operators { + internal final class CollectEvery: UnaryAsyncOperator { + let interval: DispatchTimeInterval + let discardWhenCompleted: Bool + let targetWithClock: DateScheduler + + private let state: Atomic> + private let timerDisposable = SerialDisposable() + + init( + downstream: Observer<[Value], Error>, + downstreamLifetime: Lifetime, + target: DateScheduler, + interval: DispatchTimeInterval, + skipEmpty: Bool, + discardWhenCompleted: Bool + ) { + self.interval = interval + self.discardWhenCompleted = discardWhenCompleted + self.targetWithClock = target + self.state = Atomic(CollectEveryState(skipEmpty: skipEmpty)) + + super.init(downstream: downstream, downstreamLifetime: downstreamLifetime, target: target) + + downstreamLifetime += timerDisposable + + let initialDate = targetWithClock.currentDate.addingTimeInterval(interval) + timerDisposable.inner = targetWithClock.schedule(after: initialDate, interval: interval, leeway: interval * 0.1) { + let (currentValues, isCompleted) = self.state.modify { ($0.collect(), $0.isCompleted) } + + if let currentValues = currentValues { + self.unscheduledSend(currentValues) + } + + if isCompleted { + self.unscheduledTerminate(.completed) + } + } + } + + override func receive(_ value: Value) { + state.modify { $0.values.append(value) } + } + + override func terminate(_ termination: Termination) { + guard isActive else { return } + + if case .completed = termination, !discardWhenCompleted { + state.modify { $0.isCompleted = true } + } else { + timerDisposable.dispose() + super.terminate(termination) + } + } + } +} + +private struct CollectEveryState { + let skipEmpty: Bool + var values: [Value] = [] + var isCompleted: Bool = false + + init(skipEmpty: Bool) { + self.skipEmpty = skipEmpty + } + + var hasValues: Bool { + return !values.isEmpty || !skipEmpty + } + + mutating func collect() -> [Value]? { + guard hasValues else { return nil } + defer { values.removeAll() } + return values + } +} diff --git a/Sources/Observers/Debounce.swift b/Sources/Observers/Debounce.swift new file mode 100644 index 000000000..39bbf9a68 --- /dev/null +++ b/Sources/Observers/Debounce.swift @@ -0,0 +1,75 @@ +extension Operators { + internal final class Debounce: UnaryAsyncOperator { + let interval: TimeInterval + let discardWhenCompleted: Bool + let targetWithClock: DateScheduler + + private let state: Atomic> = Atomic(DebounceState()) + private let schedulerDisposable = SerialDisposable() + + init( + downstream: Observer, + downstreamLifetime: Lifetime, + target: DateScheduler, + interval: TimeInterval, + discardWhenCompleted: Bool + ) { + precondition(interval >= 0) + + self.interval = interval + self.discardWhenCompleted = discardWhenCompleted + self.targetWithClock = target + + super.init(downstream: downstream, downstreamLifetime: downstreamLifetime, target: target) + + downstreamLifetime += schedulerDisposable + } + + override func receive(_ value: Value) { + let now = targetWithClock.currentDate + + state.modify { state in + state.lastUpdated = now + state.pendingValue = value + } + let targetDate = now.addingTimeInterval(interval) + schedulerDisposable.inner = targetWithClock.schedule(after: targetDate) { + if let pendingValue = self.state.modify({ $0.retrieve() }) { + self.unscheduledSend(pendingValue) + } + } + } + + override func terminate(_ termination: Termination) { + guard isActive else { return } + schedulerDisposable.dispose() + + if case .completed = termination { + let pending: (value: Value?, lastUpdated: Date) = state.modify { state in + return (state.retrieve(), state.lastUpdated) + } + + if !discardWhenCompleted, let pendingValue = pending.value { + targetWithClock.schedule(after: pending.lastUpdated.addingTimeInterval(interval)) { + self.unscheduledSend(pendingValue) + super.terminate(.completed) + } + } else { + super.terminate(.completed) + } + } else { + super.terminate(termination) + } + } + } +} + +private struct DebounceState { + var lastUpdated: Date = .distantPast + var pendingValue: Value? + + mutating func retrieve() -> Value? { + defer { pendingValue = nil } + return pendingValue + } +} diff --git a/Sources/Observers/Throttle.swift b/Sources/Observers/Throttle.swift new file mode 100644 index 000000000..dfd1cabe8 --- /dev/null +++ b/Sources/Observers/Throttle.swift @@ -0,0 +1,62 @@ +extension Operators { + internal final class Throttle: UnaryAsyncOperator { + let interval: TimeInterval + let targetWithClock: DateScheduler + + private let state: Atomic> = Atomic(ThrottleState()) + private let schedulerDisposable = SerialDisposable() + + init( + downstream: Observer, + downstreamLifetime: Lifetime, + target: DateScheduler, + interval: TimeInterval + ) { + precondition(interval >= 0) + + self.interval = interval + self.targetWithClock = target + super.init(downstream: downstream, downstreamLifetime: downstreamLifetime, target: target) + + downstreamLifetime += schedulerDisposable + } + + override func receive(_ value: Value) { + let scheduleDate: Date = state.modify { state in + state.pendingValue = value + + let proposedScheduleDate: Date + if let previousDate = state.previousDate, previousDate <= targetWithClock.currentDate { + proposedScheduleDate = previousDate.addingTimeInterval(interval) + } else { + proposedScheduleDate = targetWithClock.currentDate + } + + return proposedScheduleDate < targetWithClock.currentDate ? targetWithClock.currentDate : proposedScheduleDate + } + + schedulerDisposable.inner = targetWithClock.schedule(after: scheduleDate) { + guard self.isActive else { return } + + if let pendingValue = self.state.modify({ $0.retrieveValue(date: scheduleDate) }) { + self.unscheduledSend(pendingValue) + } + } + } + } +} + +private struct ThrottleState { + var previousDate: Date? + var pendingValue: Value? + + mutating func retrieveValue(date: Date) -> Value? { + defer { + if pendingValue != nil { + pendingValue = nil + previousDate = date + } + } + return pendingValue + } +} diff --git a/Sources/Observers/UnaryAsyncOperator.swift b/Sources/Observers/UnaryAsyncOperator.swift index 4f9616b76..6821e5f22 100644 --- a/Sources/Observers/UnaryAsyncOperator.swift +++ b/Sources/Observers/UnaryAsyncOperator.swift @@ -46,16 +46,29 @@ internal class UnaryAsyncOperator: downstream.receive(value) } + /// Signal termination to the downstream without any implicit scheduling on `target`. + /// + /// - important: Subclasses must invoke this only after having hopped onto the target scheduler. + final func unscheduledTerminate(_ termination: Termination) { + if self.state.tryTransition(from: .active, to: .terminated) { + if case .completed = termination { + self.onCompleted() + } + + self.downstream.terminate(termination) + } + } + open override func terminate(_ termination: Termination) { // The atomic transition here must happen **after** we hop onto the target scheduler. This is to preserve the timing // behaviour observed in previous versions of ReactiveSwift. target.schedule { - if self.state.tryTransition(from: .active, to: .terminated) { - self.downstream.terminate(termination) - } + self.unscheduledTerminate(termination) } } + + open func onCompleted() {} } private enum AsyncOperatorState: Int32 {