Skip to content

Commit

Permalink
add test for buffer reuse
Browse files Browse the repository at this point in the history
  • Loading branch information
seigert committed Oct 20, 2023
1 parent aea8f85 commit 71bd067
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 5 deletions.
36 changes: 36 additions & 0 deletions io/jvm/src/test/scala/fs2/io/IoPlatformSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,42 @@ class IoPlatformSuite extends Fs2Suite {
// This suite runs for a long time, this avoids timeouts in CI.
override def munitIOTimeout: Duration = 2.minutes

group("readInputStream") {
test("reuses internal buffer on smaller chunks") {
forAllF { (bytes: Array[Byte], chunkSize0: Int) =>
val chunkSize = (chunkSize0 % 20).abs + 1
fs2.Stream
.chunk(Chunk.array(bytes))
.chunkN(chunkSize / 3 + 1)
.unchunks
.covary[IO]
// we know that '.toInputStream' reads by chunk
.through(fs2.io.toInputStream)
.flatMap(is => io.readInputStream(IO(is), chunkSize))
.chunks
.zipWithPrevious
.assertForall {
case (None, _) => true // skip first element
case (_, _: Chunk.Singleton[_]) => true // skip singleton bytes
case (Some(_: Chunk.Singleton[_]), _) => true // skip singleton bytes
case (Some(Chunk.ArraySlice(bs1, o1, l1)), Chunk.ArraySlice(bs2, o2, _)) =>
{
// if first slice buffer is not 'full'
(bs1.length != (o1 + l1)) &&
// we expect that next slice will wrap same buffer
((bs2 eq bs1) && (o2 == o1 + l1))
} || {
// if first slice buffer is 'full'
(bs2.length == (o1 + l1)) &&
// we expect new buffer allocated for next slice
((bs2 ne bs1) && (o2 == 0))
}
case _ => false // unexpected chunk subtype
}
}
}
}

group("readOutputStream") {
test("writes data and terminates when `f` returns") {
forAllF { (bytes: Array[Byte], chunkSize0: Int) =>
Expand Down
7 changes: 2 additions & 5 deletions io/shared/src/main/scala/fs2/io/io.scala
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,8 @@ package object io extends ioplatform {
): F[Option[(Chunk[Byte], Option[(Array[Byte], Int)])]] =
read(is, buf, offset).map { numBytes =>
if (numBytes < 0) None
else if (numBytes == 0) Some(Chunk.empty -> Some(buf -> offset))
else {
if (offset + numBytes == buf.size) Some(Chunk.array(buf, offset, numBytes) -> None)
else Some(Chunk.array(buf, offset, numBytes) -> Some(buf -> (offset + numBytes)))
}
else if (offset + numBytes == buf.size) Some(Chunk.array(buf, offset, numBytes) -> None)
else Some(Chunk.array(buf, offset, numBytes) -> Some(buf -> (offset + numBytes)))
}

private[fs2] def readInputStreamGeneric[F[_]](
Expand Down

0 comments on commit 71bd067

Please sign in to comment.