Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve message body handlers #11002

Merged
merged 12 commits into from
Jul 23, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.micronaut.context.annotation.BootstrapContextCompatible;
import io.micronaut.context.annotation.ConfigurationProperties;
import io.micronaut.context.annotation.Primary;
import io.micronaut.core.annotation.NonNull;
import io.micronaut.core.naming.NameUtils;

import java.nio.charset.Charset;
Expand Down Expand Up @@ -59,7 +60,7 @@ public class ApplicationConfiguration {
/**
* @return The default charset to use.
*/
@SuppressWarnings("unchecked")
@NonNull
public Charset getDefaultCharset() {
return defaultCharset;
}
Expand Down
13 changes: 13 additions & 0 deletions core/src/main/java/io/micronaut/core/type/Argument.java
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,19 @@ static <T> Argument<T> of(
return new DefaultArgument<>(type, null, AnnotationMetadata.EMPTY_METADATA, Collections.emptyMap(), Argument.ZERO_ARGUMENTS);
}

/**
* Creates a new argument for the type of the given instance.
*
* @param instance The argument instance
* @param <T> The generic type
* @return The argument instance
* @since 4.6
*/
@NonNull
static <T> Argument<T> ofInstance(@NonNull T instance) {
return Argument.of((Class<T>) instance.getClass());
}

/**
* Creates a new argument for the given type and name.
*
Expand Down
21 changes: 20 additions & 1 deletion core/src/main/java/io/micronaut/core/type/MutableHeaders.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package io.micronaut.core.type;


import io.micronaut.core.annotation.NonNull;

/**
* Common interface for all mutable header types.
*
Expand Down Expand Up @@ -48,9 +50,26 @@ public interface MutableHeaders extends Headers {
* @return This headers
* @since 1.3.3
*/
default MutableHeaders set(CharSequence header, CharSequence value) {
@NonNull
default MutableHeaders set(@NonNull CharSequence header, @NonNull CharSequence value) {
remove(header);
add(header, value);
return this;
}

/**
* Sets an HTTP header if missing.
*
* @param header The header
* @param value The value
* @return This headers
* @since 4.6
*/
@NonNull
default MutableHeaders setIfMissing(@NonNull CharSequence header, @NonNull CharSequence value) {
if (!contains(header.toString())) {
add(header, value);
}
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,8 @@ public void bind(
String headerValue = headerAnnotation.stringValue().orElse(null);
MutableHttpHeaders headers = request.getHeaders();

if (StringUtils.isNotEmpty(headerName) &&
StringUtils.isNotEmpty(headerValue) &&
!headers.contains(headerName)
) {
headers.set(headerName, headerValue);
if (StringUtils.isNotEmpty(headerName) && StringUtils.isNotEmpty(headerValue)) {
headers.setIfMissing(headerName, headerValue);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@
import java.lang.annotation.Annotation;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ private SslContext buildWebsocketSslContext(DefaultHttpClient.RequestKey request
}
}
}
} else if (configuration.getProxyAddress().isEmpty()){
} else if (configuration.getProxyAddress().isEmpty()) {
throw decorate(new HttpClientException("Cannot send WSS request. SSL is disabled"));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import io.micronaut.http.bind.RequestBinderRegistry;
import io.micronaut.http.body.ChunkedMessageBodyReader;
import io.micronaut.http.body.ContextlessMessageBodyHandlerRegistry;
import io.micronaut.http.body.DynamicMessageBodyWriter;
import io.micronaut.http.body.MessageBodyHandlerRegistry;
import io.micronaut.http.body.MessageBodyReader;
import io.micronaut.http.client.BlockingHttpClient;
Expand Down Expand Up @@ -92,7 +91,8 @@
import io.micronaut.http.netty.NettyHttpHeaders;
import io.micronaut.http.netty.NettyHttpRequestBuilder;
import io.micronaut.http.netty.NettyHttpResponseBuilder;
import io.micronaut.http.netty.body.ByteBufRawMessageBodyHandler;
import io.micronaut.http.netty.body.NettyByteBufMessageBodyHandler;
import io.micronaut.http.netty.body.NettyCharSequenceBodyWriter;
import io.micronaut.http.netty.body.NettyJsonHandler;
import io.micronaut.http.netty.body.NettyJsonStreamHandler;
import io.micronaut.http.netty.body.NettyWritableBodyWriter;
Expand Down Expand Up @@ -1292,7 +1292,7 @@ protected NettyRequestWriter buildNettyRequest(
}
}

HttpPostRequestEncoder postRequestEncoder = null;
HttpPostRequestEncoder postRequestEncoder;
if (permitsBody) {
Optional<?> body = request.getBody();
boolean hasBody = body.isPresent();
Expand All @@ -1315,16 +1315,17 @@ protected NettyRequestWriter buildNettyRequest(
ByteBuf bodyContent;
if (hasBody) {
Object bodyValue = body.get();
DynamicMessageBodyWriter dynamicWriter = new DynamicMessageBodyWriter(handlerRegistry, List.of(requestContentType));
if (Publishers.isConvertibleToPublisher(bodyValue)) {
boolean isSingle = Publishers.isSingle(bodyValue.getClass());

Publisher<?> publisher = conversionService.convert(bodyValue, Publisher.class).orElseThrow(() ->
new IllegalArgumentException("Unconvertible reactive type: " + bodyValue)
);

Flux<HttpContent> requestBodyPublisher = Flux.from(publisher).map(o -> {
ByteBuffer<?> buffer = dynamicWriter.writeTo(Argument.OBJECT_ARGUMENT, requestContentType, o, request.getHeaders(), byteBufferFactory);
Flux<HttpContent> requestBodyPublisher = Flux.from(publisher).map(value -> {
Argument<Object> type = Argument.ofInstance(value);
ByteBuffer<?> buffer = handlerRegistry.getWriter(type, List.of(requestContentType))
.writeTo(type, requestContentType, value, request.getHeaders(), byteBufferFactory);
return new DefaultHttpContent(((ByteBuf) buffer.asNativeBuffer()));
});

Expand All @@ -1340,7 +1341,9 @@ protected NettyRequestWriter buildNettyRequest(
} else if (bodyValue instanceof CharSequence sequence) {
bodyContent = charSequenceToByteBuf(sequence, requestContentType);
} else {
ByteBuffer<?> buffer = dynamicWriter.writeTo(Argument.OBJECT_ARGUMENT, requestContentType, bodyValue, request.getHeaders(), byteBufferFactory);
Argument<Object> type = Argument.ofInstance(bodyValue);
ByteBuffer<?> buffer = handlerRegistry.getWriter(type, List.of(requestContentType))
.writeTo(type, requestContentType, bodyValue, request.getHeaders(), byteBufferFactory);
bodyContent = (ByteBuf) buffer.asNativeBuffer();
}
if (bodyContent == null) {
Expand Down Expand Up @@ -1792,9 +1795,15 @@ private static MediaTypeCodecRegistry createDefaultMediaTypeRegistry() {

private static MessageBodyHandlerRegistry createDefaultMessageBodyHandlerRegistry() {
ApplicationConfiguration applicationConfiguration = new ApplicationConfiguration();
ContextlessMessageBodyHandlerRegistry registry = new ContextlessMessageBodyHandlerRegistry(applicationConfiguration, NettyByteBufferFactory.DEFAULT, new ByteBufRawMessageBodyHandler(), new NettyWritableBodyWriter(applicationConfiguration));
ContextlessMessageBodyHandlerRegistry registry = new ContextlessMessageBodyHandlerRegistry(
applicationConfiguration,
NettyByteBufferFactory.DEFAULT,
new NettyByteBufMessageBodyHandler(),
new NettyWritableBodyWriter(applicationConfiguration)
);
JsonMapper mapper = JsonMapper.createDefault();
registry.add(MediaType.APPLICATION_JSON_TYPE, new NettyJsonHandler<>(mapper));
registry.add(MediaType.APPLICATION_JSON_TYPE, new NettyCharSequenceBodyWriter());
registry.add(MediaType.APPLICATION_JSON_STREAM_TYPE, new NettyJsonStreamHandler<>(mapper));
return registry;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ public <T> Optional<T> getBody(Argument<T> type) {
}

);
if (LOG.isTraceEnabled() && !result.isPresent()) {
if (LOG.isTraceEnabled() && result.isEmpty()) {
LOG.trace("Unable to convert response body to target type {}", type.getType());
}
return result;
Expand All @@ -213,21 +213,22 @@ private <T> Optional<T> convertByteBuf(Argument<T> type) {
}
return Optional.empty();
}
// All content operation should call slice to prevent reading the buffer completely
yawkat marked this conversation as resolved.
Show resolved Hide resolved
Optional<MediaType> contentType = getContentType();
if (contentType.isPresent()) {
Optional<MessageBodyReader<T>> reader = handlerRegistry.findReader(type, List.of(contentType.get()));
if (reader.isPresent()) {
MessageBodyReader<T> r = reader.get();
MediaType ct = contentType.get();
if (r.isReadable(type, ct)) {
return Optional.of(r.read(type, ct, headers, NettyByteBufferFactory.DEFAULT.wrap(unpooledContent)));
return Optional.of(r.read(type, ct, headers, NettyByteBufferFactory.DEFAULT.wrap(unpooledContent.slice())));
}
}
} else if (LOG.isTraceEnabled()) {
LOG.trace("Missing or unknown Content-Type received from server.");
}
// last chance, try type conversion
return conversionService.convert(unpooledContent, ByteBuf.class, type);
return conversionService.convert(unpooledContent.slice(), ByteBuf.class, type);
}

@NonNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package io.micronaut.http.netty.body;

import io.micronaut.buffer.netty.NettyByteBufferFactory;
import io.micronaut.context.annotation.Bean;
import io.micronaut.context.annotation.BootstrapContextCompatible;
import io.micronaut.core.annotation.Experimental;
import io.micronaut.core.annotation.Internal;
Expand All @@ -26,7 +25,8 @@
import io.micronaut.core.type.Headers;
import io.micronaut.core.type.MutableHeaders;
import io.micronaut.http.MediaType;
import io.micronaut.http.body.RawMessageBodyHandler;
import io.micronaut.http.body.ChunkedMessageBodyReader;
import io.micronaut.http.body.TypedMessageBodyHandler;
import io.micronaut.http.codec.CodecException;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
Expand All @@ -38,8 +38,6 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collection;
import java.util.List;

/**
* Handler for netty {@link ByteBuf}.
Expand All @@ -51,8 +49,13 @@
@Singleton
@Experimental
@BootstrapContextCompatible
@Bean(typed = RawMessageBodyHandler.class)
public final class ByteBufRawMessageBodyHandler implements RawMessageBodyHandler<ByteBuf> {
public final class NettyByteBufMessageBodyHandler implements TypedMessageBodyHandler<ByteBuf>, ChunkedMessageBodyReader<ByteBuf> {

@Override
public Argument<ByteBuf> getType() {
return Argument.of(ByteBuf.class);
}

@Override
public Publisher<ByteBuf> readChunked(Argument<ByteBuf> type, MediaType mediaType, Headers httpHeaders, Publisher<ByteBuffer<?>> input) {
return Flux.from(input).map(bb -> (ByteBuf) bb.asNativeBuffer());
Expand All @@ -76,6 +79,7 @@ public ByteBuf read(Argument<ByteBuf> type, MediaType mediaType, Headers httpHea
public void writeTo(Argument<ByteBuf> type, MediaType mediaType, ByteBuf object, MutableHeaders outgoingHeaders, OutputStream outputStream) throws CodecException {
try {
new ByteBufInputStream(object).transferTo(outputStream);
// ByteBufInputStream#close doesn't release properly
object.release();
} catch (IOException e) {
throw new CodecException("Failed to transfer byte buffer", e);
Expand All @@ -87,8 +91,4 @@ public ByteBuffer<?> writeTo(Argument<ByteBuf> type, MediaType mediaType, ByteBu
return NettyByteBufferFactory.DEFAULT.wrap(object);
}

@Override
public Collection<Class<ByteBuf>> getTypes() {
return List.of(ByteBuf.class);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2023 original authors
* Copyright 2017-2024 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,18 +18,14 @@
import io.micronaut.context.annotation.Replaces;
import io.micronaut.core.annotation.Internal;
import io.micronaut.core.type.Argument;
import io.micronaut.core.type.Headers;
import io.micronaut.core.type.MutableHeaders;
import io.micronaut.http.HttpHeaders;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.MediaType;
import io.micronaut.http.MutableHttpHeaders;
import io.micronaut.http.MutableHttpResponse;
import io.micronaut.http.annotation.Consumes;
import io.micronaut.http.annotation.Produces;
import io.micronaut.http.body.MessageBodyHandler;
import io.micronaut.http.body.CharSequenceBodyWriter;
import io.micronaut.http.body.MessageBodyWriter;
import io.micronaut.http.body.TextPlainHandler;
import io.micronaut.http.codec.CodecException;
import io.micronaut.http.netty.NettyHttpHeaders;
import io.netty.buffer.ByteBuf;
Expand All @@ -42,21 +38,25 @@
import io.netty.handler.codec.http.HttpVersion;
import jakarta.inject.Singleton;

import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;

/**
* A JSON body should not be escaped or parsed as a JSON value.
*
* @author Denis Stepanov
* @since 4.6
*/
@Singleton
@Replaces(TextPlainHandler.class)
@Produces(MediaType.TEXT_PLAIN)
@Consumes(MediaType.TEXT_PLAIN)
@Replaces(CharSequenceBodyWriter.class)
@Internal
final class NettyTextPlainHandler implements MessageBodyHandler<CharSequence>, NettyBodyWriter<CharSequence> {
private final TextPlainHandler defaultHandler = new TextPlainHandler();
public final class NettyCharSequenceBodyWriter implements MessageBodyWriter<CharSequence>, NettyBodyWriter<CharSequence> {
private final CharSequenceBodyWriter defaultHandler = new CharSequenceBodyWriter(StandardCharsets.UTF_8);

@Override
public void writeTo(HttpRequest<?> request, MutableHttpResponse<CharSequence> outgoingResponse, Argument<CharSequence> type, MediaType mediaType, CharSequence object, NettyWriteContext nettyContext) throws CodecException {
MutableHttpHeaders headers = outgoingResponse.getHeaders();
ByteBuf byteBuf = Unpooled.wrappedBuffer(object.toString().getBytes(MessageBodyWriter.getCharset(headers)));
ByteBuf byteBuf = Unpooled.copiedBuffer(object.toString(), MessageBodyWriter.getCharset(mediaType, headers));
NettyHttpHeaders nettyHttpHeaders = (NettyHttpHeaders) headers;
io.netty.handler.codec.http.HttpHeaders nettyHeaders = nettyHttpHeaders.getNettyHeaders();
if (!nettyHttpHeaders.contains(HttpHeaders.CONTENT_TYPE)) {
Expand All @@ -78,8 +78,4 @@ public void writeTo(Argument<CharSequence> type, MediaType mediaType, CharSequen
defaultHandler.writeTo(type, mediaType, object, outgoingHeaders, outputStream);
}

@Override
public String read(Argument<CharSequence> type, MediaType mediaType, Headers httpHeaders, InputStream inputStream) throws CodecException {
return defaultHandler.read(type, mediaType, httpHeaders, inputStream);
}
}
Loading
Loading