Skip to content

Commit

Permalink
rename bufferSize and cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
aajtodd committed Sep 8, 2023
1 parent ee38aef commit b0bc151
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import aws.smithy.kotlin.runtime.io.*
import aws.smithy.kotlin.runtime.io.internal.SdkDispatchers
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.job
import kotlinx.coroutines.launch

/**
Expand Down Expand Up @@ -112,19 +111,19 @@ public fun ByteStream.cancel() {
/**
* Return a [Flow] that consumes the underlying [ByteStream] when collected.
*
* @param bufferSizeHint the size of the buffers to emit from the flow. All buffers emitted
* @param bufferSize the size of the buffers to emit from the flow. All buffers emitted
* will be of this size except for the last one which may be less than the requested buffer size.
* This parameter has no effect for the [ByteStream.Buffer] variant. The emitted [ByteArray]
* will be whatever size the in-memory buffer already is in that case.
*/
public fun ByteStream.toFlow(bufferSizeHint: Long = 8192): Flow<ByteArray> = when (this) {
public fun ByteStream.toFlow(bufferSize: Long = 8192): Flow<ByteArray> = when (this) {
is ByteStream.Buffer -> flowOf(bytes())
is ByteStream.ChannelStream -> readFrom().toFlow(bufferSizeHint)
is ByteStream.SourceStream -> readFrom().toFlow(bufferSizeHint).flowOn(SdkDispatchers.IO)
is ByteStream.ChannelStream -> readFrom().toFlow(bufferSize)
is ByteStream.SourceStream -> readFrom().toFlow(bufferSize).flowOn(SdkDispatchers.IO)
}

/**
* Create a [ByteStream] from a [Flow] of individual [ByteArray]'s.
* Create a [ByteStream] from a [Flow] of byte arrays.
*
* @param scope the [CoroutineScope] to use for launching a coroutine to do the collection in.
* @param contentLength the overall content length of the [Flow] (if known). If set this will be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ abstract class ByteStreamFlowTest(
val buffers = mutableListOf<ByteArray>()
flow.toList(buffers)

val totalCollected = buffers.fold(0) { acc, bytes -> acc + bytes.size }
val totalCollected = buffers.sumOf { it.size }
assertEquals(data.size, totalCollected)

if (byteStream is ByteStream.Buffer) {
Expand All @@ -50,12 +50,12 @@ abstract class ByteStreamFlowTest(
}

class FlowToByteStreamTest {
private fun randomByteArray(size: Int): ByteArray = ByteArray(size) { i -> i.toByte() }
private fun testByteArray(size: Int): ByteArray = ByteArray(size) { i -> i.toByte() }

val data = listOf(
randomByteArray(576),
randomByteArray(9172),
randomByteArray(3278),
testByteArray(576),
testByteArray(9172),
testByteArray(3278),
)

@Test
Expand All @@ -79,7 +79,7 @@ abstract class ByteStreamFlowTest(

@Test
fun testContentLengthUnderflow() = runTest {
val advertisedContentLength = data.fold(0L) { acc, bytes -> acc + bytes.size } + 100L
val advertisedContentLength = data.sumOf { it.size } + 100L
testInvalidContentLength(advertisedContentLength, "expected 13126 bytes collected from flow, got 13026")
}

Expand Down Expand Up @@ -110,10 +110,10 @@ abstract class ByteStreamFlowTest(
// cancelling the scope should close/cancel the channel
val waiter = Channel<Unit>(1)
val flow = flow {
emit(randomByteArray(128))
emit(randomByteArray(277))
emit(testByteArray(128))
emit(testByteArray(277))
waiter.receive()
emit(randomByteArray(97))
emit(testByteArray(97))
}

val job = Job()
Expand All @@ -139,10 +139,10 @@ abstract class ByteStreamFlowTest(
// cancelling the channel should cancel the scope (via write failing)
val waiter = Channel<Unit>(1)
val flow = flow {
emit(randomByteArray(128))
emit(randomByteArray(277))
emit(testByteArray(128))
emit(testByteArray(277))
waiter.receive()
emit(randomByteArray(97))
emit(testByteArray(97))
}

val uncaughtExceptions = mutableListOf<Throwable>()
Expand Down

0 comments on commit b0bc151

Please sign in to comment.