From d30efd383976879ccbe928e2f4c59d0995c9d44d Mon Sep 17 00:00:00 2001 From: Kabir Oberai Date: Tue, 15 Oct 2024 19:44:09 -0400 Subject: [PATCH 1/8] Test tweak test tweaks --- .../CurrentValueRelayTests.swift | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/Tests/ComposableArchitectureTests/CurrentValueRelayTests.swift b/Tests/ComposableArchitectureTests/CurrentValueRelayTests.swift index 4bf45d8a2ea8..a5f0d40a6479 100644 --- a/Tests/ComposableArchitectureTests/CurrentValueRelayTests.swift +++ b/Tests/ComposableArchitectureTests/CurrentValueRelayTests.swift @@ -25,5 +25,34 @@ _ = cancellable } + + func testConcurrentSendAndReceive() async { + nonisolated(unsafe) let subject = CurrentValueRelay(0) + let values = LockIsolated>([]) + let cancellable = subject.sink { (value: Int) in + values.withValue { + _ = $0.insert(value) + } + } + + let receives = Task.detached { @Sendable in + for await _ in subject.values {} + } + + await withTaskGroup(of: Void.self) { group in + for index in 1...1_000 { + group.addTask { @Sendable in + subject.send(index) + } + } + } + + receives.cancel() + _ = await receives.value + + XCTAssertEqual(values.value, Set(Array(0...1_000))) + + _ = cancellable + } } #endif From ce6b0e7faae73f9693313523daab867971c517d2 Mon Sep 17 00:00:00 2001 From: Kabir Oberai Date: Tue, 15 Oct 2024 20:13:29 -0400 Subject: [PATCH 2/8] Slow fix --- .../Internal/CurrentValueRelay.swift | 63 +++++++++---------- 1 file changed, 29 insertions(+), 34 deletions(-) diff --git a/Sources/ComposableArchitecture/Internal/CurrentValueRelay.swift b/Sources/ComposableArchitecture/Internal/CurrentValueRelay.swift index 98e22980850c..fccd40e94336 100644 --- a/Sources/ComposableArchitecture/Internal/CurrentValueRelay.swift +++ b/Sources/ComposableArchitecture/Internal/CurrentValueRelay.swift @@ -5,7 +5,7 @@ final class CurrentValueRelay: Publisher { typealias Failure = Never private var currentValue: Output - private let lock: os_unfair_lock_t + private let lock: NSRecursiveLock private var subscriptions = ContiguousArray() var value: Output { @@ -15,13 +15,7 @@ final class CurrentValueRelay: Publisher { init(_ value: Output) { self.currentValue = value - self.lock = os_unfair_lock_t.allocate(capacity: 1) - self.lock.initialize(to: os_unfair_lock()) - } - - deinit { - self.lock.deinitialize(count: 1) - self.lock.deallocate() + self.lock = NSRecursiveLock() } func receive(subscriber: some Subscriber) { @@ -36,7 +30,7 @@ final class CurrentValueRelay: Publisher { self.lock.sync { self.currentValue = value } - for subscription in self.lock.sync({ self.subscriptions }) { + for subscription in self.lock.sync(work: { self.subscriptions }) { subscription.receive(value) } } @@ -52,27 +46,28 @@ final class CurrentValueRelay: Publisher { extension CurrentValueRelay { fileprivate final class Subscription: Combine.Subscription, Equatable { - private var demand = Subscribers.Demand.none - private var downstream: (any Subscriber)? - private let lock: os_unfair_lock_t + private var _demand = Subscribers.Demand.none + + private var _downstream: (any Subscriber)? + var downstream: (any Subscriber)? { + var downstream: (any Subscriber)? + self.lock.sync { downstream = _downstream } + return downstream + } + + private let lock: NSRecursiveLock private var receivedLastValue = false private var upstream: CurrentValueRelay? init(upstream: CurrentValueRelay, downstream: any Subscriber) { self.upstream = upstream - self.downstream = downstream - self.lock = os_unfair_lock_t.allocate(capacity: 1) - self.lock.initialize(to: os_unfair_lock()) - } - - deinit { - self.lock.deinitialize(count: 1) - self.lock.deallocate() + self._downstream = downstream + self.lock = upstream.lock } func cancel() { self.lock.sync { - self.downstream = nil + self._downstream = nil self.upstream?.remove(self) self.upstream = nil } @@ -81,24 +76,24 @@ extension CurrentValueRelay { func receive(_ value: Output) { guard let downstream else { return } - switch self.demand { + self.lock.lock() + switch self._demand { case .unlimited: + self.lock.unlock() // NB: Adding to unlimited demand has no effect and can be ignored. _ = downstream.receive(value) case .none: - self.lock.sync { - self.receivedLastValue = false - } + self.receivedLastValue = false + self.lock.unlock() default: - self.lock.sync { - self.receivedLastValue = true - self.demand -= 1 - } + self.receivedLastValue = true + self._demand -= 1 + self.lock.unlock() let moreDemand = downstream.receive(value) self.lock.sync { - self.demand += moreDemand + self._demand += moreDemand } } } @@ -109,7 +104,7 @@ extension CurrentValueRelay { guard let downstream else { return } self.lock.lock() - self.demand += demand + self._demand += demand guard !self.receivedLastValue, @@ -121,18 +116,18 @@ extension CurrentValueRelay { self.receivedLastValue = true - switch self.demand { + switch self._demand { case .unlimited: self.lock.unlock() // NB: Adding to unlimited demand has no effect and can be ignored. _ = downstream.receive(value) default: - self.demand -= 1 + self._demand -= 1 self.lock.unlock() let moreDemand = downstream.receive(value) self.lock.lock() - self.demand += moreDemand + self._demand += moreDemand self.lock.unlock() } } From ebec38bcdce3d08cdc6476ea8e5744281184c705 Mon Sep 17 00:00:00 2001 From: Kabir Oberai Date: Thu, 17 Oct 2024 20:39:24 -0400 Subject: [PATCH 3/8] Fix test compilation --- Tests/ComposableArchitectureTests/CurrentValueRelayTests.swift | 1 + 1 file changed, 1 insertion(+) diff --git a/Tests/ComposableArchitectureTests/CurrentValueRelayTests.swift b/Tests/ComposableArchitectureTests/CurrentValueRelayTests.swift index a5f0d40a6479..a1dcfa7d085b 100644 --- a/Tests/ComposableArchitectureTests/CurrentValueRelayTests.swift +++ b/Tests/ComposableArchitectureTests/CurrentValueRelayTests.swift @@ -26,6 +26,7 @@ _ = cancellable } + @available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *) func testConcurrentSendAndReceive() async { nonisolated(unsafe) let subject = CurrentValueRelay(0) let values = LockIsolated>([]) From 3cc0488fdbc3cd0e17a9abdbb703143024659bba Mon Sep 17 00:00:00 2001 From: Kabir Oberai Date: Thu, 17 Oct 2024 20:44:54 -0400 Subject: [PATCH 4/8] nonrecursive lock --- .../Internal/CurrentValueRelay.swift | 34 +++++++++++-------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/Sources/ComposableArchitecture/Internal/CurrentValueRelay.swift b/Sources/ComposableArchitecture/Internal/CurrentValueRelay.swift index fccd40e94336..e3b68f3b5e66 100644 --- a/Sources/ComposableArchitecture/Internal/CurrentValueRelay.swift +++ b/Sources/ComposableArchitecture/Internal/CurrentValueRelay.swift @@ -5,41 +5,45 @@ final class CurrentValueRelay: Publisher { typealias Failure = Never private var currentValue: Output - private let lock: NSRecursiveLock + private let lock: NSLock private var subscriptions = ContiguousArray() var value: Output { - get { self.lock.sync { self.currentValue } } + get { self.lock.withLock { self.currentValue } } set { self.send(newValue) } } init(_ value: Output) { self.currentValue = value - self.lock = NSRecursiveLock() + self.lock = NSLock() } func receive(subscriber: some Subscriber) { let subscription = Subscription(upstream: self, downstream: subscriber) - self.lock.sync { + self.lock.withLock { self.subscriptions.append(subscription) } subscriber.receive(subscription: subscription) } func send(_ value: Output) { - self.lock.sync { + self.lock.withLock { self.currentValue = value } - for subscription in self.lock.sync(work: { self.subscriptions }) { + for subscription in self.lock.withLock({ self.subscriptions }) { subscription.receive(value) } } + private func _remove(_ subscription: Subscription) { + guard let index = self.subscriptions.firstIndex(of: subscription) + else { return } + self.subscriptions.remove(at: index) + } + private func remove(_ subscription: Subscription) { - self.lock.sync { - guard let index = self.subscriptions.firstIndex(of: subscription) - else { return } - self.subscriptions.remove(at: index) + self.lock.withLock { + self._remove(subscription) } } } @@ -51,11 +55,11 @@ extension CurrentValueRelay { private var _downstream: (any Subscriber)? var downstream: (any Subscriber)? { var downstream: (any Subscriber)? - self.lock.sync { downstream = _downstream } + self.lock.withLock { downstream = _downstream } return downstream } - private let lock: NSRecursiveLock + private let lock: NSLock private var receivedLastValue = false private var upstream: CurrentValueRelay? @@ -66,9 +70,9 @@ extension CurrentValueRelay { } func cancel() { - self.lock.sync { + self.lock.withLock { self._downstream = nil - self.upstream?.remove(self) + self.upstream?._remove(self) self.upstream = nil } } @@ -92,7 +96,7 @@ extension CurrentValueRelay { self._demand -= 1 self.lock.unlock() let moreDemand = downstream.receive(value) - self.lock.sync { + self.lock.withLock { self._demand += moreDemand } } From 396798253ee8a5614dae2ad63ed2ff02a64ec557 Mon Sep 17 00:00:00 2001 From: Kabir Oberai Date: Thu, 17 Oct 2024 20:49:41 -0400 Subject: [PATCH 5/8] back to os_lock --- .../Internal/CurrentValueRelay.swift | 56 +++++++++++-------- 1 file changed, 32 insertions(+), 24 deletions(-) diff --git a/Sources/ComposableArchitecture/Internal/CurrentValueRelay.swift b/Sources/ComposableArchitecture/Internal/CurrentValueRelay.swift index e3b68f3b5e66..4cc1376479cb 100644 --- a/Sources/ComposableArchitecture/Internal/CurrentValueRelay.swift +++ b/Sources/ComposableArchitecture/Internal/CurrentValueRelay.swift @@ -4,46 +4,48 @@ import Foundation final class CurrentValueRelay: Publisher { typealias Failure = Never - private var currentValue: Output - private let lock: NSLock + private var _value: Output + private let lock: os_unfair_lock_t private var subscriptions = ContiguousArray() var value: Output { - get { self.lock.withLock { self.currentValue } } + get { self.lock.sync { self._value } } set { self.send(newValue) } } init(_ value: Output) { - self.currentValue = value - self.lock = NSLock() + self._value = value + self.lock = os_unfair_lock_t.allocate(capacity: 1) + self.lock.initialize(to: os_unfair_lock()) + } + + deinit { + self.lock.deinitialize(count: 1) + self.lock.deallocate() } func receive(subscriber: some Subscriber) { let subscription = Subscription(upstream: self, downstream: subscriber) - self.lock.withLock { + self.lock.sync { self.subscriptions.append(subscription) } subscriber.receive(subscription: subscription) } func send(_ value: Output) { - self.lock.withLock { - self.currentValue = value + self.lock.sync { + self._value = value } - for subscription in self.lock.withLock({ self.subscriptions }) { + for subscription in self.lock.sync({ self.subscriptions }) { subscription.receive(value) } } - private func _remove(_ subscription: Subscription) { - guard let index = self.subscriptions.firstIndex(of: subscription) - else { return } - self.subscriptions.remove(at: index) - } - private func remove(_ subscription: Subscription) { - self.lock.withLock { - self._remove(subscription) + self.lock.sync { + guard let index = self.subscriptions.firstIndex(of: subscription) + else { return } + self.subscriptions.remove(at: index) } } } @@ -55,24 +57,30 @@ extension CurrentValueRelay { private var _downstream: (any Subscriber)? var downstream: (any Subscriber)? { var downstream: (any Subscriber)? - self.lock.withLock { downstream = _downstream } + self.lock.sync { downstream = _downstream } return downstream } - private let lock: NSLock + private let lock: os_unfair_lock_t private var receivedLastValue = false private var upstream: CurrentValueRelay? init(upstream: CurrentValueRelay, downstream: any Subscriber) { self.upstream = upstream self._downstream = downstream - self.lock = upstream.lock + self.lock = os_unfair_lock_t.allocate(capacity: 1) + self.lock.initialize(to: os_unfair_lock()) + } + + deinit { + self.lock.deinitialize(count: 1) + self.lock.deallocate() } func cancel() { - self.lock.withLock { + self.lock.sync { self._downstream = nil - self.upstream?._remove(self) + self.upstream?.remove(self) self.upstream = nil } } @@ -96,7 +104,7 @@ extension CurrentValueRelay { self._demand -= 1 self.lock.unlock() let moreDemand = downstream.receive(value) - self.lock.withLock { + self.lock.sync { self._demand += moreDemand } } @@ -112,7 +120,7 @@ extension CurrentValueRelay { guard !self.receivedLastValue, - let value = self.upstream?.currentValue + let value = self.upstream?.value else { self.lock.unlock() return From da0413e6a46a66e5f4b2e66fe19c963f7bd3cc60 Mon Sep 17 00:00:00 2001 From: Kabir Oberai Date: Thu, 17 Oct 2024 20:50:32 -0400 Subject: [PATCH 6/8] undo renaming --- .../Internal/CurrentValueRelay.swift | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/Sources/ComposableArchitecture/Internal/CurrentValueRelay.swift b/Sources/ComposableArchitecture/Internal/CurrentValueRelay.swift index 4cc1376479cb..e25b5e10eeeb 100644 --- a/Sources/ComposableArchitecture/Internal/CurrentValueRelay.swift +++ b/Sources/ComposableArchitecture/Internal/CurrentValueRelay.swift @@ -4,17 +4,17 @@ import Foundation final class CurrentValueRelay: Publisher { typealias Failure = Never - private var _value: Output + private var currentValue: Output private let lock: os_unfair_lock_t private var subscriptions = ContiguousArray() var value: Output { - get { self.lock.sync { self._value } } + get { self.lock.sync { self.currentValue } } set { self.send(newValue) } } init(_ value: Output) { - self._value = value + self.currentValue = value self.lock = os_unfair_lock_t.allocate(capacity: 1) self.lock.initialize(to: os_unfair_lock()) } @@ -34,7 +34,7 @@ final class CurrentValueRelay: Publisher { func send(_ value: Output) { self.lock.sync { - self._value = value + self.currentValue = value } for subscription in self.lock.sync({ self.subscriptions }) { subscription.receive(value) @@ -52,7 +52,7 @@ final class CurrentValueRelay: Publisher { extension CurrentValueRelay { fileprivate final class Subscription: Combine.Subscription, Equatable { - private var _demand = Subscribers.Demand.none + private var demand = Subscribers.Demand.none private var _downstream: (any Subscriber)? var downstream: (any Subscriber)? { @@ -89,7 +89,7 @@ extension CurrentValueRelay { guard let downstream else { return } self.lock.lock() - switch self._demand { + switch self.demand { case .unlimited: self.lock.unlock() // NB: Adding to unlimited demand has no effect and can be ignored. @@ -101,11 +101,11 @@ extension CurrentValueRelay { default: self.receivedLastValue = true - self._demand -= 1 + self.demand -= 1 self.lock.unlock() let moreDemand = downstream.receive(value) self.lock.sync { - self._demand += moreDemand + self.demand += moreDemand } } } @@ -116,7 +116,7 @@ extension CurrentValueRelay { guard let downstream else { return } self.lock.lock() - self._demand += demand + self.demand += demand guard !self.receivedLastValue, @@ -128,18 +128,18 @@ extension CurrentValueRelay { self.receivedLastValue = true - switch self._demand { + switch self.demand { case .unlimited: self.lock.unlock() // NB: Adding to unlimited demand has no effect and can be ignored. _ = downstream.receive(value) default: - self._demand -= 1 + self.demand -= 1 self.lock.unlock() let moreDemand = downstream.receive(value) self.lock.lock() - self._demand += moreDemand + self.demand += moreDemand self.lock.unlock() } } From daaef0865d94d5fc207e05d03c75a25c647ca3e0 Mon Sep 17 00:00:00 2001 From: Kabir Oberai Date: Thu, 17 Oct 2024 21:16:02 -0400 Subject: [PATCH 7/8] visibility --- Sources/ComposableArchitecture/Internal/CurrentValueRelay.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/ComposableArchitecture/Internal/CurrentValueRelay.swift b/Sources/ComposableArchitecture/Internal/CurrentValueRelay.swift index e25b5e10eeeb..35d02e294401 100644 --- a/Sources/ComposableArchitecture/Internal/CurrentValueRelay.swift +++ b/Sources/ComposableArchitecture/Internal/CurrentValueRelay.swift @@ -55,7 +55,7 @@ extension CurrentValueRelay { private var demand = Subscribers.Demand.none private var _downstream: (any Subscriber)? - var downstream: (any Subscriber)? { + private var downstream: (any Subscriber)? { var downstream: (any Subscriber)? self.lock.sync { downstream = _downstream } return downstream From 9d6ffd326e7631ec3da8b350f96fd7b1f4e610eb Mon Sep 17 00:00:00 2001 From: Kabir Oberai Date: Mon, 21 Oct 2024 16:28:44 -0400 Subject: [PATCH 8/8] Feedback --- .../Internal/CurrentValueRelay.swift | 29 ++++++++++--------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/Sources/ComposableArchitecture/Internal/CurrentValueRelay.swift b/Sources/ComposableArchitecture/Internal/CurrentValueRelay.swift index 35d02e294401..6f8a9eb61c4d 100644 --- a/Sources/ComposableArchitecture/Internal/CurrentValueRelay.swift +++ b/Sources/ComposableArchitecture/Internal/CurrentValueRelay.swift @@ -53,21 +53,14 @@ final class CurrentValueRelay: Publisher { extension CurrentValueRelay { fileprivate final class Subscription: Combine.Subscription, Equatable { private var demand = Subscribers.Demand.none - - private var _downstream: (any Subscriber)? - private var downstream: (any Subscriber)? { - var downstream: (any Subscriber)? - self.lock.sync { downstream = _downstream } - return downstream - } - + private var downstream: (any Subscriber)? private let lock: os_unfair_lock_t private var receivedLastValue = false private var upstream: CurrentValueRelay? init(upstream: CurrentValueRelay, downstream: any Subscriber) { self.upstream = upstream - self._downstream = downstream + self.downstream = downstream self.lock = os_unfair_lock_t.allocate(capacity: 1) self.lock.initialize(to: os_unfair_lock()) } @@ -79,16 +72,20 @@ extension CurrentValueRelay { func cancel() { self.lock.sync { - self._downstream = nil + self.downstream = nil self.upstream?.remove(self) self.upstream = nil } } func receive(_ value: Output) { - guard let downstream else { return } - self.lock.lock() + + guard let downstream else { + self.lock.unlock() + return + } + switch self.demand { case .unlimited: self.lock.unlock() @@ -113,9 +110,13 @@ extension CurrentValueRelay { func request(_ demand: Subscribers.Demand) { precondition(demand > 0, "Demand must be greater than zero") - guard let downstream else { return } - self.lock.lock() + + guard let downstream else { + self.lock.unlock() + return + } + self.demand += demand guard