Skip to content

Commit

Permalink
#1: Implemented peek function
Browse files Browse the repository at this point in the history
  • Loading branch information
Borewit committed Aug 11, 2017
1 parent 0781d59 commit de22cf9
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 5 deletions.
60 changes: 56 additions & 4 deletions lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ export class StreamReader {

private endOfStream = false;

/**
* Store peeked data
* @type {Array}
*/
private peekQueue: Buffer[] = [];

public constructor(private s: stream.Readable) {
this.s.once("end", () => {
this.endOfStream = true;
Expand All @@ -48,8 +54,55 @@ export class StreamReader {
});
}

// Read chunk from stream
public read(buffer: Buffer | Uint8Array, offset: number, length: number, position: number = null): Promise<number> {
/**
* Read ahead from stream. Subsequent read will return the same data
* @param buffer Buffer to store data read from stream in
* @param offset Offset buffer
* @param length Number of bytes to read
* @param position
* @returns {any}
*/
public peek(buffer: Buffer | Uint8Array, offset: number, length: number): Promise<number> {
return this._read(buffer, offset, length).then((bytesRead) => {
this.peekQueue.push(buffer.slice(offset, length) as Buffer);
return bytesRead;
});
}

/**
* Read chunk from stream
* @param buffer Buffer to store data read from stream in
* @param offset Offset buffer
* @param length Number of bytes to read
* @returns {any}
*/
public read(buffer: Buffer | Uint8Array, offset: number, length: number): Promise<number> {
if (this.peekQueue.length > 0) {
const peekData = this.peekQueue.shift();
if (length === peekData.length) {
peekData.copy(buffer as Buffer, offset);
return Promise.resolve(length);
} else if (peekData.length < length) {
peekData.copy(buffer as Buffer, offset);
return this.read(buffer, offset + peekData.length, length - peekData.length).then((bytesRead) => {
return peekData.length + bytesRead;
});
} else {
throw new Error("Not implemented yet");
}
} else {
return this._read(buffer, offset, length);
}
}

/**
* Read chunk from stream
* @param buffer Buffer to store data read from stream in
* @param offset Offset buffer
* @param length Number of bytes to read
* @returns {any}
*/
private _read(buffer: Buffer | Uint8Array, offset: number, length: number): Promise<number> {

if (this.request)
throw new Error("Concurrent read operation");
Expand All @@ -64,7 +117,6 @@ export class StreamReader {
buffer,
offset,
length,
position,
deferred: new Deferred<number>()
};
this.s.once("readable", () => {
Expand All @@ -73,7 +125,7 @@ export class StreamReader {
return this.request.deferred.promise.then((n) => {
this.request = null;
return n;
}).catch( (err) => {
}).catch((err) => {
this.request = null;
throw err;
});
Expand Down
45 changes: 44 additions & 1 deletion test/test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ describe("ReadStreamTokenizer", () => {

const buf = new Buffer(4);

const run = (): Promise<void> => {
const run = (): Promise<void> => {
return sb.read(buf, 0, 4).then((bytesRead) => {
assert.equal(bytesRead, 4);
assert.equal(buf.readInt32BE(0), 16909060);
Expand All @@ -148,4 +148,47 @@ describe("ReadStreamTokenizer", () => {

});

describe("peek", () => {

it("should be able to read a peeked chunk", () => {

const sourceStream = new SourceStream("\x05peter");
const streamReader = new StreamReader(sourceStream);

const buf = new Buffer(1);

return streamReader.peek(buf, 0, 1)
.then((bytesRead) => {
assert.equal(bytesRead, 1, "Should peek exactly one byte");
assert.equal(buf[0], 5, "0x05 == 5");
})
.then(() => {
return streamReader.read(buf, 0, 1).then((bytesRead) => {
assert.equal(bytesRead, 1, "Should re-read the peaked byte");
assert.equal(buf[0], 5, "0x05 == 5");
});
});
});

it("should be able to read a larger chunk overlapping the peeked chunk", () => {

const sourceStream = new SourceStream("\x05peter");
const streamReader = new StreamReader(sourceStream);

const buf = new Buffer(6);

return streamReader.peek(buf, 0, 1)
.then((bytesRead) => {
assert.equal(bytesRead, 1, "Should peek exactly one byte");
assert.equal(buf[0], 5, "0x05 == 5");
})
.then(() => {
return streamReader.read(buf, 0, 6).then((bytesRead) => {
assert.equal(bytesRead, 6, "Should overlap the peaked byte");
assert.equal(buf, "\x05peter");
});
});
});
});

});

0 comments on commit de22cf9

Please sign in to comment.