Skip to content

Commit

Permalink
Remove buffer from pool on write failure (#11951)
Browse files Browse the repository at this point in the history
* Experiment with removable buffer from pool
* Changed remove return to be release boolean
* Fixes to avoid double release
* Tracking ByteBufferPool handles remove
* Adding assert on _channelState.isLockHeldByCurrentThread()

---------

Co-authored-by: Joakim Erdfelt <joakim.erdfelt@gmail.com>
  • Loading branch information
gregw and joakime authored Jun 27, 2024
1 parent f89e3b8 commit 5e8cc22
Show file tree
Hide file tree
Showing 8 changed files with 221 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Consumer;
import java.util.function.IntUnaryOperator;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -205,24 +205,49 @@ public RetainableByteBuffer acquire(int size, boolean direct)

// No bucket, return non-pooled.
if (bucket == null)
return newRetainableByteBuffer(size, direct, null);
return RetainableByteBuffer.wrap(BufferUtil.allocate(size, direct));

bucket.recordAcquire();

// Try to acquire a pooled entry.
Pool.Entry<RetainableByteBuffer> entry = bucket.getPool().acquire();
if (entry != null)
if (entry == null)
{
ByteBuffer buffer = BufferUtil.allocate(bucket.getCapacity(), direct);
return new ReservedBuffer(buffer, bucket);
}

bucket.recordPooled();
RetainableByteBuffer buffer = entry.getPooled();
((Buffer)buffer).acquire();
return buffer;
}

@Override
public boolean removeAndRelease(RetainableByteBuffer buffer)
{
RetainableByteBuffer actual = buffer;
while (actual instanceof RetainableByteBuffer.Wrapper wrapper)
actual = wrapper.getWrapped();

if (actual instanceof ReservedBuffer reservedBuffer)
{
// remove the actual reserved buffer, but release the wrapped buffer
reservedBuffer.remove();
return buffer.release();
}

if (actual instanceof Buffer poolBuffer)
{
bucket.recordPooled();
RetainableByteBuffer buffer = entry.getPooled();
((Buffer)buffer).acquire();
return buffer;
// remove the actual pool buffer, but release the wrapped buffer
poolBuffer.remove();
return buffer.release();
}

return newRetainableByteBuffer(bucket.getCapacity(), direct, buffer -> reserve(bucket, buffer));
return ByteBufferPool.super.removeAndRelease(buffer);
}

private void reserve(RetainedBucket bucket, RetainableByteBuffer buffer)
private void reserve(RetainedBucket bucket, ByteBuffer byteBuffer)
{
bucket.recordRelease();

Expand All @@ -235,12 +260,11 @@ private void reserve(RetainedBucket bucket, RetainableByteBuffer buffer)
}

// Add the buffer to the new entry.
ByteBuffer byteBuffer = buffer.getByteBuffer();
BufferUtil.reset(byteBuffer);
Buffer pooledBuffer = new Buffer(byteBuffer, b -> release(bucket, entry));
Buffer pooledBuffer = new Buffer(byteBuffer, bucket, entry);
if (entry.enable(pooledBuffer, false))
{
checkMaxMemory(bucket, buffer.isDirect());
checkMaxMemory(bucket, byteBuffer.isDirect());
return;
}

Expand Down Expand Up @@ -270,6 +294,13 @@ private void release(RetainedBucket bucket, Pool.Entry<RetainableByteBuffer> ent
entry.remove();
}

private boolean remove(RetainedBucket bucket, Pool.Entry<RetainableByteBuffer> entry)
{
// Cannot release, discard this buffer.
bucket.recordRemove();
return entry.remove();
}

private void checkMaxMemory(RetainedBucket bucket, boolean direct)
{
long max = direct ? _maxDirectMemory : _maxHeapMemory;
Expand Down Expand Up @@ -309,14 +340,6 @@ private void evict(long excessMemory, boolean direct)
}
}

private RetainableByteBuffer newRetainableByteBuffer(int capacity, boolean direct, Consumer<RetainableByteBuffer> releaser)
{
ByteBuffer buffer = BufferUtil.allocate(capacity, direct);
Buffer retainableByteBuffer = new Buffer(buffer, releaser);
retainableByteBuffer.acquire();
return retainableByteBuffer;
}

public Pool<RetainableByteBuffer> poolFor(int capacity, boolean direct)
{
RetainedBucket bucket = bucketFor(capacity, direct);
Expand Down Expand Up @@ -581,29 +604,61 @@ private Pool.Entry<RetainableByteBuffer> evict()
}
}

private static class Buffer extends AbstractRetainableByteBuffer
private class ReservedBuffer extends AbstractRetainableByteBuffer
{
private final Consumer<RetainableByteBuffer> _releaser;
private final RetainedBucket _bucket;
private final AtomicBoolean _removed = new AtomicBoolean();

private ReservedBuffer(ByteBuffer buffer, RetainedBucket bucket)
{
super(buffer);
_bucket = Objects.requireNonNull(bucket);
acquire();
}

@Override
public boolean release()
{
boolean released = super.release();
if (released && _removed.compareAndSet(false, true))
reserve(_bucket, getByteBuffer());
return released;
}

boolean remove()
{
// Buffer never added to pool, so just prevent future reservation
return _removed.compareAndSet(false, true);
}
}

private class Buffer extends AbstractRetainableByteBuffer
{
private final RetainedBucket _bucket;
private final Pool.Entry<RetainableByteBuffer> _entry;
private int _usages;

private Buffer(ByteBuffer buffer, Consumer<RetainableByteBuffer> releaser)
private Buffer(ByteBuffer buffer, RetainedBucket bucket, Pool.Entry<RetainableByteBuffer> entry)
{
super(buffer);
this._releaser = releaser;
_bucket = Objects.requireNonNull(bucket);
_entry = Objects.requireNonNull(entry);
}

@Override
public boolean release()
{
boolean released = super.release();
if (released)
{
if (_releaser != null)
_releaser.accept(this);
}
ArrayByteBufferPool.this.release(_bucket, _entry);
return released;
}

boolean remove()
{
return ArrayByteBufferPool.this.remove(_bucket, _entry);
}

private int use()
{
if (++_usages < 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,20 @@ public interface ByteBufferPool
*/
RetainableByteBuffer acquire(int size, boolean direct);

/**
* {@link RetainableByteBuffer#release() Release} the buffer in a way that will remove it from any pool that it may be in.
* If the buffer is not in a pool, calling this method is equivalent to calling {@link RetainableByteBuffer#release()}.
* Calling this method satisfies any contract that requires a call to {@link RetainableByteBuffer#release()}.
* @return {@code true} if a call to {@link RetainableByteBuffer#release()} would have returned {@code true}.
* @see RetainableByteBuffer#release()
* @deprecated This API is experimental and may be removed in future releases
*/
@Deprecated
default boolean removeAndRelease(RetainableByteBuffer buffer)
{
return buffer != null && buffer.release();
}

/**
* <p>Removes all {@link RetainableByteBuffer#isRetained() non-retained}
* pooled instances from this pool.</p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.sameInstance;
import static org.hamcrest.core.Is.is;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

Expand Down Expand Up @@ -444,4 +445,43 @@ public void testReleaseExcessMemory()
assertThat(compoundPool.getPrimaryPool().size(), is(ConcurrentPool.OPTIMAL_MAX_SIZE));
assertThat(compoundPool.getSecondaryPool().size(), is(0));
}

@Test
public void testRemoveAndRelease()
{
ArrayByteBufferPool pool = new ArrayByteBufferPool();

RetainableByteBuffer reserved0 = pool.acquire(1024, false);
RetainableByteBuffer reserved1 = pool.acquire(1024, false);

RetainableByteBuffer acquired0 = pool.acquire(1024, false);
acquired0.release();
acquired0 = pool.acquire(1024, false);
RetainableByteBuffer acquired1 = pool.acquire(1024, false);
acquired1.release();
acquired1 = pool.acquire(1024, false);

RetainableByteBuffer retained0 = pool.acquire(1024, false);
retained0.release();
retained0 = pool.acquire(1024, false);
retained0.retain();
RetainableByteBuffer retained1 = pool.acquire(1024, false);
retained1.release();
retained1 = pool.acquire(1024, false);
retained1.retain();

assertTrue(pool.removeAndRelease(reserved1));
assertTrue(pool.removeAndRelease(acquired1));
assertFalse(pool.removeAndRelease(retained1));
assertTrue(retained1.release());

assertThat(pool.getHeapByteBufferCount(), is(2L));
assertTrue(reserved0.release());
assertThat(pool.getHeapByteBufferCount(), is(3L));
assertTrue(acquired0.release());
assertThat(pool.getHeapByteBufferCount(), is(3L));
assertFalse(retained0.release());
assertTrue(retained0.release());
assertThat(pool.getHeapByteBufferCount(), is(3L));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import org.eclipse.jetty.http.HttpException;
Expand All @@ -39,14 +40,15 @@
import org.eclipse.jetty.http.PreEncodedHttpField;
import org.eclipse.jetty.http.QuotedQualityCSV;
import org.eclipse.jetty.io.ByteBufferOutputStream;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.io.Retainable;
import org.eclipse.jetty.io.RetainableByteBuffer;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.util.Attributes;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.ExceptionUtil;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
Expand Down Expand Up @@ -198,7 +200,8 @@ else if (charsets.contains(StandardCharsets.ISO_8859_1))

int bufferSize = request.getConnectionMetaData().getHttpConfiguration().getOutputBufferSize();
bufferSize = Math.min(8192, bufferSize); // TODO ?
RetainableByteBuffer buffer = request.getComponents().getByteBufferPool().acquire(bufferSize, false);
ByteBufferPool byteBufferPool = request.getComponents().getByteBufferPool();
RetainableByteBuffer buffer = byteBufferPool.acquire(bufferSize, false);

try
{
Expand Down Expand Up @@ -251,13 +254,14 @@ else if (charsets.contains(StandardCharsets.ISO_8859_1))
}

response.getHeaders().put(type.getContentTypeField(charset));
response.write(true, buffer.getByteBuffer(), new WriteErrorCallback(callback, buffer));
response.write(true, buffer.getByteBuffer(), new WriteErrorCallback(callback, byteBufferPool, buffer));

return true;
}
catch (Throwable x)
{
buffer.release();
if (buffer != null)
byteBufferPool.removeAndRelease(buffer);
throw x;
}
}
Expand Down Expand Up @@ -579,20 +583,33 @@ public String toString()
* when calling {@link Response#write(boolean, ByteBuffer, Callback)} to wrap the passed in {@link Callback}
* so that the {@link RetainableByteBuffer} used can be released.
*/
private static class WriteErrorCallback extends Callback.Nested
private static class WriteErrorCallback implements Callback
{
private final Retainable _retainable;
private final AtomicReference<Callback> _callback;
private final ByteBufferPool _pool;
private final RetainableByteBuffer _buffer;

public WriteErrorCallback(Callback callback, Retainable retainable)
public WriteErrorCallback(Callback callback, ByteBufferPool pool, RetainableByteBuffer retainable)
{
super(callback);
_retainable = retainable;
_callback = new AtomicReference<>(callback);
_pool = pool;
_buffer = retainable;
}

@Override
public void completed()
public void succeeded()
{
_retainable.release();
Callback callback = _callback.getAndSet(null);
if (callback != null)
ExceptionUtil.callAndThen(_buffer::release, callback::succeeded);
}

@Override
public void failed(Throwable x)
{
Callback callback = _callback.getAndSet(null);
if (callback != null)
ExceptionUtil.callAndThen(x, t -> _pool.removeAndRelease(_buffer), callback::failed);
}
}
}
Loading

0 comments on commit 5e8cc22

Please sign in to comment.