diff --git a/Sources/CCoroutine/CCoroutine.c b/Sources/CCoroutine/CCoroutine.c index 741cfff..baf81b6 100644 --- a/Sources/CCoroutine/CCoroutine.c +++ b/Sources/CCoroutine/CCoroutine.c @@ -10,6 +10,8 @@ #import #include +// MARK: - context + int __start(void* ret, const void* stack, const void* param, const void (*block)(const void*)) { int n = _setjmp(ret); if (n) return n; @@ -41,6 +43,8 @@ void __longjmp(void* env, int retVal) { _longjmp(env, retVal); } +// MARK: - atomic + long __atomicExchange(_Atomic long* value, long desired) { return atomic_exchange(value, desired); } @@ -48,3 +52,11 @@ long __atomicExchange(_Atomic long* value, long desired) { void __atomicStore(_Atomic long* value, long desired) { atomic_store(value, desired); } + +void __atomicFetchAdd(_Atomic long* value, long operand) { + atomic_fetch_add(value, operand); +} + +int __atomicCompareExchange(_Atomic long* value, long* expected, long desired) { + return atomic_compare_exchange_weak(value, expected, desired); +} diff --git a/Sources/CCoroutine/include/CCoroutine.h b/Sources/CCoroutine/include/CCoroutine.h index e29d4ae..ef59636 100644 --- a/Sources/CCoroutine/include/CCoroutine.h +++ b/Sources/CCoroutine/include/CCoroutine.h @@ -9,12 +9,18 @@ #ifndef CCoroutine_h #define CCoroutine_h +// MARK: - context + int __start(void* ret, const void* stack, const void* param, const void (*block)(const void*)); void __suspend(void* env, void** sp, void* ret, int retVal); int __save(void* env, void* ret, int retVal); void __longjmp(void* env, int retVal); +// MARK: - atomic + long __atomicExchange(_Atomic long* value, long desired); void __atomicStore(_Atomic long* value, long desired); +void __atomicFetchAdd(_Atomic long* value, long operand); +int __atomicCompareExchange(_Atomic long* value, long* expected, long desired); #endif diff --git a/Sources/SwiftCoroutine/Coroutine/Shared/SharedCoroutineDispatcher.swift b/Sources/SwiftCoroutine/Coroutine/Shared/SharedCoroutineDispatcher.swift index 857dbb7..25db33a 100644 --- a/Sources/SwiftCoroutine/Coroutine/Shared/SharedCoroutineDispatcher.swift +++ b/Sources/SwiftCoroutine/Coroutine/Shared/SharedCoroutineDispatcher.swift @@ -20,27 +20,38 @@ internal final class SharedCoroutineDispatcher: _CoroutineTaskExecutor { private var freeQueues = [SharedCoroutineQueue]() private var suspendedQueues = Set() private var tasks = FifoQueue() + private var freeCount: AtomicInt internal init(contextsCount: Int, stackSize: Int) { self.stackSize = stackSize self.contextsCount = contextsCount + freeCount = AtomicInt(value: contextsCount) freeQueues.reserveCapacity(contextsCount) suspendedQueues.reserveCapacity(contextsCount) startDispatchSource() } internal func execute(on scheduler: CoroutineScheduler, task: @escaping () -> Void) { - mutex.lock() - if let queue = freeQueue { - mutex.unlock() - scheduler.scheduleTask { - self.start(task: .init(scheduler: scheduler, task: task), on: queue) - self.performNext(for: queue) + func perform() { + freeCount.update { max(0, $0 - 1) } + mutex.lock() + if let queue = freeQueue { + mutex.unlock() + start(task: .init(scheduler: scheduler, task: task), on: queue) + performNext(for: queue) + } else { + tasks.push(.init(scheduler: scheduler, task: task)) + mutex.unlock() + } + } + if freeCount.value == 0 { + mutex.lock() + defer { mutex.unlock() } + if freeCount.value == 0 { + return tasks.push(.init(scheduler: scheduler, task: task)) } - } else { - tasks.push(.init(scheduler: scheduler, task: task)) - mutex.unlock() } + scheduler.scheduleTask(perform) } private var freeQueue: SharedCoroutineQueue? { @@ -64,11 +75,33 @@ internal final class SharedCoroutineDispatcher: _CoroutineTaskExecutor { } internal func resume(_ coroutine: SharedCoroutine) { +// func perform() { +// mutex.lock() +// if suspendedQueues.remove(coroutine.queue) == nil { +// coroutine.queue.push(coroutine) +// mutex.unlock() +// } else { +// freeCount.decrease() +// mutex.unlock() +// coroutine.resume() +// performNext(for: coroutine.queue) +// } +// } +// mutex.lock() +// if suspendedQueues.contains(coroutine.queue) { +// mutex.unlock() +// coroutine.scheduler.scheduleTask(perform) +// } else { +// coroutine.queue.push(coroutine) +// mutex.unlock() +// } + mutex.lock() if suspendedQueues.remove(coroutine.queue) == nil { coroutine.queue.push(coroutine) mutex.unlock() } else { + freeCount.decrease() mutex.unlock() coroutine.scheduler.scheduleTask { coroutine.resume() @@ -103,9 +136,11 @@ internal final class SharedCoroutineDispatcher: _CoroutineTaskExecutor { } } else if queue.started == 0 { freeQueues.append(queue) + freeCount.increase() return mutex.unlock() } else { suspendedQueues.insert(queue) + freeCount.increase() return mutex.unlock() } if state.update(.none) == .running { return } diff --git a/Sources/SwiftCoroutine/Helpers/AtomicEnum.swift b/Sources/SwiftCoroutine/Helpers/AtomicEnum.swift index 444a178..f182398 100644 --- a/Sources/SwiftCoroutine/Helpers/AtomicEnum.swift +++ b/Sources/SwiftCoroutine/Helpers/AtomicEnum.swift @@ -6,31 +6,21 @@ // Copyright © 2020 Alex Belozierov. All rights reserved. // -#if SWIFT_PACKAGE -import CCoroutine -#endif - internal struct AtomicEnum where T.RawValue == Int { - private var _value: Int + private var atomic: AtomicInt - init(value: T) { - _value = value.rawValue + @inlinable init(value: T) { + atomic = AtomicInt(value: value.rawValue) } - var value: T { - get { T(rawValue: _value)! } - set { - withUnsafeMutablePointer(to: &_value) { - __atomicStore(OpaquePointer($0), newValue.rawValue) - } - } + @inlinable var value: T { + get { T(rawValue: atomic.value)! } + set { atomic.value = newValue.rawValue } } - mutating func update(_ newValue: T) -> T { - withUnsafeMutablePointer(to: &_value) { - T(rawValue: __atomicExchange(OpaquePointer($0), newValue.rawValue))! - } + @inlinable mutating func update(_ newValue: T) -> T { + T(rawValue: atomic.update(newValue.rawValue))! } } diff --git a/Sources/SwiftCoroutine/Helpers/AtomicInt.swift b/Sources/SwiftCoroutine/Helpers/AtomicInt.swift new file mode 100644 index 0000000..b5328bb --- /dev/null +++ b/Sources/SwiftCoroutine/Helpers/AtomicInt.swift @@ -0,0 +1,55 @@ +// +// AtomicInt.swift +// SwiftCoroutine +// +// Created by Alex Belozierov on 01.04.2020. +// Copyright © 2020 Alex Belozierov. All rights reserved. +// + +#if SWIFT_PACKAGE +import CCoroutine +#endif + +internal struct AtomicInt { + + private var _value: Int + + @inlinable init(value: Int) { + _value = value + } + + @inlinable var value: Int { + get { _value } + set { + withUnsafeMutablePointer(to: &_value) { + __atomicStore(OpaquePointer($0), newValue) + } + } + } + + @inlinable mutating func add(_ value: Int) { + withUnsafeMutablePointer(to: &_value) { + __atomicFetchAdd(OpaquePointer($0), value) + } + } + + @inlinable mutating func increase() { add(1) } + @inlinable mutating func decrease() { add(-1) } + + @discardableResult @inlinable + mutating func update(_ transform: (Int) -> Int) -> (old: Int, new: Int) { + withUnsafeMutablePointer(to: &_value) { + var oldValue = $0.pointee, newValue: Int + repeat { newValue = transform(oldValue) } + while __atomicCompareExchange(OpaquePointer($0), &oldValue, newValue) == 0 + return (oldValue, newValue) + } + } + + @inlinable mutating func update(_ newValue: Int) -> Int { + withUnsafeMutablePointer(to: &_value) { + __atomicExchange(OpaquePointer($0), newValue) + } + } + +} diff --git a/SwiftCoroutine.xcodeproj/project.pbxproj b/SwiftCoroutine.xcodeproj/project.pbxproj index 34852cb..2341a98 100644 --- a/SwiftCoroutine.xcodeproj/project.pbxproj +++ b/SwiftCoroutine.xcodeproj/project.pbxproj @@ -41,6 +41,8 @@ 1A595BF723E872D5009B62E7 /* CoroutineContextPool.swift in Sources */ = {isa = PBXBuildFile; fileRef = 1A595BF323E87298009B62E7 /* CoroutineContextPool.swift */; }; 1A595BFB23E8A31F009B62E7 /* CoPromise.swift in Sources */ = {isa = PBXBuildFile; fileRef = 1A595BF823E8A2C7009B62E7 /* CoPromise.swift */; }; 1A595BFC23E8A320009B62E7 /* CoPromise.swift in Sources */ = {isa = PBXBuildFile; fileRef = 1A595BF823E8A2C7009B62E7 /* CoPromise.swift */; }; + 1A64013E2433F0B900E4CECD /* AtomicInt.swift in Sources */ = {isa = PBXBuildFile; fileRef = 1A64013D2433F0B900E4CECD /* AtomicInt.swift */; }; + 1A64013F2433F0B900E4CECD /* AtomicInt.swift in Sources */ = {isa = PBXBuildFile; fileRef = 1A64013D2433F0B900E4CECD /* AtomicInt.swift */; }; 1A65BABB239CE05D004C1716 /* CoroutineContext.swift in Sources */ = {isa = PBXBuildFile; fileRef = 1A65BABA239CE05C004C1716 /* CoroutineContext.swift */; }; 1A65BAC2239CEFED004C1716 /* CoroutineContext.swift in Sources */ = {isa = PBXBuildFile; fileRef = 1A65BABA239CE05C004C1716 /* CoroutineContext.swift */; }; 1A6910C32421612400C546ED /* CoFutureCombineTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 1A6910C22421612400C546ED /* CoFutureCombineTests.swift */; }; @@ -140,6 +142,7 @@ 1A57BE6423CC867600639FDD /* CoroutineContextTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = CoroutineContextTests.swift; sourceTree = ""; }; 1A595BF323E87298009B62E7 /* CoroutineContextPool.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = CoroutineContextPool.swift; sourceTree = ""; }; 1A595BF823E8A2C7009B62E7 /* CoPromise.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = CoPromise.swift; sourceTree = ""; }; + 1A64013D2433F0B900E4CECD /* AtomicInt.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = AtomicInt.swift; sourceTree = ""; }; 1A65BABA239CE05C004C1716 /* CoroutineContext.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = CoroutineContext.swift; sourceTree = ""; }; 1A6910C22421612400C546ED /* CoFutureCombineTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = CoFutureCombineTests.swift; sourceTree = ""; }; 1A71364C24215BBB00DC78FD /* CoFutureAwaitTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = CoFutureAwaitTests.swift; sourceTree = ""; }; @@ -226,6 +229,7 @@ 1A4C6DAC2416AB2C00EF2974 /* FifoQueue.swift */, 1A8129A72417D56700AE061C /* Constants.swift */, 1ABD0672242A368F00A22764 /* AtomicEnum.swift */, + 1A64013D2433F0B900E4CECD /* AtomicInt.swift */, ); path = Helpers; sourceTree = ""; @@ -676,6 +680,7 @@ 1A739AD023B1555A00165280 /* Coroutine+StackSize.swift in Sources */, 1A595BFB23E8A31F009B62E7 /* CoPromise.swift in Sources */, 1A993208242022D100B3BF32 /* CoFuture3+hashable.swift in Sources */, + 1A64013E2433F0B900E4CECD /* AtomicInt.swift in Sources */, 1A2D8D7C241EBD1A00ECC849 /* CoFuture+Combine.swift in Sources */, 1A2D8D82241EBD5700ECC849 /* CoFuturePublisher.swift in Sources */, 1A3583FF23DD796A0086A6E6 /* CoFuture1+whenComplete.swift in Sources */, @@ -735,6 +740,7 @@ 1A739AD123B1555A00165280 /* Coroutine+StackSize.swift in Sources */, 1A595BFC23E8A320009B62E7 /* CoPromise.swift in Sources */, 1A993209242022D100B3BF32 /* CoFuture3+hashable.swift in Sources */, + 1A64013F2433F0B900E4CECD /* AtomicInt.swift in Sources */, 1A2D8D7D241EBD1A00ECC849 /* CoFuture+Combine.swift in Sources */, 1A2D8D83241EBD5700ECC849 /* CoFuturePublisher.swift in Sources */, 1A35840023DD796A0086A6E6 /* CoFuture1+whenComplete.swift in Sources */, diff --git a/SwiftCoroutine.xcodeproj/xcshareddata/xcbaselines/F8CD25A920199D1000952299.xcbaseline/DEB0EA3A-457C-4029-95F2-69E53CE6D25D.plist b/SwiftCoroutine.xcodeproj/xcshareddata/xcbaselines/F8CD25A920199D1000952299.xcbaseline/DEB0EA3A-457C-4029-95F2-69E53CE6D25D.plist index 2816d5e..c0b1e4d 100644 --- a/SwiftCoroutine.xcodeproj/xcshareddata/xcbaselines/F8CD25A920199D1000952299.xcbaseline/DEB0EA3A-457C-4029-95F2-69E53CE6D25D.plist +++ b/SwiftCoroutine.xcodeproj/xcshareddata/xcbaselines/F8CD25A920199D1000952299.xcbaseline/DEB0EA3A-457C-4029-95F2-69E53CE6D25D.plist @@ -16,6 +16,16 @@ Local Baseline + testNestetAwaits() + + com.apple.XCTPerformanceMetric_WallClockTime + + baselineAverage + 0.030667 + baselineIntegrationDisplayName + Local Baseline + + testSuspendResume() com.apple.XCTPerformanceMetric_WallClockTime diff --git a/Tests/SwiftCoroutineTests/CoFutureTests/CoFutureAwaitTests.swift b/Tests/SwiftCoroutineTests/CoFutureTests/CoFutureAwaitTests.swift index a23dc91..0721782 100644 --- a/Tests/SwiftCoroutineTests/CoFutureTests/CoFutureAwaitTests.swift +++ b/Tests/SwiftCoroutineTests/CoFutureTests/CoFutureAwaitTests.swift @@ -59,14 +59,16 @@ class CoFutureAwaitTests: XCTestCase { } func testNestetAwaits() { - let queue = DispatchQueue.global() + let queue = DispatchQueue.global(qos: .userInteractive) + let queue2 = DispatchQueue.global(qos: .utility) let group = DispatchGroup() measure { group.enter() queue.coFuture { - try (0..<100).map { _ in - queue.coFuture { - try (0..<100) + try (0..<100).map { i -> CoFuture in + let queue = i % 2 == 0 ? queue : queue2 + return queue.coFuture { + try (0..<1000) .map { _ in CoFuture(value: ()) } .forEach { try $0.await() } } @@ -79,5 +81,17 @@ class CoFutureAwaitTests: XCTestCase { group.wait() } } + + func testOnBlockedSerial() { + let exp = expectation(description: "testAbc") + exp.expectedFulfillmentCount = 1000 + let serial = DispatchQueue(label: "sdadad") + serial.async { sleep(5) } + for _ in 0..<1000 { serial.startCoroutine { } } + for _ in 0..<1000 { + DispatchQueue.global().startCoroutine { exp.fulfill() } + } + wait(for: [exp], timeout: 3) + } } diff --git a/Tests/SwiftCoroutineTests/CoFutureTests/CoFutureCombineTests.swift b/Tests/SwiftCoroutineTests/CoFutureTests/CoFutureCombineTests.swift index 34d453a..2c8d50b 100644 --- a/Tests/SwiftCoroutineTests/CoFutureTests/CoFutureCombineTests.swift +++ b/Tests/SwiftCoroutineTests/CoFutureTests/CoFutureCombineTests.swift @@ -16,6 +16,7 @@ import Foundation class CoFutureCombineTests: XCTestCase { struct TestError: Error {} + var cancellables = Set() func testSubscribe() { let exp = expectation(description: "testSubscribe") @@ -36,7 +37,6 @@ class CoFutureCombineTests: XCTestCase { func testSubscription() { let exp = expectation(description: "testSubscription") - var cancellables = Set() let promise = CoPromise() promise.publisher() .map { $0 + 1 } @@ -54,7 +54,6 @@ class CoFutureCombineTests: XCTestCase { func testSubscriptionFail() { let exp = expectation(description: "testSubscriptionFail") - var cancellables = Set() let promise = CoPromise() promise.publisher() .sink(receiveCompletion: { diff --git a/Tests/SwiftCoroutineTests/Helpers/TestAtomic.swift b/Tests/SwiftCoroutineTests/Helpers/TestAtomic.swift index 61c98c6..a98b725 100644 --- a/Tests/SwiftCoroutineTests/Helpers/TestAtomic.swift +++ b/Tests/SwiftCoroutineTests/Helpers/TestAtomic.swift @@ -16,7 +16,7 @@ class TestAtomic: XCTestCase { case free, blocked } - func testUpdate() { + func testEnumUpdate() { var atomic = AtomicEnum(value: State.free), counter = 0 DispatchQueue.concurrentPerform(iterations: 100_000) { _ in while true { @@ -30,4 +30,20 @@ class TestAtomic: XCTestCase { XCTAssertEqual(counter, 100_000) } + func testIntUpdate() { + var atomic = AtomicInt(value: 0) + DispatchQueue.concurrentPerform(iterations: 100_000) { _ in + atomic.update { $0 + 1 } + } + XCTAssertEqual(atomic.value, 100_000) + } + + func testIntIncrease() { + var atomic = AtomicInt(value: 0) + DispatchQueue.concurrentPerform(iterations: 100_000) { _ in + atomic.increase() + } + XCTAssertEqual(atomic.value, 100_000) + } + }