-
-
Notifications
You must be signed in to change notification settings - Fork 95
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* feat(GridFS): Added Interfaces for the GridFS Basic API * feat(GridFS): Swapped ReadableStream <-> WritableStream * feat(GridFS): Added Chunks and Files interfaces (was missing) * feat(GridFS): Added Delete * feat(GridFS): Added find * feat(GridFS): Added Download * feat(GridFS): WIP Merged #1 * feat(GridFS): Fixed missing await in upload (and fixed missing await in tests) * feat(GridFS): Added First Two Echo Tests * feat(GridFS): Better compare arrayBuffers * Modified tests and chunking algorithm * feat(GridFS): Added more Tests * feat(GridFS): Fixed missing cleanup step * feat(GridFS): Added Index checking to Download * chore(CI): Switched to ubuntu-latest Ubuntu-16 will be deprecated in 8 Days. * Changed to iterate instead of recursion * feat(GridFS): Reformat file Co-authored-by: Manuel Aguirre <aguirre.manuel7@gmail.com>
- Loading branch information
1 parent
891761d
commit 7aaa297
Showing
13 changed files
with
684 additions
and
33 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,185 @@ | ||
import { assert, ObjectId } from "../../deps.ts"; | ||
import { Collection } from "../collection/collection.ts"; | ||
import { FindCursor } from "../collection/commands/find.ts"; | ||
import { Database } from "../database.ts"; | ||
import { Filter } from "../types.ts"; | ||
import { | ||
Chunk, | ||
File, | ||
FileId, | ||
GridFSBucketOptions, | ||
GridFSFindOptions, | ||
GridFSUploadOptions, | ||
} from "../types/gridfs.ts"; | ||
import { checkIndexes } from "./indexes.ts"; | ||
import { createUploadStream } from "./upload.ts"; | ||
|
||
export class GridFSBucket { | ||
#chunksCollection: Collection<Chunk>; | ||
#filesCollection: Collection<File>; | ||
#chunkSizeBytes: number; | ||
#checkedIndexes: boolean = false; | ||
|
||
private readonly getBucketData = () => ({ | ||
filesCollection: this.#filesCollection, | ||
chunksCollection: this.#chunksCollection, | ||
chunkSizeBytes: this.#chunkSizeBytes, | ||
}); | ||
|
||
/** | ||
* Create a new GridFSBucket object on @db with the given @options. | ||
*/ | ||
constructor( | ||
db: Database, | ||
options: GridFSBucketOptions = {}, | ||
) { | ||
const newLocal = options.bucketName ?? "fs"; | ||
this.#chunksCollection = db.collection(`${newLocal}.chunks`); | ||
this.#filesCollection = db.collection(`${newLocal}.files`); | ||
this.#chunkSizeBytes = options.chunkSizeBytes ?? 255 * 1024; | ||
} | ||
|
||
/** | ||
* Opens a Stream that the application can write the contents of the file to. | ||
* The driver generates the file id. | ||
* | ||
* Returns a Stream to which the application will write the contents. | ||
* | ||
* Note: this method is provided for backward compatibility. In languages | ||
* that use generic type parameters, this method may be omitted since | ||
* the TFileId type might not be an ObjectId. | ||
*/ | ||
openUploadStream( | ||
filename: string, | ||
options?: GridFSUploadOptions, | ||
) { | ||
return this.openUploadStreamWithId( | ||
new ObjectId(), | ||
filename, | ||
options, | ||
); | ||
} | ||
|
||
/** | ||
* Opens a Stream that the application can write the contents of the file to. | ||
* The application provides a custom file id. | ||
* | ||
* Returns a Stream to which the application will write the contents. | ||
*/ | ||
openUploadStreamWithId( | ||
id: FileId, | ||
filename: string, | ||
options?: GridFSUploadOptions, | ||
) { | ||
if (!this.#checkedIndexes) this.#checkIndexes(); | ||
return createUploadStream(this.getBucketData(), filename, id, options); | ||
} | ||
|
||
/** | ||
* Uploads a user file to a GridFS bucket. The driver generates the file id. | ||
* | ||
* Reads the contents of the user file from the @source Stream and uploads it | ||
* as chunks in the chunks collection. After all the chunks have been uploaded, | ||
* it creates a files collection document for @filename in the files collection. | ||
* | ||
* Returns the id of the uploaded file. | ||
* | ||
* Note: this method is provided for backward compatibility. In languages | ||
* that use generic type parameters, this method may be omitted since | ||
* the TFileId type might not be an ObjectId. | ||
*/ | ||
uploadFromStream( | ||
filename: string, | ||
source: ReadableStream, | ||
options?: GridFSUploadOptions, | ||
): ObjectId { | ||
const objectid = ObjectId.generate(); | ||
source.pipeTo(this.openUploadStreamWithId(objectid, filename, options)); | ||
return objectid; | ||
} | ||
|
||
/** | ||
* Uploads a user file to a GridFS bucket. The application supplies a custom file id. | ||
* | ||
* Reads the contents of the user file from the @source Stream and uploads it | ||
* as chunks in the chunks collection. After all the chunks have been uploaded, | ||
* it creates a files collection document for @filename in the files collection. | ||
* | ||
* Note: there is no need to return the id of the uploaded file because the application | ||
* already supplied it as a parameter. | ||
*/ | ||
uploadFromStreamWithId( | ||
id: FileId, | ||
filename: string, | ||
source: ReadableStream, | ||
options: GridFSUploadOptions, | ||
): void { | ||
source.pipeTo(this.openUploadStreamWithId(id, filename, options)); | ||
} | ||
|
||
/** Opens a Stream from which the application can read the contents of the stored file | ||
* specified by @id. | ||
* | ||
* Returns a Stream. | ||
*/ | ||
openDownloadStream(id: FileId) { | ||
if (!this.#checkedIndexes) this.#checkIndexes(); | ||
|
||
return new ReadableStream<Uint8Array>({ | ||
start: async (controller) => { | ||
const collection = this.#chunksCollection.find({ files_id: id }); | ||
await collection.forEach((value) => | ||
controller.enqueue(value?.data.buffer) | ||
); | ||
controller.close(); | ||
}, | ||
}); | ||
} | ||
|
||
/** | ||
* Downloads the contents of the stored file specified by @id and writes | ||
* the contents to the @destination Stream. | ||
*/ | ||
async downloadToStream( | ||
id: FileId, | ||
destination: WritableStream<Uint8Array>, | ||
) { | ||
this.openDownloadStream(id).pipeTo(destination); | ||
} | ||
|
||
/** | ||
* Given a @id, delete this stored file’s files collection document and | ||
* associated chunks from a GridFS bucket. | ||
*/ | ||
async delete(id: FileId) { | ||
await this.#filesCollection.deleteOne({ _id: id }); | ||
const response = await this.#chunksCollection.deleteMany({ files_id: id }); | ||
assert(response, `File not found for id ${id}`); | ||
} | ||
|
||
/** | ||
* Find and return the files collection documents that match @filter. | ||
*/ | ||
find( | ||
filter: Filter<File>, | ||
options: GridFSFindOptions = {}, | ||
): FindCursor<File> { | ||
return this.#filesCollection.find(filter ?? {}, options); | ||
} | ||
|
||
/** | ||
* Drops the files and chunks collections associated with | ||
* this bucket. | ||
*/ | ||
async drop() { | ||
await this.#filesCollection.drop(); | ||
await this.#chunksCollection.drop(); | ||
} | ||
|
||
#checkIndexes = () => | ||
checkIndexes( | ||
this.#filesCollection, | ||
this.#chunksCollection, | ||
(value) => this.#checkedIndexes = value, | ||
); | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
import { Document } from "../../deps.ts"; | ||
import { Collection } from "../collection/collection.ts"; | ||
import { Chunk, File } from "../types/gridfs.ts"; | ||
|
||
export function createFileIndex(collection: Collection<File>) { | ||
const index = { filename: 1, uploadDate: 1 }; | ||
|
||
return collection.createIndexes({ | ||
indexes: [{ | ||
name: "gridFSFiles", | ||
key: index, | ||
background: false, | ||
}], | ||
}); | ||
} | ||
export function createChunksIndex(collection: Collection<Chunk>) { | ||
const index = { "files_id": 1, n: 1 }; | ||
|
||
return collection.createIndexes({ | ||
indexes: [{ | ||
name: "gridFSFiles", | ||
key: index, | ||
unique: true, | ||
background: false, | ||
}], | ||
}); | ||
} | ||
|
||
export async function checkIndexes( | ||
filesCollection: Collection<File>, | ||
chunksCollection: Collection<Chunk>, | ||
hasCheckedIndexes: (value: boolean) => void, | ||
) { | ||
const filesCollectionIsEmpty = !await filesCollection.findOne({}, { | ||
projection: { _id: 1 }, | ||
}); | ||
|
||
const chunksCollectionIsEmpty = !await chunksCollection.findOne({}, { | ||
projection: { _id: 1 }, | ||
}); | ||
|
||
if (filesCollectionIsEmpty || chunksCollectionIsEmpty) { | ||
// At least one collection is empty so we create indexes | ||
createFileIndex(filesCollection); | ||
createChunksIndex(chunksCollection); | ||
hasCheckedIndexes(true); | ||
return; | ||
} | ||
|
||
// Now check that the right indexes are there | ||
const fileIndexes = await filesCollection.listIndexes().toArray(); | ||
let hasFileIndex = false; | ||
|
||
if (fileIndexes) { | ||
fileIndexes.forEach((index) => { | ||
const keys = Object.keys(index.key); | ||
if ( | ||
keys.length === 2 && index.key.filename === 1 && | ||
index.key.uploadDate === 1 | ||
) { | ||
hasFileIndex = true; | ||
} | ||
}); | ||
} | ||
|
||
if (!hasFileIndex) { | ||
createFileIndex(filesCollection); | ||
} | ||
|
||
const chunkIndexes = await chunksCollection.listIndexes().toArray(); | ||
let hasChunksIndex = false; | ||
|
||
if (chunkIndexes) { | ||
chunkIndexes.forEach((index: Document) => { | ||
const keys = Object.keys(index.key); | ||
if ( | ||
keys.length === 2 && index.key.filename === 1 && | ||
index.key.uploadDate === 1 && index.options.unique | ||
) { | ||
hasChunksIndex = true; | ||
} | ||
}); | ||
} | ||
|
||
if (!hasChunksIndex) { | ||
createChunksIndex(chunksCollection); | ||
} | ||
hasCheckedIndexes(true); | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
import { Binary, ObjectId } from "../../deps.ts"; | ||
import { Collection } from "../../mod.ts"; | ||
import { Chunk, File, GridFSUploadOptions } from "../types/gridfs.ts"; | ||
|
||
export interface BucketInfo { | ||
filesCollection: Collection<File>; | ||
chunksCollection: Collection<Chunk>; | ||
chunkSizeBytes: number; | ||
} | ||
|
||
export function createUploadStream( | ||
{ chunkSizeBytes, chunksCollection, filesCollection }: BucketInfo, | ||
filename: string, | ||
id: ObjectId, | ||
options?: GridFSUploadOptions, | ||
) { | ||
const chunkSizeBytesCombined = options?.chunkSizeBytes ?? chunkSizeBytes; | ||
const uploadBuffer = new Uint8Array(new ArrayBuffer(chunkSizeBytesCombined)); | ||
let bufferPosition = 0; | ||
let chunksInserted = 0; | ||
let fileSizeBytes = 0; | ||
return new WritableStream<Uint8Array>({ | ||
write: async (chunk: Uint8Array) => { | ||
let remaining = chunk; | ||
while (remaining.byteLength) { | ||
const availableBuffer = chunkSizeBytesCombined - bufferPosition; | ||
if (remaining.byteLength < availableBuffer) { | ||
uploadBuffer.set(remaining, bufferPosition); | ||
bufferPosition += remaining.byteLength; | ||
fileSizeBytes += remaining.byteLength; | ||
break; | ||
} | ||
const sliced = remaining.slice(0, availableBuffer); | ||
remaining = remaining.slice(availableBuffer); | ||
uploadBuffer.set(sliced, bufferPosition); | ||
|
||
await chunksCollection.insertOne({ | ||
files_id: id, | ||
n: chunksInserted, | ||
data: new Binary(uploadBuffer), | ||
}); | ||
|
||
bufferPosition = 0; | ||
fileSizeBytes += sliced.byteLength; | ||
++chunksInserted; | ||
} | ||
}, | ||
close: async () => { | ||
// Write the last bytes that are left in the buffer | ||
if (bufferPosition) { | ||
await chunksCollection.insertOne({ | ||
files_id: id, | ||
n: chunksInserted, | ||
data: new Binary(uploadBuffer.slice(0, bufferPosition)), | ||
}); | ||
} | ||
|
||
await filesCollection.insertOne({ | ||
_id: id, | ||
length: fileSizeBytes, | ||
chunkSize: chunkSizeBytesCombined, | ||
uploadDate: new Date(), | ||
filename: filename, | ||
metadata: options?.metadata, | ||
}); | ||
}, | ||
}); | ||
} |
Oops, something went wrong.