Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid calling MimeParser.offer with empty buffers #6898

Merged
merged 1 commit into from
May 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, 2022 Oracle and/or its affiliates.
* Copyright (c) 2020, 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,12 @@ public void onNext(DataChunk chunk) {
try {
ByteBuffer[] byteBuffers = chunk.data();
for (int i = 0; i < byteBuffers.length; i++) {
int id = parser.offer(byteBuffers[i]);
ByteBuffer byteBuffer = byteBuffers[i];
if (!byteBuffer.hasRemaining()) {
// skip if empty
continue;
}
int id = parser.offer(byteBuffer);
// record the chunk using the id of the last buffer
if (i == byteBuffers.length - 1) {
// drain() cannot be invoked concurrently, it is safe to use HashMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void testOnePartInOneChunk() {
final CountDownLatch latch = new CountDownLatch(2);
final CompletableFuture<Void> testDone = new CompletableFuture<>();

Consumer<BodyPart> consumer = (part) -> {
Consumer<ReadableBodyPart> consumer = (part) -> {
latch.countDown();
assertThat(part.headers().values("Content-Id"),
hasItems("part1"));
Expand Down Expand Up @@ -102,7 +102,7 @@ public void testTwoPartsInOneChunk() {
+ "--" + boundary + "--").getBytes();

final CountDownLatch latch = new CountDownLatch(4);
Consumer<BodyPart> consumer = (part) -> {
Consumer<ReadableBodyPart> consumer = (part) -> {
latch.countDown();
if (latch.getCount() == 3) {
assertThat(part.headers().values("Content-Id"), hasItems("part1"));
Expand Down Expand Up @@ -144,7 +144,7 @@ public void testContentAcrossChunks() {
+ "--" + boundary + "--").getBytes();

final CountDownLatch latch = new CountDownLatch(2);
Consumer<BodyPart> consumer = (part) -> {
Consumer<ReadableBodyPart> consumer = (part) -> {
latch.countDown();
assertThat(part.headers().values("Content-Id"), hasItems("part1"));
DataChunkSubscriber subscriber = new DataChunkSubscriber();
Expand Down Expand Up @@ -180,7 +180,7 @@ public void testContentAcrossChunksAsyncRequest() {
+ "--" + boundary + "--").getBytes();

final CountDownLatch latch = new CountDownLatch(2);
Consumer<BodyPart> consumer = (part) -> {
Consumer<ReadableBodyPart> consumer = (part) -> {
latch.countDown();
assertThat(part.headers().values("Content-Id"), hasItems("part1"));
DataChunkSubscriber subscriber = new DataChunkSubscriber();
Expand Down Expand Up @@ -220,7 +220,7 @@ public void testMultipleChunksBeforeContent() {
+ "--" + boundary + "--").getBytes();

final CountDownLatch latch = new CountDownLatch(2);
Consumer<BodyPart> consumer = (part) -> {
Consumer<ReadableBodyPart> consumer = (part) -> {
latch.countDown();
assertThat(part.headers().values("Content-Id"), hasItems("part1"));
assertThat(part.headers().values("Content-Type"), hasItems("text/plain"));
Expand Down Expand Up @@ -257,7 +257,7 @@ public void testMultiplePartsWithOneByOneSubscriber() {
+ "--" + boundary + "--").getBytes();

final CountDownLatch latch = new CountDownLatch(4);
Consumer<BodyPart> consumer = (part) -> {
Consumer<ReadableBodyPart> consumer = (part) -> {
latch.countDown();
if (latch.getCount()== 3) {
assertThat(part.headers().values("Content-Id"), hasItems("part1"));
Expand Down Expand Up @@ -302,7 +302,7 @@ public void testSubscriberCancelAfterOnePart() {
+ "--" + boundary + "--").getBytes();

final CountDownLatch latch = new CountDownLatch(2);
Consumer<BodyPart> consumer = (part) -> {
Consumer<ReadableBodyPart> consumer = (part) -> {
latch.countDown();
if (latch.getCount()== 1) {
assertThat(part.headers().values("Content-Id"), hasItems("part1"));
Expand Down Expand Up @@ -376,7 +376,7 @@ public void testPartContentSubscriberThrottling() {
+ "--" + boundary + "--").getBytes();

final CountDownLatch latch = new CountDownLatch(3);
Consumer<BodyPart> consumer = (part) -> {
Consumer<ReadableBodyPart> consumer = (part) -> {
latch.countDown();
if (latch.getCount() == 2) {
assertThat(part.headers().values("Content-Id"), hasItems("part1"));
Expand Down Expand Up @@ -453,7 +453,7 @@ public void testLateSubscriber() {
+ "body 1\n"
+ "--" + boundary + "--").getBytes();

// setup the decoder in an initialized state (upstream and downstream)
// set up the decoder in an initialized state (upstream and downstream)
MultiPartDecoder decoder = decoder(boundary);
List<ReadableBodyPart> parts = new ArrayList<>();
Multi.create(decoder).subscribe(parts::add);
Expand All @@ -471,6 +471,26 @@ public void testLateSubscriber() {
}
}

@Test
public void testLastEmptyChunk() {
String boundary = "boundary";
final byte[] chunk1 = ("--" + boundary + "\n"
+ "Content-Id: part1\n"
+ "\n"
+ "body 1\n"
+ "--" + boundary + "--").getBytes();

BodyPartSubscriber testSubscriber = new BodyPartSubscriber(
SUBSCRIBER_TYPE.INFINITE, ReadableBodyPart::drain);
partsPublisher(boundary, List.of(chunk1, new byte[0])).subscribe(testSubscriber);
try {
boolean b = testSubscriber.complete.orTimeout(200, TimeUnit.MILLISECONDS).join();
assertThat(b, is(equalTo(true)));
} catch(CompletionException error) {
assertThat(error, is(nullValue()));
}
}

/**
* Types of test subscribers.
*/
Expand All @@ -483,15 +503,15 @@ enum SUBSCRIBER_TYPE {
/**
* A part test subscriber.
*/
static class BodyPartSubscriber implements Subscriber<BodyPart>{
static class BodyPartSubscriber implements Subscriber<ReadableBodyPart>{

private final SUBSCRIBER_TYPE subscriberType;
private final Consumer<BodyPart> consumer;
private final Consumer<ReadableBodyPart> consumer;
private Subscription subscription;
public CompletableFuture<Boolean> complete = new CompletableFuture<>();
public CompletableFuture<Void> cancelled = new CompletableFuture<>();

BodyPartSubscriber(SUBSCRIBER_TYPE subscriberType, Consumer<BodyPart> consumer) {
BodyPartSubscriber(SUBSCRIBER_TYPE subscriberType, Consumer<ReadableBodyPart> consumer) {
this.subscriberType = subscriberType;
this.consumer = consumer;
}
Expand All @@ -507,7 +527,7 @@ public void onSubscribe(Subscription subscription) {
}

@Override
public void onNext(BodyPart item) {
public void onNext(ReadableBodyPart item) {
if (consumer == null){
return;
}
Expand Down Expand Up @@ -547,7 +567,7 @@ static MultiPartDecoder decoder(String boundary) {
* @param data data for the chunk
* @return publisher of body parts
*/
static Publisher<? extends BodyPart> partsPublisher(String boundary, byte[] data) {
static Publisher<? extends ReadableBodyPart> partsPublisher(String boundary, byte[] data) {
return partsPublisher(boundary, List.of(data));
}

Expand All @@ -557,7 +577,7 @@ static Publisher<? extends BodyPart> partsPublisher(String boundary, byte[] data
* @param data data for the chunks
* @return publisher of body parts
*/
static Publisher<? extends BodyPart> partsPublisher(String boundary, List<byte[]> data) {
static Publisher<? extends ReadableBodyPart> partsPublisher(String boundary, List<byte[]> data) {
MultiPartDecoder decoder = decoder(boundary);
chunksPublisher(data).subscribe(decoder);
return decoder;
Expand Down