Skip to content
This repository has been archived by the owner on Sep 14, 2023. It is now read-only.

Commit

Permalink
fix: flatten cleanup (#1100)
Browse files Browse the repository at this point in the history
  • Loading branch information
tjjfvi committed Jun 23, 2023
1 parent 27f08d5 commit 67b2e59
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 65 deletions.
14 changes: 12 additions & 2 deletions fluent/ConnectionRune.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { Connection, ConnectionError, RpcSubscriptionMessage, ServerError } from
import { is, MetaRune, Run, Rune, RunicArgs, Runner, RunStream } from "../rune/mod.ts"

class RunConnection extends Run<Connection, never> {
controller = new AbortController()
constructor(
ctx: Runner,
readonly initConnection: (signal: AbortSignal) => Connection | Promise<Connection>,
Expand All @@ -12,7 +13,11 @@ class RunConnection extends Run<Connection, never> {

connection?: Connection
async _evaluate(): Promise<Connection> {
return this.connection ??= await this.initConnection(this.signal)
return this.connection ??= await this.initConnection(this.controller.signal)
}

override cleanup(): void {
this.controller.abort()
}
}

Expand Down Expand Up @@ -61,6 +66,7 @@ export class ConnectionRune<U> extends Rune<Connection, U> {
}

class RunRpcSubscription extends RunStream<RpcSubscriptionMessage> {
controller = new AbortController()
constructor(
ctx: Runner,
connection: Connection,
Expand All @@ -74,7 +80,11 @@ class RunRpcSubscription extends RunStream<RpcSubscriptionMessage> {
unsubscribeMethod,
params,
(value) => this.push(value),
this.signal,
this.controller.signal,
)
}

override cleanup(): void {
this.controller.abort()
}
}
41 changes: 27 additions & 14 deletions rune/MetaRune.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ export class OrthoRune<T, U1, U2> extends Rune<Run<T, U1>, U2> {
class RunFlat<T, U1, U2> extends Run<T, U1 | U2> {
child
constructor(
readonly runner: Runner,
runner: Runner,
child: Rune<Rune<T, U1>, U2>,
) {
super(runner)
this.child = runner.prime(child, this.signal)
this.child = this.use(child)
}

lastChildReceipt = new Receipt()
Expand All @@ -62,10 +62,12 @@ class RunFlat<T, U1, U2> extends Run<T, U1 | U2> {
const rune = await this.child.evaluate(time, receipt)
if (!receipt.ready) return null!
if (receipt.novel) {
// TODO: prime before aborting?
this.innerController.abort()
this.innerController = new AbortController()
this.currentInner = this.runner.prime(rune, this.innerController.signal)
// TODO: prime before dereferencing?
if (this.currentInner) {
this.currentInner.dereference()
}
this.currentInner = this.runner.prime(rune)
this.currentInner.reference()
}
const _receipt = new Receipt()
try {
Expand All @@ -84,7 +86,9 @@ class RunFlat<T, U1, U2> extends Run<T, U1 | U2> {
}

override cleanup(): void {
this.innerController.abort()
if (this.currentInner) {
this.dependencies.push(this.currentInner)
}
super.cleanup()
}
}
Expand Down Expand Up @@ -112,13 +116,20 @@ class OrthoRunner extends Runner {
this.parent.memo.set(rune._prime, run)
return run
}

override onCleanup(run: Run<unknown, unknown>): void {
super.onCleanup(run)
if (run._sources.length) {
this.parent.onCleanup(run)
}
}
}

class RunFlatSingular<T, U1, U2> extends Run<T, U1 | U2> {
child
constructor(readonly runner: Runner, child: Rune<Run<T, U1>, U2>) {
constructor(runner: Runner, child: Rune<Run<T, U1>, U2>) {
super(runner)
this.child = runner.prime(child, this.signal)
this.child = this.use(child)
}

async _evaluate(time: number, receipt: Receipt) {
Expand All @@ -129,22 +140,24 @@ class RunFlatSingular<T, U1, U2> extends Run<T, U1 | U2> {

class RunAsOrtho<T, U1, U2> extends Run<Run<T, U1>, U2> {
child
constructor(readonly runner: Runner, child: Rune<Rune<T, U1>, U2>) {
constructor(runner: Runner, child: Rune<Rune<T, U1>, U2>) {
super(runner)
this.child = runner.prime(child, this.signal)
this.child = this.use(child)
}

async _evaluate(time: number, receipt: Receipt) {
const orthoRunner = new OrthoRunner(this.runner, time)
const rune = await this.child.evaluate(time, receipt)
return orthoRunner.prime(rune, undefined)
return orthoRunner.prime(rune)
}
}

class RunWrapOrtho<T, U> extends Run<T, U> {
constructor(readonly runner: OrthoRunner, readonly child: Run<T, U>) {
declare runner: OrthoRunner

constructor(runner: OrthoRunner, readonly child: Run<T, U>) {
super(runner)
child.reference(this.signal)
this.useRun(child)
}

_evaluate(): Promise<T> {
Expand Down
89 changes: 58 additions & 31 deletions rune/Rune.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,27 @@ export abstract class Runner {
protected abstract _prime<T, U>(rune: Rune<T, U>): Run<T, U>

memo = new Map<(runner: Runner) => Run<any, any>, Run<any, any>>()
prime<T, U>(rune: _.Rune<T, U>, signal: AbortSignal | undefined): Run<T, U> {
prime<T, U>(rune: Rune<T, U>): Run<T, U> {
const run = getOrInit(this.memo, rune._prime, () => {
const old = this._currentTrace
this._currentTrace = rune._trace
const run = this._prime(rune)
this._currentTrace = old
run.signal.addEventListener("abort", () => {
this.memo.delete(rune._prime)
})
run._sources.push(rune._prime)
return run
})
if (signal) run.reference(signal)
return run
}

getPrimed<T, U>(rune: _.Rune<T, U>): Run<T, U> | undefined {
getPrimed<T, U>(rune: Rune<T, U>): Run<T, U> | undefined {
return this.memo.get(rune._prime)
}

onCleanup(run: Run<unknown, unknown>) {
for (const source of run._sources) {
this.memo.delete(source)
}
}
}

class RootRunner extends Runner {
Expand Down Expand Up @@ -84,8 +87,8 @@ export class Rune<out T, out U = never> {
}

async *iter(runner: Runner = globalRunner) {
const abortController = new AbortController()
const primed = runner.prime(this, abortController.signal)
const primed = runner.prime(this)
primed.reference()
let time = runner.timeline.current
try {
while (time !== Infinity) {
Expand All @@ -98,7 +101,7 @@ export class Rune<out T, out U = never> {
time = receipt.nextTime
}
} finally {
abortController.abort()
primed.dereference()
}
}

Expand Down Expand Up @@ -172,7 +175,7 @@ export class Rune<out T, out U = never> {
)
}

static asyncIter<T>(fn: (signal: AbortSignal) => AsyncIterable<T>): Rune.ValueRune<T, never> {
static asyncIter<T>(fn: () => AsyncIterable<T>): Rune.ValueRune<T, never> {
return Rune.ValueRune.new(RunAsyncIter, fn)
}

Expand Down Expand Up @@ -203,8 +206,8 @@ export class Rune<out T, out U = never> {

static pin<T, U>(rune: Rune<T, U>, pinned: Rune<unknown, unknown>): Rune<T, U> {
return new Rune((runner) => {
const run = runner.prime(rune, undefined)
runner.prime(pinned, run.signal)
const run = runner.prime(rune)
run.use(pinned)
return run
})
}
Expand All @@ -216,27 +219,56 @@ export abstract class Run<T, U> {
order: number
timeline

abortController = new AbortController()
signal = this.abortController.signal
constructor(runner: Runner) {
this.signal.addEventListener("abort", () => this.cleanup())
constructor(readonly runner: Runner) {
this.trace = runner._currentTrace
?? new Trace(`execution of the ${new.target.name} instantiated`)
this.order = runner.order
this.timeline = runner.timeline
}

dependencies: Run<unknown, unknown>[] = []

use<T, U>(rune: Rune<T, U>): Run<T, U> {
const run = this.runner.prime(rune)
this.useRun(run)
return run
}

useRun(run: Run<unknown, unknown>) {
run.reference()
this.dependencies.push(run)
}

referenceCount = 0
reference(signal: AbortSignal) {
alive = true

reference() {
if (!this.alive) throw new Error("cannot reference a dead rune")
this.referenceCount++
signal.addEventListener("abort", () => {
if (!--this.referenceCount) {
this.abortController.abort()
}

_sources: Array<(runner: Runner) => Run<any, any>> = []
dereference(cleanupBatches?: Run<unknown, unknown>[][]) {
if (!--this.referenceCount) {
this.alive = false
this.cleanup()
this.runner.onCleanup(this)
if (cleanupBatches) {
cleanupBatches.push(this.dependencies)
} else {
const cleanupBatches = [this.dependencies]
while (cleanupBatches.length) {
const batch = cleanupBatches.pop()!
for (const run of batch) {
run.dereference(cleanupBatches)
}
}
}
})
}
}

cleanup() {}

_currentTime = -1
_currentPromise: Promise<T> = null!
_currentReceipt = new Receipt()
Expand All @@ -258,11 +290,6 @@ export abstract class Run<T, U> {
}
}
abstract _evaluate(time: number, receipt: Receipt): Promise<T>

alive = true
cleanup() {
this.alive = false
}
}

class RunConstant<T> extends Run<T, never> {
Expand All @@ -279,7 +306,7 @@ class RunLs<T, U> extends Run<T[], U> {
children
constructor(runner: Runner, children: Rune<T, U>[]) {
super(runner)
this.children = children.map((child) => runner.prime(child, this.signal))
this.children = children.map((child) => this.use(child))
}

_evaluate(time: number, receipt: Receipt) {
Expand Down Expand Up @@ -335,10 +362,10 @@ export abstract class RunStream<T> extends Run<T, never> {
}

class RunAsyncIter<T> extends RunStream<T> {
constructor(runner: Runner, fn: (signal: AbortSignal) => AsyncIterable<T>) {
constructor(runner: Runner, fn: () => AsyncIterable<T>) {
super(runner)
;(async () => {
for await (const value of fn(this.signal)) {
for await (const value of fn()) {
this.push(value)
}
this.finish()
Expand All @@ -356,7 +383,7 @@ class RunBubbleUnhandled<T, U> extends Run<T, never> {
child
constructor(runner: Runner, child: Rune<T, U>, readonly symbol: symbol) {
super(runner)
this.child = runner.prime(child, this.signal)
this.child = this.use(child)
}

async _evaluate(time: number, receipt: Receipt) {
Expand All @@ -375,7 +402,7 @@ class RunCaptureUnhandled<T, U1, U2> extends Run<T, U1 | U2> {
child
constructor(runner: Runner, child: Rune<T, U1>, readonly symbol: symbol) {
super(runner)
this.child = runner.prime(child, this.signal)
this.child = this.use(child)
}

async _evaluate(time: number, receipt: Receipt) {
Expand Down
Loading

0 comments on commit 67b2e59

Please sign in to comment.