Skip to content
This repository has been archived by the owner on Apr 14, 2023. It is now read-only.

fix(NA): Non forced close without connection terminate and also some connection's flow improvements #197

Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 74 additions & 20 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,12 @@ export class SubscriptionClient {
private connectionCallback: any;
private eventEmitter: EventEmitter;
private lazy: boolean;
private forceClose: boolean;
private closedByUser: boolean;
private wsImpl: any;
private wasKeepAliveReceived: boolean;
private checkConnectionTimeoutId: any;
private tryReconnectTimeoutId: any;
private checkConnectionIntervalId: any;
private maxConnectTimeoutId: any;
private middlewares: Middleware[];

constructor(url: string, options?: ClientOptions, webSocketImpl?: any) {
Expand Down Expand Up @@ -103,7 +105,7 @@ export class SubscriptionClient {
this.reconnecting = false;
this.reconnectionAttempts = reconnectionAttempts;
this.lazy = !!lazy;
this.forceClose = false;
this.closedByUser = false;
this.backoff = new Backoff({ jitter: 0.5 });
this.eventEmitter = new EventEmitter();
this.middlewares = [];
Expand All @@ -122,12 +124,24 @@ export class SubscriptionClient {
return this.client.readyState;
}

public close(isForced = true) {
public close(isForced = true, closedByUser = true) {
if (this.client !== null) {
this.forceClose = isForced;
this.sendMessage(undefined, MessageTypes.GQL_CONNECTION_TERMINATE, null);
this.closedByUser = closedByUser;

if (isForced) {
this.clearCheckConnectionInterval();
this.clearMaxConnectTimeout();
this.clearTryReconnectTimeout();
this.sendMessage(undefined, MessageTypes.GQL_CONNECTION_TERMINATE, null);
}

this.client.close();
this.client = null;
this.eventEmitter.emit('disconnected');

if (!isForced) {
this.tryReconnect();
}
}
}

Expand Down Expand Up @@ -277,6 +291,27 @@ export class SubscriptionClient {
return this;
}

private clearCheckConnectionInterval() {
if (this.checkConnectionIntervalId) {
clearInterval(this.checkConnectionIntervalId);
this.checkConnectionIntervalId = null;
}
}

private clearMaxConnectTimeout() {
if (this.maxConnectTimeoutId) {
clearTimeout(this.maxConnectTimeoutId);
this.maxConnectTimeoutId = null;
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reformat :)


private clearTryReconnectTimeout() {
if (this.tryReconnectTimeoutId) {
clearTimeout(this.tryReconnectTimeoutId);
this.tryReconnectTimeoutId = null;
}
}

private logWarningOnNonProductionEnv(warning: string) {
if (process && process.env && process.env.NODE_ENV !== 'production') {
console.warn(warning);
Expand Down Expand Up @@ -369,7 +404,7 @@ export class SubscriptionClient {
// send message, or queue it if connection is not open
private sendMessageRaw(message: Object) {
switch (this.status) {
case this.client.OPEN:
case this.wsImpl.OPEN:
let serializedMessage: string = JSON.stringify(message);
let parsedMessage: any;
try {
Expand All @@ -380,7 +415,7 @@ export class SubscriptionClient {

this.client.send(serializedMessage);
break;
case this.client.CONNECTING:
case this.wsImpl.CONNECTING:
this.unsentMessagesQueue.push(message);

break;
Expand All @@ -397,7 +432,7 @@ export class SubscriptionClient {
}

private tryReconnect() {
if (!this.reconnect || this.backoff.attempts > this.reconnectionAttempts) {
if (!this.reconnect || this.backoff.attempts >= this.reconnectionAttempts) {
return;
}

Expand All @@ -410,8 +445,10 @@ export class SubscriptionClient {
this.reconnecting = true;
}

this.clearTryReconnectTimeout();

const delay = this.backoff.duration();
setTimeout(() => {
this.tryReconnectTimeoutId = setTimeout(() => {
this.connect();
}, delay);
}
Expand All @@ -424,14 +461,34 @@ export class SubscriptionClient {
}

private checkConnection() {
this.wasKeepAliveReceived ? this.wasKeepAliveReceived = false : this.close(false);
if (this.wasKeepAliveReceived) {
this.wasKeepAliveReceived = false;
return;
}

this.close(false, true);
}

private checkMaxConnectTimeout() {
this.clearMaxConnectTimeout();

// Max timeout trying to connect
this.maxConnectTimeoutId = setTimeout(() => {
if (this.status !== this.wsImpl.OPEN) {
this.close(false, true);
}
}, this.wsTimeout);
}

private connect() {
this.client = new this.wsImpl(this.url, GRAPHQL_WS);

this.checkMaxConnectTimeout();

this.client.onopen = () => {
this.closedByUser = false;
this.eventEmitter.emit(this.reconnecting ? 'reconnecting' : 'connecting');
this.clearMaxConnectTimeout();

const payload: ConnectionParams = typeof this.connectionParams === 'function' ? this.connectionParams() : this.connectionParams;

Expand All @@ -441,12 +498,8 @@ export class SubscriptionClient {
};

this.client.onclose = () => {
this.eventEmitter.emit('disconnected');

if (this.forceClose) {
this.forceClose = false;
} else {
this.tryReconnect();
if ( !this.closedByUser ) {
this.close(false, false);
}
};

Expand Down Expand Up @@ -522,10 +575,11 @@ export class SubscriptionClient {
this.checkConnection();
}

if (this.checkConnectionTimeoutId) {
clearTimeout(this.checkConnectionTimeoutId);
if (this.checkConnectionIntervalId) {
clearInterval(this.checkConnectionIntervalId);
this.checkConnection();
}
this.checkConnectionTimeoutId = setTimeout(this.checkConnection.bind(this), this.wsTimeout);
this.checkConnectionIntervalId = setInterval(this.checkConnection.bind(this), this.wsTimeout);
break;

default:
Expand Down
Loading