-
Notifications
You must be signed in to change notification settings - Fork 55
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #853 from ably/feature/channel-typescript
Convert Channel to TypeScript
- Loading branch information
Showing
4 changed files
with
179 additions
and
161 deletions.
There are no files selected for viewing
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,177 @@ | ||
import * as Utils from '../util/utils'; | ||
import EventEmitter from '../util/eventemitter'; | ||
import Logger from '../util/logger'; | ||
import Presence from './presence'; | ||
import Crypto from 'platform-crypto'; | ||
import Message, { CipherOptions } from '../types/message'; | ||
import ErrorInfo from '../types/errorinfo'; | ||
import PaginatedResource, { PaginatedResult } from './paginatedresource'; | ||
import Http from 'platform-http'; | ||
import Resource from './resource'; | ||
import { ChannelOptions } from '../../types/channel'; | ||
import { PaginatedResultCallback } from '../../types/utils'; | ||
|
||
// TODO: Replace these when Realtime and Rest are in TypeScript | ||
type Realtime = any; | ||
type Rest = any; | ||
|
||
interface RestHistoryParams { | ||
start?: number; | ||
end?: number; | ||
direction?: string; | ||
limit?: number; | ||
} | ||
|
||
function noop() {} | ||
const MSG_ID_ENTROPY_BYTES = 9; | ||
|
||
function allEmptyIds(messages: Array<Message>) { | ||
return Utils.arrEvery(messages, function(message: Message) { | ||
return !message.id; | ||
}); | ||
} | ||
|
||
function normaliseChannelOptions(options?: ChannelOptions) { | ||
const channelOptions = options || {}; | ||
if(channelOptions.cipher) { | ||
if(!Crypto) throw new Error('Encryption not enabled; use ably.encryption.js instead'); | ||
const cipher = Crypto.getCipher(channelOptions.cipher); | ||
channelOptions.cipher = cipher.cipherParams; | ||
channelOptions.channelCipher = cipher.cipher; | ||
} else if('cipher' in channelOptions) { | ||
/* Don't deactivate an existing cipher unless options | ||
* has a 'cipher' key that's falsey */ | ||
channelOptions.cipher = null; | ||
channelOptions.channelCipher = null; | ||
} | ||
return channelOptions; | ||
} | ||
|
||
class Channel extends EventEmitter { | ||
rest: Rest | Realtime; | ||
name: string; | ||
basePath: string; | ||
presence: Presence; | ||
channelOptions: ChannelOptions; | ||
|
||
constructor(rest: Rest | Realtime, name: string, channelOptions?: ChannelOptions) { | ||
super(); | ||
Logger.logAction(Logger.LOG_MINOR, 'Channel()', 'started; name = ' + name); | ||
this.rest = rest; | ||
this.name = name; | ||
this.basePath = '/channels/' + encodeURIComponent(name); | ||
this.presence = new Presence(this); | ||
this.channelOptions = normaliseChannelOptions(channelOptions); | ||
} | ||
|
||
setOptions(options: ChannelOptions): void { | ||
this.channelOptions = normaliseChannelOptions(options); | ||
} | ||
|
||
history(params: RestHistoryParams | null, callback: PaginatedResultCallback<Message>): Promise<PaginatedResult<Message>> | void { | ||
Logger.logAction(Logger.LOG_MICRO, 'Channel.history()', 'channel = ' + this.name); | ||
/* params and callback are optional; see if params contains the callback */ | ||
if(callback === undefined) { | ||
if(typeof(params) == 'function') { | ||
callback = params; | ||
params = null; | ||
} else { | ||
if(this.rest.options.promises) { | ||
return Utils.promisify(this, 'history', arguments); | ||
} | ||
callback = noop; | ||
} | ||
} | ||
|
||
this._history(params, callback); | ||
} | ||
|
||
_history(params: RestHistoryParams | null, callback: PaginatedResultCallback<Message>): void { | ||
const rest = this.rest, | ||
format = rest.options.useBinaryProtocol ? Utils.Format.msgpack : Utils.Format.json, | ||
envelope = Http.supportsLinkHeaders ? undefined : format, | ||
headers = Utils.defaultGetHeaders(format); | ||
|
||
if(rest.options.headers) | ||
Utils.mixin(headers, rest.options.headers); | ||
|
||
const options = this.channelOptions; | ||
(new PaginatedResource(rest, this.basePath + '/messages', headers, envelope, function(body: any, headers: Record<string, string>, unpacked?: boolean) { | ||
return Message.fromResponseBody(body, options, unpacked ? undefined : format); | ||
})).get(params as Record<string, unknown>, callback); | ||
} | ||
|
||
publish(): void | Promise<void> { | ||
const argCount = arguments.length, | ||
first = arguments[0], | ||
second = arguments[1]; | ||
let callback = arguments[argCount - 1]; | ||
let messages: Array<Message>; | ||
let params: any; | ||
|
||
if(typeof(callback) !== 'function') { | ||
if(this.rest.options.promises) { | ||
return Utils.promisify(this, 'publish', arguments); | ||
} | ||
callback = noop; | ||
} | ||
|
||
if(typeof first === 'string' || first === null) { | ||
/* (name, data, ...) */ | ||
messages = [Message.fromValues({name: first, data: second})]; | ||
params = arguments[2]; | ||
} else if(Utils.isObject(first)) { | ||
messages = [Message.fromValues(first)]; | ||
params = arguments[1]; | ||
} else if(Utils.isArray(first)) { | ||
messages = Message.fromValuesArray(first); | ||
params = arguments[1]; | ||
} else { | ||
throw new ErrorInfo('The single-argument form of publish() expects a message object or an array of message objects', 40013, 400); | ||
} | ||
|
||
if(typeof params !== 'object' || !params) { | ||
/* No params supplied (so after-message argument is just the callback or undefined) */ | ||
params = {}; | ||
} | ||
|
||
const rest = this.rest, | ||
options = rest.options, | ||
format = options.useBinaryProtocol ? Utils.Format.msgpack : Utils.Format.json, | ||
idempotentRestPublishing = rest.options.idempotentRestPublishing, | ||
headers = Utils.defaultPostHeaders(format); | ||
|
||
if(options.headers) | ||
Utils.mixin(headers, options.headers); | ||
|
||
if(idempotentRestPublishing && allEmptyIds(messages)) { | ||
const msgIdBase = Utils.randomString(MSG_ID_ENTROPY_BYTES); | ||
Utils.arrForEach(messages, function(message, index) { | ||
message.id = msgIdBase + ':' + index.toString(); | ||
}); | ||
} | ||
|
||
Message.encodeArray(messages, this.channelOptions as CipherOptions, (err: Error) => { | ||
if(err) { | ||
callback(err); | ||
return; | ||
} | ||
|
||
/* RSL1i */ | ||
const size = Message.getMessagesSize(messages), | ||
maxMessageSize = options.maxMessageSize; | ||
if(size > maxMessageSize) { | ||
callback(new ErrorInfo('Maximum size of messages that can be published at once exceeded ( was ' + size + ' bytes; limit is ' + maxMessageSize + ' bytes)', 40009, 400)); | ||
return; | ||
} | ||
|
||
this._publish(Message.serialize(messages, format), headers, params, callback); | ||
}); | ||
} | ||
|
||
_publish(requestBody: unknown, headers: Record<string, string>, params: any, callback: Function): void { | ||
Resource.post(this.rest, this.basePath + '/messages', requestBody, headers, params, null, callback); | ||
} | ||
} | ||
|
||
export default Channel; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,5 +10,5 @@ export interface ChannelOptions { | |
algorithm: string; | ||
encrypt: Function; | ||
decrypt: Function; | ||
}; | ||
} | null; | ||
} |