From cb483a74c2d2ab7d59922be42c199fa3acba0b59 Mon Sep 17 00:00:00 2001 From: Quan HL Date: Fri, 28 Jul 2023 17:48:35 +0700 Subject: [PATCH] add google storage writablestream --- lib/record/google-storage.js | 30 ++++++++++++++++++++++++++++-- lib/record/index.js | 17 ++--------------- lib/record/{s3.js => upload.js} | 16 ++-------------- 3 files changed, 32 insertions(+), 31 deletions(-) rename lib/record/{s3.js => upload.js} (84%) diff --git a/lib/record/google-storage.js b/lib/record/google-storage.js index 392f5096..a833504a 100644 --- a/lib/record/google-storage.js +++ b/lib/record/google-storage.js @@ -1,2 +1,28 @@ -async function upload(logger, socket) {} -module.exports = upload; +const { Storage } = require('@google-cloud/storage'); +const { Writable } = require('stream'); + +class GoogleStorageUploadStream extends Writable { + + constructor(logger, opts) { + super(opts); + this.logger = logger; + + const storage = new Storage(opts.bucketCredential); + this.gcsFile = storage.bucket(opts.bucketName).file(opts.Key); + this.writeStream = this.gcsFile.createWriteStream(); + + this.writeStream.on('error', (err) => console.error(err)); + this.writeStream.on('finish', () => console.log('Upload completed.')); + } + + _write(chunk, encoding, callback) { + this.writeStream.write(chunk, encoding, callback); + } + + _final(callback) { + this.writeStream.end(); + this.writeStream.once('finish', callback); + } +} + +module.exports = GoogleStorageUploadStream; diff --git a/lib/record/index.js b/lib/record/index.js index cabb307a..730a5de4 100644 --- a/lib/record/index.js +++ b/lib/record/index.js @@ -1,19 +1,6 @@ -const path = require('node:path'); -async function record(logger, socket, url) { - const p = path.basename(url); - const idx = p.lastIndexOf('/'); - const vendor = p.substring(idx + 1); - switch (vendor) { - case 'aws_s3': - return require('./s3')(logger, socket); - case 'google': - return require('./google-storage')(logger, socket); - default: - logger.info(`unknown bucket vendor: ${vendor}`); - socket.send(`unknown bucket vendor: ${vendor}`); - socket.close(); - } +async function record(logger, socket) { + return require('./upload')(logger, socket); } module.exports = record; diff --git a/lib/record/s3.js b/lib/record/upload.js similarity index 84% rename from lib/record/s3.js rename to lib/record/upload.js index 24297e41..7b07acea 100644 --- a/lib/record/s3.js +++ b/lib/record/upload.js @@ -1,8 +1,8 @@ const Account = require('../models/account'); const Websocket = require('ws'); const PCMToMP3Encoder = require('./encoder'); -const S3MultipartUploadStream = require('./s3-multipart-upload-stream'); const wav = require('wav'); +const { getUploader } = require('./utils'); async function upload(logger, socket) { @@ -43,19 +43,7 @@ async function upload(logger, socket) { Key += `/${day.getDate().toString().padStart(2, '0')}/${callSid}.${account[0].record_format}`; // Uploader - const uploaderOpts = { - bucketName: obj.name, - Key, - metadata, - bucketCredential: { - credentials: { - accessKeyId: obj.access_key_id, - secretAccessKey: obj.secret_access_key, - }, - region: obj.region || 'us-east-1' - } - }; - const uploadStream = new S3MultipartUploadStream(logger, uploaderOpts); + const uploadStream = getUploader(Key, metadata, obj); /**encoder */ let encoder;