Skip to content

Commit

Permalink
Implement (some of) Room Lifecycle Monitoring spec
Browse files Browse the repository at this point in the history
Implements points CHA-RL4* from same spec as referenced in e70ee44.

In addition to the TODOs added in the code (all of which refer either to
existing GitHub issues or questions on the spec, for which we have #66
as a catch-all issue), I’ve also not done:

- CHA-RL4a2 — I don’t understand the meaning of “has not yet
  successfully managed to attach its Realtime Channel”, asked about it
  in [1]

- CHA-RL4b2 — seems redundant, asked about it in [2]

- CHA-RL4b3, CHA-RL4b4 — seem redundant, asked about it in [3]

- CHA-RL4b5, CHA-RL4b6, CHA-RL4b7 — these relate to transient disconnect
  timeouts, so will do them in #48

Something which I didn’t think about in 25e5052, and which I haven’t
thought about here, is how actor reentrancy (i.e. the fact that when an
actor-isolated method suspends via `await`, another actor-isolated
method can be scheduled instead, which can potentially cause issues like
state updates being interleaved in unexpected ways) might affect the
room lifecycle manager. I would like to first of all implement the whole
thing, specifically all of the spec points that might provide us with a
mutual exclusion mechanism (i.e. the ones that tell us to wait until the
current operation is complete), before assessing the situation. Have
created #75 for this.

Created [4] to address the `@preconcurrency import Ably` introduced by
this commit.

Aside: I have not been consistent with the way that I’ve named the
tests; the existing lifecycle manager test names have a part that
describes the expected side effects. But I haven’t done that here
because some of the spec points tested here have multiple side effects
and the test names would become really long and hard to read. So for
those ones I’ve only described the expected side effects inside the
tests. I think we can live with the inconsistency for now.

Part of #53.

[1] https://github.com/ably/specification/pull/200/files#r1775552624
[2] https://github.com/ably/specification/pull/200/files#r1777212960
[3] https://github.com/ably/specification/pull/200/files#r1777365677
[4] ably/ably-cocoa#1987
  • Loading branch information
lawrence-forooghian committed Oct 8, 2024
1 parent cba991d commit b6c3ddb
Show file tree
Hide file tree
Showing 5 changed files with 496 additions and 5 deletions.
4 changes: 4 additions & 0 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ let package = Package(
name: "Ably",
package: "ably-cocoa"
),
.product(
name: "AsyncAlgorithms",
package: "swift-async-algorithms"
),
]
),
.testTarget(
Expand Down
217 changes: 214 additions & 3 deletions Sources/AblyChat/RoomLifecycleManager.swift
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
import Ably
@preconcurrency import Ably
import AsyncAlgorithms

/// The interface that the lifecycle manager expects its contributing realtime channels to conform to.
///
/// We use this instead of the ``RealtimeChannelProtocol`` interface as its ``attach`` and ``detach`` methods are `async` instead of using callbacks. This makes it easier to write mocks for (since ``RealtimeChannelProtocol`` doesn’t express to the type system that the callbacks it receives need to be `Sendable`, it’s hard to, for example, create a mock that creates a `Task` and then calls the callback from inside this task).
/// We use this instead of the ``RealtimeChannelProtocol`` interface as:
///
/// - its ``attach`` and ``detach`` methods are `async` instead of using callbacks
/// - it uses `AsyncSequence` to emit state changes instead of using callbacks
///
/// This makes it easier to write mocks for (since ``RealtimeChannelProtocol`` doesn’t express to the type system that the callbacks it receives need to be `Sendable`, it’s hard to, for example, create a mock that creates a `Task` and then calls the callback from inside this task).
///
/// We choose to also mark the channel’s mutable state as `async`. This is a way of highlighting at the call site of accessing this state that, since `ARTRealtimeChannel` mutates this state on a separate thread, it’s possible for this state to have changed since the last time you checked it, or since the last time you performed an operation that might have mutated it, or since the last time you recieved an event informing you that it changed. To be clear, marking these as `async` doesn’t _solve_ these issues; it just makes them a bit more visible. We’ll decide how to address them in https://github.com/ably-labs/ably-chat-swift/issues/49.
internal protocol RoomLifecycleContributorChannel: Sendable {
Expand All @@ -11,24 +17,80 @@ internal protocol RoomLifecycleContributorChannel: Sendable {

var state: ARTRealtimeChannelState { get async }
var errorReason: ARTErrorInfo? { get async }

/// Equivalent to subscribing to a `RealtimeChannelProtocol` object’s state changes via its `on(_:)` method. The subscription should use the ``BufferingPolicy.unbounded`` buffering policy.
///
/// It is marked as `async` purely to make it easier to write mocks for this method (i.e. to use an actor as a mock).
func subscribeToState() async -> Subscription<ARTChannelStateChange>
}

/// A realtime channel that contributes to the room lifecycle.
internal protocol RoomLifecycleContributor: Sendable {
///
/// The identity implied by the `Identifiable` conformance must distinguish each of the contributors passed to a given ``RoomLifecycleManager`` instance.
internal protocol RoomLifecycleContributor: Identifiable, Sendable {
associatedtype Channel: RoomLifecycleContributorChannel

/// The room feature that this contributor corresponds to. Used only for choosing which error to throw when a contributor operation fails.
var feature: RoomFeature { get }
var channel: Channel { get }

/// Informs the contributor that there has been a break in channel continuity, which it should inform library users about.
///
/// It is marked as `async` purely to make it easier to write mocks for this method (i.e. to use an actor as a mock).
func emitDiscontinuity(_ error: ARTErrorInfo) async
}

internal actor RoomLifecycleManager<Contributor: RoomLifecycleContributor> {
/// Stores manager state relating to a given contributor.
private struct ContributorAnnotation {
// TODO: Not clear whether there can be multiple or just one (asked in https://github.com/ably/specification/pull/200/files#r1781927850)
var pendingDiscontinuityEvents: [ARTErrorInfo] = []
}

internal private(set) var current: RoomLifecycle
internal private(set) var error: ARTErrorInfo?
// TODO: This currently allows the the tests to inject a value in order to test the spec points that are predicated on whether “a channel lifecycle operation is in progress”. In https://github.com/ably-labs/ably-chat-swift/issues/52 we’ll set this property based on whether there actually is a lifecycle operation in progress.
private let hasOperationInProgress: Bool
/// Manager state that relates to individual contributors, keyed by contributors’ ``Contributor.id``. Stored separately from ``contributors`` so that the latter can be a `let`, to make it clear that the contributors remain fixed for the lifetime of the manager.
private var contributorAnnotations: ContributorAnnotations

/// Provides a `Dictionary`-like interface for storing manager state about individual contributors.
private struct ContributorAnnotations {
private var storage: [Contributor.ID: ContributorAnnotation]

init(contributors: [Contributor]) {
storage = contributors.reduce(into: [:]) { result, contributor in
result[contributor.id] = .init()
}
}

/// It is a programmer error to call this subscript getter with a contributor that was not one of those passed to ``init(contributors:pendingDiscontinuityEvents)``.
subscript(_ contributor: Contributor) -> ContributorAnnotation {
get {
guard let annotation = storage[contributor.id] else {
preconditionFailure("Expected annotation for \(contributor)")
}
return annotation
}

set {
storage[contributor.id] = newValue
}
}

mutating func clearPendingDiscontinuityEvents() {
storage = storage.mapValues { annotation in
var newAnnotation = annotation
newAnnotation.pendingDiscontinuityEvents = []
return newAnnotation
}
}
}

private let logger: InternalLogger
private let clock: SimpleClock
private let contributors: [Contributor]
private var listenForStateChangesTask: Task<Void, Never>!

internal init(
contributors: [Contributor],
Expand All @@ -37,6 +99,7 @@ internal actor RoomLifecycleManager<Contributor: RoomLifecycleContributor> {
) async {
await self.init(
current: nil,
hasOperationInProgress: nil,
contributors: contributors,
logger: logger,
clock: clock
Expand All @@ -46,12 +109,14 @@ internal actor RoomLifecycleManager<Contributor: RoomLifecycleContributor> {
#if DEBUG
internal init(
testsOnly_current current: RoomLifecycle? = nil,
testsOnly_hasOperationInProgress hasOperationInProgress: Bool? = nil,
contributors: [Contributor],
logger: InternalLogger,
clock: SimpleClock
) async {
await self.init(
current: current,
hasOperationInProgress: hasOperationInProgress,
contributors: contributors,
logger: logger,
clock: clock
Expand All @@ -61,16 +126,55 @@ internal actor RoomLifecycleManager<Contributor: RoomLifecycleContributor> {

private init(
current: RoomLifecycle?,
hasOperationInProgress: Bool?,
contributors: [Contributor],
logger: InternalLogger,
clock: SimpleClock
) async {
self.current = current ?? .initialized
self.hasOperationInProgress = hasOperationInProgress ?? false
self.contributors = contributors
contributorAnnotations = .init(contributors: contributors)
self.logger = logger
self.clock = clock

// The idea here is to make sure that, before the initializer completes, we are already listening for state changes, so that e.g. tests don’t miss a state change.
let subscriptions = await withTaskGroup(of: (contributor: Contributor, subscription: Subscription<ARTChannelStateChange>).self) { group in
for contributor in contributors {
group.addTask {
await (contributor: contributor, subscription: contributor.channel.subscribeToState())
}
}

return await Array(group)
}

// CHA-RL4: listen for state changes from our contributors
// TODO: Understand what happens when this task gets cancelled by `deinit`; I’m not convinced that the for-await loops will exit (https://github.com/ably-labs/ably-chat-swift/issues/29)
listenForStateChangesTask = Task {
await withTaskGroup(of: Void.self) { group in
for (contributor, subscription) in subscriptions {
// This `@Sendable` is to make the compiler error "'self'-isolated value of type '() async -> Void' passed as a strongly transferred parameter; later accesses could race" go away. I don’t hugely understand what it means, but given the "'self'-isolated value" I guessed it was something vaguely to do with the fact that `async` actor initializers are actor-isolated and thought that marking it as `@Sendable` would sever this isolation and make the error go away, which it did 🤷. But there are almost certainly consequences that I am incapable of reasoning about with my current level of Swift concurrency knowledge.
group.addTask { @Sendable [weak self] in
for await stateChange in subscription {
await self?.didReceiveStateChange(stateChange, forContributor: contributor)
}
}
}
}
}
}

deinit {
listenForStateChangesTask.cancel()
}

#if DEBUG
internal func testsOnly_pendingDiscontinuityEvents(for contributor: Contributor) -> [ARTErrorInfo] {
contributorAnnotations[contributor].pendingDiscontinuityEvents
}
#endif

// TODO: clean up old subscriptions (https://github.com/ably-labs/ably-chat-swift/issues/36)
private var subscriptions: [Subscription<RoomStatusChange>] = []

Expand All @@ -80,6 +184,113 @@ internal actor RoomLifecycleManager<Contributor: RoomLifecycleContributor> {
return subscription
}

#if DEBUG
// TODO: clean up old subscriptions (https://github.com/ably-labs/ably-chat-swift/issues/36)
/// Supports the ``testsOnly_subscribeToHandledContributorStateChanges()`` method.
private var stateChangeHandledSubscriptions: [Subscription<ARTChannelStateChange>] = []

/// Returns a subscription which emits the contributor state changes that have been handled by the manager.
///
/// A contributor state change is considered handled once the manager has performed all of the side effects that it will perform as a result of receiving this state change. Specifically, once:
///
/// - the manager has recorded all pending discontinuity events provoked by the state change (you can retrieve these using ``testsOnly_pendingDiscontinuityEventsForContributor(at:)``)
/// - the manager has performed all status changes provoked by the state change
/// - the manager has performed all contributor actions provoked by the state change, namely calls to ``RoomLifecycleContributorChannel.detach()`` or ``RoomLifecycleContributor.emitDiscontinuity(_:)``
internal func testsOnly_subscribeToHandledContributorStateChanges() -> Subscription<ARTChannelStateChange> {
let subscription = Subscription<ARTChannelStateChange>(bufferingPolicy: .unbounded)
stateChangeHandledSubscriptions.append(subscription)
return subscription
}
#endif

/// Implements CHA-RL4b’s contributor state change handling.
private func didReceiveStateChange(_ stateChange: ARTChannelStateChange, forContributor contributor: Contributor) async {
logger.log(message: "Got state change \(stateChange) for contributor \(contributor)", level: .info)

// TODO: The spec, which is written for a single-threaded environment, is presumably operating on the assumption that the channel is currently in the state given by `stateChange.current` (https://github.com/ably-labs/ably-chat-swift/issues/49)
switch stateChange.event {
case .update:
// CHA-RL4a1 — if RESUMED then no-op
guard !stateChange.resumed else {
break
}

guard let reason = stateChange.reason else {
// TODO: Decide the right thing to do here (https://github.com/ably-labs/ably-chat-swift/issues/74)
preconditionFailure("State change event with resumed == false should have a reason")
}

if hasOperationInProgress {
// CHA-RL4a3
logger.log(message: "Recording pending discontinuity event for contributor \(contributor)", level: .info)

contributorAnnotations[contributor].pendingDiscontinuityEvents.append(reason)
} else {
// CHA-RL4a4
logger.log(message: "Emitting discontinuity event for contributor \(contributor)", level: .info)

await contributor.emitDiscontinuity(reason)
}
case .attached:
if hasOperationInProgress {
if !stateChange.resumed {
// CHA-RL4b1
logger.log(message: "Recording pending discontinuity event for contributor \(contributor)", level: .info)

guard let reason = stateChange.reason else {
// TODO: Decide the right thing to do here (https://github.com/ably-labs/ably-chat-swift/issues/74)
preconditionFailure("State change event with resumed == false should have a reason")
}

contributorAnnotations[contributor].pendingDiscontinuityEvents.append(reason)
}
} else if current != .attached {
if await (contributors.async.map { await $0.channel.state }.allSatisfy { $0 == .attached }) {
// CHA-RL4b8
logger.log(message: "Now that all contributors are ATTACHED, transitioning room to ATTACHED", level: .info)
changeStatus(to: .attached)
}
}
case .failed:
if !hasOperationInProgress {
// CHA-RL4b5
guard let reason = stateChange.reason else {
// TODO: Decide the right thing to do here (https://github.com/ably-labs/ably-chat-swift/issues/74)
preconditionFailure("FAILED state change event should have a reason")
}

changeStatus(to: .failed, error: reason)

// TODO: CHA-RL4b5 is a bit unclear about how to handle failure, and whether they can be detached concurrently (asked in https://github.com/ably/specification/pull/200/files#r1777471810)
for contributor in contributors {
do {
try await contributor.channel.detach()
} catch {
logger.log(message: "Failed to detach contributor \(contributor), error \(error)", level: .info)
}
}
}
case .suspended:
if !hasOperationInProgress {
// CHA-RL4b9
guard let reason = stateChange.reason else {
// TODO: Decide the right thing to do here (https://github.com/ably-labs/ably-chat-swift/issues/74)
preconditionFailure("SUSPENDED state change event should have a reason")
}

changeStatus(to: .suspended, error: reason)
}
default:
break
}

#if DEBUG
for subscription in stateChangeHandledSubscriptions {
subscription.emit(stateChange)
}
#endif
}

/// Updates ``current`` and ``error`` and emits a status change event.
private func changeStatus(to new: RoomLifecycle, error: ARTErrorInfo? = nil) {
logger.log(message: "Transitioning from \(current) to \(new), error \(String(describing: error))", level: .info)
Expand Down
7 changes: 7 additions & 0 deletions Tests/AblyChatTests/Mocks/MockRoomLifecycleContributor.swift
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
import Ably
@testable import AblyChat

actor MockRoomLifecycleContributor: RoomLifecycleContributor {
nonisolated let feature: RoomFeature
nonisolated let channel: MockRoomLifecycleContributorChannel

private(set) var emitDiscontinuityArguments: [ARTErrorInfo] = []

init(feature: RoomFeature, channel: MockRoomLifecycleContributorChannel) {
self.feature = feature
self.channel = channel
}

func emitDiscontinuity(_ error: ARTErrorInfo) async {
emitDiscontinuityArguments.append(error)
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import Ably
@preconcurrency import Ably
@testable import AblyChat

final actor MockRoomLifecycleContributorChannel: RoomLifecycleContributorChannel {
Expand All @@ -7,6 +7,8 @@ final actor MockRoomLifecycleContributorChannel: RoomLifecycleContributorChannel

var state: ARTRealtimeChannelState
var errorReason: ARTErrorInfo?
// TODO: clean up old subscriptions (https://github.com/ably-labs/ably-chat-swift/issues/36)
private var subscriptions: [Subscription<ARTChannelStateChange>] = []

private(set) var attachCallCount = 0
private(set) var detachCallCount = 0
Expand Down Expand Up @@ -92,4 +94,16 @@ final actor MockRoomLifecycleContributorChannel: RoomLifecycleContributorChannel
throw error
}
}

func subscribeToState() -> Subscription<ARTChannelStateChange> {
let subscription = Subscription<ARTChannelStateChange>(bufferingPolicy: .unbounded)
subscriptions.append(subscription)
return subscription
}

func emitStateChange(_ stateChange: ARTChannelStateChange) {
for subscription in subscriptions {
subscription.emit(stateChange)
}
}
}
Loading

0 comments on commit b6c3ddb

Please sign in to comment.