Skip to content

Commit

Permalink
Parse output data for CLRF
Browse files Browse the repository at this point in the history
  • Loading branch information
05nelsonm committed Apr 1, 2024
1 parent 7802550 commit 937133d
Show file tree
Hide file tree
Showing 20 changed files with 758 additions and 268 deletions.
28 changes: 28 additions & 0 deletions library/process/api/process.api
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,34 @@ public final class io/matthewnelson/kmp/process/Process$Current {
public static final fun pid ()I
}

public final class io/matthewnelson/kmp/process/ReadBuffer {
public static final field Companion Lio/matthewnelson/kmp/process/ReadBuffer$Companion;
public static final synthetic fun box-impl (Ljava/lang/Object;)Lio/matthewnelson/kmp/process/ReadBuffer;
public static final fun capacity-impl (Ljava/lang/Object;)I
public static final fun decodeToUtf8-impl (Ljava/lang/Object;II)Ljava/lang/String;
public fun equals (Ljava/lang/Object;)Z
public static fun equals-impl (Ljava/lang/Object;Ljava/lang/Object;)Z
public static final fun equals-impl0 (Ljava/lang/Object;Ljava/lang/Object;)Z
public static final fun get-impl (Ljava/lang/Object;I)B
public static final fun getBuf-impl (Ljava/lang/Object;)[B
public fun hashCode ()I
public static fun hashCode-impl (Ljava/lang/Object;)I
public fun toString ()Ljava/lang/String;
public static fun toString-impl (Ljava/lang/Object;)Ljava/lang/String;
public final synthetic fun unbox-impl ()Ljava/lang/Object;
}

public final class io/matthewnelson/kmp/process/ReadBuffer$Companion {
public final fun allocate-VWWedcE ()Ljava/lang/Object;
public final fun lineOutputFeed (Lkotlin/jvm/functions/Function1;)Lio/matthewnelson/kmp/process/ReadBuffer$LineOutputFeed;
public final fun of-Armsq0Q ([B)Ljava/lang/Object;
}

public abstract class io/matthewnelson/kmp/process/ReadBuffer$LineOutputFeed {
public abstract fun close ()V
public abstract fun onData-zry8YOI (Ljava/lang/Object;I)V
}

public final class io/matthewnelson/kmp/process/Signal : java/lang/Enum {
public static final field SIGKILL Lio/matthewnelson/kmp/process/Signal;
public static final field SIGTERM Lio/matthewnelson/kmp/process/Signal;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Copyright (c) 2024 Matthew Nelson
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
@file:Suppress("EXPECT_ACTUAL_CLASSIFIERS_ARE_IN_BETA_WARNING")

package io.matthewnelson.kmp.process

import io.matthewnelson.kmp.process.internal.RealLineOutputFeed
import kotlin.jvm.JvmInline

/**
* For internal usage only.
*
* Wrapper class for buffering data to utilize in platform specific
* implementations along with [LineOutputFeed], primarily in order
* to mitigate unnecessarily copying a `Node.js` Buffer to a ByteArray.
*
* @see [LineOutputFeed]
* */
@JvmInline
public actual value class ReadBuffer private actual constructor(private actual val _buf: Any) {

/**
* Public, platform specific access to the underlying array.
* */
public val buf: ByteArray get() = _buf as ByteArray

internal actual fun capacity(): Int = buf.size

@Throws(IndexOutOfBoundsException::class)
internal actual operator fun get(
index: Int,
): Byte = buf[index]

@Throws(IllegalArgumentException::class, IndexOutOfBoundsException::class)
internal actual fun decodeToUtf8(
startIndex: Int,
endIndex: Int,
): String = buf.decodeToString(startIndex, endIndex)

/**
* Scans buffered input and dispatches lines, disregarding
* line breaks CR (`\r`), LF (`\n`), & CRLF (`\r\n`).
*
* After reading operations are exhausted, calling [close] will
* assume any buffered input remaining is terminating and dispatch
* it as a line, and then dispatch `null` to indicate End Of Stream.
*
* This is **NOT** thread safe.
*
* @see [lineOutputFeed]
* */
public actual abstract class LineOutputFeed internal actual constructor() {

@Throws(IllegalArgumentException::class, IndexOutOfBoundsException::class, IllegalStateException::class)
public actual abstract fun onData(buf: ReadBuffer, len: Int)

public actual abstract fun close()
}

public actual companion object {

/**
* Allocates a new buffer with capacity of (8 * 1024) bytes
* */
@InternalProcessApi
public actual fun allocate(): ReadBuffer {
return ReadBuffer(ByteArray(8 * 1024))
}

/**
* Creates a new [LineOutputFeed]
*
* e.g. (Jvm)
*
* val feed = ReadBuffer.lineOutputFeed { line ->
* println(line ?: "--EOS--")
* }
*
* myInputStream.use { iStream ->
* val buf = ReadBuffer.allocate()
*
* try {
* while(true) {
* val read = iStream.read(buf.buf)
* if (read == -1) break
* feed.onData(buf, read)
* }
* } finally {
*
* // done reading, dispatch last line
* // (if buffered), and dispatch null
* // to indicate EOS
* feed.close()
* }
* }
*
* **NOTE:** [dispatch] should not throw exception
* */
@InternalProcessApi
public actual fun lineOutputFeed(
dispatch: (line: String?) -> Unit,
): LineOutputFeed = RealLineOutputFeed(dispatch)

/**
* Wraps a [ByteArray] to use as [ReadBuffer].
*
* @see [buf]
* */
@InternalProcessApi
public fun of(buf: ByteArray): ReadBuffer = ReadBuffer(buf)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@ package io.matthewnelson.kmp.process.internal
import io.matthewnelson.kmp.file.File
import io.matthewnelson.kmp.file.IOException
import io.matthewnelson.kmp.file.InterruptedException
import io.matthewnelson.kmp.process.Output
import io.matthewnelson.kmp.process.Signal
import io.matthewnelson.kmp.process.Stdio
import io.matthewnelson.kmp.process.*
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds

Expand Down Expand Up @@ -133,3 +131,39 @@ internal fun PlatformBuilder.blockingOutput(
p.destroySignal,
)
}

internal fun ReadStream.scanLines(
dispatch: (line: String?) -> Unit,
) { scanLines(1024 * 8, dispatch) }

@OptIn(InternalProcessApi::class)
@Throws(IllegalArgumentException::class)
internal fun ReadStream.scanLines(
bufferSize: Int,
dispatch: (line: String?) -> Unit,
) {

val buf = ReadBuffer.of(ByteArray(bufferSize))
val feed = ReadBuffer.lineOutputFeed(dispatch)

try {
while (true) {
val read = try {
read(buf.buf)
} catch (_: IOException) {
break
}

// If a pipe has no write ends open (i.e. the
// child process exited), a zero read is returned,
// and we can end early (before process destruction).
if (read <= 0) break

feed.onData(buf, read)
}
} finally {
// Process.destroy closes the streams
buf.buf.fill(0)
feed.close()
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright (c) 2024 Matthew Nelson
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package io.matthewnelson.kmp.process

import kotlin.test.*

@OptIn(InternalProcessApi::class)
class ReadBufferLineLineOutputFeedUnitTest {

@Test
fun givenData_whenCRorCRLF_thenAreIgnored() {
val buf = ReadBuffer.allocate()

val b = buildString {
append("Hello\r\n")
append("World\n")
append("Hello\n")
append("World\r\n")
}.encodeToByteArray()

b.forEachIndexed { index, byte -> buf.buf[index] = byte }

var i = 0
val expected = listOf("Hello", "World", "Hello", "World", null)
val scanner = ReadBuffer.lineOutputFeed { line ->
assertEquals(expected[i++], line)
}

scanner.onData(buf, b.size)
scanner.close()
assertEquals(expected.size, i)
}

@Test
fun givenUnterminated_whenDoFinal_thenAssumesTermination() {
val expected = "Not terminated"
val buf = ReadBuffer.allocate()
val b = expected.encodeToByteArray()
b.forEachIndexed { index, byte -> buf.buf[index] = byte }

var actual: String? = null
var nullTerminated = false
val scanner = ReadBuffer.lineOutputFeed { line ->
if (line == null) {
nullTerminated = true
} else {
actual = line
}
}

scanner.onData(buf, b.size)
assertNull(actual)
assertFalse(nullTerminated)

scanner.close()
assertNotNull(actual)
assertTrue(nullTerminated)
assertEquals(expected, actual)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,8 @@ public fun interface OutputFeed {
}
}

protected fun dispatchStdout(line: String) { stdoutFeeds.dispatch(line) }
protected fun dispatchStderr(line: String) { stderrFeeds.dispatch(line) }
protected fun dispatchStdout(line: String?) { stdoutFeeds.dispatch(line) }
protected fun dispatchStderr(line: String?) { stderrFeeds.dispatch(line) }

protected fun onStdoutStopped() { stdoutFeeds.withLock { clear(); stdoutStopped = true } }
protected fun onStderrStopped() { stderrFeeds.withLock { clear(); stderrStopped = true } }
Expand Down Expand Up @@ -231,13 +231,14 @@ public fun interface OutputFeed {
private inline val This: Process get() = this as Process

@Suppress("NOTHING_TO_INLINE")
private inline fun SynchronizedSet<OutputFeed>.dispatch(line: String) {
withLock {
forEach { feed ->
try {
feed.onOutput(line)
} catch (_: Throwable) {}
}
private inline fun SynchronizedSet<OutputFeed>.dispatch(line: String?) {
// TODO: dispatch null to indicate end of stream
if (line == null) return

withLock { toSet() }.forEach { feed ->
try {
feed.onOutput(line)
} catch (_: Throwable) {}
}
}

Expand Down
Loading

0 comments on commit 937133d

Please sign in to comment.