Skip to content

Commit

Permalink
refactor(repeater): streamline repeater connection management (#514)
Browse files Browse the repository at this point in the history
  • Loading branch information
derevnjuk authored Feb 24, 2024
1 parent 08e303b commit 9c62d83
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 32 deletions.
2 changes: 1 addition & 1 deletion src/Config/CliBuilder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ export class CliBuilder {

private initSentry(dsn: string) {
init({
attachStacktrace: true,
dsn,
attachStacktrace: true,
release: process.env.VERSION,
beforeSend(event) {
if (event.contexts.args) {
Expand Down
26 changes: 15 additions & 11 deletions src/Repeater/DefaultRepeaterServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,12 @@ export class DefaultRepeaterServer implements RepeaterServer {
'deployed'
);

this.createPingTimer();

return result;
}

public connect(hostname: string) {
public async connect(hostname: string) {
this._socket = io(this.options.uri, {
parser,
path: '/api/ws/v1',
Expand All @@ -90,21 +92,23 @@ export class DefaultRepeaterServer implements RepeaterServer {
}
});

this.socket.on('connect_error', (error: Error) =>
logger.debug(`Unable to connect to the %s host`, this.options.uri, error)
);
this.socket.on('disconnect', (reason) => {
if (reason === 'io server disconnect') {
// the disconnection was initiated by the server, you need to reconnect manually
this.socket.connect();
}
});

this.createPingTimer();
await Promise.race([
once(this.socket, 'connect'),
once(this.socket, 'connect_error').then(([error]: Error[]) => {
throw error;
})
]);

logger.debug('Event bus connected to %s', this.options.uri);
}

public connected(handler: () => void | Promise<void>): void {
this.socket.on('connect', () =>
this.processEventHandler('connect', undefined, handler)
);
}

public requestReceived(
handler: (
event: RepeaterServerRequestEvent
Expand Down
5 changes: 2 additions & 3 deletions src/Repeater/RepeaterServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ export interface RepeaterServerReconnectionAttemptedEvent {
}

export interface RepeaterServerErrorEvent {
eventName: string;
message: string;
}

Expand Down Expand Up @@ -89,7 +90,7 @@ export interface DeploymentRuntime {
export interface RepeaterServer {
disconnect(): void;

connect(hostname: string): void;
connect(hostname: string): Promise<void>;

deploy(
options: DeployCommandOptions,
Expand Down Expand Up @@ -132,8 +133,6 @@ export interface RepeaterServer {

reconnectionSucceeded(handler: () => void | Promise<void>): void;

connected(handler: () => void | Promise<void>): void;

errorOccurred(
handler: (event: RepeaterServerErrorEvent) => void | Promise<void>
): void;
Expand Down
39 changes: 22 additions & 17 deletions src/Repeater/ServerRepeaterLauncher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,23 @@ export class ServerRepeaterLauncher implements RepeaterLauncher {
logger.log('Starting the Repeater (%s)...', this.info.version);

this.repeaterId = repeaterId;
this.repeaterServer.connect(repeaterId);
this.subscribeToEvents();

process.nextTick(() => this.subscribeToEvents());

await this.repeaterServer.connect(repeaterId);

await this.repeaterServer.deploy(
{
repeaterId: this.repeaterId
},
this.getRuntime()
);

if (!this.repeaterStarted) {
this.repeaterStarted = true;

logger.log('The Repeater (%s) started', this.info.version);
}
}

private getRuntime(): DeploymentRuntime {
Expand All @@ -117,23 +132,13 @@ export class ServerRepeaterLauncher implements RepeaterLauncher {
}

private subscribeToEvents() {
this.repeaterServer.connected(async () => {
await this.repeaterServer.deploy(
{
repeaterId: this.repeaterId
},
this.getRuntime()
);

if (!this.repeaterStarted) {
this.repeaterStarted = true;

logger.log('The Repeater (%s) started', this.info.version);
this.repeaterServer.errorOccurred(({ eventName, message }) => {
if (['deploy', 'undeploy'].includes(eventName)) {
logger.error(`%s: %s`, chalk.red('(!) CRITICAL'), message);
this.close().catch(logger.error);
process.exitCode = 1;
}
});
this.repeaterServer.errorOccurred(({ message }) => {
logger.error(`%s: %s`, chalk.red('(!) CRITICAL'), message);
});
this.repeaterServer.reconnectionFailed((payload) =>
this.reconnectionFailed(payload)
);
Expand Down

0 comments on commit 9c62d83

Please sign in to comment.