Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove now-unnecessary OutputFeed.Handler.onStopped* functions #96

Merged
merged 1 commit into from
Apr 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions library/process/api/process.api
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,6 @@ public abstract class io/matthewnelson/kmp/process/OutputFeed$Handler : io/matth
public synthetic fun <init> (Lio/matthewnelson/kmp/process/Stdio$Config;Lkotlin/jvm/internal/DefaultConstructorMarker;)V
protected final fun dispatchStderr (Ljava/lang/String;)V
protected final fun dispatchStdout (Ljava/lang/String;)V
protected final fun onStderrStopped ()V
protected final fun onStdoutStopped ()V
protected abstract fun startStderr ()V
protected abstract fun startStdout ()V
public final fun stderrFeed (Lio/matthewnelson/kmp/process/OutputFeed;)Lio/matthewnelson/kmp/process/Process;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,15 +134,13 @@ internal fun PlatformBuilder.blockingOutput(

internal fun ReadStream.scanLines(
dispatch: (line: String?) -> Unit,
onCompletion: () -> Unit,
) { scanLines(1024 * 8, dispatch, onCompletion) }
) { scanLines(1024 * 8, dispatch) }

@OptIn(InternalProcessApi::class)
@Throws(IllegalArgumentException::class)
internal fun ReadStream.scanLines(
bufferSize: Int,
dispatch: (line: String?) -> Unit,
onCompletion: () -> Unit,
) {

val stream = this
Expand All @@ -167,6 +165,4 @@ internal fun ReadStream.scanLines(
// Process.destroy closes the streams
buf.buf.fill(0)
feed.close()

onCompletion()
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import kotlinx.coroutines.delay
import kotlin.concurrent.Volatile
import kotlin.jvm.JvmField
import kotlin.jvm.JvmSynthetic
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds

/**
Expand Down Expand Up @@ -197,11 +196,12 @@ public fun interface OutputFeed {
}
}

protected fun dispatchStdout(line: String?) { stdoutFeeds.dispatch(line) }
protected fun dispatchStderr(line: String?) { stderrFeeds.dispatch(line) }

protected fun onStdoutStopped() { stdoutFeeds.withLock { clear(); stdoutStopped = true } }
protected fun onStderrStopped() { stderrFeeds.withLock { clear(); stderrStopped = true } }
protected fun dispatchStdout(line: String?) {
stdoutFeeds.dispatch(line, onClosed = { stdoutStopped = true })
}
protected fun dispatchStderr(line: String?) {
stderrFeeds.dispatch(line, onClosed = { stderrStopped = true })
}

protected abstract fun startStdout()
protected abstract fun startStderr()
Expand Down Expand Up @@ -233,12 +233,22 @@ public fun interface OutputFeed {
private inline val This: Process get() = this as Process

@Suppress("NOTHING_TO_INLINE")
private inline fun SynchronizedSet<OutputFeed>.dispatch(line: String?) {
private inline fun SynchronizedSet<OutputFeed>.dispatch(
line: String?,
crossinline onClosed: () -> Unit,
) {
withLock { toSet() }.forEach { feed ->
try {
feed.onOutput(line)
} catch (_: Throwable) {}
} catch (_: Throwable) {
// TODO: exception handler
}
}

if (line != null) return

// null was dispatched indicating that the stream stopped.
withLock { clear(); onClosed() }
}

@JvmSynthetic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ internal class NodeJsProcess internal constructor(
ReadBuffer.lineOutputFeed(::dispatchStdout).apply {
stdout.onClose {
close()
onStdoutStopped()
}.onData { data ->
onData(data, data.capacity())
}
Expand All @@ -97,7 +96,6 @@ internal class NodeJsProcess internal constructor(
ReadBuffer.lineOutputFeed(::dispatchStderr).apply {
stderr.onClose {
close()
onStderrStopped()
}.onData { data ->
onData(data, data.capacity())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,13 +117,13 @@ internal class JvmProcess private constructor(

override fun startStdout() {
Runnable {
jProcess.inputStream.scanLines(::dispatchStdout, ::onStdoutStopped)
jProcess.inputStream.scanLines(::dispatchStdout)
}.execute(stdio = "stdout")
}

override fun startStderr() {
Runnable {
jProcess.errorStream.scanLines(::dispatchStderr, ::onStderrStopped)
jProcess.errorStream.scanLines(::dispatchStderr)
}.execute(stdio = "stderr")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,30 +63,25 @@ class ScanLinesUnitTest {
append("END")
})

var completion = 0
var nullOutput = false
s.scanLines(
bufferSize,
dispatch = { line ->
// anymore output should fail
assertFalse(nullOutput)
s.scanLines(bufferSize) { line ->
// anymore output should fail
assertFalse(nullOutput)

if (line == null) {
nullOutput = true
return@scanLines
}
if (line == null) {
nullOutput = true
return@scanLines
}

lines.add(line)
},
onCompletion = { completion++; assertTrue(nullOutput) }
)
lines.add(line)
}


assertEquals(3, lines.size)
assertEquals(lengthFirst, lines.first().length)
assertEquals("Hello World!", lines[1])
assertEquals("END", lines.last())
assertEquals(1, completion)
assertTrue(nullOutput)
assertEquals((lengthFirst / bufferSize) + 1, s.numReads)
}

Expand All @@ -99,27 +94,23 @@ class ScanLinesUnitTest {
appendLine("END")
})

var completion = 0
var nullOutput = false
s.scanLines(
dispatch = { line ->
// anymore output should fail
assertFalse(nullOutput)
s.scanLines { line ->
// anymore output should fail
assertFalse(nullOutput)

if (line == null) {
nullOutput = true
return@scanLines
}
if (line == null) {
nullOutput = true
return@scanLines
}

lines.add(line)
},
onCompletion = { completion++; assertTrue(nullOutput) }
)
lines.add(line)
}

assertEquals(2, lines.size)
assertEquals("Hello World!", lines.first())
assertEquals("END", lines.last())
assertEquals(1, completion)
assertTrue(nullOutput)
assertEquals(1, s.numReads)
}

Expand All @@ -133,30 +124,26 @@ class ScanLinesUnitTest {
appendLine("END")
})

var completion = 0
var nullOutput = false
s.scanLines(
dispatch = { line ->
// anymore output should fail
assertFalse(nullOutput)
s.scanLines { line ->
// anymore output should fail
assertFalse(nullOutput)

if (line == null) {
nullOutput = true
return@scanLines
}
if (line == null) {
nullOutput = true
return@scanLines
}

lines.add(line)
},
onCompletion = { completion++; assertTrue(nullOutput) }
)
lines.add(line)
}

assertEquals(6, lines.size)
for (i in 1..4) {
assertEquals(0, lines[i].length)
}
assertEquals("Hello World!", lines.first())
assertEquals("END", lines.last())
assertEquals(1, completion)
assertTrue(nullOutput)
assertEquals(1, s.numReads)
}

Expand All @@ -167,27 +154,22 @@ class ScanLinesUnitTest {
val expected = "1234"
val s = TestStream(text = expected)

var completion = 0
var nullOutput = false
s.scanLines(
expected.length,
dispatch = { line ->
// anymore output should fail
assertFalse(nullOutput)

if (line == null) {
nullOutput = true
return@scanLines
}

lines.add(line)
},
onCompletion = { completion++; assertTrue(nullOutput) },
)
s.scanLines(expected.length) { line ->
// anymore output should fail
assertFalse(nullOutput)

if (line == null) {
nullOutput = true
return@scanLines
}

lines.add(line)
}

assertEquals(1, lines.size)
assertEquals(expected, lines.first())
assertEquals(1, completion)
assertTrue(nullOutput)
assertEquals(1, s.numReads)
}

Expand All @@ -199,25 +181,21 @@ class ScanLinesUnitTest {
// as it will throw IOException.
val s = TestStream(text = "")

var completion = 0
var nullOutput = false
s.scanLines(
dispatch = { line ->
// anymore output should fail
assertFalse(nullOutput)
s.scanLines { line ->
// anymore output should fail
assertFalse(nullOutput)

if (line == null) {
nullOutput = true
return@scanLines
}
if (line == null) {
nullOutput = true
return@scanLines
}

lines.add(line)
},
onCompletion = { completion++; assertTrue(nullOutput) }
)
lines.add(line)
}

assertEquals(0, lines.size)
assertEquals(1, completion)
assertTrue(nullOutput)
assertEquals(0, s.numReads)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,14 @@ internal constructor(
if (isDestroyed) return@Instance null
val reader = handle.stdoutReader() ?: return@Instance null

Worker.start("stdout", reader, ::dispatchStdout, ::onStdoutStopped)
Worker.execute("stdout", reader, ::dispatchStdout)
})

private val stderrWorker = Instance(create = {
if (isDestroyed) return@Instance null
val reader = handle.stderrReader() ?: return@Instance null

Worker.start("stderr", reader, ::dispatchStderr, ::onStderrStopped)
Worker.execute("stderr", reader, ::dispatchStderr)
})

override fun destroy(): Process = destroyLock.withLock {
Expand All @@ -94,7 +94,7 @@ internal constructor(
try {
handle.close()
} catch (_: IOException) {
// TODO: Error handler
// TODO: exception handler
}

if (!hasBeenDestroyed) {
Expand Down Expand Up @@ -154,16 +154,15 @@ internal constructor(
override fun startStdout() { stdoutWorker.getOrCreate() }
override fun startStderr() { stderrWorker.getOrCreate() }

private fun Worker.Companion.start(
private fun Worker.Companion.execute(
name: String,
r: ReadStream,
d: (line: String?) -> Unit,
s: () -> Unit,
): Worker {
val w = start(name = "Process[pid=$pid, stdio=$name]")

w.execute(TransferMode.SAFE, { Triple(r, d, s) }) { (reader, dispatch, onStopped) ->
reader.scanLines(dispatch, onStopped)
w.execute(TransferMode.SAFE, { Pair(r, d) }) { (reader, dispatch) ->
reader.scanLines(dispatch)
}

return w
Expand Down
Loading