Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

readInputStream(in, chunkSize) allocates a full chunkSize for each read operation #3106

Closed
jakobmerrild opened this issue Jan 10, 2023 · 22 comments
Labels

Comments

@jakobmerrild
Copy link
Contributor

jakobmerrild commented Jan 10, 2023

When attempting to upgrade from version 3.2.7 to 3.2.8 we encountered an OutOfMemoryError in one of our tests.

Unfortunately we haven't been able to reproduce the error using a minimal example. An attempt can be seen here: https://scastie.scala-lang.org/NflW2ZCIRzSnw7YJ8p5HCQ

Edit: Scastie has been updated to reflect the findings in the comments that the issue likely arises from calling fs2.io.readInputStream with a chunkSize that is (much) larger than the number of bytes read from the underlying stream when its read(buffer, offset, length) method is invoked.

Edit: The below is not really relevant for the issue, but kept for historic purposes


Our code takes an java.io.InputStream and uploads it to AWS S3 using the multipart upload functionality from their SDK

Our test generates a random kilobyte of data and then uploads varying sizes of repeated kilobytes to S3.
While obviously not exactly the same, the Scastie example reproduces the fs2 parts of our implementation as faithfully as possible.

As mentioned the example doesn't actually exhibit the same behavior as our tests. See below for a screenshot of a heapdump captured by setting javaOptions += "-XX:+HeapDumpOnOutOfMemoryError"
image

It looks like the accB field of the OuterRun class for whatever reason builds up a 500MB scodec.bits.ByteVector.Buffer

It might be of interest that our tests don't fail if the test data is only kilobytes large (10-13 KB).

@mpilquist
Copy link
Member

The stack trace in the screenshot seems to be referring to a stream of bytes that's compiled to an F[ByteVector] via src.compile.to(ByteVector). Do you have any code like that in the original code base?

@jakobmerrild
Copy link
Contributor Author

No. The only time we .compile a stream is when we run the parallel uploads where we .compile.toList because we need to mark the multipart upload as completed and need the results from the individual parts to do that.

@diesalbla
Copy link
Contributor

diesalbla commented Jan 11, 2023

@kalanzai In addition to a heap dump, it would help us if you could also run the program while monitoring with the https://www.azul.com/products/components/azul-mission-control/#downloads, or any other tools that can record memory allocations per object and locate where in the code are those rather huge objects being allocated, whether that be within fs2 or without.

@mpilquist
Copy link
Member

The large ByteVector is the result of something in the original code base compiling a Stream[F, Byte] and accumulating a large number of bytes in the collected output. The acc field that shows up in the heap dump is this one:

def byteVector: Builder[Byte, ByteVector] =
new Builder[Byte, ByteVector] {
private[this] var acc = ByteVector.empty
def +=(c: Chunk[Byte]): Unit = acc = acc ++ c.toByteVector
def result: ByteVector = acc
}

There are other ways to hit that code path besides src.compile.to(ByteVector) -- for example, src.compile.to(Array).

As a next step, I'd try to find the byte stream that's being compiled and see why the output is growing so large.

@jakobmerrild
Copy link
Contributor Author

@diesalbla Thanks for the input. I'll have a look at the tool you linked :)

@mpilquist Further investigation might indicate that the error doesn't actually happen during the upload part of our test but rather during re-download of the data.

// Method which returns the data from S3 as an java.io.InputStream
def getInputStreamFromS3Client(): IO[InputStream] = ???

def readBytes(): Array[Byte] = 
  fs2.io.readInputStream(getInputStream(), 10 * 1024).compile.to(Array)

@armanbilge
Copy link
Member

armanbilge commented Jan 11, 2023

@kalanzai try 3.2.7-69-ff9d758-SNAPSHOT and see if you get the memory leak?

https://s01.oss.sonatype.org/content/repositories/snapshots/co/fs2/fs2-core_2.13/3.2.7-69-ff9d758-SNAPSHOT/

And if not, try 3.2.7-74-d83b430-SNAPSHOT and see if you do?

https://s01.oss.sonatype.org/content/repositories/snapshots/co/fs2/fs2-core_2.13/3.2.7-74-d83b430-SNAPSHOT/

@jakobmerrild
Copy link
Contributor Author

@armanbilge Thanks!

Looking at it now we are fairly certain that the issue arises when the the chunkSize passed onto fs2.io.readInputStream is larger than the number of bytes read from the InputStream.read(). In our case it seems that AWS returns at most 16KB of data at a time, but fs2.io.readInputStream allocates 1MB per chunk.

Then when we try to download 10MB of data we end up keeping references to much larger byte vectors (which will be mostly empty).

We suspect that it might be that the reason we didn't have the problem in 3.2.7 is that the array copy removed by this PR #2892 only copied the slice of the array which had actual data in it. Due to the copy the large unused space could then be garbage collected.

@armanbilge
Copy link
Member

armanbilge commented Jan 11, 2023

We suspect that it might be that the reason we didn't have the problem in 3.2.7 is that the array copy removed by this PR #2892 only copied the slice of the array which had actual data in it.

Yup! I came to the same conclusion. Edit: actually, not sure it was that PR, I thought it was this one:

but fs2.io.readInputStream allocates 1MB per chunk.

Oh, it does, where is this?

@jakobmerrild
Copy link
Contributor Author

Well, it allocates the size we specify (which is 1MB)

@jakobmerrild
Copy link
Contributor Author

@jakobmerrild
Copy link
Contributor Author

I was trying to communicate that when we specify 1MB chunksize, but we only read up to 16KB per InputStream.read(buffer) from AWS, then we end up in a situation where reading a 10MB file uses considerably more memory than 10MB

@armanbilge
Copy link
Member

armanbilge commented Jan 11, 2023

Makes sense! Sorry, I thought 1 mb was hard-coded somewhere, thanks for clarifying :)

So is this something we should fix in FS2? There is a trade-off here between avoiding a copy and not keeping references to mostly-unused arrays, and it sounds like in your case the best solution is to tune the chunk size.

@mpilquist
Copy link
Member

Great find folks! I think we should do something in FS2 but not sure what exactly.

@jakobmerrild
Copy link
Contributor Author

We will definitely tune our chunk size.
That being said I think it would be nice if fs2.io.readInputStream(in, chunkSize).compile.to(Array) didn't have a memory footprint of chunkSize * N where N is the number of reads performed on in. Or at least that it documented this somehow.

@armanbilge
Copy link
Member

armanbilge commented Jan 11, 2023

Two ideas:

  1. We can allocate a new Array and copy if we read significantly less than the chunk size. This would mean two array allocations and a copy in the worst case, but no allocations or copies in the best case.

  2. We can use a similar technique as Socket, which keeps a read buffer, and always copies into a new appropriately sized array. Best/worst case are always the same: one array allocation and one copy.

/** Copies the contents of the supplied buffer to a `Chunk[Byte]` and clears the buffer contents. */
private def releaseBuffer(buffer: ByteBuffer): F[Chunk[Byte]] =

@diesalbla
Copy link
Contributor

Not sure if this would be relevant in this context #2863

@diesalbla
Copy link
Contributor

Looking at it now we are fairly certain that the issue arises when the the chunkSize passed onto fs2.io.readInputStream is larger than the number of bytes read from the InputStream.read(). In our case it seems that AWS returns at most 16KB of data at a time, but fs2.io.readInputStream allocates 1MB per chunk.

Perhaps you can use smaller read sizes, and later on if you want to preserve that chunk size for the outputs, you can use the methods chunkN or chunkMin of the Stream class.

As another possibility, you can use the mapChunks function of Stream, and pass it a function that compacts any chunk smaller than a certain size, using the compact method of Chunk.

@jakobmerrild
Copy link
Contributor Author

@diesalbla We're lowering our read sizes to match what we observed at runtime, namely that it seemed the underlying InputStream from AWS returned 16KB of data at a time.
We believe this will allow us to upgrade our fs2 version 👍

@christianharrington
Copy link

This may be outside the scope of fs2, but would a PR implementing an auto-sizing variation of readInputStream have a chance of being merged?

I'm thinking it would start at some relatively small size (e.g. 1KB) and grow by some factor until the underlying InputStream's read() no longer fills the array, up to some bound (e.g. 32KB).

@mpilquist
Copy link
Member

@christianharrington Definitely interested in a PR like that, especially if we can use it in Socket and perhaps places in Files. I'd love to avoid forcing all callers to pass chunk sizes as parameters if we can do a better job with either defaults or dynamic resizing. I suggest opening a draft PR first to get early feedback. Then if we find an approach that sounds good to everyone, we can polish it from there.

@jakobmerrild
Copy link
Contributor Author

I have updated the Scastie so it now better represents the issue. The more interesting part for potential tests is the following somewhat hacky implementation of java.io.InputStream

import java.io.InputStream

/** An InputStream which simulates a slow stream by always only reading 1 byte
  * at a time
  * @param bytes
  *   The bytes that the simulated stream consists of
  */
class SlowTestInputStream(bytes: Array[Byte]) extends InputStream {

  var readBytes: Int = 0
  override def read(): Int = {
    if (readBytes >= bytes.size) {
      -1
    } else {
      val result = bytes(readBytes)
      readBytes += 1
      result.toInt
    }
  }
  override def available(): Int = bytes.size - readBytes
  override def markSupported(): Boolean = false

  override def read(buffer: Array[Byte], off: Int, len: Int): Int = {
    if (len == 0) {
      0
    } else if (readBytes >= bytes.size) {
      -1
    } else {
      buffer(off) = bytes(readBytes)
      readBytes += 1
      1
    }
  }
}

@mpilquist mpilquist changed the title Memory leak in 3.2.8 readInputStream(in, chunkSize) allocates a full chunkSize for each read operation Mar 25, 2023
@christianharrington
Copy link

I think #3318 closes this issue, unless I'm mistaken?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

5 participants