Skip to content

Commit

Permalink
- updated SharedCoroutineDispatcher
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex Belozierov committed Apr 1, 2020
1 parent 2bc9a0f commit 42a0257
Show file tree
Hide file tree
Showing 10 changed files with 177 additions and 34 deletions.
12 changes: 12 additions & 0 deletions Sources/CCoroutine/CCoroutine.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
#import <stdatomic.h>
#include <setjmp.h>

// MARK: - context

int __start(void* ret, const void* stack, const void* param, const void (*block)(const void*)) {
int n = _setjmp(ret);
if (n) return n;
Expand Down Expand Up @@ -41,10 +43,20 @@ void __longjmp(void* env, int retVal) {
_longjmp(env, retVal);
}

// MARK: - atomic

long __atomicExchange(_Atomic long* value, long desired) {
return atomic_exchange(value, desired);
}

void __atomicStore(_Atomic long* value, long desired) {
atomic_store(value, desired);
}

void __atomicFetchAdd(_Atomic long* value, long operand) {
atomic_fetch_add(value, operand);
}

int __atomicCompareExchange(_Atomic long* value, long* expected, long desired) {
return atomic_compare_exchange_weak(value, expected, desired);
}
6 changes: 6 additions & 0 deletions Sources/CCoroutine/include/CCoroutine.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,18 @@
#ifndef CCoroutine_h
#define CCoroutine_h

// MARK: - context

int __start(void* ret, const void* stack, const void* param, const void (*block)(const void*));
void __suspend(void* env, void** sp, void* ret, int retVal);
int __save(void* env, void* ret, int retVal);
void __longjmp(void* env, int retVal);

// MARK: - atomic

long __atomicExchange(_Atomic long* value, long desired);
void __atomicStore(_Atomic long* value, long desired);
void __atomicFetchAdd(_Atomic long* value, long operand);
int __atomicCompareExchange(_Atomic long* value, long* expected, long desired);

#endif
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,38 @@ internal final class SharedCoroutineDispatcher: _CoroutineTaskExecutor {
private var freeQueues = [SharedCoroutineQueue]()
private var suspendedQueues = Set<SharedCoroutineQueue>()
private var tasks = FifoQueue<Task>()
private var freeCount: AtomicInt

internal init(contextsCount: Int, stackSize: Int) {
self.stackSize = stackSize
self.contextsCount = contextsCount
freeCount = AtomicInt(value: contextsCount)
freeQueues.reserveCapacity(contextsCount)
suspendedQueues.reserveCapacity(contextsCount)
startDispatchSource()
}

internal func execute(on scheduler: CoroutineScheduler, task: @escaping () -> Void) {
mutex.lock()
if let queue = freeQueue {
mutex.unlock()
scheduler.scheduleTask {
self.start(task: .init(scheduler: scheduler, task: task), on: queue)
self.performNext(for: queue)
func perform() {
freeCount.update { max(0, $0 - 1) }
mutex.lock()
if let queue = freeQueue {
mutex.unlock()
start(task: .init(scheduler: scheduler, task: task), on: queue)
performNext(for: queue)
} else {
tasks.push(.init(scheduler: scheduler, task: task))
mutex.unlock()
}
}
if freeCount.value == 0 {
mutex.lock()
defer { mutex.unlock() }
if freeCount.value == 0 {
return tasks.push(.init(scheduler: scheduler, task: task))
}
} else {
tasks.push(.init(scheduler: scheduler, task: task))
mutex.unlock()
}
scheduler.scheduleTask(perform)
}

private var freeQueue: SharedCoroutineQueue? {
Expand All @@ -64,11 +75,33 @@ internal final class SharedCoroutineDispatcher: _CoroutineTaskExecutor {
}

internal func resume(_ coroutine: SharedCoroutine) {
// func perform() {
// mutex.lock()
// if suspendedQueues.remove(coroutine.queue) == nil {
// coroutine.queue.push(coroutine)
// mutex.unlock()
// } else {
// freeCount.decrease()
// mutex.unlock()
// coroutine.resume()
// performNext(for: coroutine.queue)
// }
// }
// mutex.lock()
// if suspendedQueues.contains(coroutine.queue) {
// mutex.unlock()
// coroutine.scheduler.scheduleTask(perform)
// } else {
// coroutine.queue.push(coroutine)
// mutex.unlock()
// }

mutex.lock()
if suspendedQueues.remove(coroutine.queue) == nil {
coroutine.queue.push(coroutine)
mutex.unlock()
} else {
freeCount.decrease()
mutex.unlock()
coroutine.scheduler.scheduleTask {
coroutine.resume()
Expand Down Expand Up @@ -103,9 +136,11 @@ internal final class SharedCoroutineDispatcher: _CoroutineTaskExecutor {
}
} else if queue.started == 0 {
freeQueues.append(queue)
freeCount.increase()
return mutex.unlock()
} else {
suspendedQueues.insert(queue)
freeCount.increase()
return mutex.unlock()
}
if state.update(.none) == .running { return }
Expand Down
26 changes: 8 additions & 18 deletions Sources/SwiftCoroutine/Helpers/AtomicEnum.swift
Original file line number Diff line number Diff line change
Expand Up @@ -6,31 +6,21 @@
// Copyright © 2020 Alex Belozierov. All rights reserved.
//

#if SWIFT_PACKAGE
import CCoroutine
#endif

internal struct AtomicEnum<T: RawRepresentable> where T.RawValue == Int {

private var _value: Int
private var atomic: AtomicInt

init(value: T) {
_value = value.rawValue
@inlinable init(value: T) {
atomic = AtomicInt(value: value.rawValue)
}

var value: T {
get { T(rawValue: _value)! }
set {
withUnsafeMutablePointer(to: &_value) {
__atomicStore(OpaquePointer($0), newValue.rawValue)
}
}
@inlinable var value: T {
get { T(rawValue: atomic.value)! }
set { atomic.value = newValue.rawValue }
}

mutating func update(_ newValue: T) -> T {
withUnsafeMutablePointer(to: &_value) {
T(rawValue: __atomicExchange(OpaquePointer($0), newValue.rawValue))!
}
@inlinable mutating func update(_ newValue: T) -> T {
T(rawValue: atomic.update(newValue.rawValue))!
}

}
55 changes: 55 additions & 0 deletions Sources/SwiftCoroutine/Helpers/AtomicInt.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
//
// AtomicInt.swift
// SwiftCoroutine
//
// Created by Alex Belozierov on 01.04.2020.
// Copyright © 2020 Alex Belozierov. All rights reserved.
//

#if SWIFT_PACKAGE
import CCoroutine
#endif

internal struct AtomicInt {

private var _value: Int

@inlinable init(value: Int) {
_value = value
}

@inlinable var value: Int {
get { _value }
set {
withUnsafeMutablePointer(to: &_value) {
__atomicStore(OpaquePointer($0), newValue)
}
}
}

@inlinable mutating func add(_ value: Int) {
withUnsafeMutablePointer(to: &_value) {
__atomicFetchAdd(OpaquePointer($0), value)
}
}

@inlinable mutating func increase() { add(1) }
@inlinable mutating func decrease() { add(-1) }

@discardableResult @inlinable
mutating func update(_ transform: (Int) -> Int) -> (old: Int, new: Int) {
withUnsafeMutablePointer(to: &_value) {
var oldValue = $0.pointee, newValue: Int
repeat { newValue = transform(oldValue) }
while __atomicCompareExchange(OpaquePointer($0), &oldValue, newValue) == 0
return (oldValue, newValue)
}
}

@inlinable mutating func update(_ newValue: Int) -> Int {
withUnsafeMutablePointer(to: &_value) {
__atomicExchange(OpaquePointer($0), newValue)
}
}

}
6 changes: 6 additions & 0 deletions SwiftCoroutine.xcodeproj/project.pbxproj
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
1A595BF723E872D5009B62E7 /* CoroutineContextPool.swift in Sources */ = {isa = PBXBuildFile; fileRef = 1A595BF323E87298009B62E7 /* CoroutineContextPool.swift */; };
1A595BFB23E8A31F009B62E7 /* CoPromise.swift in Sources */ = {isa = PBXBuildFile; fileRef = 1A595BF823E8A2C7009B62E7 /* CoPromise.swift */; };
1A595BFC23E8A320009B62E7 /* CoPromise.swift in Sources */ = {isa = PBXBuildFile; fileRef = 1A595BF823E8A2C7009B62E7 /* CoPromise.swift */; };
1A64013E2433F0B900E4CECD /* AtomicInt.swift in Sources */ = {isa = PBXBuildFile; fileRef = 1A64013D2433F0B900E4CECD /* AtomicInt.swift */; };
1A64013F2433F0B900E4CECD /* AtomicInt.swift in Sources */ = {isa = PBXBuildFile; fileRef = 1A64013D2433F0B900E4CECD /* AtomicInt.swift */; };
1A65BABB239CE05D004C1716 /* CoroutineContext.swift in Sources */ = {isa = PBXBuildFile; fileRef = 1A65BABA239CE05C004C1716 /* CoroutineContext.swift */; };
1A65BAC2239CEFED004C1716 /* CoroutineContext.swift in Sources */ = {isa = PBXBuildFile; fileRef = 1A65BABA239CE05C004C1716 /* CoroutineContext.swift */; };
1A6910C32421612400C546ED /* CoFutureCombineTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 1A6910C22421612400C546ED /* CoFutureCombineTests.swift */; };
Expand Down Expand Up @@ -140,6 +142,7 @@
1A57BE6423CC867600639FDD /* CoroutineContextTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = CoroutineContextTests.swift; sourceTree = "<group>"; };
1A595BF323E87298009B62E7 /* CoroutineContextPool.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = CoroutineContextPool.swift; sourceTree = "<group>"; };
1A595BF823E8A2C7009B62E7 /* CoPromise.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = CoPromise.swift; sourceTree = "<group>"; };
1A64013D2433F0B900E4CECD /* AtomicInt.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = AtomicInt.swift; sourceTree = "<group>"; };
1A65BABA239CE05C004C1716 /* CoroutineContext.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = CoroutineContext.swift; sourceTree = "<group>"; };
1A6910C22421612400C546ED /* CoFutureCombineTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = CoFutureCombineTests.swift; sourceTree = "<group>"; };
1A71364C24215BBB00DC78FD /* CoFutureAwaitTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = CoFutureAwaitTests.swift; sourceTree = "<group>"; };
Expand Down Expand Up @@ -226,6 +229,7 @@
1A4C6DAC2416AB2C00EF2974 /* FifoQueue.swift */,
1A8129A72417D56700AE061C /* Constants.swift */,
1ABD0672242A368F00A22764 /* AtomicEnum.swift */,
1A64013D2433F0B900E4CECD /* AtomicInt.swift */,
);
path = Helpers;
sourceTree = "<group>";
Expand Down Expand Up @@ -676,6 +680,7 @@
1A739AD023B1555A00165280 /* Coroutine+StackSize.swift in Sources */,
1A595BFB23E8A31F009B62E7 /* CoPromise.swift in Sources */,
1A993208242022D100B3BF32 /* CoFuture3+hashable.swift in Sources */,
1A64013E2433F0B900E4CECD /* AtomicInt.swift in Sources */,
1A2D8D7C241EBD1A00ECC849 /* CoFuture+Combine.swift in Sources */,
1A2D8D82241EBD5700ECC849 /* CoFuturePublisher.swift in Sources */,
1A3583FF23DD796A0086A6E6 /* CoFuture1+whenComplete.swift in Sources */,
Expand Down Expand Up @@ -735,6 +740,7 @@
1A739AD123B1555A00165280 /* Coroutine+StackSize.swift in Sources */,
1A595BFC23E8A320009B62E7 /* CoPromise.swift in Sources */,
1A993209242022D100B3BF32 /* CoFuture3+hashable.swift in Sources */,
1A64013F2433F0B900E4CECD /* AtomicInt.swift in Sources */,
1A2D8D7D241EBD1A00ECC849 /* CoFuture+Combine.swift in Sources */,
1A2D8D83241EBD5700ECC849 /* CoFuturePublisher.swift in Sources */,
1A35840023DD796A0086A6E6 /* CoFuture1+whenComplete.swift in Sources */,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,16 @@
<string>Local Baseline</string>
</dict>
</dict>
<key>testNestetAwaits()</key>
<dict>
<key>com.apple.XCTPerformanceMetric_WallClockTime</key>
<dict>
<key>baselineAverage</key>
<real>0.030667</real>
<key>baselineIntegrationDisplayName</key>
<string>Local Baseline</string>
</dict>
</dict>
<key>testSuspendResume()</key>
<dict>
<key>com.apple.XCTPerformanceMetric_WallClockTime</key>
Expand Down
22 changes: 18 additions & 4 deletions Tests/SwiftCoroutineTests/CoFutureTests/CoFutureAwaitTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,16 @@ class CoFutureAwaitTests: XCTestCase {
}

func testNestetAwaits() {
let queue = DispatchQueue.global()
let queue = DispatchQueue.global(qos: .userInteractive)
let queue2 = DispatchQueue.global(qos: .utility)
let group = DispatchGroup()
measure {
group.enter()
queue.coFuture {
try (0..<100).map { _ in
queue.coFuture {
try (0..<100)
try (0..<100).map { i -> CoFuture<Void> in
let queue = i % 2 == 0 ? queue : queue2
return queue.coFuture {
try (0..<1000)
.map { _ in CoFuture(value: ()) }
.forEach { try $0.await() }
}
Expand All @@ -79,5 +81,17 @@ class CoFutureAwaitTests: XCTestCase {
group.wait()
}
}

func testOnBlockedSerial() {
let exp = expectation(description: "testAbc")
exp.expectedFulfillmentCount = 1000
let serial = DispatchQueue(label: "sdadad")
serial.async { sleep(5) }
for _ in 0..<1000 { serial.startCoroutine { } }
for _ in 0..<1000 {
DispatchQueue.global().startCoroutine { exp.fulfill() }
}
wait(for: [exp], timeout: 3)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import Foundation
class CoFutureCombineTests: XCTestCase {

struct TestError: Error {}
var cancellables = Set<AnyCancellable>()

func testSubscribe() {
let exp = expectation(description: "testSubscribe")
Expand All @@ -36,7 +37,6 @@ class CoFutureCombineTests: XCTestCase {

func testSubscription() {
let exp = expectation(description: "testSubscription")
var cancellables = Set<AnyCancellable>()
let promise = CoPromise<Int>()
promise.publisher()
.map { $0 + 1 }
Expand All @@ -54,7 +54,6 @@ class CoFutureCombineTests: XCTestCase {

func testSubscriptionFail() {
let exp = expectation(description: "testSubscriptionFail")
var cancellables = Set<AnyCancellable>()
let promise = CoPromise<Int>()
promise.publisher()
.sink(receiveCompletion: {
Expand Down
18 changes: 17 additions & 1 deletion Tests/SwiftCoroutineTests/Helpers/TestAtomic.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class TestAtomic: XCTestCase {
case free, blocked
}

func testUpdate() {
func testEnumUpdate() {
var atomic = AtomicEnum(value: State.free), counter = 0
DispatchQueue.concurrentPerform(iterations: 100_000) { _ in
while true {
Expand All @@ -30,4 +30,20 @@ class TestAtomic: XCTestCase {
XCTAssertEqual(counter, 100_000)
}

func testIntUpdate() {
var atomic = AtomicInt(value: 0)
DispatchQueue.concurrentPerform(iterations: 100_000) { _ in
atomic.update { $0 + 1 }
}
XCTAssertEqual(atomic.value, 100_000)
}

func testIntIncrease() {
var atomic = AtomicInt(value: 0)
DispatchQueue.concurrentPerform(iterations: 100_000) { _ in
atomic.increase()
}
XCTAssertEqual(atomic.value, 100_000)
}

}

0 comments on commit 42a0257

Please sign in to comment.