From 876654ad0fe3a46f7404af5c6d4300a3062f4612 Mon Sep 17 00:00:00 2001 From: Juraj Kirchheim Date: Tue, 20 Jul 2021 10:49:05 +0200 Subject: [PATCH] Get rid of subscriptions (for #69). --- src/tink/state/internal/AutoObservable.hx | 157 +++++++++------------- 1 file changed, 67 insertions(+), 90 deletions(-) diff --git a/src/tink/state/internal/AutoObservable.hx b/src/tink/state/internal/AutoObservable.hx index 898479e..c90a9f0 100644 --- a/src/tink/state/internal/AutoObservable.hx +++ b/src/tink/state/internal/AutoObservable.hx @@ -67,61 +67,12 @@ private abstract Computation((a:AutoObservable,?Noise)->T) { } } -private typedef Subscription = SubscriptionTo; - -private class SubscriptionTo { - - public final source:ObservableObject; - var last:T; - var lastRev:Revision; - final owner:Invalidatable; - - public var used = true; - - public function new(source, cur, owner) { - this.source = source; - this.last = cur; - this.lastRev = source.getRevision(); - this.owner = owner; - } - - public inline function isValid() - return source.getRevision() == lastRev; - - public function hasChanged():Bool { - var nextRev = source.getRevision(); - if (nextRev == lastRev) return false; - lastRev = nextRev; - var before = last; - last = source.getValue(); - return !source.getComparator().eq(last, before); - } - - public inline function reuse(value:T) { - used = true; - last = value; - } - - public inline function disconnect():Void { - #if tink_state.debug - logger.disconnected(source, cast owner); - #end - source.unsubscribe(owner); - } - - public inline function connect():Void { - #if tink_state.debug - logger.connected(source, cast owner); - #end - source.subscribe(owner); - } -} - private enum abstract AutoObservableStatus(Int) { var Dirty; var Computed; } +private typedef Source = ObservableObject; class AutoObservable extends Invalidator implements Invalidatable implements Derived implements ObservableObject { @@ -138,19 +89,20 @@ class AutoObservable extends Invalidator final annex:Annex<{}>; var status = Dirty; var last:T = null; - var subscriptions:Array; - var dependencies = new ObjectMap, Subscription>(); + var sources:Array; + var lastValues = new ObjectMap(); + var lastRevisions = new ObjectMap(); var comparator:Comparator; override function getRevision() { if (hot) return revision; - if (subscriptions == null) + if (sources == null) getValue(); - for (s in subscriptions) - if (s.source.getRevision() > revision) + for (s in sources) + if (s.getRevision() > lastRevisions[s]) return revision = new Revision(); return revision; @@ -160,11 +112,11 @@ class AutoObservable extends Invalidator return annex; function subsValid() { - if (subscriptions == null) + if (sources == null) return false; - for (s in subscriptions) - if (!s.isValid()) + for (s in sources) + if (s.getRevision() != lastRevisions[s]) return false; return true; @@ -183,18 +135,33 @@ class AutoObservable extends Invalidator this.annex = new Annex<{}>(this); } + inline function connect(s:Source) { + #if tink_state.debug + logger.connected(s, this); + #end + s.subscribe(this); + } + + inline function disconnect(s:Source):Void { + #if tink_state.debug + logger.disconnected(s, this); + #end + s.unsubscribe(this); + } + function wakeup() { getValue(); getRevision(); - if (subscriptions != null) - for (s in subscriptions) s.connect(); + if (sources != null) + for (s in sources) connect(s); hot = true; } + function sleep() { hot = false; - if (subscriptions != null) - for (s in subscriptions) s.disconnect(); + if (sources != null) + for (s in sources) disconnect(s); } static public inline function computeFor(o:Derived, fn:()->T) { @@ -235,19 +202,19 @@ class AutoObservable extends Invalidator function doCompute() { status = Computed; - if (subscriptions != null) - for (s in subscriptions) s.used = false; - subscriptions = []; + if (sources != null) + lastValues.clear();// TODO: this might actually cause some churn ... who knows + sources = []; sync = true; last = computeFor(this, () -> compute(this)); sync = false; #if tink_state.debug logger.revalidated(this, false); #end - if (subscriptions.length == 0) dispose(); + if (sources.length == 0) dispose(); } - var prevSubs = subscriptions, + var prevSources = sources, count = 0; while (!isValid()) { @@ -256,11 +223,11 @@ class AutoObservable extends Invalidator #end if (++count == Observable.MAX_ITERATIONS) throw 'no result after ${Observable.MAX_ITERATIONS} attempts'; - else if (subscriptions != null) { + else if (sources != null) { var valid = true; - for (s in subscriptions) - if (s.hasChanged()) { + for (s in sources) + if (hasChanged(s)) { valid = false; break; } @@ -273,14 +240,14 @@ class AutoObservable extends Invalidator } else { doCompute(); - if (prevSubs != null) { - for (s in prevSubs) - if (!s.used) { - if (hot) s.disconnect(); - dependencies.remove(s.source); - s.source.release(); + if (prevSources != null) { + for (s in prevSources) + if (!isUsed(s)) { + if (hot) disconnect(s); + lastRevisions.remove(s); + s.release(); #if tink_state.debug - logger.unsubscribed(s.source, this); + logger.unsubscribed(s, this); #end } } @@ -291,6 +258,16 @@ class AutoObservable extends Invalidator return last; } + public function hasChanged(s:ObservableObject):Bool { + var nextRev = s.getRevision(); + if (nextRev == lastRevisions[s]) return false; + lastRevisions[s] = nextRev; + var before:R = lastValues[s]; + var last = s.getValue(); + lastValues[s] = last; + return !s.getComparator().eq(last, before); + } + var sync = true; function update(value) if (!sync) { @@ -298,29 +275,29 @@ class AutoObservable extends Invalidator fire(); } + inline function isUsed(s:Source) + return lastValues.exists(s); + public function subscribeTo(source:ObservableObject, cur:R):Void - switch dependencies.get(source) { + switch lastRevisions[source] { case null: #if tink_state.debug logger.subscribed(source, this); #end - var sub:Subscription = cast new SubscriptionTo(source, cur, this); + lastRevisions[source] = source.getRevision(); + lastValues[source] = cur; source.retain(); - if (hot) sub.connect(); - dependencies.set(source, sub); - subscriptions.push(sub); + if (hot) connect(source); + sources.push(source); case v: - if (!v.used) { - v.reuse(cur); - subscriptions.push(v); + if (!isUsed(source)) { + lastValues[source] = cur; + sources.push(source); } } public function isSubscribedTo(source:ObservableObject) - return switch dependencies.get(source) { - case null: false; - case s: s.used; - } + return isUsed(source); public function invalidate() if (status == Computed) { @@ -330,7 +307,7 @@ class AutoObservable extends Invalidator #if tink_state.debug public function getDependencies() - return cast dependencies.keys(); + return sources.iterator(); #end }