Skip to content

Commit

Permalink
fix: cleaning up and errors
Browse files Browse the repository at this point in the history
[ci skip]
  • Loading branch information
tegefaulkes committed Feb 24, 2023
1 parent 5da726e commit d096ce2
Show file tree
Hide file tree
Showing 3 changed files with 178 additions and 86 deletions.
82 changes: 54 additions & 28 deletions src/clientRPC/ClientClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { PromiseCancellable } from '@matrixai/async-cancellable';
import { Timer } from '@matrixai/timer';
import { Validator } from 'ip-num';
import * as clientRpcUtils from './utils';
import * as clientRPCErrors from './errors';
import { promise } from '../utils';

const timeoutSymbol = Symbol('TimedOutSymbol');
Expand Down Expand Up @@ -69,7 +70,7 @@ class ClientClient {
} else if (Validator.isValidIPv6String(host)[0]) {
this.host = `[${host}]`;
} else {
throw Error('TMP Invalid host');
throw new clientRPCErrors.ErrorClientInvalidHost();
}
}

Expand All @@ -86,7 +87,7 @@ class ClientClient {
this.logger.info(`Destroyed ${this.constructor.name}`);
}

@createDestroy.ready(Error('TMP destroyed'))
@createDestroy.ready(new clientRPCErrors.ErrorClientDestroyed())
public async startConnection({
timeoutTimer,
}: {
Expand Down Expand Up @@ -125,7 +126,11 @@ class ClientClient {
);
// Handle connection failure
const openErrorHandler = (e) => {
connectProm.rejectP(Error('TMP ERROR Connection failure', { cause: e }));
connectProm.rejectP(
new clientRPCErrors.ErrorClientConnectionFailed(undefined, {
cause: e,
}),
);
};
ws.once('error', openErrorHandler);
// Authenticate server's certificates
Expand Down Expand Up @@ -153,7 +158,9 @@ class ClientClient {
timer?.then(() => timeoutSymbol) ?? new Promise(() => {}),
await Promise.all([authenticateProm.p, connectProm.p]),
]);
if (result === timeoutSymbol) throw Error('TMP timed out');
if (result === timeoutSymbol) {
throw new clientRPCErrors.ErrorClientConnectionTimedOut();
}
} catch (e) {
// Clean up
// unregister handlers
Expand All @@ -177,52 +184,67 @@ class ClientClient {
const readableStream = new ReadableStream<Uint8Array>(
{
start: (controller) => {
readableLogger.info('STARTING');
readableLogger.info('Starting');
const messageHandler = (data) => {
// ReadableLogger.debug(`message: ${data.toString()}`);
readableLogger.debug(`Received ${data.toString()}`);
if (controller.desiredSize == null) {
controller.error(Error('NEVER'));
return;
}
if (controller.desiredSize < 0) {
// ReadableLogger.debug('PAUSING');
readableLogger.debug('Applying readable backpressure');
ws.pause();
}
const message = data as Buffer;
if (message.length === 0) {
readableLogger.info('CLOSING, NULL MESSAGE');
readableLogger.debug('Null message received');
ws.removeListener('message', messageHandler);
if (!readableClosed) {
controller.close();
readableClosed = true;
readableLogger.debug('Closing');
controller.close();
}
if (writableClosed) {
this.logger.debug('Closing socket');
ws.close();
}
return;
}
controller.enqueue(message);
};
readableLogger.debug('Registering socket message handler');
ws.on('message', messageHandler);
ws.once('close', () => {
readableLogger.info('CLOSED, WS CLOSED');
ws.once('close', (code, reason) => {
this.logger.info('Socket closed');
ws.removeListener('message', messageHandler);
if (!readableClosed) {
controller.error(Error('TMP WebSocket Closed early CR'));
readableClosed = true;
readableLogger.debug(
`Closed early, ${code}, ${reason.toString()}`,
);
controller.error(
new clientRPCErrors.ErrorClientConnectionEndedEarly(),
);
}
});
ws.once('error', (e) => {
if (!readableClosed) {
readableClosed = true;
readableLogger.error(e);
controller.error(e);
}
});
ws.once('error', (e) => readableLogger.error(e));
},
cancel: () => {
readableLogger.info('CANCELLED');
readableLogger.debug('Cancelled');
if (!readableClosed) {
ws.close();
readableLogger.debug('Closing socket');
readableClosed = true;
ws.close();
}
},
pull: () => {
// ReadableLogger.debug('RESUMING');
readableLogger.debug('Releasing backpressure');
ws.resume();
},
},
Expand All @@ -233,43 +255,47 @@ class ClientClient {
);
const writableStream = new WritableStream<Uint8Array>({
start: (controller) => {
writableLogger.info('STARTING');
writableLogger.info('Starting');
ws.once('error', (e) => {
writableLogger.error(`error: ${e}`);
if (!writableClosed) {
controller.error(e);
writableClosed = true;
writableLogger.error(e.toString());
controller.error(e);
}
});
ws.once('close', (code, reason) => {
if (!writableClosed) {
writableLogger.info(
`ws closing early! with code: ${code} and reason: ${reason.toString()}`,
writableClosed = true;
writableLogger.debug(`Closed early, ${code}, ${reason.toString()}`);
controller.error(
new clientRPCErrors.ErrorClientConnectionEndedEarly(),
);
controller.error(Error('TMP WebSocket Closed early CW'));
}
});
},
close: () => {
writableLogger.info('CLOSING');
writableLogger.debug('Closing, sending null message');
ws.send(Buffer.from([]));
writableClosed = true;
if (readableClosed) {
writableLogger.debug('Closing socket');
ws.close();
}
},
abort: () => {
writableLogger.info('ABORTED');
writableLogger.debug('Aborted');
writableClosed = true;
if (readableClosed) {
writableLogger.debug('Closing socket');
ws.close();
}
},
write: async (chunk, controller) => {
// WritableLogger.debug(`writing: ${chunk?.toString()}`);
writableLogger.debug(`Sending ${chunk?.toString()}`);
const wait = promise<void>();
ws.send(chunk, (e) => {
if (e != null) {
writableLogger.error(e.toString());
controller.error(e);
}
wait.resolveP();
Expand All @@ -283,15 +309,15 @@ class ClientClient {
ws.ping();
}, this.pingInterval);
const pingTimeoutTimer = setTimeout(() => {
this.logger.debug('PING TIMED OUT');
this.logger.debug('Ping timed out');
ws.close(4002, 'Timed out');
}, this.pingTimeout);
ws.on('ping', () => {
this.logger.debug('received ping');
this.logger.debug('Received ping');
ws.pong();
});
ws.on('pong', () => {
this.logger.debug('received pong');
this.logger.debug('Received pong');
pingTimeoutTimer.refresh();
});
ws.once('close', () => {
Expand Down
Loading

0 comments on commit d096ce2

Please sign in to comment.