Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tech(observer): Use a registry to register observers #53

Merged
merged 14 commits into from
Jul 28, 2023
Merged
55 changes: 40 additions & 15 deletions Sources/CohesionKit/Identity/IdentityStore.swift
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,22 @@ import Foundation
public class IdentityMap {
public typealias Update<T> = (inout T) -> Void

private(set) var storage: EntitiesStorage = EntitiesStorage()
private(set) var refAliases: AliasStorage = [:]
private lazy var storeVisitor = IdentityMapStoreVisitor(identityMap: self)
/// the queue on which identity map do its heavy work
private let identityQueue = DispatchQueue(label: "com.cohesionkit.identitymap", attributes: .concurrent)
/// dispatch queue to return the results
private let observeQueue: DispatchQueue
private let logger: Logger?
private let registry: ObserverRegistry

private(set) var storage: EntitiesStorage = EntitiesStorage()
private(set) var refAliases: AliasStorage = [:]
private lazy var storeVisitor = IdentityMapStoreVisitor(identityMap: self)


/// Create a new IdentityMap instance optionally with a queue and a logger
/// - Parameter queue: the queue on which to receive updates. If not defined it default to main
/// - Parameter logger: a logger to follow/debug identity internal state
public init(queue: DispatchQueue = .main, logger: Logger? = nil) {
self.logger = logger
self.observeQueue = queue
self.registry = ObserverRegistry(queue: queue)
}

/// Store an entity in the storage. Entity will be stored only if stamp (`modifiedAt`) is higher than in previous
Expand Down Expand Up @@ -49,7 +50,9 @@ public class IdentityMap {
logger?.didRegisterAlias(alias)
}

return EntityObserver(node: node, queue: observeQueue)
self.registry.postNotifications()

return EntityObserver(node: node, registry: registry)
}
}

Expand Down Expand Up @@ -81,7 +84,9 @@ public class IdentityMap {
logger?.didRegisterAlias(alias)
}

return EntityObserver(node: node, queue: observeQueue)
self.registry.postNotifications()

return EntityObserver(node: node, registry: registry)
}
}

Expand All @@ -96,7 +101,9 @@ public class IdentityMap {
logger?.didRegisterAlias(alias)
}

return nodes.map { EntityObserver(node: $0, queue: observeQueue) }
self.registry.postNotifications()

return nodes.map { EntityObserver(node: $0, registry: registry) }
}
}

Expand All @@ -111,7 +118,9 @@ public class IdentityMap {
logger?.didRegisterAlias(alias)
}

return nodes.map { EntityObserver(node: $0, queue: observeQueue) }
self.registry.postNotifications()

return nodes.map { EntityObserver(node: $0, registry: registry) }
}
}

Expand All @@ -122,7 +131,7 @@ public class IdentityMap {
public func find<T: Identifiable>(_ type: T.Type, id: T.ID) -> EntityObserver<T>? {
identityQueue.sync {
if let node = storage[T.self, id: id] {
return EntityObserver(node: node, queue: observeQueue)
return EntityObserver(node: node, registry: registry)
}

return nil
Expand All @@ -133,20 +142,22 @@ public class IdentityMap {
/// - Parameter named: the alias to look for
public func find<T: Identifiable>(named: AliasKey<T>) -> AliasObserver<T> {
identityQueue.sync {
AliasObserver(alias: refAliases[named], queue: observeQueue)
AliasObserver(alias: refAliases[named], registry: registry)
}
}

/// Try to find a collected registered under `named` alias
/// - Returns: an observer returning the alias value. Note that the value will be an Array
public func find<C: Collection>(named: AliasKey<C>) -> AliasObserver<[C.Element]> {
identityQueue.sync {
AliasObserver(alias: refAliases[named], queue: observeQueue)
AliasObserver(alias: refAliases[named], registry: registry)
}
}

func nodeStore<T: Identifiable>(entity: T, modifiedAt: Stamp?) -> EntityNode<T> {
let node = storage[entity, new: EntityNode(entity, modifiedAt: nil)]
let node = storage[entity, new: EntityNode(entity, modifiedAt: nil) { [registry] in
registry.postNotification(for: $0)
}]

do {
try node.updateEntity(entity, modifiedAt: modifiedAt)
Expand All @@ -160,7 +171,9 @@ public class IdentityMap {
}

func nodeStore<T: Aggregate>(entity: T, modifiedAt: Stamp?) -> EntityNode<T> {
let node = storage[entity, new: EntityNode(entity, modifiedAt: nil)]
let node = storage[entity, new: EntityNode(entity, modifiedAt: nil) { [registry] in
registry.postNotification(for: $0)
}]

// disable changes while doing the entity update
node.applyChildrenChanges = false
Expand Down Expand Up @@ -207,6 +220,8 @@ extension IdentityMap {

_ = nodeStore(entity: entity, modifiedAt: modifiedAt)

self.registry.postNotifications()

return true
}
}
Expand All @@ -228,6 +243,8 @@ extension IdentityMap {

_ = nodeStore(entity: entity, modifiedAt: modifiedAt)

self.registry.postNotifications()

return true
}
}
Expand All @@ -250,6 +267,8 @@ extension IdentityMap {
// ref might have changed
refAliases.insert(node, key: named)

self.registry.postNotifications()

return true
}
}
Expand All @@ -272,6 +291,8 @@ extension IdentityMap {
// ref might have changed
refAliases.insert(node, key: named)

self.registry.postNotifications()

return true
}
}
Expand All @@ -296,6 +317,8 @@ extension IdentityMap {
// update alias because `update` may have added/removed entities
refAliases.insert(nodes, key: named)

self.registry.postNotifications()

return true
}
}
Expand All @@ -320,6 +343,8 @@ extension IdentityMap {
// update alias because `update` may have added/removed entities
refAliases.insert(nodes, key: named)

self.registry.postNotifications()

return true
}
}
Expand Down
28 changes: 13 additions & 15 deletions Sources/CohesionKit/Observer/AliasObserver.swift
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,18 @@ public struct AliasObserver<T>: Observer {
let createObserve: (@escaping OnChangeClosure) -> Subscription

/// create an observer for a single entity node ref
init(alias: Observable<EntityNode<T>?>, queue: DispatchQueue) {
init(alias: Observable<EntityNode<T>?>, registry: ObserverRegistry) {
self.value = alias.value?.ref.value
self.createObserve = {
Self.createObserve(for: alias, queue: queue, onChange: $0)
Self.createObserve(for: alias, registry: registry, onChange: $0)
}
}

/// create an observer for a list of node ref
init<E>(alias: Observable<[EntityNode<E>]?>, queue: DispatchQueue) where T == Array<E> {
init<E>(alias: Observable<[EntityNode<E>]?>, registry: ObserverRegistry) where T == Array<E> {
self.value = alias.value?.map(\.ref.value)
self.createObserve = {
Self.createObserve(for: alias, queue: queue, onChange: $0)
Self.createObserve(for: alias, registry: registry, onChange: $0)
}
}

Expand All @@ -35,21 +35,19 @@ extension AliasObserver {
/// - the ref node value change
private static func createObserve(
for alias: Observable<EntityNode<T>?>,
queue: DispatchQueue,
registry: ObserverRegistry,
onChange: @escaping OnChangeClosure
) -> Subscription {
var entityChangesSubscription: Subscription? = alias
.value
.map { node in EntityObserver(node: node, queue: .main) }?
.observe(onChange: onChange)
.map { node in registry.addObserver(node: node, onChange: onChange) }

// subscribe to alias changes
let subscription = alias.addObserver { node in
let nodeObserver = node.map { EntityObserver(node: $0, queue: queue) }

queue.async { onChange(nodeObserver?.value) }
// update entity changes subscription
entityChangesSubscription = nodeObserver?.observe(onChange: onChange)
entityChangesSubscription = node.map { registry.addObserver(node: $0) { onChange($0) }}

registry.queue.async { onChange(node?.ref.value) }
}

return Subscription {
Expand All @@ -63,19 +61,19 @@ extension AliasObserver {
/// - any of the ref node element change
private static func createObserve<E>(
for alias: Observable<[EntityNode<E>]?>,
queue: DispatchQueue,
registry: ObserverRegistry,
onChange: @escaping OnChangeClosure
) -> Subscription where T == Array<E> {
var entitiesChangesSubscriptions: Subscription? = alias
.value
.map { nodes in nodes.map { EntityObserver(node: $0, queue: queue) } }?
.map { nodes in nodes.map { EntityObserver(node: $0, registry: registry) } }?
.observe(onChange: onChange)

// Subscribe to alias ref changes and to any changes made on the ref collection nodes.
let subscription = alias.addObserver { nodes in
let nodeObservers = nodes?.map { EntityObserver(node: $0, queue: queue) }
let nodeObservers = nodes?.map { EntityObserver(node: $0, registry: registry) }

queue.async { onChange(nodeObservers?.value) }
registry.queue.async { onChange(nodeObservers?.value) }

// update collection changes subscription
entitiesChangesSubscriptions = nodeObservers?.observe(onChange: onChange)
Expand Down
22 changes: 6 additions & 16 deletions Sources/CohesionKit/Observer/EntityObserver.swift
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,16 @@ import Foundation
/// A type registering observers on a given entity from identity storage
public struct EntityObserver<T>: Observer {
let node: EntityNode<T>
let queue: DispatchQueue
let registry: ObserverRegistry
public let value: T
init(node: EntityNode<T>, queue: DispatchQueue) {
self.queue = queue

init(node: EntityNode<T>, registry: ObserverRegistry) {
self.registry = registry
self.node = node
self.value = node.value as! T
}

public func observe(onChange: @escaping (T) -> Void) -> Subscription {
let subscription = node.ref.addObserver { newValue in
queue.async {
onChange(newValue)
}
}
let retain = Unmanaged.passRetained(node)

return Subscription {
subscription.unsubscribe()
retain.release()
}
registry.addObserver(node: node, onChange: onChange)
}
}
76 changes: 76 additions & 0 deletions Sources/CohesionKit/Observer/ObserverRegistry.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import Foundation

class ObserverRegistry {
typealias Observer = (Any) -> Void
private typealias ObserverID = Int
private typealias EntityNodeKey = Int

let queue: DispatchQueue
/// registered observers per node
private var observers: [EntityNodeKey: [ObserverID: Observer]] = [:]
/// next available id for an observer
private var nextObserverID: ObserverID = 0
/// nodes waiting for notifiying their observes about changes
private var pendingChangedNodes: Set<AnyHashable> = []

init(queue: DispatchQueue) {
self.queue = queue
}

/// register an observer to observe changes on an entity node. Everytime `ObserverRegistry` is notified about changes
/// to this node `onChange` will be called.
func addObserver<T>(node: EntityNode<T>, onChange: @escaping (T) -> Void) -> Subscription {
let observerID = generateID()

observers[node.hashValue, default: [:]][observerID] = {
guard let newValue = $0 as? T else {
return
}

onChange(newValue)
}

// subscription keeps a strong ref to node, avoiding it from being released somehow while suscription is running
return Subscription { [node] in
self.observers[node.hashValue]?.removeValue(forKey: observerID)
}
}

func postNotification<T>(for node: EntityNode<T>) {
self.observers[node.hashValue]?.forEach { (_, observer) in
observer(node.value)
}
}

/// Queue a notification for given node. Notification won't be sent until ``postNotifications`` is called
func enqueueNotification<T>(for node: EntityNode<T>) {
pendingChangedNodes.insert(AnyHashable(node))
}

/// Notify observers of all queued changes. Once notified pending changes are cleared out.
func postNotifications() {
/// keep notifications as-is when queue was triggered
queue.async { [weak self] in
guard let self else {
return
}

let changes = self.pendingChangedNodes

self.pendingChangedNodes = []

for hash in changes {
let node = hash.base as! AnyEntityNode

self.observers[hash.hashValue]?.forEach { (_, observer) in
observer(node.value)
}
}
}
}

private func generateID() -> ObserverID {
defer { nextObserverID &+= 1 }
return nextObserverID
}
}
15 changes: 11 additions & 4 deletions Sources/CohesionKit/Storage/EntityNode.swift
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,26 @@ class EntityNode<T>: AnyEntityNode {

/// An observable entity reference
let ref: Observable<T>
private var subscription: Subscription?
/// last time the ref.value was changed. Any subsequent change must have a higher value to be applied
/// if nil ref has no stamp and any change will be accepted
private var modifiedAt: Stamp?
/// entity children
private(set) var children: [PartialKeyPath<T>: SubscribedChild] = [:]

init(ref: Observable<T>, modifiedAt: Stamp?) {
init(ref: Observable<T>, modifiedAt: Stamp?, onRefChange: ((EntityNode<T>) -> Void)? = nil) {
self.ref = ref
self.modifiedAt = modifiedAt

if let onRefChange {
self.subscription = ref.addObserver { [unowned self] _ in
onRefChange(self)
}
}
}

convenience init(_ entity: T, modifiedAt: Stamp?) {
self.init(ref: Observable(value: entity), modifiedAt: modifiedAt)
convenience init(_ entity: T, modifiedAt: Stamp?, onRefChange: ((EntityNode<T>) -> Void)? = nil) {
self.init(ref: Observable(value: entity), modifiedAt: modifiedAt, onRefChange: onRefChange)
}

/// change the entity to a new value. If modifiedAt is nil or > to previous date update the value will be changed
Expand Down Expand Up @@ -98,6 +105,6 @@ extension EntityNode: Hashable {
}

func hash(into hasher: inout Hasher) {
hasher.combine(ObjectIdentifier(self).hashValue)
hasher.combine(ObjectIdentifier(self))
}
}
Loading
Loading