Skip to content

Commit

Permalink
Add support for an asynchronous RandomAccessReader, and using that su…
Browse files Browse the repository at this point in the history
…pport create a reader implementation for stored entries within a zip file, thereby supporting stored zips within zips. Add a test for the new classes based on the existing range-test.

Related to thejoshwolfe#89
  • Loading branch information
brucehappy committed Dec 12, 2018
1 parent d3fa8a3 commit 880db1c
Show file tree
Hide file tree
Showing 3 changed files with 396 additions and 57 deletions.
194 changes: 137 additions & 57 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ exports.validateFileName = validateFileName;
exports.ZipFile = ZipFile;
exports.Entry = Entry;
exports.RandomAccessReader = RandomAccessReader;
exports.AsyncRandomAccessReader = AsyncRandomAccessReader;
exports.StoredEntryAsyncRandomAccessReader = StoredEntryAsyncRandomAccessReader;

function open(path, options, callback) {
if (typeof options === "function") {
Expand Down Expand Up @@ -193,6 +195,7 @@ function ZipFile(reader, centralDirectoryOffset, fileSize, entryCount, comment,
var self = this;
EventEmitter.call(self);
self.reader = reader;
self.isReaderAsync = reader instanceof AsyncRandomAccessReader;
// forward close events
self.reader.on("error", function(err) {
// error closing the fd
Expand Down Expand Up @@ -534,47 +537,65 @@ ZipFile.prototype.openReadStream = function(entry, options, callback) {
fileDataStart + " + " + entry.compressedSize + " > " + self.fileSize));
}
}
var readStream = self.reader.createReadStream({
start: fileDataStart + relativeStart,
end: fileDataStart + relativeEnd,
});
var endpointStream = readStream;
if (decompress) {
var destroyed = false;
var inflateFilter = zlib.createInflateRaw();
readStream.on("error", function(err) {
// setImmediate here because errors can be emitted during the first call to pipe()
setImmediate(function() {
if (!destroyed) inflateFilter.emit("error", err);
});
});
readStream.pipe(inflateFilter);

if (self.validateEntrySizes) {
endpointStream = new AssertByteCountStream(entry.uncompressedSize);
inflateFilter.on("error", function(err) {
// forward zlib errors to the client-visible stream
var readStreamCallback = function(err, readStream) {
if (err) {
return callback(err);
}
var endpointStream = readStream;
if (decompress) {
var destroyed = false;
var inflateFilter = zlib.createInflateRaw();
readStream.on("error", function(err) {
// setImmediate here because errors can be emitted during the first call to pipe()
setImmediate(function() {
if (!destroyed) endpointStream.emit("error", err);
if (!destroyed) inflateFilter.emit("error", err);
});
});
inflateFilter.pipe(endpointStream);
} else {
// the zlib filter is the client-visible stream
endpointStream = inflateFilter;
readStream.pipe(inflateFilter);

if (self.validateEntrySizes) {
endpointStream = new AssertByteCountStream(entry.uncompressedSize);
inflateFilter.on("error", function(err) {
// forward zlib errors to the client-visible stream
setImmediate(function() {
if (!destroyed) endpointStream.emit("error", err);
});
});
inflateFilter.pipe(endpointStream);
} else {
// the zlib filter is the client-visible stream
endpointStream = inflateFilter;
}
// this is part of yauzl's API, so implement this function on the client-visible stream
endpointStream.destroy = function() {
destroyed = true;
if (inflateFilter !== endpointStream) inflateFilter.unpipe(endpointStream);
readStream.unpipe(inflateFilter);
// TODO: the inflateFilter may cause a memory leak. see Issue #27.
readStream.destroy();
};
}
// this is part of yauzl's API, so implement this function on the client-visible stream
endpointStream.destroy = function() {
destroyed = true;
if (inflateFilter !== endpointStream) inflateFilter.unpipe(endpointStream);
readStream.unpipe(inflateFilter);
// TODO: the inflateFilter may cause a memory leak. see Issue #27.
readStream.destroy();
};
callback(null, endpointStream);
};
var createReadStreamOptions = {
start: fileDataStart + relativeStart,
end: fileDataStart + relativeEnd,
};
if (self.isReaderAsync) {
self.reader.createReadStream(createReadStreamOptions, function(err, readStream) {
try {
readStreamCallback(err, readStream);
} finally {
self.reader.unref();
}
});
} else {
readStreamCallback(null, self.reader.createReadStream(createReadStreamOptions));
}
callback(null, endpointStream);
} finally {
self.reader.unref();
if (!self.isReaderAsync) {
self.reader.unref();
}
}
});
};
Expand Down Expand Up @@ -654,15 +675,15 @@ AssertByteCountStream.prototype._flush = function(cb) {
cb();
};

util.inherits(RandomAccessReader, EventEmitter);
function RandomAccessReader() {
util.inherits(BaseRandomAccessReader, EventEmitter);
function BaseRandomAccessReader() {
EventEmitter.call(this);
this.refCount = 0;
}
RandomAccessReader.prototype.ref = function() {
BaseRandomAccessReader.prototype.ref = function() {
this.refCount += 1;
};
RandomAccessReader.prototype.unref = function() {
BaseRandomAccessReader.prototype.unref = function() {
var self = this;
self.refCount -= 1;

Expand All @@ -676,18 +697,14 @@ RandomAccessReader.prototype.unref = function() {
self.emit('close');
}
};
RandomAccessReader.prototype.createReadStream = function(options) {
var start = options.start;
var end = options.end;
if (start === end) {
var emptyStream = new PassThrough();
setImmediate(function() {
emptyStream.end();
});
return emptyStream;
}
var stream = this._readStreamForRange(start, end);

BaseRandomAccessReader.prototype._createEmptyReadStream = function() {
var emptyStream = new PassThrough();
setImmediate(function() {
emptyStream.end();
});
return emptyStream;
};
BaseRandomAccessReader.prototype._setupReadStream = function(stream, start, end) {
var destroyed = false;
var refUnrefFilter = new RefUnrefFilter(this);
stream.on("error", function(err) {
Expand Down Expand Up @@ -715,11 +732,7 @@ RandomAccessReader.prototype.createReadStream = function(options) {

return stream.pipe(refUnrefFilter).pipe(byteCounter);
};
RandomAccessReader.prototype._readStreamForRange = function(start, end) {
throw new Error("not implemented");
};
RandomAccessReader.prototype.read = function(buffer, offset, length, position, callback) {
var readStream = this.createReadStream({start: position, end: position + length});
BaseRandomAccessReader.prototype._performRead = function(readStream, buffer, offset, callback) {
var writeStream = new Writable();
var written = 0;
writeStream._write = function(chunk, encoding, cb) {
Expand All @@ -733,10 +746,77 @@ RandomAccessReader.prototype.read = function(buffer, offset, length, position, c
});
readStream.pipe(writeStream);
};
RandomAccessReader.prototype.close = function(callback) {
BaseRandomAccessReader.prototype.close = function(callback) {
setImmediate(callback);
};

util.inherits(RandomAccessReader, BaseRandomAccessReader);
function RandomAccessReader() {
BaseRandomAccessReader.call(this);
}
RandomAccessReader.prototype.createReadStream = function(options) {
var start = options.start;
var end = options.end;
if (start === end) {
return this._createEmptyReadStream();
} else {
return this._setupReadStream(this._readStreamForRange(start, end), start, end);
}
};
RandomAccessReader.prototype._readStreamForRange = function(start, end) {
throw new Error("not implemented");
};
RandomAccessReader.prototype.read = function(buffer, offset, length, position, callback) {
this._performRead(this.createReadStream({start: position, end: position + length}), buffer, offset, callback);
};

util.inherits(AsyncRandomAccessReader, BaseRandomAccessReader);
function AsyncRandomAccessReader() {
BaseRandomAccessReader.call(this);
}
AsyncRandomAccessReader.prototype.createReadStream = function(options, callback) {
var self = this;
var start = options.start;
var end = options.end;
if (start === end) {
return callback(null, this._createEmptyReadStream());
} else {
this._readStreamForRange(start, end, function(err, readStream) {
if (err) {
return callback(err);
}
return callback(null, self._setupReadStream(readStream, start, end));
});
}
};
AsyncRandomAccessReader.prototype._readStreamForRange = function(start, end, callback) {
return callback(new Error("not implemented"));
};
AsyncRandomAccessReader.prototype.read = function(buffer, offset, length, position, callback) {
var self = this;
this.createReadStream({start: position, end: position + length}, function(err, readStream) {
if (err) {
return callback(err);
}
self._performRead(readStream, buffer, offset, callback);
});
};

util.inherits(StoredEntryAsyncRandomAccessReader, AsyncRandomAccessReader);
function StoredEntryAsyncRandomAccessReader(zipfile, entry) {
AsyncRandomAccessReader.call(this);
this.zipfile = zipfile;
this.entry = entry;
};
StoredEntryAsyncRandomAccessReader.prototype._readStreamForRange = function(start, end, callback) {
if (this.entry.isCompressed()) {
return callback(new Error('Cannot read from compressed entry'));
} else if (this.entry.isEncrypted()) {
return callback(new Error('Cannot read from encrypted entry'));
}
return this.zipfile.openReadStream(this.entry, {start: start, end: end}, callback);
};

util.inherits(RefUnrefFilter, PassThrough);
function RefUnrefFilter(context) {
PassThrough.call(this);
Expand Down
4 changes: 4 additions & 0 deletions test/test.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
var yauzl = require("../");
var zip64 = require("./zip64");
var rangeTest = require("./range-test");
var zipInZipTest = require("./zip-in-zip-test");
var fs = require("fs");
var path = require("path");
var Pend = require("pend");
Expand Down Expand Up @@ -347,6 +348,9 @@ pend.go(zip64.runTest);
// openReadStream with range
pend.go(rangeTest.runTest);

// openReadStream with range for files in a zip that is in another zip
pend.go(zipInZipTest.runTest);

pend.wait(function() {
// if you don't see this, something never happened.
console.log("done");
Expand Down
Loading

0 comments on commit 880db1c

Please sign in to comment.