Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ContentSourceByteBuffer, ContentSourceString, ChunkAccumulator potential resource leaks #11755

Open
scrat98 opened this issue May 6, 2024 · 2 comments
Assignees

Comments

@scrat98
Copy link
Contributor

scrat98 commented May 6, 2024

Jetty version(s)
12.0.8

Enhancement Description
ContentSourceByteBuffer, ContentSourceString, ChunkAccumulator, ContentSourceConsumer implementations use provided Promise and call promise.failed/succeeded, but don't take into the account that user defined callback may fail and chunk/source won't be released/failed in such case.

if (Content.Chunk.isFailure(chunk))
{
	promise.failed(chunk.getFailure()); // throw exception
	if (!chunk.isLast())
	    source.fail(chunk.getFailure());
	return;
}
// some action that throws exception
chunk.release();

The good reference implementation from my point of view is ContentCopier, that's for succeeded and failed events always release resources

@gregw
Copy link
Contributor

gregw commented May 7, 2024

@scrat98 again note #11598 which deprecates ChunkAccumulator. It is still draft status, but we are working on improving these areas of the code.

@gregw gregw self-assigned this May 7, 2024
@scrat98
Copy link
Contributor Author

scrat98 commented May 7, 2024

@gregw I had reviewed that issue briefly, but as far as I understood the main goal to simplify and unify RBB public API. But current issue is intended to show that there are multiple implementations that do the same "chunk iterating loop"(start-demand-read-process chunk-repeat) and have the same issue about potential resource leaks.

This issue relates to the #11758. The suggestion was to use the same "ChunkIterator" for all "asXXX"(asString/asByteArray and so on) functions. ChunkIterator could be current Flow.Publisher implementation for example. To show the basic idea:

fun asString(source: Content.Source): CompletableFuture<String> {
  val buffer = StringBuilder()
  val callback = Promise.Completable<String>()
  iterate(source,
    onChunk = { buffer.append(it.byteBuffer.asCharBuffer()) },
    onComplete = {
      val result = buffer.toString()
      callback.succeeded(result)
    },
    onError = {
      callback.failed(it)
    }
  )
  return callback
}

fun iterate(
  source: Content.Source,
  onChunk: Consumer<Content.Chunk>,
  onComplete: Runnable,
  onError: Consumer<Throwable>,
) {
  ChunkIterator(source, onChunk, onComplete, onError).iterate()
}

private data class ChunkIterator(
  private val source: Content.Source,
  private val onChunk: Consumer<Content.Chunk>,
  private val onComplete: Runnable,
  private val onError: Consumer<Throwable>,
) {
  private val started = AtomicBoolean(false)

  fun iterate() {
    if (!started.compareAndSet(false, true)) {
      throw IllegalStateException("Iterator already started")
    }
    processNextChunks()
  }

  private fun processNextChunks() {
    while (true) {
      val chunk = source.read()
      if (chunk == null) {
        source.demand(this::processNextChunks)
        return
      }
      runCatching {
        processNextChunk(chunk)
      }.onSuccess {
        chunk.release()
      }.onFailure {
        chunk.release()
        source.fail(it)
        onError.accept(it)
      }
    }
  }

  private fun processNextChunk(chunk: Content.Chunk) {
    if (Content.Chunk.isFailure(chunk)) {
      throw chunk.failure
    }
    onChunk.accept(chunk)
    if (chunk.isLast) {
      onComplete.run()
    }
  }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants