diff --git a/fluent/ConnectionRune.ts b/fluent/ConnectionRune.ts index 72e7f8810..8fd81c0d5 100644 --- a/fluent/ConnectionRune.ts +++ b/fluent/ConnectionRune.ts @@ -3,6 +3,7 @@ import { Connection, ConnectionError, RpcSubscriptionMessage, ServerError } from import { is, MetaRune, Run, Rune, RunicArgs, Runner, RunStream } from "../rune/mod.ts" class RunConnection extends Run { + controller = new AbortController() constructor( ctx: Runner, readonly initConnection: (signal: AbortSignal) => Connection | Promise, @@ -12,7 +13,11 @@ class RunConnection extends Run { connection?: Connection async _evaluate(): Promise { - return this.connection ??= await this.initConnection(this.signal) + return this.connection ??= await this.initConnection(this.controller.signal) + } + + override cleanup(): void { + this.controller.abort() } } @@ -61,6 +66,7 @@ export class ConnectionRune extends Rune { } class RunRpcSubscription extends RunStream { + controller = new AbortController() constructor( ctx: Runner, connection: Connection, @@ -74,7 +80,11 @@ class RunRpcSubscription extends RunStream { unsubscribeMethod, params, (value) => this.push(value), - this.signal, + this.controller.signal, ) } + + override cleanup(): void { + this.controller.abort() + } } diff --git a/rune/MetaRune.ts b/rune/MetaRune.ts index 74ef1d21c..2759be2b6 100644 --- a/rune/MetaRune.ts +++ b/rune/MetaRune.ts @@ -46,11 +46,11 @@ export class OrthoRune extends Rune, U2> { class RunFlat extends Run { child constructor( - readonly runner: Runner, + runner: Runner, child: Rune, U2>, ) { super(runner) - this.child = runner.prime(child, this.signal) + this.child = this.use(child) } lastChildReceipt = new Receipt() @@ -62,10 +62,12 @@ class RunFlat extends Run { const rune = await this.child.evaluate(time, receipt) if (!receipt.ready) return null! if (receipt.novel) { - // TODO: prime before aborting? - this.innerController.abort() - this.innerController = new AbortController() - this.currentInner = this.runner.prime(rune, this.innerController.signal) + // TODO: prime before dereferencing? + if (this.currentInner) { + this.currentInner.dereference() + } + this.currentInner = this.runner.prime(rune) + this.currentInner.reference() } const _receipt = new Receipt() try { @@ -84,7 +86,9 @@ class RunFlat extends Run { } override cleanup(): void { - this.innerController.abort() + if (this.currentInner) { + this.dependencies.push(this.currentInner) + } super.cleanup() } } @@ -112,13 +116,20 @@ class OrthoRunner extends Runner { this.parent.memo.set(rune._prime, run) return run } + + override onCleanup(run: Run): void { + super.onCleanup(run) + if (run._sources.length) { + this.parent.onCleanup(run) + } + } } class RunFlatSingular extends Run { child - constructor(readonly runner: Runner, child: Rune, U2>) { + constructor(runner: Runner, child: Rune, U2>) { super(runner) - this.child = runner.prime(child, this.signal) + this.child = this.use(child) } async _evaluate(time: number, receipt: Receipt) { @@ -129,22 +140,24 @@ class RunFlatSingular extends Run { class RunAsOrtho extends Run, U2> { child - constructor(readonly runner: Runner, child: Rune, U2>) { + constructor(runner: Runner, child: Rune, U2>) { super(runner) - this.child = runner.prime(child, this.signal) + this.child = this.use(child) } async _evaluate(time: number, receipt: Receipt) { const orthoRunner = new OrthoRunner(this.runner, time) const rune = await this.child.evaluate(time, receipt) - return orthoRunner.prime(rune, undefined) + return orthoRunner.prime(rune) } } class RunWrapOrtho extends Run { - constructor(readonly runner: OrthoRunner, readonly child: Run) { + declare runner: OrthoRunner + + constructor(runner: OrthoRunner, readonly child: Run) { super(runner) - child.reference(this.signal) + this.useRun(child) } _evaluate(): Promise { diff --git a/rune/Rune.ts b/rune/Rune.ts index 66df043a6..9669b7481 100644 --- a/rune/Rune.ts +++ b/rune/Rune.ts @@ -15,24 +15,27 @@ export abstract class Runner { protected abstract _prime(rune: Rune): Run memo = new Map<(runner: Runner) => Run, Run>() - prime(rune: _.Rune, signal: AbortSignal | undefined): Run { + prime(rune: Rune): Run { const run = getOrInit(this.memo, rune._prime, () => { const old = this._currentTrace this._currentTrace = rune._trace const run = this._prime(rune) this._currentTrace = old - run.signal.addEventListener("abort", () => { - this.memo.delete(rune._prime) - }) + run._sources.push(rune._prime) return run }) - if (signal) run.reference(signal) return run } - getPrimed(rune: _.Rune): Run | undefined { + getPrimed(rune: Rune): Run | undefined { return this.memo.get(rune._prime) } + + onCleanup(run: Run) { + for (const source of run._sources) { + this.memo.delete(source) + } + } } class RootRunner extends Runner { @@ -84,8 +87,8 @@ export class Rune { } async *iter(runner: Runner = globalRunner) { - const abortController = new AbortController() - const primed = runner.prime(this, abortController.signal) + const primed = runner.prime(this) + primed.reference() let time = runner.timeline.current try { while (time !== Infinity) { @@ -98,7 +101,7 @@ export class Rune { time = receipt.nextTime } } finally { - abortController.abort() + primed.dereference() } } @@ -172,7 +175,7 @@ export class Rune { ) } - static asyncIter(fn: (signal: AbortSignal) => AsyncIterable): Rune.ValueRune { + static asyncIter(fn: () => AsyncIterable): Rune.ValueRune { return Rune.ValueRune.new(RunAsyncIter, fn) } @@ -203,8 +206,8 @@ export class Rune { static pin(rune: Rune, pinned: Rune): Rune { return new Rune((runner) => { - const run = runner.prime(rune, undefined) - runner.prime(pinned, run.signal) + const run = runner.prime(rune) + run.use(pinned) return run }) } @@ -216,27 +219,56 @@ export abstract class Run { order: number timeline - abortController = new AbortController() - signal = this.abortController.signal - constructor(runner: Runner) { - this.signal.addEventListener("abort", () => this.cleanup()) + constructor(readonly runner: Runner) { this.trace = runner._currentTrace ?? new Trace(`execution of the ${new.target.name} instantiated`) this.order = runner.order this.timeline = runner.timeline } + dependencies: Run[] = [] + + use(rune: Rune): Run { + const run = this.runner.prime(rune) + this.useRun(run) + return run + } + + useRun(run: Run) { + run.reference() + this.dependencies.push(run) + } + referenceCount = 0 - reference(signal: AbortSignal) { + alive = true + + reference() { if (!this.alive) throw new Error("cannot reference a dead rune") this.referenceCount++ - signal.addEventListener("abort", () => { - if (!--this.referenceCount) { - this.abortController.abort() + } + + _sources: Array<(runner: Runner) => Run> = [] + dereference(cleanupBatches?: Run[][]) { + if (!--this.referenceCount) { + this.alive = false + this.cleanup() + this.runner.onCleanup(this) + if (cleanupBatches) { + cleanupBatches.push(this.dependencies) + } else { + const cleanupBatches = [this.dependencies] + while (cleanupBatches.length) { + const batch = cleanupBatches.pop()! + for (const run of batch) { + run.dereference(cleanupBatches) + } + } } - }) + } } + cleanup() {} + _currentTime = -1 _currentPromise: Promise = null! _currentReceipt = new Receipt() @@ -258,11 +290,6 @@ export abstract class Run { } } abstract _evaluate(time: number, receipt: Receipt): Promise - - alive = true - cleanup() { - this.alive = false - } } class RunConstant extends Run { @@ -279,7 +306,7 @@ class RunLs extends Run { children constructor(runner: Runner, children: Rune[]) { super(runner) - this.children = children.map((child) => runner.prime(child, this.signal)) + this.children = children.map((child) => this.use(child)) } _evaluate(time: number, receipt: Receipt) { @@ -335,10 +362,10 @@ export abstract class RunStream extends Run { } class RunAsyncIter extends RunStream { - constructor(runner: Runner, fn: (signal: AbortSignal) => AsyncIterable) { + constructor(runner: Runner, fn: () => AsyncIterable) { super(runner) ;(async () => { - for await (const value of fn(this.signal)) { + for await (const value of fn()) { this.push(value) } this.finish() @@ -356,7 +383,7 @@ class RunBubbleUnhandled extends Run { child constructor(runner: Runner, child: Rune, readonly symbol: symbol) { super(runner) - this.child = runner.prime(child, this.signal) + this.child = this.use(child) } async _evaluate(time: number, receipt: Receipt) { @@ -375,7 +402,7 @@ class RunCaptureUnhandled extends Run { child constructor(runner: Runner, child: Rune, readonly symbol: symbol) { super(runner) - this.child = runner.prime(child, this.signal) + this.child = this.use(child) } async _evaluate(time: number, receipt: Receipt) { diff --git a/rune/ValueRune.ts b/rune/ValueRune.ts index 600aa563a..3eb431c0c 100644 --- a/rune/ValueRune.ts +++ b/rune/ValueRune.ts @@ -182,10 +182,8 @@ class RunMatch extends Run { conditions: [(x: M) => boolean, ValueRune][], ) { super(runner) - this.value = runner.prime(child, this.signal) - this.conditions = conditions.map(([cond, val]) => - [cond, runner.prime(val, this.signal)] as const - ) + this.value = this.use(child) + this.conditions = conditions.map(([cond, val]) => [cond, this.use(val)] as const) } async _evaluate(time: number, receipt: Receipt) { @@ -207,7 +205,7 @@ class RunMap extends Run { readonly fn: (value: T1) => T2 | Promise, ) { super(runner) - this.child = runner.prime(child, this.signal) + this.child = this.use(child) } lastValue: T2 = null! @@ -231,8 +229,8 @@ class RunHandle extends Run | T3, U | alt: Rune, ) { super(runner) - this.child = runner.prime(child, this.signal) - this.alt = runner.prime(alt, this.signal) + this.child = this.use(child) + this.alt = this.use(alt) } async _evaluate(time: number, receipt: Receipt) { @@ -251,7 +249,7 @@ class RunUnhandle extends Run { readonly guard: Guard, ) { super(runner) - this.child = runner.prime(child, this.signal) + this.child = this.use(child) } async _evaluate(time: number, receipt: Receipt) { @@ -269,7 +267,7 @@ class RunThrows extends Run { readonly guards: Array>, ) { super(runner) - this.child = runner.prime(child, this.signal) + this.child = this.use(child) } async _evaluate(time: number, receipt: Receipt) { @@ -288,7 +286,7 @@ class RunGetUnhandled extends Run | null, never> { child constructor(runner: Runner, child: Rune) { super(runner) - this.child = runner.prime(child, this.signal) + this.child = this.use(child) } async _evaluate(time: number, receipt: Receipt) { @@ -314,8 +312,8 @@ class RunRehandle extends Run, ) { super(runner) - this.child = runner.prime(child, this.signal) - this.alt = runner.prime(alt, this.signal) + this.child = this.use(child) + this.alt = this.use(alt) } async _evaluate(time: number, receipt: Receipt) { @@ -334,7 +332,7 @@ class RunLazy extends Run { child constructor(runner: Runner, child: Rune) { super(runner) - this.child = runner.prime(child, this.signal) + this.child = this.use(child) } async _evaluate(time: number, _receipt: Receipt): Promise { @@ -346,7 +344,7 @@ class RunFilter extends Run { child constructor(runner: Runner, child: Rune, readonly fn: (value: T) => boolean) { super(runner) - this.child = runner.prime(child, this.signal) + this.child = this.use(child) } first = true @@ -375,7 +373,7 @@ class RunFinal extends Run { child constructor(runner: Runner, child: Rune) { super(runner) - this.child = runner.prime(child, this.signal) + this.child = this.use(child) } async _evaluate(time: number, receipt: Receipt): Promise { @@ -395,7 +393,7 @@ class RunReduce extends Run { readonly fn: (last: T2, value: T1) => T2 | Promise, ) { super(runner) - this.child = runner.prime(child, this.signal) + this.child = this.use(child) } async _evaluate(time: number, receipt: Receipt) { @@ -415,8 +413,8 @@ class RunChain extends Run { second: Rune, ) { super(runner) - this.first = runner.prime(first, this.signal) - this.second = runner.prime(second, this.signal) + this.first = this.use(first) + this.second = this.use(second) } lastValue: T2 = null!