From c55bf06989f65e28571f86834b28773f0cb37291 Mon Sep 17 00:00:00 2001 From: Romain Grecourt Date: Fri, 19 May 2023 10:41:12 -0700 Subject: [PATCH] Avoid calling MimeParser.offer with empty buffers (#6851) Fixes #6828 --- .../helidon/media/multipart/MimeParser.java | 2 +- .../media/multipart/MultiPartDecoder.java | 7 ++- .../media/multipart/MultiPartDecoderTest.java | 50 +++++++++++++------ 3 files changed, 42 insertions(+), 17 deletions(-) diff --git a/media/multipart/src/main/java/io/helidon/media/multipart/MimeParser.java b/media/multipart/src/main/java/io/helidon/media/multipart/MimeParser.java index 0bf3a4cc5f7..f5aacb9e58d 100644 --- a/media/multipart/src/main/java/io/helidon/media/multipart/MimeParser.java +++ b/media/multipart/src/main/java/io/helidon/media/multipart/MimeParser.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, 2022 Oracle and/or its affiliates. + * Copyright (c) 2020, 2023 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/media/multipart/src/main/java/io/helidon/media/multipart/MultiPartDecoder.java b/media/multipart/src/main/java/io/helidon/media/multipart/MultiPartDecoder.java index 1220ea02717..3ed4e8955f7 100644 --- a/media/multipart/src/main/java/io/helidon/media/multipart/MultiPartDecoder.java +++ b/media/multipart/src/main/java/io/helidon/media/multipart/MultiPartDecoder.java @@ -139,7 +139,12 @@ public void onNext(DataChunk chunk) { try { ByteBuffer[] byteBuffers = chunk.data(); for (int i = 0; i < byteBuffers.length; i++) { - int id = parser.offer(byteBuffers[i]); + ByteBuffer byteBuffer = byteBuffers[i]; + if (!byteBuffer.hasRemaining()) { + // skip if empty + continue; + } + int id = parser.offer(byteBuffer); // record the chunk using the id of the last buffer if (i == byteBuffers.length - 1) { // drain() cannot be invoked concurrently, it is safe to use HashMap diff --git a/media/multipart/src/test/java/io/helidon/media/multipart/MultiPartDecoderTest.java b/media/multipart/src/test/java/io/helidon/media/multipart/MultiPartDecoderTest.java index 4b678c84bf0..5106bf5b041 100644 --- a/media/multipart/src/test/java/io/helidon/media/multipart/MultiPartDecoderTest.java +++ b/media/multipart/src/test/java/io/helidon/media/multipart/MultiPartDecoderTest.java @@ -59,7 +59,7 @@ public void testOnePartInOneChunk() { final CountDownLatch latch = new CountDownLatch(2); final CompletableFuture testDone = new CompletableFuture<>(); - Consumer consumer = (part) -> { + Consumer consumer = (part) -> { latch.countDown(); assertThat(part.headers().values("Content-Id"), hasItems("part1")); @@ -102,7 +102,7 @@ public void testTwoPartsInOneChunk() { + "--" + boundary + "--").getBytes(); final CountDownLatch latch = new CountDownLatch(4); - Consumer consumer = (part) -> { + Consumer consumer = (part) -> { latch.countDown(); if (latch.getCount() == 3) { assertThat(part.headers().values("Content-Id"), hasItems("part1")); @@ -144,7 +144,7 @@ public void testContentAcrossChunks() { + "--" + boundary + "--").getBytes(); final CountDownLatch latch = new CountDownLatch(2); - Consumer consumer = (part) -> { + Consumer consumer = (part) -> { latch.countDown(); assertThat(part.headers().values("Content-Id"), hasItems("part1")); DataChunkSubscriber subscriber = new DataChunkSubscriber(); @@ -180,7 +180,7 @@ public void testContentAcrossChunksAsyncRequest() { + "--" + boundary + "--").getBytes(); final CountDownLatch latch = new CountDownLatch(2); - Consumer consumer = (part) -> { + Consumer consumer = (part) -> { latch.countDown(); assertThat(part.headers().values("Content-Id"), hasItems("part1")); DataChunkSubscriber subscriber = new DataChunkSubscriber(); @@ -220,7 +220,7 @@ public void testMultipleChunksBeforeContent() { + "--" + boundary + "--").getBytes(); final CountDownLatch latch = new CountDownLatch(2); - Consumer consumer = (part) -> { + Consumer consumer = (part) -> { latch.countDown(); assertThat(part.headers().values("Content-Id"), hasItems("part1")); assertThat(part.headers().values("Content-Type"), hasItems("text/plain")); @@ -257,7 +257,7 @@ public void testMultiplePartsWithOneByOneSubscriber() { + "--" + boundary + "--").getBytes(); final CountDownLatch latch = new CountDownLatch(4); - Consumer consumer = (part) -> { + Consumer consumer = (part) -> { latch.countDown(); if (latch.getCount()== 3) { assertThat(part.headers().values("Content-Id"), hasItems("part1")); @@ -302,7 +302,7 @@ public void testSubscriberCancelAfterOnePart() { + "--" + boundary + "--").getBytes(); final CountDownLatch latch = new CountDownLatch(2); - Consumer consumer = (part) -> { + Consumer consumer = (part) -> { latch.countDown(); if (latch.getCount()== 1) { assertThat(part.headers().values("Content-Id"), hasItems("part1")); @@ -376,7 +376,7 @@ public void testPartContentSubscriberThrottling() { + "--" + boundary + "--").getBytes(); final CountDownLatch latch = new CountDownLatch(3); - Consumer consumer = (part) -> { + Consumer consumer = (part) -> { latch.countDown(); if (latch.getCount() == 2) { assertThat(part.headers().values("Content-Id"), hasItems("part1")); @@ -453,7 +453,7 @@ public void testLateSubscriber() { + "body 1\n" + "--" + boundary + "--").getBytes(); - // setup the decoder in an initialized state (upstream and downstream) + // set up the decoder in an initialized state (upstream and downstream) MultiPartDecoder decoder = decoder(boundary); List parts = new ArrayList<>(); Multi.create(decoder).subscribe(parts::add); @@ -471,6 +471,26 @@ public void testLateSubscriber() { } } + @Test + public void testLastEmptyChunk() { + String boundary = "boundary"; + final byte[] chunk1 = ("--" + boundary + "\n" + + "Content-Id: part1\n" + + "\n" + + "body 1\n" + + "--" + boundary + "--").getBytes(); + + BodyPartSubscriber testSubscriber = new BodyPartSubscriber( + SUBSCRIBER_TYPE.INFINITE, ReadableBodyPart::drain); + partsPublisher(boundary, List.of(chunk1, new byte[0])).subscribe(testSubscriber); + try { + boolean b = testSubscriber.complete.orTimeout(200, TimeUnit.MILLISECONDS).join(); + assertThat(b, is(equalTo(true))); + } catch(CompletionException error) { + assertThat(error, is(nullValue())); + } + } + /** * Types of test subscribers. */ @@ -483,15 +503,15 @@ enum SUBSCRIBER_TYPE { /** * A part test subscriber. */ - static class BodyPartSubscriber implements Subscriber{ + static class BodyPartSubscriber implements Subscriber{ private final SUBSCRIBER_TYPE subscriberType; - private final Consumer consumer; + private final Consumer consumer; private Subscription subscription; public CompletableFuture complete = new CompletableFuture<>(); public CompletableFuture cancelled = new CompletableFuture<>(); - BodyPartSubscriber(SUBSCRIBER_TYPE subscriberType, Consumer consumer) { + BodyPartSubscriber(SUBSCRIBER_TYPE subscriberType, Consumer consumer) { this.subscriberType = subscriberType; this.consumer = consumer; } @@ -507,7 +527,7 @@ public void onSubscribe(Subscription subscription) { } @Override - public void onNext(BodyPart item) { + public void onNext(ReadableBodyPart item) { if (consumer == null){ return; } @@ -547,7 +567,7 @@ static MultiPartDecoder decoder(String boundary) { * @param data data for the chunk * @return publisher of body parts */ - static Publisher partsPublisher(String boundary, byte[] data) { + static Publisher partsPublisher(String boundary, byte[] data) { return partsPublisher(boundary, List.of(data)); } @@ -557,7 +577,7 @@ static Publisher partsPublisher(String boundary, byte[] data * @param data data for the chunks * @return publisher of body parts */ - static Publisher partsPublisher(String boundary, List data) { + static Publisher partsPublisher(String boundary, List data) { MultiPartDecoder decoder = decoder(boundary); chunksPublisher(data).subscribe(decoder); return decoder;