Skip to content
This repository has been archived by the owner on Mar 3, 2022. It is now read-only.

Commit

Permalink
feat: Core ddp-connector and init strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
nytamin committed Feb 25, 2018
1 parent f048398 commit 93f2f60
Show file tree
Hide file tree
Showing 10 changed files with 492 additions and 51 deletions.
7 changes: 5 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,10 @@
"production"
],
"dependencies": {
"tslib": "^1.6.0",
"mos-connection": "git+https://Mfn2bXzQtEFXHKpo12x2y78QJMR7gUZz:x-oauth-basic@bitbucket.org/superflytv/mos-connection.git"
"ddp": "^0.12.1",
"ddp-ejson": "~0.8.1-3",
"ddp-random": "~0.8.1-1",
"mos-connection": "git+https://Mfn2bXzQtEFXHKpo12x2y78QJMR7gUZz:x-oauth-basic@bitbucket.org/superflytv/mos-connection.git",
"tslib": "^1.6.0"
}
}
34 changes: 0 additions & 34 deletions src/MosIntegration.ts

This file was deleted.

12 changes: 7 additions & 5 deletions src/__tests__/MosIntegration.spec.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@



import MosIntegration from "../MosIntegration"
import {Connector} from "../connector"

test('Simple test', () => {
test('Simple test', async () => {

var myMosC = new MosIntegration();
var c = new Connector();

MosIntegration.init();
await c.init()

expect(myMosC).toBeInstanceOf(MosIntegration)
expect(c).toBeInstanceOf(Connector)

return 1;
})
44 changes: 44 additions & 0 deletions src/connector.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@

import {MosHandler} from "./mosHandler"
import {CoreHandler} from "./coreHandler"



export class Connector {

private mosHandler:MosHandler;
private coreHandler:CoreHandler;


async init():Promise<number> {

await this.initCore()



await this.initMos();



return 0;

}
initCore() {

this.coreHandler = new CoreHandler();

return this.coreHandler.init();

}
initMos():Promise<number> {


// TODO: maybe get some config data from core here?

this.mosHandler = new MosHandler();

return this.mosHandler.init();

}
}

215 changes: 215 additions & 0 deletions src/connectors/ddpConnector.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
let ddp: any = require("ddp");

import {EventEmitter} from 'events';




export interface DDPConnectorOptions {
host: string;
port: number;
path?: string;
ssl?: boolean;
debug?: boolean;
}
export interface Observer {
added: (id:string) => void;
changed:(id:string, oldFields:any, clearedFields:any, newFields:any) => void;
removed:(id:string, oldValue:any) => void;
}
export interface DDPClient {
on:(event:string, data?:any) => void,
close:() => void,
connect:(callback?:(error:Error, wasReconnect:boolean) => void) => void,

call:(methodName:string, data:Array<any>, callback:(err:Error, result:any) => void) => void
subscribe: (subscriptionName:string, data:Array<any>, callback:() => void) => void
observe: (collectionName:string) => Observer

collections: {
[collectionName:string]: {
[id:string]: {
_id: string,
[attr:string]: any
}
}
}

socket:any,
session:string,

host: string,
port: number,
path: string,
ssl: boolean,
useSockJS: boolean;
autoReconnect: boolean;
autoReconnectTimer: number;
ddpVersion: any;
}




export class DDPConnector extends EventEmitter {
private _options:DDPConnectorOptions;
public ddpClient:DDPClient;
private _connected:boolean = false;
private _connecting:boolean = false;
private _connectionId:string;

onConnectionChanged?: (connected:boolean) => void;
onConnected?: () => void;
onDisconnected?: () => void;


constructor(options:DDPConnectorOptions) {
super();

this._options = options;

}
createClient() {
var o = {
host: this._options.host,
port: this._options.port,
path: this._options.path || '',
ssl: this._options.ssl || false,
useSockJS: true,
autoReconnect: true,
autoReconnectTimer: 1000,
maintain_collections: true,
ddpVersion: "1"
};
if (!this.ddpClient) {

this.ddpClient = new ddp(o);
this.ddpClient.on("socket-close", () => {

this._onclientConnectionChange(false)

});
this.ddpClient.on("message", (message: any) => this._onClientMessage(message));
this.ddpClient.on("socket-error", (error: any) => this._onClientError(error));



} else {


if (this.ddpClient.socket) {
this.ddpClient.close();
}


this.ddpClient.host = o.host;
this.ddpClient.port = o.port;
this.ddpClient.path = o.path;
this.ddpClient.ssl = o.ssl;
this.ddpClient.useSockJS = o.useSockJS;
this.ddpClient.autoReconnect = o.autoReconnect;
this.ddpClient.autoReconnectTimer = o.autoReconnectTimer ;
this.ddpClient.ddpVersion = o.ddpVersion ;


this.ddpClient.connect();

}

this.ddpClient.on("connected", () => {

this._onclientConnectionChange(true);
});
this.ddpClient.on("failed", (error: any) => this._onClientConnectionFailed(error));
}
public connect() {
return new Promise((resolve, reject) => {

if (!this.ddpClient) {
this.createClient();
}

if (this.ddpClient && !this._connecting) {

this._connecting = true;

this.ddpClient.connect((error: Object, isReconnecting: boolean) => {
this._connecting = false;


if (error) {
reject(error);
} else {
resolve();
}
});
}
});
}
public close() {
if (this.ddpClient) {
this.ddpClient.close();
delete this.ddpClient;
}
}
public get connected(): boolean{
return this._connected;
}
public forceReconnect(): void {
this.createClient();
}

private _onclientConnectionChange(connected:boolean) {
if (connected !== this._connected) {
this._connected = connected;

if (connected) {
this._connectionId = this.ddpClient.session;
}

//log.debug("DDP: _onclientConnectionChange "+connected);

if (this.onConnectionChanged) {
this.onConnectionChanged(this._connected);
}
if (this.onConnected && this._connected) {
this.onConnected();
}
if (this.onDisconnected && !this._connected) {
this.onDisconnected();
}

/*if(!this._connected && this.autoReconnect){
this._createClient();
this.handleAutoReconnect();
}
if (this._connected) {
this._failedConnectionAttempts = 0;
}
*/
}
}
private _onClientConnectionFailed(error:Error) {

if (this.listenerCount('failed') > 0) {

this.emit('failed', error);
} else {
console.log('failed',error);
// last resort retry strategy:
setTimeout(() => {
if (!this._connected) {
this.forceReconnect();
}
}, 5000);
}
}

private _onClientMessage(message:any) {
//console.log('message',message);
message;
}
private _onClientError(error:Error) {
this.emit('error', error);
}
}
18 changes: 18 additions & 0 deletions src/coreHandler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import {DDPConnector} from "./connectors/ddpConnector"


export class CoreHandler {

private ddp:DDPConnector;

init() {

this.ddp = new DDPConnector({
host: '127.0.0.1',
port: 3000,
});

this.ddp.createClient();

}
}
38 changes: 38 additions & 0 deletions src/ddp.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/// <reference path="meteor.d.ts" />

declare module DDP {
interface DDPStatic {
subscribe(name: string, ...rest: any[]): Meteor.SubscriptionHandle;
call(method: string, ...parameters: any[]): void;
apply(method: string, ...parameters: any[]): void;
methods(IMeteorMethodsDictionary: any): any;
status(): DDPStatus;
reconnect(): void;
disconnect(): void;
onReconnect(): void;
}

function _allSubscriptionsReady(): boolean;

type Status = 'connected' | 'connecting' | 'failed' | 'waiting' | 'offline';

interface DDPStatus {
connected: boolean;
status: Status;
retryCount: number;
retryTime?: number;
reason?: string;
}

function connect(url: string): DDPStatic;
}

declare module DDPCommon {
interface MethodInvocation {
new (options: {}): MethodInvocation;

unblock(): void;

setUserId(userId: number): void;
}
}
Loading

0 comments on commit 93f2f60

Please sign in to comment.