Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Address Effect.throttle sendability #3325

Merged
merged 1 commit into from
Aug 30, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 35 additions & 35 deletions Sources/ComposableArchitecture/Effects/Throttle.swift
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import Combine
@preconcurrency import Combine
import Dispatch
import Foundation

extension Effect {
extension Effect where Action: Sendable {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the action flows through Combine across potential concurrency boundaries, I think we're forced to require a sendable action here. It's possible we could weaken this with a pure Swift concurrency throttle.

/// Throttles an effect so that it only publishes one output per given interval.
///
/// The throttling of an effect is with respect to actions being sent into the store. So, if
Expand All @@ -23,12 +23,13 @@ extension Effect {
/// `false`, the publisher emits the first element received during the interval.
/// - Returns: An effect that emits either the most-recent or first element received during the
/// specified interval.
public func throttle<S: Scheduler>(
public func throttle<S: Scheduler & Sendable>(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The scheduler also travels across concurrency boundaries, so it needs to be sendable. Dispatch queues are sendable, but run loops are not. Combine's ImmediateScheduler is also not sendable, but our .immediate scheduler is, as is our TestScheduler, so I think this is safe to do, it'll just mean folks shouldn't throttle with a run loop.

id: some Hashable & Sendable,
for interval: S.SchedulerTimeType.Stride,
scheduler: S,
latest: Bool
) -> Self {
) -> Self
where S.SchedulerTimeType.Stride: Sendable {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another requirement. Dispatch queue's stride type is sendable.

switch self.operation {
case .none:
return .none
Expand All @@ -42,45 +43,44 @@ extension Effect {
publisher
.receive(on: scheduler)
.flatMap { value -> AnyPublisher<Action, Never> in
throttleLock.lock()
defer { throttleLock.unlock() }
throttleState.withValue {
guard let throttleTime = $0.times[id] as! S.SchedulerTimeType? else {
$0.times[id] = scheduler.now
$0.values[id] = nil
return Just(value).eraseToAnyPublisher()
}

guard let throttleTime = throttleTimes[id] as! S.SchedulerTimeType? else {
throttleTimes[id] = scheduler.now
throttleValues[id] = nil
return Just(value).eraseToAnyPublisher()
}

let value = latest ? value : (throttleValues[id] as! Action? ?? value)
throttleValues[id] = value
let value = latest ? value : ($0.values[id] as! Action? ?? value)
$0.values[id] = value

guard throttleTime.distance(to: scheduler.now) < interval else {
throttleTimes[id] = scheduler.now
throttleValues[id] = nil
return Just(value).eraseToAnyPublisher()
}
guard throttleTime.distance(to: scheduler.now) < interval else {
$0.times[id] = scheduler.now
$0.values[id] = nil
return Just(value).eraseToAnyPublisher()
}

return Just(value)
.delay(
for: scheduler.now.distance(to: throttleTime.advanced(by: interval)),
scheduler: scheduler
)
.handleEvents(
receiveOutput: { _ in
throttleLock.sync {
throttleTimes[id] = scheduler.now
throttleValues[id] = nil
return Just(value)
.delay(
for: scheduler.now.distance(to: throttleTime.advanced(by: interval)),
scheduler: scheduler
)
.handleEvents(
receiveOutput: { _ in
throttleState.withValue {
$0.times[id] = scheduler.now
$0.values[id] = nil
}
}
}
)
.eraseToAnyPublisher()
)
.eraseToAnyPublisher()
}
}
}
.cancellable(id: id, cancelInFlight: true)
}
}
}

var throttleTimes: [AnyHashable: Any] = [:]
var throttleValues: [AnyHashable: Any] = [:]
let throttleLock = NSRecursiveLock()
private let throttleState = LockIsolated<(times: [AnyHashable: Any], values: [AnyHashable: Any])>(
(times: [:], values: [:])
)