Skip to content

Commit

Permalink
Fix streaming disconnection
Browse files Browse the repository at this point in the history
  • Loading branch information
gthea committed Dec 4, 2023
1 parent 281ff08 commit 6dbd7ab
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 33 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.split.android.client.network;

import static com.google.common.base.Preconditions.checkNotNull;

import androidx.annotation.NonNull;
import androidx.annotation.Nullable;

Expand All @@ -16,21 +18,18 @@
import java.util.HashMap;
import java.util.Map;

import io.split.android.client.utils.logger.Logger;

import static com.google.common.base.Preconditions.checkNotNull;

import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLSocketFactory;

import io.split.android.client.utils.logger.Logger;

public class HttpStreamRequestImpl implements HttpStreamRequest {

private static final int STREAMING_READ_TIMEOUT_IN_MILLISECONDS = 80000;
private final URI mUri;
private final Map<String, String> mHeaders;
private HttpURLConnection mConnection;
private BufferedReader mResponseBufferedReader;
private InputStream mResponseInputStream;
private BufferedReader mBufferedReader;
@Nullable
private final Proxy mProxy;
@Nullable
Expand Down Expand Up @@ -68,28 +67,23 @@ public void addHeader(String name, String value) {
@Override
public void close() {
Logger.d("Closing streaming connection");
if (mResponseInputStream != null) {
try {
mResponseInputStream.close();
} catch (IOException e) {
Logger.d("Unknown error closing streaming connection: " + e.getLocalizedMessage());
} catch (Exception e) {
Logger.d("Unknown error closing stream: " + e.getLocalizedMessage());
}
}
if (mResponseBufferedReader != null) {
try {
mResponseBufferedReader.close();
} catch (IOException e) {
Logger.d("Buffer already closed");
} catch (Exception e) {
Logger.d("Unknown error closing buffer: " + e.getLocalizedMessage());
}
if (mBufferedReader != null) {
closeBufferedReader();
}
mConnection.disconnect();
Logger.d("Streaming connection closed");
}

private void closeBufferedReader() {
try {
mBufferedReader.close();
} catch (IOException e) {
Logger.d("Buffer already closed");
} catch (Exception e) {
Logger.d("Unknown error closing buffer: " + e.getLocalizedMessage());
}
}

private HttpStreamResponse getRequest() throws HttpException {
URL url;
HttpStreamResponse response;
Expand Down Expand Up @@ -162,13 +156,17 @@ private static void addHeaders(HttpURLConnection request, Map<String, String> he
private HttpStreamResponse buildResponse(HttpURLConnection connection) throws IOException {
int responseCode = connection.getResponseCode();
if (responseCode >= HttpURLConnection.HTTP_OK && responseCode < HttpURLConnection.HTTP_MULT_CHOICE) {
mResponseInputStream = connection.getInputStream();
if (mResponseInputStream != null) {
mResponseBufferedReader = new BufferedReader(new InputStreamReader(mResponseInputStream));
InputStream inputStream = connection.getInputStream();
if (inputStream != null) {
if (mBufferedReader != null) {
closeBufferedReader();
}
mBufferedReader = new BufferedReader(new InputStreamReader(inputStream));

return new HttpStreamResponseImpl(responseCode, mResponseBufferedReader);
return new HttpStreamResponseImpl(responseCode, mBufferedReader);
}
}

return new HttpStreamResponseImpl(responseCode);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ public class EventStreamParser {
* if the line contains any.
* @return Returns true if a blank line meaning the final of an event if found.
*/
@VisibleForTesting
public boolean parseLineAndAppendValue(String streamLine, Map<String, String> messageValues) {

if (streamLine == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@ public int status() {

@Override
public void disconnect() {
mIsDisconnectCalled.set(true);
close();
if (!mIsDisconnectCalled.getAndSet(true)) {
close();
}
}

private void close() {
Expand Down Expand Up @@ -101,8 +102,8 @@ public void connect(SseJwtToken token, ConnectionListener connectionListener) {
Map<String, String> values = new HashMap<>();
while ((inputLine = bufferedReader.readLine()) != null) {
if (mEventStreamParser.parseLineAndAppendValue(inputLine, values)) {
if(!isConnectionConfirmed) {
if(mEventStreamParser.isKeepAlive(values) || mSseHandler.isConnectionConfirmed(values)) {
if (!isConnectionConfirmed) {
if (mEventStreamParser.isKeepAlive(values) || mSseHandler.isConnectionConfirmed(values)) {
Logger.d("Streaming connection success");
isConnectionConfirmed = true;
connectionListener.onConnectionSuccess();
Expand Down Expand Up @@ -130,7 +131,7 @@ public void connect(SseJwtToken token, ConnectionListener connectionListener) {
logError("An error has occurred while creating stream Url ", e);
isErrorRetryable = false;
} catch (IOException e) {
logError("An error has occurred while parsing stream from: ", e);
Logger.d("An error has occurred while parsing stream: ", e);
isErrorRetryable = true;
} catch (Exception e) {
logError("An unexpected error has occurred while receiving stream events from: ", e);
Expand Down

0 comments on commit 6dbd7ab

Please sign in to comment.