Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(lib-storage): call AbortMultipartUpload when failing to CompleteMultipartUpload #6112

Merged
merged 3 commits into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions lib/lib-storage/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,23 @@ try {
client: new S3({}) || new S3Client({}),
params: { Bucket, Key, Body },

// optional tags
tags: [
/*...*/
], // optional tags
queueSize: 4, // optional concurrency configuration
partSize: 1024 * 1024 * 5, // optional size of each part, in bytes, at least 5MB
leavePartsOnError: false, // optional manually handle dropped parts
],

// additional optional fields show default values below:

// (optional) concurrency configuration
queueSize: 4,

// (optional) size of each part, in bytes, at least 5MB
partSize: 1024 * 1024 * 5,

// (optional) when true, do not automatically call AbortMultipartUpload when
// a multipart upload fails to complete. You should then manually handle
// the leftover parts.
leavePartsOnError: false,
});

parallelUploads3.on("httpUploadProgress", (progress) => {
Expand Down
12 changes: 12 additions & 0 deletions lib/lib-storage/src/Upload.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -711,4 +711,16 @@ describe(Upload.name, () => {
expect(error).toBeDefined();
}
});

it("should reject calling .done() more than once on an instance", async () => {
const upload = new Upload({
params,
client: new S3({}),
});

await upload.done();
expect(() => upload.done()).rejects.toEqual(
new Error("@aws-sdk/lib-storage: this instance of Upload has already executed .done(). Create a new instance.")
);
});
});
247 changes: 145 additions & 102 deletions lib/lib-storage/src/Upload.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {
AbortMultipartUploadCommand,
CompletedPart,
CompleteMultipartUploadCommand,
CompleteMultipartUploadCommandOutput,
Expand Down Expand Up @@ -36,18 +37,18 @@ const MIN_PART_SIZE = 1024 * 1024 * 5;

export class Upload extends EventEmitter {
/**
* S3 multipart upload does not allow more than 10000 parts.
* S3 multipart upload does not allow more than 10,000 parts.
*/
private MAX_PARTS = 10000;
private MAX_PARTS = 10_000;

// Defaults.
private queueSize = 4;
private partSize = MIN_PART_SIZE;
private leavePartsOnError = false;
private tags: Tag[] = [];
private readonly queueSize: number = 4;
private readonly partSize = MIN_PART_SIZE;
private readonly leavePartsOnError: boolean = false;
private readonly tags: Tag[] = [];

private client: S3Client;
private params: PutObjectCommandInput;
private readonly client: S3Client;
private readonly params: PutObjectCommandInput;

// used for reporting progress.
private totalBytes?: number;
Expand All @@ -57,13 +58,19 @@ export class Upload extends EventEmitter {
private abortController: IAbortController;
private concurrentUploaders: Promise<void>[] = [];
private createMultiPartPromise?: Promise<CreateMultipartUploadCommandOutput>;
private abortMultipartUploadCommand: AbortMultipartUploadCommand | null = null;

private uploadedParts: CompletedPart[] = [];
private uploadId?: string;
uploadEvent?: string;
private uploadEnqueuedPartsCount = 0;
/**
* Last UploadId if the upload was done with MultipartUpload and not PutObject.
*/
public uploadId?: string;
public uploadEvent?: string;

private isMultiPart = true;
private singleUploadResult?: CompleteMultipartUploadCommandOutput;
private sent = false;

constructor(options: Options) {
super();
Expand Down Expand Up @@ -94,6 +101,12 @@ export class Upload extends EventEmitter {
}

public async done(): Promise<CompleteMultipartUploadCommandOutput> {
if (this.sent) {
throw new Error(
"@aws-sdk/lib-storage: this instance of Upload has already executed .done(). Create a new instance."
);
}
this.sent = true;
return await Promise.race([this.__doMultipartUpload(), this.__abortTimeout(this.abortController.signal)]);
}

Expand Down Expand Up @@ -184,104 +197,64 @@ export class Upload extends EventEmitter {
private async __createMultipartUpload(): Promise<CreateMultipartUploadCommandOutput> {
if (!this.createMultiPartPromise) {
const createCommandParams = { ...this.params, Body: undefined };
this.createMultiPartPromise = this.client.send(new CreateMultipartUploadCommand(createCommandParams));
this.createMultiPartPromise = this.client
.send(new CreateMultipartUploadCommand(createCommandParams))
.then((createMpuResponse) => {
// We use the parameter Bucket/Key rather than the information from
// createMultipartUpload response in case the Bucket is an access point arn.
this.abortMultipartUploadCommand = new AbortMultipartUploadCommand({
Bucket: this.params.Bucket,
Key: this.params.Key,
UploadId: createMpuResponse.UploadId,
});
return createMpuResponse;
});
}
return this.createMultiPartPromise;
}

private async __doConcurrentUpload(dataFeeder: AsyncGenerator<RawDataPart, void, undefined>): Promise<void> {
for await (const dataPart of dataFeeder) {
if (this.uploadedParts.length > this.MAX_PARTS) {
if (this.uploadEnqueuedPartsCount > this.MAX_PARTS) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reason for this change: this.uploadedParts.length is updated after an async operation (UploadPart), which causes a delay.

Due to request concurrency, this.uploadedParts.length can be far behind the actual number of parts that have been iterated and submitted.

throw new Error(
`Exceeded ${this.MAX_PARTS} as part of the upload to ${this.params.Key} and ${this.params.Bucket}.`
`Exceeded ${this.MAX_PARTS} parts in multipart upload to Bucket: ${this.params.Bucket} Key: ${this.params.Key}.`
);
}

try {
if (this.abortController.signal.aborted) {
return;
}
if (this.abortController.signal.aborted) {
return;
}

// Use put instead of multi-part for one chunk uploads.
if (dataPart.partNumber === 1 && dataPart.lastPart) {
return await this.__uploadUsingPut(dataPart);
}
// Use put instead of multipart for one chunk uploads.
if (dataPart.partNumber === 1 && dataPart.lastPart) {
return await this.__uploadUsingPut(dataPart);
}

if (!this.uploadId) {
const { UploadId } = await this.__createMultipartUpload();
this.uploadId = UploadId;
if (this.abortController.signal.aborted) {
return;
}
if (!this.uploadId) {
const { UploadId } = await this.__createMultipartUpload();
this.uploadId = UploadId;
if (this.abortController.signal.aborted) {
return;
}
}

const partSize: number = byteLength(dataPart.data) || 0;

const requestHandler = this.client.config.requestHandler;
const eventEmitter: EventEmitter | null = requestHandler instanceof EventEmitter ? requestHandler : null;

let lastSeenBytes = 0;
const uploadEventListener = (event: ProgressEvent, request: HttpRequest) => {
const requestPartSize = Number(request.query["partNumber"]) || -1;

if (requestPartSize !== dataPart.partNumber) {
// ignored, because the emitted event is not for this part.
return;
}

if (event.total && partSize) {
this.bytesUploadedSoFar += event.loaded - lastSeenBytes;
lastSeenBytes = event.loaded;
}

this.__notifyProgress({
loaded: this.bytesUploadedSoFar,
total: this.totalBytes,
part: dataPart.partNumber,
Key: this.params.Key,
Bucket: this.params.Bucket,
});
};
const partSize: number = byteLength(dataPart.data) || 0;

if (eventEmitter !== null) {
// The requestHandler is the xhr-http-handler.
eventEmitter.on("xhr.upload.progress", uploadEventListener);
}
const requestHandler = this.client.config.requestHandler;
const eventEmitter: EventEmitter | null = requestHandler instanceof EventEmitter ? requestHandler : null;

const partResult = await this.client.send(
new UploadPartCommand({
...this.params,
UploadId: this.uploadId,
Body: dataPart.data,
PartNumber: dataPart.partNumber,
})
);
let lastSeenBytes = 0;
const uploadEventListener = (event: ProgressEvent, request: HttpRequest) => {
const requestPartSize = Number(request.query["partNumber"]) || -1;

if (eventEmitter !== null) {
eventEmitter.off("xhr.upload.progress", uploadEventListener);
}

if (this.abortController.signal.aborted) {
if (requestPartSize !== dataPart.partNumber) {
// ignored, because the emitted event is not for this part.
return;
}

if (!partResult.ETag) {
throw new Error(
`Part ${dataPart.partNumber} is missing ETag in UploadPart response. Missing Bucket CORS configuration for ETag header?`
);
}

this.uploadedParts.push({
PartNumber: dataPart.partNumber,
ETag: partResult.ETag,
...(partResult.ChecksumCRC32 && { ChecksumCRC32: partResult.ChecksumCRC32 }),
...(partResult.ChecksumCRC32C && { ChecksumCRC32C: partResult.ChecksumCRC32C }),
...(partResult.ChecksumSHA1 && { ChecksumSHA1: partResult.ChecksumSHA1 }),
...(partResult.ChecksumSHA256 && { ChecksumSHA256: partResult.ChecksumSHA256 }),
});

if (eventEmitter === null) {
this.bytesUploadedSoFar += partSize;
if (event.total && partSize) {
this.bytesUploadedSoFar += event.loaded - lastSeenBytes;
lastSeenBytes = event.loaded;
}

this.__notifyProgress({
Expand All @@ -291,33 +264,89 @@ export class Upload extends EventEmitter {
Key: this.params.Key,
Bucket: this.params.Bucket,
});
} catch (e) {
// Failed to create multi-part or put
if (!this.uploadId) {
throw e;
}
// on leavePartsOnError throw an error so users can deal with it themselves,
// otherwise swallow the error.
if (this.leavePartsOnError) {
throw e;
}
};

if (eventEmitter !== null) {
// The requestHandler is the xhr-http-handler.
eventEmitter.on("xhr.upload.progress", uploadEventListener);
}

this.uploadEnqueuedPartsCount += 1;

const partResult = await this.client.send(
new UploadPartCommand({
...this.params,
UploadId: this.uploadId,
Body: dataPart.data,
PartNumber: dataPart.partNumber,
})
);

if (eventEmitter !== null) {
eventEmitter.off("xhr.upload.progress", uploadEventListener);
}

if (this.abortController.signal.aborted) {
return;
}

if (!partResult.ETag) {
throw new Error(
`Part ${dataPart.partNumber} is missing ETag in UploadPart response. Missing Bucket CORS configuration for ETag header?`
);
}

this.uploadedParts.push({
PartNumber: dataPart.partNumber,
ETag: partResult.ETag,
...(partResult.ChecksumCRC32 && { ChecksumCRC32: partResult.ChecksumCRC32 }),
...(partResult.ChecksumCRC32C && { ChecksumCRC32C: partResult.ChecksumCRC32C }),
...(partResult.ChecksumSHA1 && { ChecksumSHA1: partResult.ChecksumSHA1 }),
...(partResult.ChecksumSHA256 && { ChecksumSHA256: partResult.ChecksumSHA256 }),
});

if (eventEmitter === null) {
this.bytesUploadedSoFar += partSize;
}

this.__notifyProgress({
loaded: this.bytesUploadedSoFar,
total: this.totalBytes,
part: dataPart.partNumber,
Key: this.params.Key,
Bucket: this.params.Bucket,
});
}
}

private async __doMultipartUpload(): Promise<CompleteMultipartUploadCommandOutput> {
// Set up data input chunks.
const dataFeeder = getChunk(this.params.Body, this.partSize);
const concurrentUploaderFailures: Error[] = [];

// Create and start concurrent uploads.
for (let index = 0; index < this.queueSize; index++) {
const currentUpload = this.__doConcurrentUpload(dataFeeder);
const currentUpload = this.__doConcurrentUpload(dataFeeder).catch((err) => {
concurrentUploaderFailures.push(err);
});
this.concurrentUploaders.push(currentUpload);
}

// Create and start concurrent uploads.
await Promise.all(this.concurrentUploaders);
if (concurrentUploaderFailures.length >= 1) {
await this.markUploadAsAborted();
/**
* Previously, each promise in concurrentUploaders could potentially throw
* and immediately return control to user code. However, we want to wait for
* all uploaders to finish before calling AbortMultipartUpload to avoid
* stranding uploaded parts.
*
* We throw only the first error to be consistent with prior behavior,
* but may consider combining the errors into a report in the future.
*/
throw concurrentUploaderFailures[0];
}

if (this.abortController.signal.aborted) {
await this.markUploadAsAborted();
throw Object.assign(new Error("Upload aborted."), { name: "AbortError" });
}

Expand All @@ -341,6 +370,8 @@ export class Upload extends EventEmitter {
result = this.singleUploadResult!;
}

this.abortMultipartUploadCommand = null;

// Add tags to the object after it's completed the upload.
if (this.tags.length) {
await this.client.send(
Expand All @@ -356,6 +387,18 @@ export class Upload extends EventEmitter {
return result;
}

/**
* Abort the last multipart upload in progress
* if we know the upload id, the user did not specify to leave the parts, and
* we have a prepared AbortMultipartUpload command.
*/
private async markUploadAsAborted(): Promise<void> {
if (this.uploadId && !this.leavePartsOnError && null !== this.abortMultipartUploadCommand) {
await this.client.send(this.abortMultipartUploadCommand);
this.abortMultipartUploadCommand = null;
}
}

private __notifyProgress(progress: Progress): void {
if (this.uploadEvent) {
this.emit(this.uploadEvent, progress);
Expand Down
Loading
Loading