Skip to content

Commit

Permalink
tech(observation): Enqueue modifications in ObservationRegistry (#54)
Browse files Browse the repository at this point in the history
  • Loading branch information
pjechris authored Aug 8, 2023
1 parent 5a87c9e commit 8779ade
Show file tree
Hide file tree
Showing 10 changed files with 196 additions and 142 deletions.
5 changes: 3 additions & 2 deletions Sources/CohesionKit/Combine/EntityObserver+Publisher.swift
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ import Combine
extension Observer {
/// A `Publisher` emitting the observer current value and subscribing to any subsequents new values
public var asPublisher: AnyPublisher<T, Never> {
let subject = CurrentValueSubject<T, Never>(value)
let subject = CurrentValueSubject<T?, Never>(nil)
let subscription = observe(onChange: subject.send)

return subject
.compactMap { $0 }
.handleEvents(receiveCancel: { subscription.unsubscribe() })
.eraseToAnyPublisher()
}
Expand Down
63 changes: 28 additions & 35 deletions Sources/CohesionKit/Identity/IdentityStore.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@ public class IdentityMap {
private(set) var refAliases: AliasStorage = [:]
private lazy var storeVisitor = IdentityMapStoreVisitor(identityMap: self)


/// Create a new IdentityMap instance optionally with a queue and a logger
/// - Parameter queue: the queue on which to receive updates. If not defined it default to main
/// - Parameter logger: a logger to follow/debug identity internal state
public init(queue: DispatchQueue = .main, logger: Logger? = nil) {
public convenience init(queue: DispatchQueue = .main, logger: Logger? = nil) {
self.init(registry: ObserverRegistry(queue: queue), logger: logger)
}

init(registry: ObserverRegistry, logger: Logger? = nil) {
self.logger = logger
self.registry = ObserverRegistry(queue: queue)
self.registry = registry
}

/// Store an entity in the storage. Entity will be stored only if stamp (`modifiedAt`) is higher than in previous
Expand All @@ -36,7 +39,7 @@ public class IdentityMap {
modifiedAt: Stamp? = nil,
ifPresent update: Update<T>? = nil
) -> EntityObserver<T> {
identityQueue.sync(flags: .barrier) {
transaction {
var entity = entity

if storage[entity] != nil {
Expand All @@ -50,8 +53,6 @@ public class IdentityMap {
logger?.didRegisterAlias(alias)
}

self.registry.postNotifications()

return EntityObserver(node: node, registry: registry)
}
}
Expand All @@ -70,7 +71,7 @@ public class IdentityMap {
modifiedAt: Stamp? = nil,
ifPresent update: Update<T>? = nil
) -> EntityObserver<T> {
identityQueue.sync(flags: .barrier) {
transaction {
var entity = entity

if storage[entity] != nil {
Expand All @@ -84,42 +85,36 @@ public class IdentityMap {
logger?.didRegisterAlias(alias)
}

self.registry.postNotifications()

return EntityObserver(node: node, registry: registry)
}
}

/// Store multiple entities at once
public func store<C: Collection>(entities: C, named: AliasKey<C>? = nil, modifiedAt: Stamp? = nil)
-> [EntityObserver<C.Element>] where C.Element: Identifiable {
identityQueue.sync(flags: .barrier) {
transaction {
let nodes = entities.map { nodeStore(entity: $0, modifiedAt: modifiedAt) }

if let alias = named {
refAliases.insert(nodes, key: alias)
logger?.didRegisterAlias(alias)
}

self.registry.postNotifications()

return nodes.map { EntityObserver(node: $0, registry: registry) }
}
}

/// store multiple aggregates at once
public func store<C: Collection>(entities: C, named: AliasKey<C>? = nil, modifiedAt: Stamp? = nil)
-> [EntityObserver<C.Element>] where C.Element: Aggregate {
identityQueue.sync(flags: .barrier) {
transaction {
let nodes = entities.map { nodeStore(entity: $0, modifiedAt: modifiedAt) }

if let alias = named {
refAliases.insert(nodes, key: alias)
logger?.didRegisterAlias(alias)
}

self.registry.postNotifications()

return nodes.map { EntityObserver(node: $0, registry: registry) }
}
}
Expand Down Expand Up @@ -156,7 +151,7 @@ public class IdentityMap {

func nodeStore<T: Identifiable>(entity: T, modifiedAt: Stamp?) -> EntityNode<T> {
let node = storage[entity, new: EntityNode(entity, modifiedAt: nil) { [registry] in
registry.postNotification(for: $0)
registry.enqueueChange(for: $0)
}]

do {
Expand All @@ -172,7 +167,7 @@ public class IdentityMap {

func nodeStore<T: Aggregate>(entity: T, modifiedAt: Stamp?) -> EntityNode<T> {
let node = storage[entity, new: EntityNode(entity, modifiedAt: nil) { [registry] in
registry.postNotification(for: $0)
registry.enqueueChange(for: $0)
}]

// disable changes while doing the entity update
Expand All @@ -198,6 +193,16 @@ public class IdentityMap {
return node
}

private func transaction<T>(_ body: () -> T) -> T {
identityQueue.sync(flags: .barrier) {
let returnValue = body()

self.registry.postChanges()

return returnValue
}
}

}

// MARK: Update
Expand All @@ -211,7 +216,7 @@ extension IdentityMap {
/// - Returns: true if entity exists and might be updated, false otherwise. The update might **not** be applied if modifiedAt is too old
@discardableResult
public func update<T: Identifiable>(_ type: T.Type, id: T.ID, modifiedAt: Stamp? = nil, update: Update<T>) -> Bool {
identityQueue.sync(flags: .barrier) {
transaction {
guard var entity = storage[T.self, id: id]?.ref.value else {
return false
}
Expand All @@ -220,8 +225,6 @@ extension IdentityMap {

_ = nodeStore(entity: entity, modifiedAt: modifiedAt)

self.registry.postNotifications()

return true
}
}
Expand All @@ -234,7 +237,7 @@ extension IdentityMap {
/// - Returns: true if entity exists and might be updated, false otherwise. The update might **not** be applied if modifiedAt is too old
@discardableResult
public func update<T: Aggregate>(_ type: T.Type, id: T.ID, modifiedAt: Stamp? = nil, _ update: Update<T>) -> Bool {
identityQueue.sync(flags: .barrier) {
transaction {
guard var entity = storage[T.self, id: id]?.ref.value else {
return false
}
Expand All @@ -243,8 +246,6 @@ extension IdentityMap {

_ = nodeStore(entity: entity, modifiedAt: modifiedAt)

self.registry.postNotifications()

return true
}
}
Expand All @@ -255,7 +256,7 @@ extension IdentityMap {
/// - Returns: true if entity exists and might be updated, false otherwise. The update might **not** be applied if modifiedAt is too old
@discardableResult
public func update<T: Identifiable>(named: AliasKey<T>, modifiedAt: Stamp? = nil, update: Update<T>) -> Bool {
identityQueue.sync(flags: .barrier) {
transaction {
guard let entity = refAliases[named].value else {
return false
}
Expand All @@ -267,8 +268,6 @@ extension IdentityMap {
// ref might have changed
refAliases.insert(node, key: named)

self.registry.postNotifications()

return true
}
}
Expand All @@ -279,7 +278,7 @@ extension IdentityMap {
/// - Returns: true if entity exists and might be updated, false otherwise. The update might **not** be applied if modifiedAt is too old
@discardableResult
public func update<T: Aggregate>(named: AliasKey<T>, modifiedAt: Stamp? = nil, update: Update<T>) -> Bool {
identityQueue.sync(flags: .barrier) {
transaction {
guard let entity = refAliases[named].value else {
return false
}
Expand All @@ -291,8 +290,6 @@ extension IdentityMap {
// ref might have changed
refAliases.insert(node, key: named)

self.registry.postNotifications()

return true
}
}
Expand All @@ -304,7 +301,7 @@ extension IdentityMap {
@discardableResult
public func update<C: Collection>(named: AliasKey<C>, modifiedAt: Stamp? = nil, update: Update<[C.Element]>)
-> Bool where C.Element: Identifiable {
identityQueue.sync(flags: .barrier) {
transaction {
guard let entities = refAliases[named].value else {
return false
}
Expand All @@ -317,8 +314,6 @@ extension IdentityMap {
// update alias because `update` may have added/removed entities
refAliases.insert(nodes, key: named)

self.registry.postNotifications()

return true
}
}
Expand All @@ -330,7 +325,7 @@ extension IdentityMap {
@discardableResult
public func update<C: Collection>(named: AliasKey<C>, modifiedAt: Stamp? = nil, update: Update<[C.Element]>)
-> Bool where C.Element: Aggregate {
identityQueue.sync(flags: .barrier) {
transaction {
guard let entities = refAliases[named].value else {
return false
}
Expand All @@ -343,8 +338,6 @@ extension IdentityMap {
// update alias because `update` may have added/removed entities
refAliases.insert(nodes, key: named)

self.registry.postNotifications()

return true
}
}
Expand Down
51 changes: 25 additions & 26 deletions Sources/CohesionKit/Observer/ObserverRegistry.swift
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
import Foundation

/// Registers observers associated to an ``EntityNode``.
/// The registry will handle notifying observers when a node is marked as changed
class ObserverRegistry {
typealias Observer = (Any) -> Void
private typealias ObserverID = Int
private typealias EntityNodeKey = Int
private typealias Hash = Int

let queue: DispatchQueue
/// registered observers per node
private var observers: [EntityNodeKey: [ObserverID: Observer]] = [:]
/// registered observers
private var observers: [Hash: [ObserverID: Observer]] = [:]
/// next available id for an observer
private var nextObserverID: ObserverID = 0
/// nodes waiting for notifiying their observes about changes
private var pendingChangedNodes: Set<AnyHashable> = []
private var pendingChanges: [Hash: AnyWeak] = [:]

init(queue: DispatchQueue) {
self.queue = queue
Expand All @@ -23,11 +25,11 @@ class ObserverRegistry {
let observerID = generateID()

observers[node.hashValue, default: [:]][observerID] = {
guard let newValue = $0 as? T else {
guard let newValue = $0 as? EntityNode<T> else {
return
}

onChange(newValue)
onChange(newValue.ref.value)
}

// subscription keeps a strong ref to node, avoiding it from being released somehow while suscription is running
Expand All @@ -36,34 +38,31 @@ class ObserverRegistry {
}
}

func postNotification<T>(for node: EntityNode<T>) {
self.observers[node.hashValue]?.forEach { (_, observer) in
observer(node.value)
}
/// Mark a node as changed. Observers won't be notified of the change until ``postChanges`` is called
func enqueueChange<T>(for node: EntityNode<T>) {
pendingChanges[node.hashValue] = Weak(value: node)
}

/// Queue a notification for given node. Notification won't be sent until ``postNotifications`` is called
func enqueueNotification<T>(for node: EntityNode<T>) {
pendingChangedNodes.insert(AnyHashable(node))
func hasPendingChange<T>(for node: EntityNode<T>) -> Bool {
pendingChanges[node.hashValue] != nil
}

/// Notify observers of all queued changes. Once notified pending changes are cleared out.
func postNotifications() {
/// keep notifications as-is when queue was triggered
queue.async { [weak self] in
guard let self else {
return
}
func postChanges() {
let changes = pendingChanges
// let observers = self.observers

let changes = self.pendingChangedNodes
self.pendingChanges = [:]

self.pendingChangedNodes = []

for hash in changes {
let node = hash.base as! AnyEntityNode
queue.async { [unowned self] in
for (hashKey, weakNode) in changes {
// node was released: no one to notify
guard let node = weakNode.unwrap() else {
continue
}

self.observers[hash.hashValue]?.forEach { (_, observer) in
observer(node.value)
self.observers[hashKey]?.forEach { (_, observer) in
observer(node)
}
}
}
Expand Down
18 changes: 8 additions & 10 deletions Sources/CohesionKit/Storage/EntityNode.swift
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,22 @@ class EntityNode<T>: AnyEntityNode {

/// An observable entity reference
let ref: Observable<T>
private var subscription: Subscription?

private let onChange: ((EntityNode<T>) -> Void)?
/// last time the ref.value was changed. Any subsequent change must have a higher value to be applied
/// if nil ref has no stamp and any change will be accepted
private var modifiedAt: Stamp?
/// entity children
private(set) var children: [PartialKeyPath<T>: SubscribedChild] = [:]

init(ref: Observable<T>, modifiedAt: Stamp?, onRefChange: ((EntityNode<T>) -> Void)? = nil) {
init(ref: Observable<T>, modifiedAt: Stamp?, onChange: ((EntityNode<T>) -> Void)? = nil) {
self.ref = ref
self.modifiedAt = modifiedAt

if let onRefChange {
self.subscription = ref.addObserver { [unowned self] _ in
onRefChange(self)
}
}
self.onChange = onChange
}

convenience init(_ entity: T, modifiedAt: Stamp?, onRefChange: ((EntityNode<T>) -> Void)? = nil) {
self.init(ref: Observable(value: entity), modifiedAt: modifiedAt, onRefChange: onRefChange)
convenience init(_ entity: T, modifiedAt: Stamp?, onChange: ((EntityNode<T>) -> Void)? = nil) {
self.init(ref: Observable(value: entity), modifiedAt: modifiedAt, onChange: onChange)
}

/// change the entity to a new value. If modifiedAt is nil or > to previous date update the value will be changed
Expand All @@ -54,6 +50,7 @@ class EntityNode<T>: AnyEntityNode {

modifiedAt = newModifiedAt ?? modifiedAt
ref.value = newEntity
onChange?(self)
}

func removeAllChildren() {
Expand Down Expand Up @@ -93,6 +90,7 @@ class EntityNode<T>: AnyEntityNode {
}

update(&self.ref.value, newValue)
self.onChange?(self)
}

children[keyPath] = SubscribedChild(subscription: subscription, node: childNode)
Expand Down
Loading

0 comments on commit 8779ade

Please sign in to comment.