From 6dbd7ab4c4007844051f4f4137e0897b74b8180c Mon Sep 17 00:00:00 2001 From: Gaston Thea Date: Mon, 4 Dec 2023 19:59:16 -0300 Subject: [PATCH] Fix streaming disconnection --- .../client/network/HttpStreamRequestImpl.java | 52 +++++++++---------- .../service/sseclient/EventStreamParser.java | 1 - .../sseclient/sseclient/SseClientImpl.java | 11 ++-- 3 files changed, 31 insertions(+), 33 deletions(-) diff --git a/src/main/java/io/split/android/client/network/HttpStreamRequestImpl.java b/src/main/java/io/split/android/client/network/HttpStreamRequestImpl.java index 82dd770ba..c0f288b74 100644 --- a/src/main/java/io/split/android/client/network/HttpStreamRequestImpl.java +++ b/src/main/java/io/split/android/client/network/HttpStreamRequestImpl.java @@ -1,5 +1,7 @@ package io.split.android.client.network; +import static com.google.common.base.Preconditions.checkNotNull; + import androidx.annotation.NonNull; import androidx.annotation.Nullable; @@ -16,21 +18,18 @@ import java.util.HashMap; import java.util.Map; -import io.split.android.client.utils.logger.Logger; - -import static com.google.common.base.Preconditions.checkNotNull; - import javax.net.ssl.HttpsURLConnection; import javax.net.ssl.SSLSocketFactory; +import io.split.android.client.utils.logger.Logger; + public class HttpStreamRequestImpl implements HttpStreamRequest { private static final int STREAMING_READ_TIMEOUT_IN_MILLISECONDS = 80000; private final URI mUri; private final Map mHeaders; private HttpURLConnection mConnection; - private BufferedReader mResponseBufferedReader; - private InputStream mResponseInputStream; + private BufferedReader mBufferedReader; @Nullable private final Proxy mProxy; @Nullable @@ -68,28 +67,23 @@ public void addHeader(String name, String value) { @Override public void close() { Logger.d("Closing streaming connection"); - if (mResponseInputStream != null) { - try { - mResponseInputStream.close(); - } catch (IOException e) { - Logger.d("Unknown error closing streaming connection: " + e.getLocalizedMessage()); - } catch (Exception e) { - Logger.d("Unknown error closing stream: " + e.getLocalizedMessage()); - } - } - if (mResponseBufferedReader != null) { - try { - mResponseBufferedReader.close(); - } catch (IOException e) { - Logger.d("Buffer already closed"); - } catch (Exception e) { - Logger.d("Unknown error closing buffer: " + e.getLocalizedMessage()); - } + if (mBufferedReader != null) { + closeBufferedReader(); } mConnection.disconnect(); Logger.d("Streaming connection closed"); } + private void closeBufferedReader() { + try { + mBufferedReader.close(); + } catch (IOException e) { + Logger.d("Buffer already closed"); + } catch (Exception e) { + Logger.d("Unknown error closing buffer: " + e.getLocalizedMessage()); + } + } + private HttpStreamResponse getRequest() throws HttpException { URL url; HttpStreamResponse response; @@ -162,13 +156,17 @@ private static void addHeaders(HttpURLConnection request, Map he private HttpStreamResponse buildResponse(HttpURLConnection connection) throws IOException { int responseCode = connection.getResponseCode(); if (responseCode >= HttpURLConnection.HTTP_OK && responseCode < HttpURLConnection.HTTP_MULT_CHOICE) { - mResponseInputStream = connection.getInputStream(); - if (mResponseInputStream != null) { - mResponseBufferedReader = new BufferedReader(new InputStreamReader(mResponseInputStream)); + InputStream inputStream = connection.getInputStream(); + if (inputStream != null) { + if (mBufferedReader != null) { + closeBufferedReader(); + } + mBufferedReader = new BufferedReader(new InputStreamReader(inputStream)); - return new HttpStreamResponseImpl(responseCode, mResponseBufferedReader); + return new HttpStreamResponseImpl(responseCode, mBufferedReader); } } + return new HttpStreamResponseImpl(responseCode); } } diff --git a/src/main/java/io/split/android/client/service/sseclient/EventStreamParser.java b/src/main/java/io/split/android/client/service/sseclient/EventStreamParser.java index 12fc4a9bf..bd2f61da9 100644 --- a/src/main/java/io/split/android/client/service/sseclient/EventStreamParser.java +++ b/src/main/java/io/split/android/client/service/sseclient/EventStreamParser.java @@ -22,7 +22,6 @@ public class EventStreamParser { * if the line contains any. * @return Returns true if a blank line meaning the final of an event if found. */ - @VisibleForTesting public boolean parseLineAndAppendValue(String streamLine, Map messageValues) { if (streamLine == null) { diff --git a/src/main/java/io/split/android/client/service/sseclient/sseclient/SseClientImpl.java b/src/main/java/io/split/android/client/service/sseclient/sseclient/SseClientImpl.java index c94739e61..d598f393e 100644 --- a/src/main/java/io/split/android/client/service/sseclient/sseclient/SseClientImpl.java +++ b/src/main/java/io/split/android/client/service/sseclient/sseclient/SseClientImpl.java @@ -61,8 +61,9 @@ public int status() { @Override public void disconnect() { - mIsDisconnectCalled.set(true); - close(); + if (!mIsDisconnectCalled.getAndSet(true)) { + close(); + } } private void close() { @@ -101,8 +102,8 @@ public void connect(SseJwtToken token, ConnectionListener connectionListener) { Map values = new HashMap<>(); while ((inputLine = bufferedReader.readLine()) != null) { if (mEventStreamParser.parseLineAndAppendValue(inputLine, values)) { - if(!isConnectionConfirmed) { - if(mEventStreamParser.isKeepAlive(values) || mSseHandler.isConnectionConfirmed(values)) { + if (!isConnectionConfirmed) { + if (mEventStreamParser.isKeepAlive(values) || mSseHandler.isConnectionConfirmed(values)) { Logger.d("Streaming connection success"); isConnectionConfirmed = true; connectionListener.onConnectionSuccess(); @@ -130,7 +131,7 @@ public void connect(SseJwtToken token, ConnectionListener connectionListener) { logError("An error has occurred while creating stream Url ", e); isErrorRetryable = false; } catch (IOException e) { - logError("An error has occurred while parsing stream from: ", e); + Logger.d("An error has occurred while parsing stream: ", e); isErrorRetryable = true; } catch (Exception e) { logError("An unexpected error has occurred while receiving stream events from: ", e);