Skip to content

Commit

Permalink
Avoid calling MimeParser.offer with empty buffers (#6851)
Browse files Browse the repository at this point in the history
Fixes #6828
  • Loading branch information
romain-grecourt authored May 19, 2023
1 parent 7c4ecc3 commit 8a6a1c0
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 17 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, 2022 Oracle and/or its affiliates.
* Copyright (c) 2020, 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,12 @@ public void onNext(DataChunk chunk) {
try {
ByteBuffer[] byteBuffers = chunk.data();
for (int i = 0; i < byteBuffers.length; i++) {
int id = parser.offer(byteBuffers[i]);
ByteBuffer byteBuffer = byteBuffers[i];
if (!byteBuffer.hasRemaining()) {
// skip if empty
continue;
}
int id = parser.offer(byteBuffer);
// record the chunk using the id of the last buffer
if (i == byteBuffers.length - 1) {
// drain() cannot be invoked concurrently, it is safe to use HashMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void testOnePartInOneChunk() {
final CountDownLatch latch = new CountDownLatch(2);
final CompletableFuture<Void> testDone = new CompletableFuture<>();

Consumer<BodyPart> consumer = (part) -> {
Consumer<ReadableBodyPart> consumer = (part) -> {
latch.countDown();
assertThat(part.headers().values("Content-Id"),
hasItems("part1"));
Expand Down Expand Up @@ -102,7 +102,7 @@ public void testTwoPartsInOneChunk() {
+ "--" + boundary + "--").getBytes();

final CountDownLatch latch = new CountDownLatch(4);
Consumer<BodyPart> consumer = (part) -> {
Consumer<ReadableBodyPart> consumer = (part) -> {
latch.countDown();
if (latch.getCount() == 3) {
assertThat(part.headers().values("Content-Id"), hasItems("part1"));
Expand Down Expand Up @@ -144,7 +144,7 @@ public void testContentAcrossChunks() {
+ "--" + boundary + "--").getBytes();

final CountDownLatch latch = new CountDownLatch(2);
Consumer<BodyPart> consumer = (part) -> {
Consumer<ReadableBodyPart> consumer = (part) -> {
latch.countDown();
assertThat(part.headers().values("Content-Id"), hasItems("part1"));
DataChunkSubscriber subscriber = new DataChunkSubscriber();
Expand Down Expand Up @@ -180,7 +180,7 @@ public void testContentAcrossChunksAsyncRequest() {
+ "--" + boundary + "--").getBytes();

final CountDownLatch latch = new CountDownLatch(2);
Consumer<BodyPart> consumer = (part) -> {
Consumer<ReadableBodyPart> consumer = (part) -> {
latch.countDown();
assertThat(part.headers().values("Content-Id"), hasItems("part1"));
DataChunkSubscriber subscriber = new DataChunkSubscriber();
Expand Down Expand Up @@ -220,7 +220,7 @@ public void testMultipleChunksBeforeContent() {
+ "--" + boundary + "--").getBytes();

final CountDownLatch latch = new CountDownLatch(2);
Consumer<BodyPart> consumer = (part) -> {
Consumer<ReadableBodyPart> consumer = (part) -> {
latch.countDown();
assertThat(part.headers().values("Content-Id"), hasItems("part1"));
assertThat(part.headers().values("Content-Type"), hasItems("text/plain"));
Expand Down Expand Up @@ -257,7 +257,7 @@ public void testMultiplePartsWithOneByOneSubscriber() {
+ "--" + boundary + "--").getBytes();

final CountDownLatch latch = new CountDownLatch(4);
Consumer<BodyPart> consumer = (part) -> {
Consumer<ReadableBodyPart> consumer = (part) -> {
latch.countDown();
if (latch.getCount()== 3) {
assertThat(part.headers().values("Content-Id"), hasItems("part1"));
Expand Down Expand Up @@ -302,7 +302,7 @@ public void testSubscriberCancelAfterOnePart() {
+ "--" + boundary + "--").getBytes();

final CountDownLatch latch = new CountDownLatch(2);
Consumer<BodyPart> consumer = (part) -> {
Consumer<ReadableBodyPart> consumer = (part) -> {
latch.countDown();
if (latch.getCount()== 1) {
assertThat(part.headers().values("Content-Id"), hasItems("part1"));
Expand Down Expand Up @@ -376,7 +376,7 @@ public void testPartContentSubscriberThrottling() {
+ "--" + boundary + "--").getBytes();

final CountDownLatch latch = new CountDownLatch(3);
Consumer<BodyPart> consumer = (part) -> {
Consumer<ReadableBodyPart> consumer = (part) -> {
latch.countDown();
if (latch.getCount() == 2) {
assertThat(part.headers().values("Content-Id"), hasItems("part1"));
Expand Down Expand Up @@ -453,7 +453,7 @@ public void testLateSubscriber() {
+ "body 1\n"
+ "--" + boundary + "--").getBytes();

// setup the decoder in an initialized state (upstream and downstream)
// set up the decoder in an initialized state (upstream and downstream)
MultiPartDecoder decoder = decoder(boundary);
List<ReadableBodyPart> parts = new ArrayList<>();
Multi.create(decoder).subscribe(parts::add);
Expand All @@ -471,6 +471,26 @@ public void testLateSubscriber() {
}
}

@Test
public void testLastEmptyChunk() {
String boundary = "boundary";
final byte[] chunk1 = ("--" + boundary + "\n"
+ "Content-Id: part1\n"
+ "\n"
+ "body 1\n"
+ "--" + boundary + "--").getBytes();

BodyPartSubscriber testSubscriber = new BodyPartSubscriber(
SUBSCRIBER_TYPE.INFINITE, ReadableBodyPart::drain);
partsPublisher(boundary, List.of(chunk1, new byte[0])).subscribe(testSubscriber);
try {
boolean b = testSubscriber.complete.orTimeout(200, TimeUnit.MILLISECONDS).join();
assertThat(b, is(equalTo(true)));
} catch(CompletionException error) {
assertThat(error, is(nullValue()));
}
}

/**
* Types of test subscribers.
*/
Expand All @@ -483,15 +503,15 @@ enum SUBSCRIBER_TYPE {
/**
* A part test subscriber.
*/
static class BodyPartSubscriber implements Subscriber<BodyPart>{
static class BodyPartSubscriber implements Subscriber<ReadableBodyPart>{

private final SUBSCRIBER_TYPE subscriberType;
private final Consumer<BodyPart> consumer;
private final Consumer<ReadableBodyPart> consumer;
private Subscription subscription;
public CompletableFuture<Boolean> complete = new CompletableFuture<>();
public CompletableFuture<Void> cancelled = new CompletableFuture<>();

BodyPartSubscriber(SUBSCRIBER_TYPE subscriberType, Consumer<BodyPart> consumer) {
BodyPartSubscriber(SUBSCRIBER_TYPE subscriberType, Consumer<ReadableBodyPart> consumer) {
this.subscriberType = subscriberType;
this.consumer = consumer;
}
Expand All @@ -507,7 +527,7 @@ public void onSubscribe(Subscription subscription) {
}

@Override
public void onNext(BodyPart item) {
public void onNext(ReadableBodyPart item) {
if (consumer == null){
return;
}
Expand Down Expand Up @@ -547,7 +567,7 @@ static MultiPartDecoder decoder(String boundary) {
* @param data data for the chunk
* @return publisher of body parts
*/
static Publisher<? extends BodyPart> partsPublisher(String boundary, byte[] data) {
static Publisher<? extends ReadableBodyPart> partsPublisher(String boundary, byte[] data) {
return partsPublisher(boundary, List.of(data));
}

Expand All @@ -557,7 +577,7 @@ static Publisher<? extends BodyPart> partsPublisher(String boundary, byte[] data
* @param data data for the chunks
* @return publisher of body parts
*/
static Publisher<? extends BodyPart> partsPublisher(String boundary, List<byte[]> data) {
static Publisher<? extends ReadableBodyPart> partsPublisher(String boundary, List<byte[]> data) {
MultiPartDecoder decoder = decoder(boundary);
chunksPublisher(data).subscribe(decoder);
return decoder;
Expand Down

0 comments on commit 8a6a1c0

Please sign in to comment.