Skip to content
This repository has been archived by the owner on Jun 24, 2024. It is now read-only.

Commit

Permalink
fix: Stop Duplicate Response Handlers on Retries (#502)
Browse files Browse the repository at this point in the history
* fix: Stop duplicate reponse handlers on retries

* style: lint
  • Loading branch information
danielbankhead authored Jan 26, 2022
1 parent 47cc8a8 commit c5b3059
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 31 deletions.
18 changes: 8 additions & 10 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -761,12 +761,6 @@ export class Upload extends Pumpify {
},
});

// This should be 'once' as `startUploading` can be called again for
// multi chunk uploads and each request would have its own response.
this.once('response', resp => {
responseReceived = true;
this.responseHandler(resp);
});
let headers: GaxiosOptions['headers'] = {};

// If using multiple chunk upload, set appropriate header
Expand All @@ -791,7 +785,11 @@ export class Upload extends Pumpify {
};

try {
await this.makeRequestStream(reqOpts);
const resp = await this.makeRequestStream(reqOpts);
if (resp) {
responseReceived = true;
this.responseHandler(resp);
}
} catch (err) {
const e = err as Error;
this.destroy(e);
Expand Down Expand Up @@ -983,7 +981,7 @@ export class Upload extends Pumpify {
return res;
}

private async makeRequestStream(reqOpts: GaxiosOptions): GaxiosPromise {
private async makeRequestStream(reqOpts: GaxiosOptions) {
const controller = new AbortController();
const errorCallback = () => controller.abort();
this.once('error', errorCallback);
Expand All @@ -1002,10 +1000,10 @@ export class Upload extends Pumpify {
reqOpts
);
const res = await this.authClient.request(combinedReqOpts);
this.onResponse(res);
const successfulRequest = this.onResponse(res);
this.removeListener('error', errorCallback);

return res;
return successfulRequest ? res : null;
}

private restart() {
Expand Down
47 changes: 26 additions & 21 deletions test/test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import * as mockery from 'mockery';
import * as nock from 'nock';
import * as path from 'path';
import * as sinon from 'sinon';
import {PassThrough, Readable} from 'stream';
import {Readable} from 'stream';

import {ApiError, CreateUriCallback, PROTOCOL_REGEX} from '../src';
import {GaxiosOptions, GaxiosError, GaxiosResponse} from 'gaxios';
Expand Down Expand Up @@ -896,7 +896,7 @@ describe('gcs-resumable-upload', () => {

describe('#startUploading', () => {
beforeEach(() => {
up.makeRequestStream = async () => new PassThrough();
up.makeRequestStream = async () => null;
up.upstreamChunkBuffer = Buffer.alloc(16);
});

Expand Down Expand Up @@ -968,14 +968,6 @@ describe('gcs-resumable-upload', () => {
up.startUploading();
});

it("should setup a 'response' listener", async () => {
assert.equal(up.eventNames().includes('response'), false);

await up.startUploading();

assert.equal(up.eventNames().includes('response'), true);
});

it('should destroy the stream if the request failed', done => {
const error = new Error('Error.');
up.on('error', (e: Error) => {
Expand Down Expand Up @@ -1687,14 +1679,27 @@ describe('gcs-resumable-upload', () => {
up.makeRequestStream(REQ_OPTS);
});

it('should return the response', async () => {
const response = {};
it('should return the response if successful', async () => {
const response = {some: 'response'};
up.authClient = {
request: async () => response,
};
up.onResponse = () => true;

const stream = await up.makeRequestStream(REQ_OPTS);
assert.strictEqual(stream, response);
});

it('should return `null` if the response is unsuccessful', async () => {
const response = {some: 'response'};
up.authClient = {
request: async () => response,
};
up.onResponse = () => false;

const stream = await up.makeRequestStream(REQ_OPTS);
assert.strictEqual(stream, null);
});
});

describe('#restart', () => {
Expand Down Expand Up @@ -2251,7 +2256,7 @@ describe('gcs-resumable-upload', () => {
let dataReceived = 0;
let chunkWritesInRequest = 0;

await new Promise(resolve => {
const res = await new Promise(resolve => {
opts.body.on('data', (data: Buffer) => {
dataReceived += data.byteLength;
overallDataReceived += data.byteLength;
Expand All @@ -2261,14 +2266,14 @@ describe('gcs-resumable-upload', () => {
opts.body.on('end', () => {
requests.push({dataReceived, opts, chunkWritesInRequest});

up.emit('response', {
resolve({
status: 200,
data: {},
});

resolve(null);
});
});

return res;
};

up.on('error', done);
Expand Down Expand Up @@ -2390,7 +2395,7 @@ describe('gcs-resumable-upload', () => {
let dataReceived = 0;
let chunkWritesInRequest = 0;

await new Promise(resolve => {
const res = await new Promise(resolve => {
opts.body.on('data', (data: Buffer) => {
dataReceived += data.byteLength;
overallDataReceived += data.byteLength;
Expand All @@ -2405,23 +2410,23 @@ describe('gcs-resumable-upload', () => {
? overallDataReceived - 1
: 0;

up.emit('response', {
resolve({
status: RESUMABLE_INCOMPLETE_STATUS_CODE,
headers: {
range: `bytes=0-${lastByteReceived}`,
},
data: {},
});
} else {
up.emit('response', {
resolve({
status: 200,
data: {},
});
}

resolve(null);
});
});

return res;
};

up.on('error', done);
Expand Down

0 comments on commit c5b3059

Please sign in to comment.