Skip to content

Commit

Permalink
Use default content type when X-SSE header not set
Browse files Browse the repository at this point in the history
  • Loading branch information
geoand committed Oct 12, 2023
1 parent 20075d7 commit a6d9c09
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,20 @@ void shouldRestStreamElementTypeOverwriteProducesAtClassLevel() {
.containsExactly(new Dto("foo", "bar"), new Dto("chocolate", "bar")));
}

@Test
void shouldUseDefaultContentTypeWhenXSseHeaderNotPresent() {
var resultList = new CopyOnWriteArrayList<>();
QuarkusRestClientBuilder.newBuilder().baseUri(uri)
.build(SeeWithRestStreamElementTypeClientWithoutSpecialHeader.class)
.getJson()
.subscribe()
.with(resultList::add);
await().atMost(5, TimeUnit.SECONDS)
.untilAsserted(

Check failure on line 124 in extensions/resteasy-reactive/rest-client-reactive-jackson/deployment/src/test/java/io/quarkus/rest/client/reactive/jackson/test/MultiSseTest.java

View check run for this annotation

quarkus-bot / Build summary for a6d9c09f55113991fe9b6a7ba2d95a675a587658

JVM Tests - JDK 11

org.awaitility.core.ConditionTimeoutException: Assertion condition defined as a lambda expression in io.quarkus.rest.client.reactive.jackson.test.MultiSseTest that uses java.util.concurrent.CopyOnWriteArrayList Expecting actual:
Raw output
org.awaitility.core.ConditionTimeoutException: 
Assertion condition defined as a lambda expression in io.quarkus.rest.client.reactive.jackson.test.MultiSseTest that uses java.util.concurrent.CopyOnWriteArrayList 
Expecting actual:
  []
to contain exactly (and in same order):
  [io.quarkus.rest.client.reactive.jackson.test.MultiSseTest$Dto@318bce]
but could not find the following elements:
  [io.quarkus.rest.client.reactive.jackson.test.MultiSseTest$Dto@318bce]
 within 5 seconds.
	at org.awaitility.core.ConditionAwaiter.await(ConditionAwaiter.java:167)
	at org.awaitility.core.AssertionCondition.await(AssertionCondition.java:119)
	at org.awaitility.core.AssertionCondition.await(AssertionCondition.java:31)
	at org.awaitility.core.ConditionFactory.until(ConditionFactory.java:985)
	at org.awaitility.core.ConditionFactory.untilAsserted(ConditionFactory.java:769)
	at io.quarkus.rest.client.reactive.jackson.test.MultiSseTest.shouldUseDefaultContentTypeWhenXSseHeaderNotPresent(MultiSseTest.java:124)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at io.quarkus.test.QuarkusUnitTest.runExtensionMethod(QuarkusUnitTest.java:499)
	at io.quarkus.test.QuarkusUnitTest.interceptTestMethod(QuarkusUnitTest.java:413)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
Caused by: org.opentest4j.AssertionFailedError: 
Expecting actual:
  []
to contain exactly (and in same order):
  [io.quarkus.rest.client.reactive.jackson.test.MultiSseTest$Dto@318bce]
but could not find the following elements:
  [io.quarkus.rest.client.reactive.jackson.test.MultiSseTest$Dto@318bce]

	at jdk.internal.reflect.GeneratedConstructorAccessor31.newInstance(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at io.quarkus.rest.client.reactive.jackson.test.MultiSseTest.lambda$shouldUseDefaultContentTypeWhenXSseHeaderNotPresent$6(MultiSseTest.java:126)
	at org.awaitility.core.AssertionCondition.lambda$new$0(AssertionCondition.java:53)
	at org.awaitility.core.ConditionAwaiter$ConditionPoller.call(ConditionAwaiter.java:248)
	at org.awaitility.core.ConditionAwaiter$ConditionPoller.call(ConditionAwaiter.java:235)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Check failure on line 124 in extensions/resteasy-reactive/rest-client-reactive-jackson/deployment/src/test/java/io/quarkus/rest/client/reactive/jackson/test/MultiSseTest.java

View check run for this annotation

quarkus-bot / Build summary for a6d9c09f55113991fe9b6a7ba2d95a675a587658

JVM Tests - JDK 17

org.awaitility.core.ConditionTimeoutException: Assertion condition defined as a io.quarkus.rest.client.reactive.jackson.test.MultiSseTest Expecting actual:
Raw output
org.awaitility.core.ConditionTimeoutException: 
Assertion condition defined as a io.quarkus.rest.client.reactive.jackson.test.MultiSseTest 
Expecting actual:
  []
to contain exactly (and in same order):
  [io.quarkus.rest.client.reactive.jackson.test.MultiSseTest$Dto@318bce]
but could not find the following elements:
  [io.quarkus.rest.client.reactive.jackson.test.MultiSseTest$Dto@318bce]
 within 5 seconds.
	at org.awaitility.core.ConditionAwaiter.await(ConditionAwaiter.java:167)
	at org.awaitility.core.AssertionCondition.await(AssertionCondition.java:119)
	at org.awaitility.core.AssertionCondition.await(AssertionCondition.java:31)
	at org.awaitility.core.ConditionFactory.until(ConditionFactory.java:985)
	at org.awaitility.core.ConditionFactory.untilAsserted(ConditionFactory.java:769)
	at io.quarkus.rest.client.reactive.jackson.test.MultiSseTest.shouldUseDefaultContentTypeWhenXSseHeaderNotPresent(MultiSseTest.java:124)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at io.quarkus.test.QuarkusUnitTest.runExtensionMethod(QuarkusUnitTest.java:499)
	at io.quarkus.test.QuarkusUnitTest.interceptTestMethod(QuarkusUnitTest.java:413)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
Caused by: org.opentest4j.AssertionFailedError: 
Expecting actual:
  []
to contain exactly (and in same order):
  [io.quarkus.rest.client.reactive.jackson.test.MultiSseTest$Dto@318bce]
but could not find the following elements:
  [io.quarkus.rest.client.reactive.jackson.test.MultiSseTest$Dto@318bce]

	at jdk.internal.reflect.GeneratedConstructorAccessor31.newInstance(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:499)
	at io.quarkus.rest.client.reactive.jackson.test.MultiSseTest.lambda$shouldUseDefaultContentTypeWhenXSseHeaderNotPresent$6(MultiSseTest.java:126)
	at org.awaitility.core.AssertionCondition.lambda$new$0(AssertionCondition.java:53)
	at org.awaitility.core.ConditionAwaiter$ConditionPoller.call(ConditionAwaiter.java:248)
	at org.awaitility.core.ConditionAwaiter$ConditionPoller.call(ConditionAwaiter.java:235)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)

Check failure on line 124 in extensions/resteasy-reactive/rest-client-reactive-jackson/deployment/src/test/java/io/quarkus/rest/client/reactive/jackson/test/MultiSseTest.java

View check run for this annotation

quarkus-bot / Build summary for a6d9c09f55113991fe9b6a7ba2d95a675a587658

JVM Tests - JDK 17 Windows

org.awaitility.core.ConditionTimeoutException: Assertion condition defined as a io.quarkus.rest.client.reactive.jackson.test.MultiSseTest Expecting actual:
Raw output
org.awaitility.core.ConditionTimeoutException: 
Assertion condition defined as a io.quarkus.rest.client.reactive.jackson.test.MultiSseTest 
Expecting actual:
  []
to contain exactly (and in same order):
  [io.quarkus.rest.client.reactive.jackson.test.MultiSseTest$Dto@318bce]
but could not find the following elements:
  [io.quarkus.rest.client.reactive.jackson.test.MultiSseTest$Dto@318bce]
 within 5 seconds.
	at org.awaitility.core.ConditionAwaiter.await(ConditionAwaiter.java:167)
	at org.awaitility.core.AssertionCondition.await(AssertionCondition.java:119)
	at org.awaitility.core.AssertionCondition.await(AssertionCondition.java:31)
	at org.awaitility.core.ConditionFactory.until(ConditionFactory.java:985)
	at org.awaitility.core.ConditionFactory.untilAsserted(ConditionFactory.java:769)
	at io.quarkus.rest.client.reactive.jackson.test.MultiSseTest.shouldUseDefaultContentTypeWhenXSseHeaderNotPresent(MultiSseTest.java:124)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at io.quarkus.test.QuarkusUnitTest.runExtensionMethod(QuarkusUnitTest.java:499)
	at io.quarkus.test.QuarkusUnitTest.interceptTestMethod(QuarkusUnitTest.java:413)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
Caused by: org.opentest4j.AssertionFailedError: 
Expecting actual:
  []
to contain exactly (and in same order):
  [io.quarkus.rest.client.reactive.jackson.test.MultiSseTest$Dto@318bce]
but could not find the following elements:
  [io.quarkus.rest.client.reactive.jackson.test.MultiSseTest$Dto@318bce]

	at jdk.internal.reflect.GeneratedConstructorAccessor72.newInstance(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:499)
	at io.quarkus.rest.client.reactive.jackson.test.MultiSseTest.lambda$shouldUseDefaultContentTypeWhenXSseHeaderNotPresent$6(MultiSseTest.java:126)
	at org.awaitility.core.AssertionCondition.lambda$new$0(AssertionCondition.java:53)
	at org.awaitility.core.ConditionAwaiter$ConditionPoller.call(ConditionAwaiter.java:248)
	at org.awaitility.core.ConditionAwaiter$ConditionPoller.call(ConditionAwaiter.java:235)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)

Check failure on line 124 in extensions/resteasy-reactive/rest-client-reactive-jackson/deployment/src/test/java/io/quarkus/rest/client/reactive/jackson/test/MultiSseTest.java

View check run for this annotation

quarkus-bot / Build summary for a6d9c09f55113991fe9b6a7ba2d95a675a587658

JVM Tests - JDK 20

org.awaitility.core.ConditionTimeoutException: Assertion condition defined as a io.quarkus.rest.client.reactive.jackson.test.MultiSseTest Expecting actual:
Raw output
org.awaitility.core.ConditionTimeoutException: 
Assertion condition defined as a io.quarkus.rest.client.reactive.jackson.test.MultiSseTest 
Expecting actual:
  []
to contain exactly (and in same order):
  [io.quarkus.rest.client.reactive.jackson.test.MultiSseTest$Dto@318bce]
but could not find the following elements:
  [io.quarkus.rest.client.reactive.jackson.test.MultiSseTest$Dto@318bce]
 within 5 seconds.
	at org.awaitility.core.ConditionAwaiter.await(ConditionAwaiter.java:167)
	at org.awaitility.core.AssertionCondition.await(AssertionCondition.java:119)
	at org.awaitility.core.AssertionCondition.await(AssertionCondition.java:31)
	at org.awaitility.core.ConditionFactory.until(ConditionFactory.java:985)
	at org.awaitility.core.ConditionFactory.untilAsserted(ConditionFactory.java:769)
	at io.quarkus.rest.client.reactive.jackson.test.MultiSseTest.shouldUseDefaultContentTypeWhenXSseHeaderNotPresent(MultiSseTest.java:124)
	at java.base/java.lang.reflect.Method.invoke(Method.java:578)
	at io.quarkus.test.QuarkusUnitTest.runExtensionMethod(QuarkusUnitTest.java:499)
	at io.quarkus.test.QuarkusUnitTest.interceptTestMethod(QuarkusUnitTest.java:413)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
Caused by: org.opentest4j.AssertionFailedError: 
Expecting actual:
  []
to contain exactly (and in same order):
  [io.quarkus.rest.client.reactive.jackson.test.MultiSseTest$Dto@318bce]
but could not find the following elements:
  [io.quarkus.rest.client.reactive.jackson.test.MultiSseTest$Dto@318bce]

	at java.base/jdk.internal.reflect.DirectConstructorHandleAccessor.newInstance(DirectConstructorHandleAccessor.java:67)
	at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:500)
	at io.quarkus.rest.client.reactive.jackson.test.MultiSseTest.lambda$shouldUseDefaultContentTypeWhenXSseHeaderNotPresent$6(MultiSseTest.java:126)
	at org.awaitility.core.AssertionCondition.lambda$new$0(AssertionCondition.java:53)
	at org.awaitility.core.ConditionAwaiter$ConditionPoller.call(ConditionAwaiter.java:248)
	at org.awaitility.core.ConditionAwaiter$ConditionPoller.call(ConditionAwaiter.java:235)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1623)
() -> assertThat(resultList)
.containsExactly(new Dto("foo", "bar")));
}

private SseClient createClient() {
return QuarkusRestClientBuilder.newBuilder()
.baseUri(uri)
Expand Down Expand Up @@ -200,6 +214,26 @@ public interface SeeWithRestStreamElementTypeClient {
Multi<Dto> getJson();
}

@Path("/sse-rest-stream-element-type-no-x-sse")
public static class SseWithRestStreamElementTypeResourceWithoutSpecialHeader {
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
@Path("/json")
public Multi<String> getJson() {
return Multi.createFrom().items("{\"name\": \"foo\", \"value\": \"bar\"}");
}
}

@RegisterRestClient
@Path("/sse-rest-stream-element-type-no-x-sse")
@Produces(MediaType.APPLICATION_JSON)
public interface SeeWithRestStreamElementTypeClientWithoutSpecialHeader {
@GET
@Path("/json")
@RestStreamElementType(MediaType.APPLICATION_JSON)
Multi<Dto> getJson();
}

public static class Dto {
public String name;
public String value;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,9 @@ public <R> Multi<R> method(String name, Entity<?> entity, GenericType<R> respons
if (!emitter.isCancelled()) {
if (response.getStatus() == 200
&& MediaType.SERVER_SENT_EVENTS_TYPE.isCompatible(response.getMediaType())) {
registerForSse(multiRequest, responseType, response, vertxResponse);
registerForSse(multiRequest, responseType, response, vertxResponse,
(String) restClientRequestContext.getProperties()
.get(RestClientRequestContext.DEFAULT_CONTENT_TYPE_PROP));
} else if (response.getStatus() == 200
&& RestMediaType.APPLICATION_STREAM_JSON_TYPE.isCompatible(response.getMediaType())) {
registerForJsonStream(multiRequest, restClientRequestContext, responseType, response,
Expand All @@ -152,12 +154,12 @@ private boolean isNewlineDelimited(ResponseImpl response) {
private <R> void registerForSse(MultiRequest<? super R> multiRequest,
GenericType<R> responseType,
Response response,
HttpClientResponse vertxResponse) {
HttpClientResponse vertxResponse, String defaultContentType) {
// honestly, isn't reconnect contradictory with completion?
// FIXME: Reconnect settings?
// For now we don't want multi to reconnect
SseEventSourceImpl sseSource = new SseEventSourceImpl(invocationBuilder.getTarget(),
invocationBuilder, Integer.MAX_VALUE, TimeUnit.SECONDS);
invocationBuilder, Integer.MAX_VALUE, TimeUnit.SECONDS, defaultContentType);

multiRequest.onCancel(sseSource::close);
sseSource.register(event -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,19 @@ public class SseEventSourceImpl implements SseEventSource, Handler<Long> {

public SseEventSourceImpl(WebTargetImpl webTarget, Invocation.Builder invocationBuilder,
long reconnectDelay, TimeUnit reconnectUnit) {
this(webTarget, invocationBuilder, reconnectDelay, reconnectUnit, null);
}

public SseEventSourceImpl(WebTargetImpl webTarget, Invocation.Builder invocationBuilder,
long reconnectDelay, TimeUnit reconnectUnit, String defaultContentType) {
// tests set a null endpoint
Objects.requireNonNull(reconnectUnit);
if (reconnectDelay <= 0)
throw new IllegalArgumentException("Delay must be > 0: " + reconnectDelay);
this.webTarget = webTarget;
this.reconnectDelay = reconnectDelay;
this.reconnectUnit = reconnectUnit;
this.sseParser = new SseParser(this);
this.sseParser = new SseParser(this, defaultContentType);
this.invocationBuilder = invocationBuilder;
}

Expand Down Expand Up @@ -136,7 +141,9 @@ private void registerOnClient(HttpClientResponse vertxClientResponse) {
vertxClientResponse.request().exceptionHandler(null);
connection = vertxClientResponse.request().connection();
String sseContentTypeHeader = vertxClientResponse.getHeader(CommonSseUtil.SSE_CONTENT_TYPE);
sseParser.setSseContentTypeHeader(sseContentTypeHeader);
if ((sseContentTypeHeader != null) && !sseContentTypeHeader.isEmpty()) {
sseParser.setSseContentTypeHeader(sseContentTypeHeader);
}
// we don't add a closeHandler handler on the connection as it can race with this handler
// and close before the emitter emits anything
// see: https://github.com/quarkusio/quarkus/pull/16438
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ public class SseParser implements Handler<Buffer> {
*/
private String contentType;
/**
* The content type we're reading. Defaults to the X-Sse-Element-Type header
* The content type we're reading. If the X-Sse-Element-Type header is not set, then it defaults to the declared @Produces
* (if any)
*/
private String contentTypeHeader;
/**
Expand All @@ -67,8 +68,9 @@ public class SseParser implements Handler<Buffer> {
private long eventReconnectTime = SseEvent.RECONNECT_NOT_SET;
private SseEventSourceImpl sseEventSource;

public SseParser(SseEventSourceImpl sseEventSource) {
public SseParser(SseEventSourceImpl sseEventSource, String defaultContentType) {
this.sseEventSource = sseEventSource;
this.contentTypeHeader = defaultContentType;
}

public void setSseContentTypeHeader(String sseContentTypeHeader) {
Expand Down

0 comments on commit a6d9c09

Please sign in to comment.