Skip to content

Commit

Permalink
#7281 pass special content to interceptors
Browse files Browse the repository at this point in the history
Signed-off-by: Ludovic Orban <lorban@bitronix.be>
  • Loading branch information
lorban committed Jan 13, 2022
1 parent 7ecab4b commit 91f29a0
Show file tree
Hide file tree
Showing 5 changed files with 199 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -339,34 +339,11 @@ private HttpInput.Content nextTransformedContent()

while (_transformedContent == null)
{
if (_rawContent.isSpecial())
{
// TODO does EOF need to be passed to the interceptors?

// In case the _rawContent was set by consumeAll(), check the httpChannel
// to see if it has a more precise error. Otherwise, the exact same
// special content will be returned by the httpChannel; do not do that
// if the _error flag was set, meaning the current error is definitive.
if (!_error)
{
HttpInput.Content refreshedRawContent = produceRawContent();
if (refreshedRawContent != null)
_rawContent = refreshedRawContent;
_error = _rawContent.getError() != null;
}

if (LOG.isDebugEnabled())
LOG.debug("raw content is special (with error = {}), returning it {}", _error, this);
return _rawContent;
}

if (_interceptor != null)
{
if (LOG.isDebugEnabled())
LOG.debug("using interceptor to transform raw content {}", this);
_transformedContent = intercept();
if (_error)
return _rawContent;
}
else
{
Expand All @@ -386,18 +363,40 @@ private HttpInput.Content nextTransformedContent()

if (_transformedContent == null)
{
if (_rawContent.isEmpty())
if (_rawContent.isSpecial())
{
if (LOG.isDebugEnabled())
LOG.debug("using special raw content as transformed content {}", this);

// In case the _rawContent was set by consumeAll(), check the httpChannel
// to see if it has a more precise error. Otherwise, the exact same
// special content will be returned by the httpChannel; do not do that
// if the _error flag was set, meaning the current error is definitive.
if (!_error)
{
HttpInput.Content refreshedRawContent = produceRawContent();
if (refreshedRawContent != null)
_rawContent = refreshedRawContent;
_error = _rawContent.getError() != null;
if (LOG.isDebugEnabled())
LOG.debug("refreshed raw content: {} {}", _rawContent, this);
}

_transformedContent = _rawContent;
break;
}
else if (_rawContent.isEmpty())
{
_rawContent.succeeded();
_rawContent = null;
if (LOG.isDebugEnabled())
LOG.debug("nulling depleted raw content {}", this);
_rawContent.succeeded();
_rawContent = null;
_rawContent = produceRawContent();
if (_rawContent == null)
{
if (LOG.isDebugEnabled())
LOG.debug("produced null raw content, returning null, {}", this);
return null;
break;
}
}
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ public GzipHttpInputInterceptor(InflaterPool inflaterPool, ByteBufferPool pool,
@Override
public Content readFrom(Content content)
{
if (content.isSpecial())
return content;

_decoder.decodeChunks(content.getByteBuffer());
final ByteBuffer chunk = _chunk;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -118,6 +120,74 @@ public void testAsyncContentProducerNoInterceptorWithError() throws Exception
}
}

@Test
public void testAsyncContentProducerEofContentIsPassedToInterceptor() throws Exception
{
ByteBuffer[] buffers = new ByteBuffer[3];
buffers[0] = ByteBuffer.wrap("1 hello 1".getBytes(StandardCharsets.ISO_8859_1));
buffers[1] = ByteBuffer.wrap("2 howdy 2".getBytes(StandardCharsets.ISO_8859_1));
buffers[2] = ByteBuffer.wrap("3 hey ya 3".getBytes(StandardCharsets.ISO_8859_1));
final int totalContentBytesCount = countRemaining(buffers);
final String originalContentString = asString(buffers);

CyclicBarrier barrier = new CyclicBarrier(2);

ContentProducer contentProducer = new AsyncContentProducer(new ArrayDelayedHttpChannel(buffers, new HttpInput.EofContent(), scheduledExecutorService, barrier));
AccountingInterceptor interceptor = new AccountingInterceptor();
try (AutoLock lock = contentProducer.lock())
{
contentProducer.setInterceptor(interceptor);

Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, contentProducer, (buffers.length + 1) * 2, 0, 4, barrier);
assertThat(error, nullValue());

HttpInput.Content lastContent = contentProducer.nextContent();
assertThat(lastContent.isSpecial(), is(true));
assertThat(lastContent.isEof(), is(true));
}

assertThat(interceptor.contents.size(), is(4));
assertThat(interceptor.contents.get(0).isSpecial(), is(false));
assertThat(interceptor.contents.get(1).isSpecial(), is(false));
assertThat(interceptor.contents.get(2).isSpecial(), is(false));
assertThat(interceptor.contents.get(3).isSpecial(), is(true));
assertThat(interceptor.contents.get(3).isEof(), is(true));
}

@Test
public void testAsyncContentProducerErrorContentIsPassedToInterceptor() throws Exception
{
ByteBuffer[] buffers = new ByteBuffer[3];
buffers[0] = ByteBuffer.wrap("1 hello 1".getBytes(StandardCharsets.ISO_8859_1));
buffers[1] = ByteBuffer.wrap("2 howdy 2".getBytes(StandardCharsets.ISO_8859_1));
buffers[2] = ByteBuffer.wrap("3 hey ya 3".getBytes(StandardCharsets.ISO_8859_1));
final int totalContentBytesCount = countRemaining(buffers);
final String originalContentString = asString(buffers);

CyclicBarrier barrier = new CyclicBarrier(2);

ContentProducer contentProducer = new AsyncContentProducer(new ArrayDelayedHttpChannel(buffers, new HttpInput.ErrorContent(new Throwable("testAsyncContentProducerErrorContentIsPassedToInterceptor error")), scheduledExecutorService, barrier));
AccountingInterceptor interceptor = new AccountingInterceptor();
try (AutoLock lock = contentProducer.lock())
{
contentProducer.setInterceptor(interceptor);

Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, contentProducer, (buffers.length + 1) * 2, 0, 4, barrier);
assertThat(error.getMessage(), is("testAsyncContentProducerErrorContentIsPassedToInterceptor error"));

HttpInput.Content lastContent = contentProducer.nextContent();
assertThat(lastContent.isSpecial(), is(true));
assertThat(lastContent.getError().getMessage(), is("testAsyncContentProducerErrorContentIsPassedToInterceptor error"));
}

assertThat(interceptor.contents.size(), is(4));
assertThat(interceptor.contents.get(0).isSpecial(), is(false));
assertThat(interceptor.contents.get(1).isSpecial(), is(false));
assertThat(interceptor.contents.get(2).isSpecial(), is(false));
assertThat(interceptor.contents.get(3).isSpecial(), is(true));
assertThat(interceptor.contents.get(3).getError().getMessage(), is("testAsyncContentProducerErrorContentIsPassedToInterceptor error"));
}

@Test
public void testAsyncContentProducerGzipInterceptor() throws Exception
{
Expand Down Expand Up @@ -381,4 +451,17 @@ public HttpInput.Content readFrom(HttpInput.Content content)
return null;
}
}

private static class AccountingInterceptor implements HttpInput.Interceptor
{
private List<HttpInput.Content> contents = new ArrayList<>();

@Override
public HttpInput.Content readFrom(HttpInput.Content content)
{
if (!contents.contains(content))
contents.add(content);
return content;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -116,6 +118,76 @@ public void testBlockingContentProducerNoInterceptorWithError()
}
}

@Test
public void testBlockingContentProducerEofContentIsPassedToInterceptor() throws Exception
{
ByteBuffer[] buffers = new ByteBuffer[3];
buffers[0] = ByteBuffer.wrap("1 hello 1".getBytes(StandardCharsets.ISO_8859_1));
buffers[1] = ByteBuffer.wrap("2 howdy 2".getBytes(StandardCharsets.ISO_8859_1));
buffers[2] = ByteBuffer.wrap("3 hey ya 3".getBytes(StandardCharsets.ISO_8859_1));
final int totalContentBytesCount = countRemaining(buffers);
final String originalContentString = asString(buffers);

ContentListener contentListener = new ContentListener();
ArrayDelayedHttpChannel httpChannel = new ArrayDelayedHttpChannel(buffers, new HttpInput.EofContent(), scheduledExecutorService, contentListener);
ContentProducer contentProducer = new BlockingContentProducer(new AsyncContentProducer(httpChannel));
contentListener.setContentProducer(contentProducer);
AccountingInterceptor interceptor = new AccountingInterceptor();
try (AutoLock lock = contentProducer.lock())
{
contentProducer.setInterceptor(interceptor);

Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, buffers.length + 1, contentProducer);
assertThat(error, nullValue());

HttpInput.Content lastContent = contentProducer.nextContent();
assertThat(lastContent.isSpecial(), Matchers.is(true));
assertThat(lastContent.isEof(), Matchers.is(true));
}

assertThat(interceptor.contents.size(), Matchers.is(4));
assertThat(interceptor.contents.get(0).isSpecial(), Matchers.is(false));
assertThat(interceptor.contents.get(1).isSpecial(), Matchers.is(false));
assertThat(interceptor.contents.get(2).isSpecial(), Matchers.is(false));
assertThat(interceptor.contents.get(3).isSpecial(), Matchers.is(true));
assertThat(interceptor.contents.get(3).isEof(), Matchers.is(true));
}

@Test
public void testBlockingContentProducerErrorContentIsPassedToInterceptor() throws Exception
{
ByteBuffer[] buffers = new ByteBuffer[3];
buffers[0] = ByteBuffer.wrap("1 hello 1".getBytes(StandardCharsets.ISO_8859_1));
buffers[1] = ByteBuffer.wrap("2 howdy 2".getBytes(StandardCharsets.ISO_8859_1));
buffers[2] = ByteBuffer.wrap("3 hey ya 3".getBytes(StandardCharsets.ISO_8859_1));
final int totalContentBytesCount = countRemaining(buffers);
final String originalContentString = asString(buffers);

ContentListener contentListener = new ContentListener();
ArrayDelayedHttpChannel httpChannel = new ArrayDelayedHttpChannel(buffers, new HttpInput.ErrorContent(new Throwable("testBlockingContentProducerErrorContentIsPassedToInterceptor error")), scheduledExecutorService, contentListener);
ContentProducer contentProducer = new BlockingContentProducer(new AsyncContentProducer(httpChannel));
contentListener.setContentProducer(contentProducer);
AccountingInterceptor interceptor = new AccountingInterceptor();
try (AutoLock lock = contentProducer.lock())
{
contentProducer.setInterceptor(interceptor);

Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, buffers.length + 1, contentProducer);
assertThat(error.getMessage(), Matchers.is("testBlockingContentProducerErrorContentIsPassedToInterceptor error"));

HttpInput.Content lastContent = contentProducer.nextContent();
assertThat(lastContent.isSpecial(), Matchers.is(true));
assertThat(lastContent.getError().getMessage(), Matchers.is("testBlockingContentProducerErrorContentIsPassedToInterceptor error"));
}

assertThat(interceptor.contents.size(), Matchers.is(4));
assertThat(interceptor.contents.get(0).isSpecial(), Matchers.is(false));
assertThat(interceptor.contents.get(1).isSpecial(), Matchers.is(false));
assertThat(interceptor.contents.get(2).isSpecial(), Matchers.is(false));
assertThat(interceptor.contents.get(3).isSpecial(), Matchers.is(true));
assertThat(interceptor.contents.get(3).getError().getMessage(), Matchers.is("testBlockingContentProducerErrorContentIsPassedToInterceptor error"));
}

@Test
public void testBlockingContentProducerGzipInterceptor()
{
Expand Down Expand Up @@ -382,4 +454,17 @@ public HttpInput.Content readFrom(HttpInput.Content content)
return null;
}
}

private static class AccountingInterceptor implements HttpInput.Interceptor
{
private List<HttpInput.Content> contents = new ArrayList<>();

@Override
public HttpInput.Content readFrom(HttpInput.Content content)
{
if (!contents.contains(content))
contents.add(content);
return content;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1459,6 +1459,8 @@ protected void service(HttpServletRequest request, HttpServletResponse response)
httpInput.addInterceptor(new GzipHttpInputInterceptor(new InflaterPool(-1, true), ((Request)request).getHttpChannel().getByteBufferPool(), 1024));
httpInput.addInterceptor(content ->
{
if (content.isSpecial())
return content;
ByteBuffer byteBuffer = content.getByteBuffer();
byte[] bytes = new byte[2];
bytes[1] = byteBuffer.get();
Expand Down

0 comments on commit 91f29a0

Please sign in to comment.