Skip to content

Commit

Permalink
Refactor dependency handling to be async and non-blocking
Browse files Browse the repository at this point in the history
  • Loading branch information
ffried committed Sep 30, 2020
1 parent 212e0f7 commit 8e6ed34
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 32 deletions.
92 changes: 60 additions & 32 deletions Sources/GCDCoreOperations/Base Classes/Operation.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@ import Dispatch
/// Finally, an operation can be observed using `OperationObserver`.
///
/// Be aware, that once an operation has been enqueued, it should not be modified directly in terms of adding dependencies, conditions or observers.
open class Operation {
private final lazy var startItem: DispatchWorkItem! = DispatchWorkItem(block: self.run)
private final let finishSignal = DispatchSemaphore(value: 0)

open class Operation: CustomStringConvertible, CustomDebugStringConvertible {
@Synchronized
internal final var state: State = .created

Expand All @@ -37,6 +34,10 @@ open class Operation {
/// The list of dependencies of this operation.
public final var dependencies: ContiguousArray<Operation> { _dependencies }

@Synchronized
private final var waiters: Array<DispatchGroup> = []
private let dependenciesGroup = DispatchGroup()

/// The list of errors this operation encountered.
public private(set) final var errors: [Error] = []

Expand All @@ -45,6 +46,16 @@ open class Operation {
public final var isCancelled: Bool { state.isCancelled }
/// Whether or not this operation has finished. This will also return `true` if the operation has been cancelled.
public final var isFinished: Bool { state.isFinished }

/// inherited
public var description: String {
"\(Self.self)(state: \(state))"
}

/// inherited
public var debugDescription: String {
"\(Self.self)(state: \(state), no. dependencies: \(dependencies.count), no. waiters: \(waiters.count))"
}

// MARK: - Init
/// Creates a new operation.
Expand All @@ -55,6 +66,7 @@ open class Operation {
/// - Parameter dep: The dependency to add.
/// - Precondition: This must not be called after the operation has been enqueued.
public final func addDependency(_ dep: Operation) {
assert(dep !== self, "Adding itself as dependency will lead to deadlocks!")
__dependencies.coordinated(with: _state) { deps, state in
assert(state.isCancelled || state < .waitingForDependencies, "Can't modify dependencies after execution has begun!")
deps.append(dep)
Expand Down Expand Up @@ -121,7 +133,7 @@ open class Operation {
/// - SeeAlso: `aggregate(errors:)`
@inlinable
public final func aggregate(error: Error) {
aggregate(errors: error)
aggregate(errors: CollectionOfOne(error))
}

// MARK: - Produce Operation
Expand All @@ -142,37 +154,53 @@ open class Operation {
}
guard !isCancelled else { return }
if let group = group {
queue.async(group: group, execute: startItem)
queue.async(group: group, execute: run)
} else {
queue.async(execute: startItem)
queue.async(execute: run)
}
}

private final func run() {
guard !_state.setUnlessCancelled(to: .waitingForDependencies,
assertion: { assert($0 == .enqueued, "\(#function) called without the Operation being enqueued!") })
else { return }
waitForDependencies()

guard !_state.setUnlessCancelled(to: .evaluatingConditions) else { return }
evaluateConditions {
guard !self.isCancelled else { return }
self._state.withValue { $0 = .running }
self.observers.operationDidStart(self)
self.execute()
waitForDependencies {
guard !self._state.setUnlessCancelled(to: .evaluatingConditions) else { return }
self.evaluateConditions {
guard !self._state.setUnlessCancelled(to: .running) else { return }
self.observers.operationDidStart(self)
self.execute()
}
}
}

private final func addWaiter(_ group: DispatchGroup) {
group.enter()
let hasFinished: Bool = _state.coordinated(with: _waiters) {
guard !$0.isFinished else { return true }
$1.append(group)
return false
}
if hasFinished { group.leave() }
}

private final func removeWaiter(_ group: DispatchGroup) {
let hasFinished: Bool = _state.coordinated(with: _waiters) {
guard !$0.isFinished else { return true }
$1.removeAll { $0 === group }
return false
}
if !hasFinished { group.leave() }
}

private final func waitForDependencies() {
private final func waitForDependencies(completion: @escaping () -> ()) {
assert(_state.isCancelled(or: .waitingForDependencies), "Incorrect state for \(#function)!")
for dependency in dependencies {
while case .timedOut = dependency.finishSignal.wait(timeout: .now()), !isCancelled {
continue
}
if isCancelled {
break
}
__dependencies.withValue {
// close the lock so that if we're cancelled,
// we first add all waiters before removing them.
$0.forEach { $0.addWaiter(dependenciesGroup) }
}
dependenciesGroup.notify(queue: queue ?? .global()) { completion() }
}

private final func evaluateConditions(completion: @escaping () -> ()) {
Expand Down Expand Up @@ -220,9 +248,9 @@ open class Operation {

internal func cleanup() {
// Cleanup to prevent any retain cycles
startItem = nil
queue = nil
observers.removeAll()
_waiters.withValue { $0.removeAll() }
}

private final func finish<Errors>(cancelled: Bool, errors errs: Errors)
Expand All @@ -231,26 +259,26 @@ open class Operation {
assert(cancelled || state > .enqueued, "Finishing Operation that was never enqueued!")
guard !state.isFinished else { return }

_state.withValue { $0 = .finishing(cancelled: cancelled) }

if _state.exchange(with: .finishing(cancelled: cancelled)) == .waitingForDependencies {
__dependencies.withValue {
$0.forEach { $0.removeWaiter(dependenciesGroup) }
}
}
if cancelled {
handleCancellation()
} else {
handleFinishing()
}
aggregate(errors: errs)

guard !state.isFinished else { return }

aggregate(errors: errs)
_state.withValue { $0 = .finished(cancelled: cancelled) }
_state.exchange(with:.finished(cancelled: cancelled))

didFinish(wasCancelled: cancelled, errors: errors)
observers.operationDidFinish(self, wasCancelled: cancelled, errors: errors)

if cancelled {
startItem.cancel()
}
finishSignal.signal()
waiters.forEach { $0.leave() }

cleanup()
}
Expand Down
8 changes: 8 additions & 0 deletions Sources/GCDCoreOperations/Helpers/Synchronized.swift
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ final class Synchronized<Value> {
return try accessQueue.sync(flags: .barrier) { try work(&_wrappedValue) }
}

@discardableResult
func exchange(with newValue: Value) -> Value {
withValue { currentVal in
defer { currentVal = newValue }
return currentVal
}
}

func coordinated<OtherValue, T>(with other: Synchronized<OtherValue>, do work: (inout Value, inout OtherValue) throws -> T) rethrows -> T {
dispatchPrecondition(condition: .notOnQueue(accessQueue))
return try accessQueue.sync(flags: .barrier) {
Expand Down
13 changes: 13 additions & 0 deletions Tests/GCDCoreOperationsTests/OperationTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,19 @@ final class OperationTests: XCTestCase {
// Put teardown code here. This method is called after the invocation of each test method in the class.
super.tearDown()
}

func testDescription() {
final class TestOp: GCDOperation {}

let op1 = GCDOperation()
let op2 = TestOp()
op1.addDependency(op2)

XCTAssertEqual(String(describing: op1), "\(GCDOperation.self)(state: \(op1.state))")
XCTAssertEqual(String(describing: op2), "\(TestOp.self)(state: \(op2.state))")
XCTAssertEqual(String(reflecting: op1), "\(GCDOperation.self)(state: \(op1.state), no. dependencies: 1, no. waiters: 0)")
XCTAssertEqual(String(reflecting: op2), "\(TestOp.self)(state: \(op2.state), no. dependencies: 0, no. waiters: 0)")
}

func testSimpleExecution() {
let expectation = self.expectation(description: "Waiting for Operation to execute...")
Expand Down

0 comments on commit 8e6ed34

Please sign in to comment.