Skip to content

Commit

Permalink
fix: duplicate CompletedFileUpload in publisher if route is delayed (#…
Browse files Browse the repository at this point in the history
…10713)

Fixes #10578

* Fix NPE when part conversion fails
Reactor does not like null elements, so if the refCountAwareConvert fails, there would be an NPE.

* Fix duplicate CompletedFileUpload in publisher if route is delayed
NettyPublisherPartUploadBinder is designed to emit a HttpData every time a piece of the input data is received. The data is then converted to the specified publisher type. For CompletedFileUpload, NettyConvertersSpi checks whether the HttpData is completed, so the output should only include the final HttpData for the particular part.

However, when the route has another argument that delays its execution until all parts have been fully received, this assumption fails. The publisher will buffer all the HttpData, and when the conversion happens, all of these HttpData are isComplete (since at that point the full part has been received). This leads to the same CompletedFileUpload being emitted many times.

This patch changes NettyPublisherPartUploadBinder to use claimFieldsComplete instead of claimFieldsRaw to ensure that the same part does not appear multiple times in the first place.
  • Loading branch information
yawkat authored Apr 16, 2024
1 parent 95efa06 commit f2797d9
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.micronaut.core.type.Argument;
import io.micronaut.http.MediaType;
import io.micronaut.http.bind.binders.TypedRequestArgumentBinder;
import io.micronaut.http.multipart.CompletedPart;
import io.micronaut.http.multipart.PartData;
import io.micronaut.http.multipart.StreamingFileUpload;
import io.micronaut.http.server.netty.MicronautHttpData;
Expand Down Expand Up @@ -76,7 +77,16 @@ public BindingResult<Publisher<?>> bindForNettyRequest(ArgumentConversionContext
publisher = request.formRouteCompleter()
.claimFields(inputName, (data, flux) -> flux.mapNotNull(partData -> conversionService.convert(partData, nestedType).orElse(null)));
} else {
Flux<? extends MicronautHttpData<?>> raw = request.formRouteCompleter().claimFieldsRaw(inputName);
Flux<? extends MicronautHttpData<?>> raw;
if (CompletedPart.class.isAssignableFrom(contentTypeClass)) {
// For CompletedPart, only include completed fields. Otherwise, if the publisher is
// only subscribed to after all components have been received (e.g. because another
// argument delays execution of the controller), each component will have
// isCompleted=true, so the part will show up many times in the publisher.
raw = request.formRouteCompleter().claimFieldsComplete(inputName);
} else {
raw = request.formRouteCompleter().claimFieldsRaw(inputName);
}
Flux<?> mnTypeIfNecessary;
if (contentTypeClass == PartData.class || ClassUtils.isJavaLangType(contentTypeClass)) {
mnTypeIfNecessary = raw
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package io.micronaut.http.server.netty
import io.micronaut.context.ApplicationContext
import io.micronaut.context.annotation.Requires
import io.micronaut.core.annotation.Nullable
import io.micronaut.core.async.annotation.SingleResult
import io.micronaut.http.HttpRequest
import io.micronaut.http.MediaType
import io.micronaut.http.annotation.Controller
Expand All @@ -13,8 +14,13 @@ import io.micronaut.http.client.HttpClient
import io.micronaut.http.client.multipart.MultipartBody
import io.micronaut.http.multipart.CompletedFileUpload
import io.micronaut.runtime.server.EmbeddedServer
import org.reactivestreams.Publisher
import reactor.core.publisher.Flux
import spock.lang.Issue
import spock.lang.Specification

import java.nio.charset.StandardCharsets

class FileUploadSpec extends Specification {
def 'leak with inputstream getter'() {
given:
Expand Down Expand Up @@ -56,6 +62,48 @@ class FileUploadSpec extends Specification {
server.stop()
}

@Issue("https://github.com/micronaut-projects/micronaut-core/issues/10578")
def 'publisher of completed'() {
given:
def ctx = ApplicationContext.run(['spec.name': 'FileUploadSpec'])
def server = ctx.getBean(EmbeddedServer)
server.start()
def client = ctx.createBean(HttpClient, server.URI).toBlocking()

byte[] body1 = ("--------------------------76f6e44be9b3575a\r\n" +
"Content-Disposition: form-data; name=\"file\"; filename=\"image1.jpeg\"\r\n" +
"Content-Type: image/jpeg\r\n\r\n" +
"foo\r\n" +
"--------------------------76f6e44be9b3575a\r\n" +
"Content-Disposition: form-data; name=\"file\"; filename=\"image2.jpeg\"\r\n" +
"Content-Type: image/jpeg\r\n\r\n" +
("bar" * 10000)).getBytes(StandardCharsets.UTF_8)
byte[] body2 = ("baz\r\n" +
"--------------------------76f6e44be9b3575a--").getBytes(StandardCharsets.UTF_8)

when:
def connection = (HttpURLConnection) new URL("http://$server.host:$server.port/multipart/publisher-completed").openConnection()
connection.setRequestMethod("POST")
connection.addRequestProperty("Content-Type", "multipart/form-data; boundary=------------------------76f6e44be9b3575a")
connection.setDoOutput(true)
connection.setDoInput(true)
connection.setChunkedStreamingMode(0)
connection.connect()

connection.outputStream.write(body1)
connection.outputStream.flush()
connection.outputStream.write(body2)
connection.outputStream.close()

def response = new String(connection.inputStream.readAllBytes(), StandardCharsets.UTF_8)
then:
response == 'Files: 2'

cleanup:
client.close()
server.stop()
}

@Controller('/multipart')
@Requires(property = 'spec.name', value = 'FileUploadSpec')
@Produces(MediaType.TEXT_PLAIN)
Expand All @@ -70,6 +118,17 @@ class FileUploadSpec extends Specification {
String completeFileUpload(@Nullable @Part("metadata") Metadata metadata) {
return "Metadata: " + metadata
}

@Post(value = '/publisher-completed', consumes = MediaType.MULTIPART_FORM_DATA)
@SingleResult
Publisher<String> publisherCompleted(@Nullable @Part("file") Publisher<CompletedFileUpload> files, @Nullable @Part("metadata") Metadata metadata) {
return Flux.from(files)
.collectList()
.map { l ->
l.forEach { it.discard() }
"Files: " + l.size()
}
}
}

record Metadata(@Nullable String foo) {}
Expand Down

0 comments on commit f2797d9

Please sign in to comment.