Skip to content

Commit

Permalink
Merge pull request #36442 from geoand/sse-client-no-header
Browse files Browse the repository at this point in the history
Use default content type when X-SSE header not set
  • Loading branch information
geoand authored Oct 13, 2023
2 parents 01240b9 + 1814717 commit 6808004
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,9 @@ public <R> Multi<R> method(String name, Entity<?> entity, GenericType<R> respons
if (!emitter.isCancelled()) {
if (response.getStatus() == 200
&& MediaType.SERVER_SENT_EVENTS_TYPE.isCompatible(response.getMediaType())) {
registerForSse(multiRequest, responseType, response, vertxResponse);
registerForSse(multiRequest, responseType, response, vertxResponse,
(String) restClientRequestContext.getProperties()
.get(RestClientRequestContext.DEFAULT_CONTENT_TYPE_PROP));
} else if (response.getStatus() == 200
&& RestMediaType.APPLICATION_STREAM_JSON_TYPE.isCompatible(response.getMediaType())) {
registerForJsonStream(multiRequest, restClientRequestContext, responseType, response,
Expand All @@ -152,12 +154,12 @@ private boolean isNewlineDelimited(ResponseImpl response) {
private <R> void registerForSse(MultiRequest<? super R> multiRequest,
GenericType<R> responseType,
Response response,
HttpClientResponse vertxResponse) {
HttpClientResponse vertxResponse, String defaultContentType) {
// honestly, isn't reconnect contradictory with completion?
// FIXME: Reconnect settings?
// For now we don't want multi to reconnect
SseEventSourceImpl sseSource = new SseEventSourceImpl(invocationBuilder.getTarget(),
invocationBuilder, Integer.MAX_VALUE, TimeUnit.SECONDS);
invocationBuilder, Integer.MAX_VALUE, TimeUnit.SECONDS, defaultContentType);

multiRequest.onCancel(sseSource::close);
sseSource.register(event -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,19 @@ public class SseEventSourceImpl implements SseEventSource, Handler<Long> {

public SseEventSourceImpl(WebTargetImpl webTarget, Invocation.Builder invocationBuilder,
long reconnectDelay, TimeUnit reconnectUnit) {
this(webTarget, invocationBuilder, reconnectDelay, reconnectUnit, null);
}

public SseEventSourceImpl(WebTargetImpl webTarget, Invocation.Builder invocationBuilder,
long reconnectDelay, TimeUnit reconnectUnit, String defaultContentType) {
// tests set a null endpoint
Objects.requireNonNull(reconnectUnit);
if (reconnectDelay <= 0)
throw new IllegalArgumentException("Delay must be > 0: " + reconnectDelay);
this.webTarget = webTarget;
this.reconnectDelay = reconnectDelay;
this.reconnectUnit = reconnectUnit;
this.sseParser = new SseParser(this);
this.sseParser = new SseParser(this, defaultContentType);
this.invocationBuilder = invocationBuilder;
}

Expand Down Expand Up @@ -136,7 +141,9 @@ private void registerOnClient(HttpClientResponse vertxClientResponse) {
vertxClientResponse.request().exceptionHandler(null);
connection = vertxClientResponse.request().connection();
String sseContentTypeHeader = vertxClientResponse.getHeader(CommonSseUtil.SSE_CONTENT_TYPE);
sseParser.setSseContentTypeHeader(sseContentTypeHeader);
if ((sseContentTypeHeader != null) && !sseContentTypeHeader.isEmpty()) {
sseParser.setSseContentTypeHeader(sseContentTypeHeader);
}
// we don't add a closeHandler handler on the connection as it can race with this handler
// and close before the emitter emits anything
// see: https://github.com/quarkusio/quarkus/pull/16438
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ public class SseParser implements Handler<Buffer> {
*/
private String contentType;
/**
* The content type we're reading. Defaults to the X-Sse-Element-Type header
* The content type we're reading. If the X-Sse-Element-Type header is not set, then it defaults to the declared @Produces
* (if any)
*/
private String contentTypeHeader;
/**
Expand All @@ -67,8 +68,9 @@ public class SseParser implements Handler<Buffer> {
private long eventReconnectTime = SseEvent.RECONNECT_NOT_SET;
private SseEventSourceImpl sseEventSource;

public SseParser(SseEventSourceImpl sseEventSource) {
public SseParser(SseEventSourceImpl sseEventSource, String defaultContentType) {
this.sseEventSource = sseEventSource;
this.contentTypeHeader = defaultContentType;
}

public void setSseContentTypeHeader(String sseContentTypeHeader) {
Expand Down

0 comments on commit 6808004

Please sign in to comment.