Skip to content

Commit

Permalink
Fix JS stdout/stderr line scanning
Browse files Browse the repository at this point in the history
  • Loading branch information
05nelsonm committed Mar 6, 2024
1 parent 82fd911 commit 8228e0a
Show file tree
Hide file tree
Showing 9 changed files with 112 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -262,11 +262,8 @@ abstract class ProcessBaseTest {
.waitForAsync()

assertEquals(0, exitCode)

// Node.js is awful and adds blank lines at dumb intervals.
// We just want to ensure that the data made it to the other
// end.
assertEquals((expected.size * 2) + if (IsNodeJs) 4 else 0, actual.size)
assertEquals(expected.size * 2, actual.size)
assertContentEquals(expected + expected, actual)
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,36 +19,12 @@ import io.matthewnelson.kmp.file.IOException
import kotlin.jvm.JvmSynthetic

@Suppress("UNUSED")
internal class BufferedLineScanner private constructor(
internal class StreamLineScanner private constructor(
readBufferSize: Int,
stream: ReadStream,
dispatchLine: (line: String) -> Unit,
onStopped: () -> Unit,
) {

private val overflow = ArrayList<ByteArray>(1)

private fun ArrayList<ByteArray>.consumeAndJoin(append: String): String {
if (isEmpty()) return append

var size = append.length
forEach { size += it.size }
val sb = StringBuilder(size)

while (isNotEmpty()) {
val segment = removeAt(0)
sb.append(segment.decodeToString())
segment.fill(0)
}

sb.append(append)

val s = sb.toString()
sb.clear()
repeat(size) { sb.append(' ') }

return s
}
stopped: () -> Unit,
): BufferedLineScanner(dispatchLine) {

init {
val buf = ByteArray(readBufferSize)
Expand All @@ -65,27 +41,12 @@ internal class BufferedLineScanner private constructor(
// and we can end early (before process destruction).
if (read <= 0) break

var iNext = 0
for (i in 0 until read) {
if (buf[i] != N) continue
val line = overflow.consumeAndJoin(append = buf.decodeToString(iNext, i))
iNext = i + 1

dispatchLine(line)
}

if (iNext == read) continue
overflow.add(buf.copyOfRange(iNext, read))
}

if (overflow.isNotEmpty()) {
val s = overflow.consumeAndJoin(append = "")
dispatchLine(s)
onData(buf, read)
}

buf.fill(0)

onStopped()
buf.fill(0)
stopped()
}

internal companion object {
Expand All @@ -102,8 +63,6 @@ internal class BufferedLineScanner private constructor(
readBufferSize: Int,
dispatchLine: (line: String) -> Unit,
onStopped: () -> Unit,
) { BufferedLineScanner(readBufferSize, this, dispatchLine, onStopped) }

private const val N = '\n'.code.toByte()
) { StreamLineScanner(readBufferSize, this, dispatchLine, onStopped) }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright (c) 2024 Matthew Nelson
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package io.matthewnelson.kmp.process.internal

internal abstract class BufferedLineScanner(
private val dispatchLine: (line: String) -> Unit,
) {

private val overflow = ArrayList<ByteArray>(1)

private fun ArrayList<ByteArray>.consumeAndJoin(append: String): String {
if (isEmpty()) return append

var size = append.length
forEach { size += it.size }
val sb = StringBuilder(size)

while (isNotEmpty()) {
val segment = removeAt(0)
sb.append(segment.decodeToString())
segment.fill(0)
}

sb.append(append)

val s = sb.toString()
sb.clear()
repeat(size) { sb.append(' ') }

return s
}

protected fun onData(data: ByteArray, len: Int = data.size) {
var iNext = 0
for (i in 0 until len) {
if (data[i] != N) continue
val line = overflow.consumeAndJoin(append = data.decodeToString(iNext, i))
iNext = i + 1

dispatchLine(line)
}

if (iNext == len) return
overflow.add(data.copyOfRange(iNext, len))
}

protected fun onStopped() {
if (overflow.isEmpty()) return
val s = overflow.consumeAndJoin(append = "")
dispatchLine(s)
}

internal companion object {

internal const val N: Byte = '\n'.code.toByte()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package io.matthewnelson.kmp.process.internal

import io.matthewnelson.kmp.file.*
import io.matthewnelson.kmp.process.internal.BufferedLineScanner.Companion.N
import org.khronos.webgl.ArrayBufferView
import kotlin.contracts.ExperimentalContracts
import kotlin.contracts.InvocationKind
Expand Down Expand Up @@ -80,25 +81,24 @@ internal inline fun stream_Readable.onClose(

@Suppress("NOTHING_TO_INLINE")
internal inline fun stream_Readable.onData(
noinline block: (data: String) -> Unit,
noinline block: (data: ByteArray) -> Unit,
): stream_Readable {
val cb: (chunk: dynamic) -> Unit = { chunk ->
// can be either a String or a Buffer (fucking stupid...)

val result = try {
val buf = Buffer.wrap(chunk)
val utf8 = buf.toUtf8()
buf.fill()
utf8
} catch (_: IllegalArgumentException) {
try {
chunk as String
} catch (_: ClassCastException) {
null
val result = Buffer.wrap(chunk).let { buf ->
val len = buf.length.toInt()
if (len == 0) return@let null

val b = ByteArray(len)
for (i in b.indices) {
b[i] = buf.readInt8(i)
}
buf.fill()
b
}

if (!result.isNullOrEmpty()) block(result)
if (result != null) block(result)
}

return on("data", cb)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,34 +80,27 @@ internal class NodeJsProcess internal constructor(
}

override fun startStdout() {
jsProcess.stdout
?.onClose(::onStdoutStopped)
?.onData { data ->
data.dispatchLinesTo(::dispatchStdout)
val stdout = jsProcess.stdout ?: return

object : BufferedLineScanner(::dispatchStdout) {
init {
stdout.onClose {
onStopped()
onStdoutStopped()
}.onData(::onData)
}
}
}

override fun startStderr() {
jsProcess.stderr
?.onClose(::onStderrStopped)
?.onData { data ->
data.dispatchLinesTo(::dispatchStderr)
}
}
val stderr = jsProcess.stderr ?: return

@Suppress("NOTHING_TO_INLINE")
private inline fun String.dispatchLinesTo(
dispatch: (line: String) -> Unit,
) {
val lines = lines()
val iLast = lines.lastIndex
for (i in lines.indices) {
val line = lines[i]
if (i == iLast && line.isEmpty()) {
// If data ended with a return, skip it
continue
} else {
dispatch(line)
object : BufferedLineScanner(::dispatchStderr) {
init {
stderr.onClose {
onStopped()
onStderrStopped()
}.onData(::onData)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import io.matthewnelson.kmp.process.Output
import io.matthewnelson.kmp.process.Process
import io.matthewnelson.kmp.process.Signal
import io.matthewnelson.kmp.process.Stdio
import io.matthewnelson.kmp.process.internal.BufferedLineScanner.Companion.N
import org.khronos.webgl.Int8Array

// jsMain
Expand Down Expand Up @@ -239,8 +240,6 @@ internal actual class PlatformBuilder private actual constructor() {
return result
}

private const val N = '\n'.code.toByte()

@Suppress("NOTHING_TO_INLINE")
private inline fun Buffer.toUtf8Trimmed(): String {
var limit = length.toInt()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package io.matthewnelson.kmp.process.internal
import io.matthewnelson.kmp.file.ANDROID
import io.matthewnelson.kmp.file.File
import io.matthewnelson.kmp.process.AsyncWriteStream
import io.matthewnelson.kmp.process.internal.BufferedLineScanner.Companion.scanLines
import io.matthewnelson.kmp.process.internal.StreamLineScanner.Companion.scanLines
import io.matthewnelson.kmp.process.Process
import io.matthewnelson.kmp.process.Signal
import io.matthewnelson.kmp.process.Stdio
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@
package io.matthewnelson.kmp.process.internal

import io.matthewnelson.kmp.file.IOException
import io.matthewnelson.kmp.process.internal.BufferedLineScanner.Companion.scanLines
import io.matthewnelson.kmp.process.internal.StreamLineScanner.Companion.scanLines
import kotlin.math.min
import kotlin.test.Test
import kotlin.test.assertEquals

class BufferedLineScannerUnitTest {
class StreamLineScannerUnitTest {

private class TestStream(text: String): ReadStream() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import io.matthewnelson.kmp.file.IOException
import io.matthewnelson.kmp.process.AsyncWriteStream
import io.matthewnelson.kmp.process.Process
import io.matthewnelson.kmp.process.Signal
import io.matthewnelson.kmp.process.internal.BufferedLineScanner.Companion.scanLines
import io.matthewnelson.kmp.process.internal.StreamLineScanner.Companion.scanLines
import io.matthewnelson.kmp.process.internal.Closeable.Companion.tryCloseSuppressed
import io.matthewnelson.kmp.process.internal.stdio.StdioHandle
import kotlinx.cinterop.*
Expand Down

0 comments on commit 8228e0a

Please sign in to comment.