Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Suspend async computation when observable goes out of use. #72

Closed
wants to merge 33 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
5c77794
Support haxe 4.1
back2dos Jul 4, 2021
e82515f
Minor tweak.
back2dos Jul 7, 2021
8692116
Add API to check if tracking is needed.
back2dos Jul 18, 2021
cfefb49
Add retain/release
back2dos Jul 18, 2021
50a014c
Merge branch 'master' of https://github.com/haxetink/tink_state
back2dos Jul 18, 2021
aa8dd66
Improve dependency graph. Add dispose callback for TransformObservable.
back2dos Jul 18, 2021
311009f
Fix formatting in vscode.
back2dos Jul 18, 2021
318c2e4
Begin working on granular observability.
back2dos Jul 18, 2021
d749da2
Make #49 work for arrays.
back2dos Jul 18, 2021
d947b53
Minor.
back2dos Jul 18, 2021
a23a3e5
Delete unused stuff.
back2dos Jul 18, 2021
1350c45
Make maps multitype and granularly observable.
back2dos Jul 18, 2021
9fb34f6
Implement granular array tracking without leaking.
back2dos Jul 18, 2021
8745ca2
Move granularity to ObservableArrayView.
back2dos Jul 18, 2021
f1d7b8b
Non-leaky granular observability for maps. Add tests too.
back2dos Jul 18, 2021
bf329d9
Switch to next version of streams/testrunner. Should fix tests.
back2dos Jul 18, 2021
d437d37
Get rid of callback links in invalidation (for #69).
back2dos Jul 20, 2021
876654a
Get rid of subscriptions (for #69).
back2dos Jul 20, 2021
60cd663
Randomly move things around for a change.
back2dos Jul 20, 2021
4eb7e44
And yet more random refactoring.
back2dos Jul 20, 2021
5cd1e6b
Convolution beats correctness. Always.
back2dos Jul 20, 2021
383c32e
Avoid NPE.
back2dos Jul 20, 2021
213ab73
Revert "Get rid of subscriptions (for #69)."
back2dos Jul 20, 2021
07c67f7
Ditch the subscription type param.
back2dos Jul 20, 2021
1e56968
Focus benchmark on updates (rather than setup/teardown).
back2dos Jul 20, 2021
db3f026
One more level of indirection \o/
back2dos Jul 25, 2021
cb06a99
Make tests pass again.
back2dos Jul 25, 2021
d641c49
Tests seem to pass again after implementing #60.
back2dos Aug 3, 2021
1683e37
Haxe 4.1 compat.
back2dos Aug 3, 2021
f5e6bf8
Use final only where for constant locals.
back2dos Aug 3, 2021
deec843
Make tests slightly trickier.
back2dos Aug 3, 2021
810e5af
Don't use direct scheduler in tests when not necessary.
back2dos Aug 3, 2021
3658427
Add a rudimentary test for #60.
back2dos Aug 3, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .haxerc
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"version": "4.2.0",
"version": "4.2.3",
"resolveLibs": "scoped"
}
27 changes: 15 additions & 12 deletions bench/Bench.hx
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,21 @@ class Bench {
});
return todos;
}
measure('create 10000 todos', () -> makeTodos(1000), 100);
var count = 1000;
measure('creating ${count} todos', () -> makeTodos(count), 100);

var todos = makeTodos(1000);
for (mode in ['direct', 'batched', 'atomic'])
measure('toggle 1000 todos [$mode]', () -> {
var todos = makeTodos(count);
for (mode in ['direct', 'batched', 'atomic']) {
var unfinishedTodoCount = Observable.auto(() -> {
var sum = 0;
for (t in todos)
if (!t.done.value) sum++;
sum;
});

var unfinishedTodoCount = Observable.auto(() -> {
var sum = 0;
for (t in todos)
if (!t.done.value) sum++;
sum;
});
var watch = unfinishedTodoCount.bind(_ -> {}, if (mode == 'batched') null else Scheduler.direct);

var watch = unfinishedTodoCount.bind(_ -> {}, if (mode == 'batched') null else Scheduler.direct);
measure('toggling ${todos.length} todos [$mode]', () -> {

function update()
for (t in todos)
Expand All @@ -38,13 +39,15 @@ class Bench {
if (mode == 'batched')
Observable.updateAll();

watch.cancel();

}, switch mode {
case 'atomic': 1000;
case 'batched': 1000;
default: 10;
});

watch.cancel();
}
}

static function measure(name, f:()->Void, ?repeat = 1) {
Expand Down
27 changes: 15 additions & 12 deletions bench/mobx-bench.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ function measure(name, task, repeat = 1) {
console.log(`${name} took ${(Date.now() - start) / repeat}ms (avg. of ${repeat} runs)`);
}

measure('create 10000 todos', () => createTodos(1000), 100);
let count = 1000;
measure(`creating ${count} todos`, () => createTodos(count), 100);

{
let todos = createTodos(1000);
Expand All @@ -40,24 +41,26 @@ measure('create 10000 todos', () => createTodos(1000), 100);
}

['direct', 'batched', 'atomic'].forEach(mode => {
measure(`create 1000 todos, finish all [${mode}]`, () => {
let unfinishedTodoCount = computed(() => {
return todos.reduce((count, { done }) => done ? count : count + 1, 0);
});
let unfinishedTodoCount = computed(() => {
return todos.reduce((count, { done }) => done ? count : count + 1, 0);
});

let dispose =
(mode == 'batched')
? autorun(() => unfinishedTodoCount.get(), {
scheduler: scheduler()
})
: unfinishedTodoCount.observe(x => {});
let dispose =
(mode == 'batched')
? autorun(() => unfinishedTodoCount.get(), {
scheduler: scheduler()
})
: unfinishedTodoCount.observe(x => {});

measure(`toggling ${todos.length} todos [${mode}]`, () => {

let update = (mode == 'atomic') ? transaction : f => f();
update(() => {
for (let item of todos)
item.done = !item.done;
});
dispose();
}, { atomic: 1000, batched: 1000, direct: 10 }[mode]);

dispose();
});
}
4 changes: 2 additions & 2 deletions haxe_libraries/tink_core.hxml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
# @install: lix --silent download "gh://github.com/haxetink/tink_core#e0ed6c33f6f396fb83397a590bee4c3d48ab2e17" into tink_core/2.0.2/github/e0ed6c33f6f396fb83397a590bee4c3d48ab2e17
-cp ${HAXE_LIBCACHE}/tink_core/2.0.2/github/e0ed6c33f6f396fb83397a590bee4c3d48ab2e17/src
# @install: lix --silent download "gh://github.com/haxetink/tink_core#33a5b72257d421c0b278973d58805c9ecefea259" into tink_core/2.0.2/github/33a5b72257d421c0b278973d58805c9ecefea259
-cp ${HAXE_LIBCACHE}/tink_core/2.0.2/github/33a5b72257d421c0b278973d58805c9ecefea259/src
-D tink_core=2.0.2
8 changes: 3 additions & 5 deletions haxe_libraries/tink_streams.hxml
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
# @install: lix --silent download "gh://github.com/haxetink/tink_streams#5066a96c4a8b483479b6a8df8893eaf8922d3bea" into tink_streams/0.4.0/github/5066a96c4a8b483479b6a8df8893eaf8922d3bea
# @install: lix --silent download "gh://github.com/haxetink/tink_streams#f4478825ef0a30df1187f02a354ec61176b47b8b" into tink_streams/0.3.3/github/f4478825ef0a30df1187f02a354ec61176b47b8b
-lib tink_core
-cp ${HAXE_LIBCACHE}/tink_streams/0.4.0/github/5066a96c4a8b483479b6a8df8893eaf8922d3bea/src
-D tink_streams=0.4.0
# temp for development, delete this file when pure branch merged
-D pure
-cp ${HAXE_LIBCACHE}/tink_streams/0.3.3/github/f4478825ef0a30df1187f02a354ec61176b47b8b/src
-D tink_streams=0.3.3
7 changes: 3 additions & 4 deletions haxe_libraries/tink_testrunner.hxml
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# @install: lix --silent download "gh://github.com/haxetink/tink_testrunner#45f704215ae28c3d864755036dc2ee63f7c44e8a" into tink_testrunner/0.9.0/github/45f704215ae28c3d864755036dc2ee63f7c44e8a
# @install: lix --silent download "gh://github.com/haxetink/tink_testrunner#866de8b991be89b969825b0c0f5565d51f96a6f7" into tink_testrunner/0.8.0/github/866de8b991be89b969825b0c0f5565d51f96a6f7
-lib ansi
-lib tink_macro
-lib tink_streams
-cp ${HAXE_LIBCACHE}/tink_testrunner/0.9.0/github/45f704215ae28c3d864755036dc2ee63f7c44e8a/src
-D tink_testrunner=0.9.0
--macro addGlobalMetadata('ANSI.Attribute', "@:native('ANSIAttribute')", false)
-cp ${HAXE_LIBCACHE}/tink_testrunner/0.8.0/github/866de8b991be89b969825b0c0f5565d51f96a6f7/src
-D tink_testrunner=0.8.0
17 changes: 10 additions & 7 deletions src/tink/state/Observable.hx
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,8 @@ abstract Observable<T>(ObservableObject<T>) from ObservableObject<T> to Observab
will take place and the type of the observable value will become `tink.state.Promised` or `tink.State.Promised.Predicted`
respectively. The future/promise will be automatically handled to update the value of this Observable.
**/
@:noUsing static public inline function auto<T>(compute, ?comparator #if tink_state.debug , ?toString, ?pos:haxe.PosInfos #end):Observable<T>
return new AutoObservable<T>(compute, comparator #if tink_state.debug , toString, pos #end);
@:noUsing static public inline function auto<Result>(compute, ?comparator #if tink_state.debug , ?toString, ?pos:haxe.PosInfos #end):Observable<Result>
return new AutoObservable<Result>(compute, comparator #if tink_state.debug , toString, pos #end);

/**
Create a constant Observable object from a value. Const observables are lightweight objects
Expand Down Expand Up @@ -276,6 +276,9 @@ private class ConstObservable<T> implements ObservableObject<T> {
#end
}

function retain() {}
function release() {}

public function getValue()
return value;

Expand All @@ -297,18 +300,18 @@ private class ConstObservable<T> implements ObservableObject<T> {
return EmptyIterator.DEPENDENCIES;
#end

public function onInvalidate(i:Invalidatable):CallbackLink
return null;
public function subscribe(i:Observer) {}
public function unsubscribe(i:Observer) {}
}

private class SimpleObservable<T> extends Invalidator implements ObservableObject<T> {
private class SimpleObservable<T> extends Dispatcher implements ObservableObject<T> {

var _poll:Void->Measurement<T>;
var _cache:Measurement<T> = null;
var comparator:Comparator<T>;

public function new(poll, ?comparator #if tink_state.debug , ?toString, ?pos #end) {
super(#if tink_state.debug toString, pos #end);
super(null #if tink_state.debug , toString, pos #end);
this._poll = poll;
this.comparator = comparator;
}
Expand All @@ -321,7 +324,7 @@ private class SimpleObservable<T> extends Invalidator implements ObservableObjec

function reset(_) {
_cache = null;
fire();
fire(this);
}

function poll() {
Expand Down
105 changes: 89 additions & 16 deletions src/tink/state/ObservableArray.hx
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ abstract ObservableArray<T>(ArrayImpl<T>) from ArrayImpl<T> to Observable<ArrayV
});

public function entry(index)
return Observable.auto(() -> this.get(index));
return Observable.auto(() -> get(index));

@:deprecated('use iterator instead')
public function values()
Expand All @@ -33,7 +33,7 @@ abstract ObservableArray<T>(ArrayImpl<T>) from ArrayImpl<T> to Observable<ArrayV
return 0...this.length;

@:op([]) public inline function get(index)
return this.get(index);
return view[index];

@:op([]) public inline function set(index, value)
return this.set(index, value);
Expand Down Expand Up @@ -77,8 +77,21 @@ abstract ObservableArrayView<T>(ArrayView<T>) from ArrayView<T> {
public function keys()
return 0...this.length;

@:op([]) public inline function get(index)
return this.get(index);
@:op([]) public function get(index) {
return
if (AutoObservable.needsTracking(this)) {
var wrappers = AutoObservable.currentAnnex().get(Wrappers).forSource(this);

wrappers.get(index, () -> new TransformObservable(
this,
_ -> this.get(index),
null,
() -> wrappers.remove(index)
#if tink_state.debug , () -> 'Entry $index of ${this.toString()}' #end
)).value;
}
else this.get(index);
}

public function toArray():Array<T>
return this.copy();
Expand Down Expand Up @@ -131,20 +144,29 @@ private interface ArrayView<T> extends ObservableObject<ArrayView<T>> {
function keyValueIterator():ArrayKeyValueIterator<T>;
}

private class ArrayImpl<T> extends Invalidator implements ArrayView<T> {
private class ArrayImpl<T> extends Dispatcher implements ArrayView<T> {

var valid = false;
var entries:Array<T>;
final observableLength:Observable<Int>;

public var length(get, never):Int;
function get_length()
return calc(() -> entries.length);
return observableLength.value;

public function new(entries) {
super(#if tink_state.debug id -> 'ObservableArray#$id${this.entries.toString()}' #end);
super(#if tink_state.debug id -> 'ObservableArray#$id[${this.entries.toString()}]' #end);
this.entries = entries;
this.observableLength = new TransformObservable(this, _ -> this.entries.length, null #if tink_state.debug , () -> 'length of ${toString()}' #end);
this.observableLength = new TransformObservable(
this,
_ -> {
valid = true;
this.entries.length;
},
null,
null
#if tink_state.debug , () -> 'length of ${this.toString()}' #end
);
}

public function replace(values:Array<T>)
Expand Down Expand Up @@ -192,8 +214,10 @@ private class ArrayImpl<T> extends Invalidator implements ArrayView<T> {
public function shift()
return update(() -> entries.shift());

public function get(index:Int)
return calc(() -> entries[index]);
public function get(index:Int) {
valid = true;
return entries[index];
}

public function set(index:Int, value:T)
return update(() -> entries[index] = value);
Expand Down Expand Up @@ -226,7 +250,7 @@ private class ArrayImpl<T> extends Invalidator implements ArrayView<T> {
var ret = fn();
if (valid) {
valid = false;
fire();
fire(this);
}
return ret;
}
Expand All @@ -238,11 +262,46 @@ private class ArrayImpl<T> extends Invalidator implements ArrayView<T> {
}
}

private class Wrappers {
final bySource = new Map<{}, SourceWrappers<Dynamic>>();

public function new(target:{}) {}

public function forSource<T>(source:ArrayView<T>):SourceWrappers<T>
return cast switch bySource[source] {
case null: bySource[source] = new SourceWrappers<T>(() -> bySource.remove(source));
case v: v;
}
}

private class SourceWrappers<T> {
final dispose:()->Void;
var count = 0;
final observables = new Map<Int, Observable<T>>();

public function new(dispose)
this.dispose = dispose;

public function get(index, create:() -> Observable<T>):Observable<T>
return switch observables[index] {
case null:
count++;
observables[index] = create();
case v: v;
}

public function remove(index:Int) {
if (observables.remove(index) && (--count == 0)) dispose();
}
}

private class DerivedView<T> implements ArrayView<T> {

final observableLength:Observable<Int>;

public var length(get, never):Int;
function get_length()
return o.value.length;
return observableLength.value;

final o:Observable<Array<T>>;

Expand All @@ -252,11 +311,19 @@ private class DerivedView<T> implements ArrayView<T> {
public function canFire()
return self().canFire();

public function new(o)
public function new(o) {
this.o = o;
this.observableLength = new TransformObservable(
o,
a -> a.length,
null,
null
#if tink_state.debug , () -> 'length of ${toString()}' #end
);
}

public function get(index:Int)
return o.value[index];
return self().getValue()[index];

inline function self()
return (o:ObservableObject<Array<T>>);
Expand All @@ -279,8 +346,11 @@ private class DerivedView<T> implements ArrayView<T> {
public function isValid()
return self().isValid();

public function onInvalidate(i)
return self().onInvalidate(i);
public function subscribe(i)
self().subscribe(i);

public function unsubscribe(i)
self().unsubscribe(i);

public function copy()
return o.value.copy();
Expand All @@ -297,4 +367,7 @@ private class DerivedView<T> implements ArrayView<T> {
public function keyValueIterator()
return o.value.keyValueIterator();

function retain() {}
function release() {}

}
Loading