Skip to content

Commit

Permalink
Improvement
Browse files Browse the repository at this point in the history
- Fix problem concurent "Possible EventEmitter memory leak detected. 11 error listeners added"
- Use own @type ssh2-sftp-client
  • Loading branch information
rolldone committed Sep 21, 2021
1 parent 98f85b6 commit b123179
Show file tree
Hide file tree
Showing 6 changed files with 269 additions and 67 deletions.
152 changes: 92 additions & 60 deletions app/devsync2/compute/Download.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import BaseModel, { BaseModelInterface } from "@root/base/BaseModel";
import sftpClient from 'ssh2-sftp-client';
import sftpClient, { sftp } from '@root/tool/ssh2-sftp-client';
import { existsSync, mkdirSync, readFileSync, rmdirSync, statSync, unlink, unlinkSync } from "fs";
import { debounce, DebouncedFunc } from "lodash";
import { CliInterface } from "../services/CliService";
Expand All @@ -8,17 +8,26 @@ import { SftpOptions } from "./SyncPull";
import upath from 'upath';
import { MasterDataInterface } from "@root/bootstrap/StartMasterData";
import { join as pathJoin } from "path";
import { SFTPWrapper } from "ssh2";

declare var masterData: MasterDataInterface;

const DOWNLOAD_ACTION = {
DELETE_IS_FOLDER: 1
}

/* Use this if want to debugging */
/* process.on('warning', (warning) => {
console.warn(warning.name); // Print the warning name
console.warn(warning.message); // Print the warning message
console.warn(warning.stack); // Print the stack trace
process.exit();
}); */

export interface DownloadInterface extends BaseModelInterface {
tempFolder: string
status: downloadStatus
_client?: sftpClient
_client?: sftp
_sftpOptions?: SftpOptions
returnSftpConfig: { (props: SftpOptions): SftpOptions }
returnClient: {
Expand Down Expand Up @@ -55,6 +64,7 @@ export interface DownloadInterface extends BaseModelInterface {
_pendingTimeoutStop?: { (stop?: boolean): void }
_deleteCacheFile?: { (path: string, isFolder?: number): void }
_removeSameString: { (fullPath: string, basePath: string): string }
_ssh2SftpWrapper?: SFTPWrapper
}

export const STATUS_UPLOAD = {
Expand All @@ -76,9 +86,13 @@ const Download = BaseModel.extend<Omit<DownloadInterface, 'model'>>({
returnSftpConfig(props) {
return this._sftpOptions = props;
},

returnClient: async function (props) {
this._client = new sftpClient();
await this._client.connect(props);
await this._client.connect({
...props
});
this._ssh2SftpWrapper = await this._client.getSftpChannel();
return this._client;
},
construct(cli, config) {
Expand Down Expand Up @@ -121,7 +135,7 @@ const Download = BaseModel.extend<Omit<DownloadInterface, 'model'>>({
this._pendingUpload[entry.path].cancel();
}
/* Mengikuti kelipatan concurent */
let _debouncePendingOut = first_time_out == null ? (100 * (entry.queue_no == 0 ? 1 : entry.queue_no + 1)) : first_time_out;
let _debouncePendingOut = 100;//first_time_out == null ? (100 * (entry.queue_no == 0 ? 1 : entry.queue_no + 1)) : first_time_out;
this._pendingUpload[entry.path] = debounce((entry: any) => {
this._pendingTimeoutStop();
var remote = entry.path;
Expand All @@ -139,75 +153,88 @@ const Download = BaseModel.extend<Omit<DownloadInterface, 'model'>>({
delete this._pendingUpload[entry.path];
delete this._orders[entry.queue_no];
}
this._client.stat(remote).then((data) => {
this._ssh2SftpWrapper.stat(remote, (err, data) => {
deleteQueue();
let size_limit = this._config.size_limit;
if (size_limit == null) {
size_limit = 5;
}
size_limit = size_limit * 1000000;
if (data.size > size_limit) {
// console.log('size_limit', size_limit);
// console.log('stats', stats);
this.onListener('WARNING', {
return: 'File size more than ' + this._config.size_limit + 'MB : ' + upath.normalizeSafe(fileName)
})
if (err) {
this.onListener('REJECTED_DOWNLOAD', err.message);
reject({
message: 'File size over than limit.', // upath.normalizeSafe(fileName)
message: err.message, // upath.normalizeSafe(fileName)
error: ""
});
return;
}
/* Dont let file edited by server upload to server again! */
let fileEditFromServer: any = masterData.getData('file_edit_from_server', {});
if (fileEditFromServer[upath.normalizeSafe(fileName)] != null) {
if (fileEditFromServer[upath.normalizeSafe(fileName)] == true) {
this.onListener('REJECTED', {
return: 'File edited by system dont let uploaded.' // upath.normalizeSafe(fileName)
} else {
let size_limit = this._config.size_limit;
if (size_limit == null) {
size_limit = 5;
}
size_limit = size_limit * 1000000;
if (data.size > size_limit) {
// console.log('size_limit', size_limit);
// console.log('stats', stats);
this.onListener('WARNING', {
return: 'File size more than ' + this._config.size_limit + 'MB : ' + upath.normalizeSafe(fileName)
})
delete this._pendingQueue[remote];
masterData.updateData('file_edit_from_server', {
[upath.normalizeSafe(fileName)]: false
});
reject({
message: 'File edited by system dont let uploaded.', // upath.normalizeSafe(fileName)
message: 'File size over than limit.', // upath.normalizeSafe(fileName)
error: ""
});
return;
}
}
// Download the file
this._client.fastGet(remote, fileName).then((data) => {
this.onListener('DOWNLOADED', upath.normalizeSafe(fileName));
/* Dont let file edited by server upload to server again! */
let fileEditFromServer: any = masterData.getData('file_edit_from_server', {});
if (fileEditFromServer[upath.normalizeSafe(fileName)] != null) {
if (fileEditFromServer[upath.normalizeSafe(fileName)] == true) {
this.onListener('REJECTED', {
return: 'File edited by system dont let uploaded.' // upath.normalizeSafe(fileName)
})
delete this._pendingQueue[remote];
masterData.updateData('file_edit_from_server', {
[upath.normalizeSafe(fileName)]: false
});
reject({
message: 'File edited by system dont let uploaded.', // upath.normalizeSafe(fileName)
error: ""
});
return;
}
}

/* This is use for prevent upload to remote. */
/* Is use on watcher */
let fileDownoadRecord = masterData.getData('FILE_DOWNLOAD_RECORD', {}) as any;
fileDownoadRecord[fileName] = true;
masterData.saveData('FILE_DOWNLOAD_RECORD', fileDownoadRecord);
// Download the file
this._ssh2SftpWrapper.fastGet(remote, fileName, {
concurrency: 1
}, (err) => {
if (err) {
this.onListener('REJECTED_DOWNLOAD', err.message);
reject({
message: err.message, // upath.normalizeSafe(fileName)
error: ""
})
return;
}
this.onListener('DOWNLOADED', upath.normalizeSafe(fileName));

let firstKey = Object.keys(this._pendingQueue)[entry.queue_no];
if (firstKey == null) {
firstKey = Object.keys(this._pendingQueue)[0];
/* This is use for prevent upload to remote. */
/* Is use on watcher */
let fileDownoadRecord = masterData.getData('FILE_DOWNLOAD_RECORD', {}) as any;
fileDownoadRecord[fileName] = true;
masterData.saveData('FILE_DOWNLOAD_RECORD', fileDownoadRecord);

let firstKey = Object.keys(this._pendingQueue)[entry.queue_no];
if (firstKey == null) {
_closeIfPossible(this._client, upath.normalizeSafe(fileName));
resolve(remote);
return;
firstKey = Object.keys(this._pendingQueue)[0];
if (firstKey == null) {
_closeIfPossible(this._client, upath.normalizeSafe(fileName));
resolve(remote);
return;
}
}
}
let oo = Object.assign({}, this._pendingQueue[firstKey]);
delete this._pendingQueue[firstKey];
if (firstKey != null && oo.path == null) { }
this._exeHandlePush(oo);
resolve(remote);
}).catch((err) => {
this.onListener('REJECTED_DOWNLOAD', err.message);
})
}).catch((err) => {
deleteQueue();
/* Debug here */
let oo = Object.assign({}, this._pendingQueue[firstKey]);
delete this._pendingQueue[firstKey];
if (firstKey != null && oo.path == null) { }
resolve(remote);
this._exeHandlePush(oo);
})
}
})

}, _debouncePendingOut);
this._pendingUpload[entry.path](entry);
}
Expand Down Expand Up @@ -253,6 +280,7 @@ const Download = BaseModel.extend<Omit<DownloadInterface, 'model'>>({
unlinkSync(pathJoin('', props));

delete this._folderQueue[props];
this.onListener('DELETED', props)
} catch (ex: any) {
this.onListener('REJECTED', ex.message)
}
Expand Down Expand Up @@ -283,11 +311,12 @@ const Download = BaseModel.extend<Omit<DownloadInterface, 'model'>>({
delete this._folderQueue[local_path];
try {
rmdirSync(pathJoin('', local_path));
this.onListener('DELETED_FOLDER', local_path)
} catch (ex: any) {
setTimeout(() => {
if (oportunity > 0) {
this.deleteFolder(originPath, oportunity -= 1);
}else{
} else {
this.onListener('REJECTED', ex.message)
}
}, 1000);
Expand All @@ -298,6 +327,9 @@ const Download = BaseModel.extend<Omit<DownloadInterface, 'model'>>({
startWaitingDownloads(path) {
return new Promise((resolve, reject) => {
try {
if (this._index == null) {
this._index = 0;
}
/* Transalte to local path */
let local_path = this.getLocalPath(path);
if (this._index == this._concurent) {
Expand Down Expand Up @@ -384,7 +416,6 @@ const Download = BaseModel.extend<Omit<DownloadInterface, 'model'>>({
console.log('SFTP CLIENT CONNECTION :: ', err);
process.exit(0)
})
this._index = 0;
this._exeHandlePush = this._handlePush();
}
} catch (ex) {
Expand Down Expand Up @@ -412,6 +443,7 @@ const Download = BaseModel.extend<Omit<DownloadInterface, 'model'>>({
stop(mode) {
if (mode == this.status.SILENT) {
if (this._client != null) {
this._index = 0;
this._client.end();
this._client = null;
}
Expand Down
22 changes: 16 additions & 6 deletions app/devsync2/services/DevRsyncService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -252,19 +252,29 @@ const DevRsyncService = BaseService.extend<DevRsyncServiceInterface>({
case 'REJECTED_DOWNLOAD':
this._task['REJECTED_DOWNLOAD'] = observatory.add("Download Failed :: ");
this._task['REJECTED_DOWNLOAD'].fail(props);
this._task['REJECTED_DOWNLOAD'] = null;
break;
case 'ONGOING':
break;
case 'DELETED_FOLDER':
this._task['DELETED_FOLDER'] = observatory.add("DELETED_FOLDER :: ");
this._task['DELETED_FOLDER'].done(props);
break;
case 'DELETED':
this._task['DELETED'] = observatory.add("DELETED :: ");
this._task['DELETED'].done(props);
break;
case 'DOWNLOADED_DONE':
// this._task['DOWNLOADED'].done();
// this._task['DOWNLOADED'] = null;
this._task['DOWNLOADED'] = observatory.add("FINISH :: ");
this._task['DOWNLOADED'].done();
this._task['DOWNLOADED'] = null;
break;
case 'DOWNLOADED':
if (this._task['DOWNLOADED'] == null) {
this._task['DOWNLOADED'] = observatory.add("DOWNLOADED :: ");
}
this._task['DOWNLOADED'].status(props);
// if (this._task['DOWNLOADED'] == null) {
// this._task['DOWNLOADED'] = observatory.add("DOWNLOADED :: ");
// }
this._task['DOWNLOADED'] = observatory.add("DOWNLOADED :: ");
this._task['DOWNLOADED'].done(props);
break;
case 'TRYING_STOP':
if (this._task['STOP_DOWNLOAD'] == null) {
Expand Down
1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
"@types/node": "^14.14.37",
"@types/node-redis-pubsub": "^3.0.0",
"@types/ssh2": "^0.5.48",
"@types/ssh2-sftp-client": "^7.0.0",
"@types/yaml": "^1.9.7",
"async": "^3.2.0",
"chalk": "^4.1.1",
Expand Down
Loading

0 comments on commit b123179

Please sign in to comment.