Skip to content

Commit

Permalink
Address Effect.throttle sendability (#3325)
Browse files Browse the repository at this point in the history
  • Loading branch information
stephencelis authored and mbrandonw committed Aug 30, 2024
1 parent d277743 commit 53d5389
Showing 1 changed file with 35 additions and 35 deletions.
70 changes: 35 additions & 35 deletions Sources/ComposableArchitecture/Effects/Throttle.swift
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import Combine
@preconcurrency import Combine
import Dispatch
import Foundation

extension Effect {
extension Effect where Action: Sendable {
/// Throttles an effect so that it only publishes one output per given interval.
///
/// The throttling of an effect is with respect to actions being sent into the store. So, if
Expand All @@ -23,12 +23,13 @@ extension Effect {
/// `false`, the publisher emits the first element received during the interval.
/// - Returns: An effect that emits either the most-recent or first element received during the
/// specified interval.
public func throttle<S: Scheduler>(
public func throttle<S: Scheduler & Sendable>(
id: some Hashable & Sendable,
for interval: S.SchedulerTimeType.Stride,
scheduler: S,
latest: Bool
) -> Self {
) -> Self
where S.SchedulerTimeType.Stride: Sendable {
switch self.operation {
case .none:
return .none
Expand All @@ -42,45 +43,44 @@ extension Effect {
publisher
.receive(on: scheduler)
.flatMap { value -> AnyPublisher<Action, Never> in
throttleLock.lock()
defer { throttleLock.unlock() }
throttleState.withValue {
guard let throttleTime = $0.times[id] as! S.SchedulerTimeType? else {
$0.times[id] = scheduler.now
$0.values[id] = nil
return Just(value).eraseToAnyPublisher()
}

guard let throttleTime = throttleTimes[id] as! S.SchedulerTimeType? else {
throttleTimes[id] = scheduler.now
throttleValues[id] = nil
return Just(value).eraseToAnyPublisher()
}

let value = latest ? value : (throttleValues[id] as! Action? ?? value)
throttleValues[id] = value
let value = latest ? value : ($0.values[id] as! Action? ?? value)
$0.values[id] = value

guard throttleTime.distance(to: scheduler.now) < interval else {
throttleTimes[id] = scheduler.now
throttleValues[id] = nil
return Just(value).eraseToAnyPublisher()
}
guard throttleTime.distance(to: scheduler.now) < interval else {
$0.times[id] = scheduler.now
$0.values[id] = nil
return Just(value).eraseToAnyPublisher()
}

return Just(value)
.delay(
for: scheduler.now.distance(to: throttleTime.advanced(by: interval)),
scheduler: scheduler
)
.handleEvents(
receiveOutput: { _ in
throttleLock.sync {
throttleTimes[id] = scheduler.now
throttleValues[id] = nil
return Just(value)
.delay(
for: scheduler.now.distance(to: throttleTime.advanced(by: interval)),
scheduler: scheduler
)
.handleEvents(
receiveOutput: { _ in
throttleState.withValue {
$0.times[id] = scheduler.now
$0.values[id] = nil
}
}
}
)
.eraseToAnyPublisher()
)
.eraseToAnyPublisher()
}
}
}
.cancellable(id: id, cancelInFlight: true)
}
}
}

var throttleTimes: [AnyHashable: Any] = [:]
var throttleValues: [AnyHashable: Any] = [:]
let throttleLock = NSRecursiveLock()
private let throttleState = LockIsolated<(times: [AnyHashable: Any], values: [AnyHashable: Any])>(
(times: [:], values: [:])
)

0 comments on commit 53d5389

Please sign in to comment.