Skip to content

Commit

Permalink
Merge pull request #834 from ably/feature/protocol-typescript
Browse files Browse the repository at this point in the history
Convert Protocol to TypeScript
  • Loading branch information
owenpearson authored Nov 5, 2021
2 parents 5dab0ec + ba842e1 commit eaa4498
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 42 deletions.
3 changes: 1 addition & 2 deletions common/lib/transport/connectionmanager.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import ProtocolMessage from '../types/protocolmessage';
import * as Utils from '../util/utils';
import Protocol from './protocol';
import Protocol, { PendingMessage } from './protocol';
import Defaults from '../util/defaults';
import Platform from 'platform';
import EventEmitter from '../util/eventemitter';
Expand All @@ -23,7 +23,6 @@ var ConnectionManager = (function() {
var haveWebStorage = !!(typeof(WebStorage) !== 'undefined' && WebStorage.get);
var haveSessionStorage = !!(typeof(WebStorage) !== 'undefined' && WebStorage.getSession);
var actions = ProtocolMessage.Action;
var PendingMessage = Protocol.PendingMessage;
var noop = function() {};
var transportPreferenceOrder = Defaults.transportPreferenceOrder;
var optimalTransport = transportPreferenceOrder[transportPreferenceOrder.length - 1];
Expand Down
3 changes: 1 addition & 2 deletions common/lib/transport/messagequeue.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import ErrorInfo from '../types/errorinfo';
import EventEmitter from '../util/eventemitter';
import Logger from '../util/logger';

type PendingMessage = any;
import { PendingMessage } from './protocol';

class MessageQueue extends EventEmitter {
messages: Array<PendingMessage>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,43 +4,63 @@ import EventEmitter from '../util/eventemitter';
import Logger from '../util/logger';
import MessageQueue from './messagequeue';
import ErrorInfo from '../types/errorinfo';
import Transport from './transport';
import { ErrCallback } from '../../types/utils';

var Protocol = (function() {
var actions = ProtocolMessage.Action;
const actions = ProtocolMessage.Action;

function Protocol(transport) {
EventEmitter.call(this);
export class PendingMessage {
message: ProtocolMessage;
callback?: ErrCallback;
merged: boolean;
sendAttempted: boolean;
ackRequired: boolean;

constructor(message: ProtocolMessage, callback?: ErrCallback) {
this.message = message;
this.callback = callback;
this.merged = false;
const action = message.action;
this.sendAttempted = false;
this.ackRequired = (action == actions.MESSAGE || action == actions.PRESENCE);
}
}

class Protocol extends EventEmitter {
transport: Transport;
messageQueue: MessageQueue;

constructor(transport: Transport) {
super();
this.transport = transport;
this.messageQueue = new MessageQueue();
var self = this;
transport.on('ack', function(serial, count) { self.onAck(serial, count); });
transport.on('nack', function(serial, count, err) { self.onNack(serial, count, err); });
transport.on('ack', (serial: number, count: number) => { this.onAck(serial, count); });
transport.on('nack', (serial: number, count: number, err: ErrorInfo) => { this.onNack(serial, count, err); });
}
Utils.inherits(Protocol, EventEmitter);

Protocol.prototype.onAck = function(serial, count) {
onAck(serial: number, count: number): void {
Logger.logAction(Logger.LOG_MICRO, 'Protocol.onAck()', 'serial = ' + serial + '; count = ' + count);
this.messageQueue.completeMessages(serial, count);
};
}

Protocol.prototype.onNack = function(serial, count, err) {
onNack(serial: number, count: number, err: ErrorInfo): void {
Logger.logAction(Logger.LOG_ERROR, 'Protocol.onNack()', 'serial = ' + serial + '; count = ' + count + '; err = ' + Utils.inspectError(err));
if(!err) {
err = new ErrorInfo('Unable to send message; channel not responding', 50001, 500);
}
this.messageQueue.completeMessages(serial, count, err);
};
}

Protocol.prototype.onceIdle = function(listener) {
var messageQueue = this.messageQueue;
onceIdle(listener: ErrCallback): void {
const messageQueue = this.messageQueue;
if(messageQueue.count() === 0) {
listener();
return;
}
messageQueue.once('idle', listener);
};
}

Protocol.prototype.send = function(pendingMessage) {
send(pendingMessage: PendingMessage): void {
if(pendingMessage.ackRequired) {
this.messageQueue.push(pendingMessage);
}
Expand All @@ -49,38 +69,26 @@ var Protocol = (function() {
}
pendingMessage.sendAttempted = true;
this.transport.send(pendingMessage.message);
};
}

Protocol.prototype.getTransport = function() {
getTransport(): Transport {
return this.transport;
};
}

Protocol.prototype.getPendingMessages = function() {
getPendingMessages(): PendingMessage[] {
return this.messageQueue.copyAll();
};
}

Protocol.prototype.clearPendingMessages = function() {
clearPendingMessages(): void {
return this.messageQueue.clear();
};
}

Protocol.prototype.finish = function() {
var transport = this.transport;
finish(): void {
const transport = this.transport;
this.onceIdle(function() {
transport.disconnect();
});
};

function PendingMessage(message, callback) {
this.message = message;
this.callback = callback;
this.merged = false;
var action = message.action;
this.sendAttempted = false;
this.ackRequired = (action == actions.MESSAGE || action == actions.PRESENCE);
}
Protocol.PendingMessage = PendingMessage;

return Protocol;
})();
}

export default Protocol;
1 change: 1 addition & 0 deletions common/types/utils.d.ts
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
export type StandardCallback<T> = (err?: ErrorInfo | null, result?: T) => void;
export type ErrCallback = (err?: ErrorInfo | null) => void;

0 comments on commit eaa4498

Please sign in to comment.