From 45a31b6f1cf505bbd6fd8b9b125d04a239a23786 Mon Sep 17 00:00:00 2001 From: Gustavo Cairo Date: Wed, 13 Sep 2023 14:37:09 +0100 Subject: [PATCH] Add async gracefulShutdown (#158) --- .../AsyncGracefulShutdownSequence.swift | 18 ++----- .../ServiceLifecycle/CancellationWaiter.swift | 50 +++++++++++++++++++ .../ServiceLifecycle/GracefulShutdown.swift | 20 +++++++- Sources/ServiceLifecycle/ServiceGroup.swift | 10 ++-- .../GracefulShutdownTests.swift | 50 +++++++++++++++++++ 5 files changed, 129 insertions(+), 19 deletions(-) create mode 100644 Sources/ServiceLifecycle/CancellationWaiter.swift diff --git a/Sources/ServiceLifecycle/AsyncGracefulShutdownSequence.swift b/Sources/ServiceLifecycle/AsyncGracefulShutdownSequence.swift index 4c398fe..653aad1 100644 --- a/Sources/ServiceLifecycle/AsyncGracefulShutdownSequence.swift +++ b/Sources/ServiceLifecycle/AsyncGracefulShutdownSequence.swift @@ -15,6 +15,8 @@ /// An async sequence that emits an element once graceful shutdown has been triggered. /// /// This sequence is a broadcast async sequence and will only produce one value and then finish. +/// +/// - Note: This sequence respects cancellation and thus is `throwing`. @usableFromInline struct AsyncGracefulShutdownSequence: AsyncSequence, Sendable { @usableFromInline @@ -34,19 +36,9 @@ struct AsyncGracefulShutdownSequence: AsyncSequence, Sendable { init() {} @inlinable - func next() async -> Element? { - var cont: AsyncStream.Continuation! - let stream = AsyncStream { cont = $0 } - let continuation = cont! - - return await withTaskGroup(of: Void.self) { _ in - await withGracefulShutdownHandler { - await stream.first { _ in true } - } onGracefulShutdown: { - continuation.yield(()) - continuation.finish() - } - } + func next() async throws -> Element? { + try await CancellationWaiter().wait() + return () } } } diff --git a/Sources/ServiceLifecycle/CancellationWaiter.swift b/Sources/ServiceLifecycle/CancellationWaiter.swift new file mode 100644 index 0000000..d09a215 --- /dev/null +++ b/Sources/ServiceLifecycle/CancellationWaiter.swift @@ -0,0 +1,50 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftServiceLifecycle open source project +// +// Copyright (c) 2023 Apple Inc. and the SwiftServiceLifecycle project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftServiceLifecycle project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +/// An actor that provides a function to wait on cancellation/graceful shutdown. +@usableFromInline +actor CancellationWaiter { + private var taskContinuation: CheckedContinuation? + + @usableFromInline + init() {} + + @usableFromInline + func wait() async throws { + try await withTaskCancellationHandler { + try await withGracefulShutdownHandler { + try await withCheckedThrowingContinuation { continuation in + self.taskContinuation = continuation + } + } onGracefulShutdown: { + Task { + await self.finish() + } + } + } onCancel: { + Task { + await self.finish(throwing: CancellationError()) + } + } + } + + private func finish(throwing error: Error? = nil) { + if let error { + self.taskContinuation?.resume(throwing: error) + } else { + self.taskContinuation?.resume() + } + self.taskContinuation = nil + } +} diff --git a/Sources/ServiceLifecycle/GracefulShutdown.swift b/Sources/ServiceLifecycle/GracefulShutdown.swift index d8b9671..76b8838 100644 --- a/Sources/ServiceLifecycle/GracefulShutdown.swift +++ b/Sources/ServiceLifecycle/GracefulShutdown.swift @@ -55,6 +55,16 @@ public func withGracefulShutdownHandler( return try await operation() } +/// Waits until graceful shutdown is triggered. +/// +/// This method suspends the caller until graceful shutdown is triggered. If the calling task is cancelled before +/// graceful shutdown is triggered then this method will throw a `CancellationError`. +/// +/// - Throws: `CancellationError` if the task is cancelled. +public func gracefulShutdown() async throws { + try await AsyncGracefulShutdownSequence().first { _ in true } +} + /// This is just a helper type for the result of our task group. enum ValueOrGracefulShutdown: Sendable { case value(T) @@ -72,7 +82,7 @@ public func cancelOnGracefulShutdown(_ operation: @Sendable @escapi } group.addTask { - for await _ in AsyncGracefulShutdownSequence() { + for try await _ in AsyncGracefulShutdownSequence() { return .gracefulShutdown } @@ -138,6 +148,8 @@ public final class GracefulShutdownManager: @unchecked Sendable { fileprivate var handlerCounter: UInt64 = 0 /// A boolean indicating if we have been shutdown already. fileprivate var isShuttingDown = false + /// Continuations to resume after all of the handlers have been executed. + fileprivate var gracefulShutdownFinishedContinuations = [CheckedContinuation]() } private let state = LockedValueBox(State()) @@ -191,6 +203,12 @@ public final class GracefulShutdownManager: @unchecked Sendable { } state.handlers.removeAll() + + for continuation in state.gracefulShutdownFinishedContinuations { + continuation.resume() + } + + state.gracefulShutdownFinishedContinuations.removeAll() } } } diff --git a/Sources/ServiceLifecycle/ServiceGroup.swift b/Sources/ServiceLifecycle/ServiceGroup.swift index 238bbe3..0dc0227 100644 --- a/Sources/ServiceLifecycle/ServiceGroup.swift +++ b/Sources/ServiceLifecycle/ServiceGroup.swift @@ -193,7 +193,7 @@ public actor ServiceGroup: Sendable { // Using a result here since we want a task group that has non-throwing child tasks // but the body itself is throwing - let result = await withTaskGroup(of: ChildTaskResult.self, returning: Result.self) { group in + let result = try await withThrowingTaskGroup(of: ChildTaskResult.self, returning: Result.self) { group in // First we have to register our signals. let gracefulShutdownSignals = await UnixSignalsSequence(trapping: self.gracefulShutdownSignals) let cancellationSignals = await UnixSignalsSequence(trapping: self.cancellationSignals) @@ -228,7 +228,7 @@ public actor ServiceGroup: Sendable { // This is an optional task that listens to graceful shutdowns from the parent task if let _ = TaskLocals.gracefulShutdownManager { group.addTask { - for await _ in AsyncGracefulShutdownSequence() { + for try await _ in AsyncGracefulShutdownSequence() { return .gracefulShutdownCaught } @@ -276,7 +276,7 @@ public actor ServiceGroup: Sendable { // We are going to wait for any of the services to finish or // the signal sequence to throw an error. while !group.isEmpty { - let result: ChildTaskResult? = await group.next() + let result: ChildTaskResult? = try await group.next() switch result { case .serviceFinished(let service, let index): @@ -452,7 +452,7 @@ public actor ServiceGroup: Sendable { private func shutdownGracefully( services: [ServiceGroupConfiguration.ServiceConfiguration?], - group: inout TaskGroup, + group: inout ThrowingTaskGroup, gracefulShutdownManagers: [GracefulShutdownManager] ) async throws { guard case .running = self.state else { @@ -481,7 +481,7 @@ public actor ServiceGroup: Sendable { gracefulShutdownManager.shutdownGracefully() - let result = await group.next() + let result = try await group.next() switch result { case .serviceFinished(let service, let index): diff --git a/Tests/ServiceLifecycleTests/GracefulShutdownTests.swift b/Tests/ServiceLifecycleTests/GracefulShutdownTests.swift index 53d7731..13e082d 100644 --- a/Tests/ServiceLifecycleTests/GracefulShutdownTests.swift +++ b/Tests/ServiceLifecycleTests/GracefulShutdownTests.swift @@ -253,4 +253,54 @@ final class GracefulShutdownTests: XCTestCase { XCTAssertTrue(Task.isShuttingDownGracefully) } } + + func testWaitForGracefulShutdown() async throws { + try await testGracefulShutdown { gracefulShutdownTestTrigger in + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + try await Task.sleep(for: .milliseconds(10)) + gracefulShutdownTestTrigger.triggerGracefulShutdown() + } + + try await withGracefulShutdownHandler { + try await gracefulShutdown() + } onGracefulShutdown: { + // No-op + } + + try await group.waitForAll() + } + } + } + + func testWaitForGracefulShutdown_WhenAlreadyShutdown() async throws { + try await testGracefulShutdown { gracefulShutdownTestTrigger in + gracefulShutdownTestTrigger.triggerGracefulShutdown() + + try await withGracefulShutdownHandler { + try await Task.sleep(for: .milliseconds(10)) + try await gracefulShutdown() + } onGracefulShutdown: { + // No-op + } + } + } + + func testWaitForGracefulShutdown_Cancellation() async throws { + do { + try await testGracefulShutdown { _ in + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + try await gracefulShutdown() + } + + group.cancelAll() + try await group.waitForAll() + } + } + XCTFail("Expected CancellationError to be thrown") + } catch { + XCTAssertTrue(error is CancellationError) + } + } }