Skip to content

Commit

Permalink
Get rid of subscriptions (for #69).
Browse files Browse the repository at this point in the history
  • Loading branch information
back2dos committed Jul 20, 2021
1 parent d437d37 commit 876654a
Showing 1 changed file with 67 additions and 90 deletions.
157 changes: 67 additions & 90 deletions src/tink/state/internal/AutoObservable.hx
Original file line number Diff line number Diff line change
Expand Up @@ -67,61 +67,12 @@ private abstract Computation<T>((a:AutoObservable<T>,?Noise)->T) {
}
}

private typedef Subscription = SubscriptionTo<Any>;

private class SubscriptionTo<T> {

public final source:ObservableObject<T>;
var last:T;
var lastRev:Revision;
final owner:Invalidatable;

public var used = true;

public function new(source, cur, owner) {
this.source = source;
this.last = cur;
this.lastRev = source.getRevision();
this.owner = owner;
}

public inline function isValid()
return source.getRevision() == lastRev;

public function hasChanged():Bool {
var nextRev = source.getRevision();
if (nextRev == lastRev) return false;
lastRev = nextRev;
var before = last;
last = source.getValue();
return !source.getComparator().eq(last, before);
}

public inline function reuse(value:T) {
used = true;
last = value;
}

public inline function disconnect():Void {
#if tink_state.debug
logger.disconnected(source, cast owner);
#end
source.unsubscribe(owner);
}

public inline function connect():Void {
#if tink_state.debug
logger.connected(source, cast owner);
#end
source.subscribe(owner);
}
}

private enum abstract AutoObservableStatus(Int) {
var Dirty;
var Computed;
}

private typedef Source = ObservableObject<Dynamic>;
class AutoObservable<T> extends Invalidator
implements Invalidatable implements Derived implements ObservableObject<T> {

Expand All @@ -138,19 +89,20 @@ class AutoObservable<T> extends Invalidator
final annex:Annex<{}>;
var status = Dirty;
var last:T = null;
var subscriptions:Array<Subscription>;
var dependencies = new ObjectMap<ObservableObject<Dynamic>, Subscription>();
var sources:Array<Source>;
var lastValues = new ObjectMap<Source, Dynamic>();
var lastRevisions = new ObjectMap<Source, Revision>();

var comparator:Comparator<T>;

override function getRevision() {
if (hot)
return revision;
if (subscriptions == null)
if (sources == null)
getValue();

for (s in subscriptions)
if (s.source.getRevision() > revision)
for (s in sources)
if (s.getRevision() > lastRevisions[s])
return revision = new Revision();

return revision;
Expand All @@ -160,11 +112,11 @@ class AutoObservable<T> extends Invalidator
return annex;

function subsValid() {
if (subscriptions == null)
if (sources == null)
return false;

for (s in subscriptions)
if (!s.isValid())
for (s in sources)
if (s.getRevision() != lastRevisions[s])
return false;

return true;
Expand All @@ -183,18 +135,33 @@ class AutoObservable<T> extends Invalidator
this.annex = new Annex<{}>(this);
}

inline function connect(s:Source) {
#if tink_state.debug
logger.connected(s, this);
#end
s.subscribe(this);
}

inline function disconnect(s:Source):Void {
#if tink_state.debug
logger.disconnected(s, this);
#end
s.unsubscribe(this);
}

function wakeup() {
getValue();
getRevision();
if (subscriptions != null)
for (s in subscriptions) s.connect();
if (sources != null)
for (s in sources) connect(s);
hot = true;
}


function sleep() {
hot = false;
if (subscriptions != null)
for (s in subscriptions) s.disconnect();
if (sources != null)
for (s in sources) disconnect(s);
}

static public inline function computeFor<T>(o:Derived, fn:()->T) {
Expand Down Expand Up @@ -235,19 +202,19 @@ class AutoObservable<T> extends Invalidator

function doCompute() {
status = Computed;
if (subscriptions != null)
for (s in subscriptions) s.used = false;
subscriptions = [];
if (sources != null)
lastValues.clear();// TODO: this might actually cause some churn ... who knows
sources = [];
sync = true;
last = computeFor(this, () -> compute(this));
sync = false;
#if tink_state.debug
logger.revalidated(this, false);
#end
if (subscriptions.length == 0) dispose();
if (sources.length == 0) dispose();
}

var prevSubs = subscriptions,
var prevSources = sources,
count = 0;

while (!isValid()) {
Expand All @@ -256,11 +223,11 @@ class AutoObservable<T> extends Invalidator
#end
if (++count == Observable.MAX_ITERATIONS)
throw 'no result after ${Observable.MAX_ITERATIONS} attempts';
else if (subscriptions != null) {
else if (sources != null) {
var valid = true;

for (s in subscriptions)
if (s.hasChanged()) {
for (s in sources)
if (hasChanged(s)) {
valid = false;
break;
}
Expand All @@ -273,14 +240,14 @@ class AutoObservable<T> extends Invalidator
}
else {
doCompute();
if (prevSubs != null) {
for (s in prevSubs)
if (!s.used) {
if (hot) s.disconnect();
dependencies.remove(s.source);
s.source.release();
if (prevSources != null) {
for (s in prevSources)
if (!isUsed(s)) {
if (hot) disconnect(s);
lastRevisions.remove(s);
s.release();
#if tink_state.debug
logger.unsubscribed(s.source, this);
logger.unsubscribed(s, this);
#end
}
}
Expand All @@ -291,36 +258,46 @@ class AutoObservable<T> extends Invalidator
return last;
}

public function hasChanged<R>(s:ObservableObject<R>):Bool {
var nextRev = s.getRevision();
if (nextRev == lastRevisions[s]) return false;
lastRevisions[s] = nextRev;
var before:R = lastValues[s];
var last = s.getValue();
lastValues[s] = last;
return !s.getComparator().eq(last, before);
}

var sync = true;

function update(value) if (!sync) {
last = value;
fire();
}

inline function isUsed(s:Source)
return lastValues.exists(s);

public function subscribeTo<R>(source:ObservableObject<R>, cur:R):Void
switch dependencies.get(source) {
switch lastRevisions[source] {
case null:
#if tink_state.debug
logger.subscribed(source, this);
#end
var sub:Subscription = cast new SubscriptionTo(source, cur, this);
lastRevisions[source] = source.getRevision();
lastValues[source] = cur;
source.retain();
if (hot) sub.connect();
dependencies.set(source, sub);
subscriptions.push(sub);
if (hot) connect(source);
sources.push(source);
case v:
if (!v.used) {
v.reuse(cur);
subscriptions.push(v);
if (!isUsed(source)) {
lastValues[source] = cur;
sources.push(source);
}
}

public function isSubscribedTo<R>(source:ObservableObject<R>)
return switch dependencies.get(source) {
case null: false;
case s: s.used;
}
return isUsed(source);

public function invalidate()
if (status == Computed) {
Expand All @@ -330,7 +307,7 @@ class AutoObservable<T> extends Invalidator

#if tink_state.debug
public function getDependencies()
return cast dependencies.keys();
return sources.iterator();
#end
}

Expand Down

0 comments on commit 876654a

Please sign in to comment.