diff --git a/Sources/ComposableArchitecture/Internal/CurrentValueRelay.swift b/Sources/ComposableArchitecture/Internal/CurrentValueRelay.swift index 98e22980850c..6f8a9eb61c4d 100644 --- a/Sources/ComposableArchitecture/Internal/CurrentValueRelay.swift +++ b/Sources/ComposableArchitecture/Internal/CurrentValueRelay.swift @@ -79,23 +79,27 @@ extension CurrentValueRelay { } func receive(_ value: Output) { - guard let downstream else { return } + self.lock.lock() + + guard let downstream else { + self.lock.unlock() + return + } switch self.demand { case .unlimited: + self.lock.unlock() // NB: Adding to unlimited demand has no effect and can be ignored. _ = downstream.receive(value) case .none: - self.lock.sync { - self.receivedLastValue = false - } + self.receivedLastValue = false + self.lock.unlock() default: - self.lock.sync { - self.receivedLastValue = true - self.demand -= 1 - } + self.receivedLastValue = true + self.demand -= 1 + self.lock.unlock() let moreDemand = downstream.receive(value) self.lock.sync { self.demand += moreDemand @@ -106,14 +110,18 @@ extension CurrentValueRelay { func request(_ demand: Subscribers.Demand) { precondition(demand > 0, "Demand must be greater than zero") - guard let downstream else { return } - self.lock.lock() + + guard let downstream else { + self.lock.unlock() + return + } + self.demand += demand guard !self.receivedLastValue, - let value = self.upstream?.currentValue + let value = self.upstream?.value else { self.lock.unlock() return diff --git a/Tests/ComposableArchitectureTests/CurrentValueRelayTests.swift b/Tests/ComposableArchitectureTests/CurrentValueRelayTests.swift index 4bf45d8a2ea8..a1dcfa7d085b 100644 --- a/Tests/ComposableArchitectureTests/CurrentValueRelayTests.swift +++ b/Tests/ComposableArchitectureTests/CurrentValueRelayTests.swift @@ -25,5 +25,35 @@ _ = cancellable } + + @available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *) + func testConcurrentSendAndReceive() async { + nonisolated(unsafe) let subject = CurrentValueRelay(0) + let values = LockIsolated>([]) + let cancellable = subject.sink { (value: Int) in + values.withValue { + _ = $0.insert(value) + } + } + + let receives = Task.detached { @Sendable in + for await _ in subject.values {} + } + + await withTaskGroup(of: Void.self) { group in + for index in 1...1_000 { + group.addTask { @Sendable in + subject.send(index) + } + } + } + + receives.cancel() + _ = await receives.value + + XCTAssertEqual(values.value, Set(Array(0...1_000))) + + _ = cancellable + } } #endif