Skip to content

Commit

Permalink
return an error when there are insufficient bytes
Browse files Browse the repository at this point in the history
add tests for multireading and fix logic
simplify logic for overreading
  • Loading branch information
jacobheun committed Jul 6, 2018
1 parent e079954 commit 364a665
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 28 deletions.
41 changes: 21 additions & 20 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,29 @@ function isFunction (f) {
return 'function' === typeof f
}

function maxDelay(fn, delay) {
if(!delay) return fn
return function (a, cb) {
var timer = setTimeout(function () {
fn(new Error('pull-reader: read exceeded timeout'), cb)
}, delay)
fn(a, function (err, value) {
clearTimeout(timer)
cb(err, value)
})

}

}

module.exports = function (timeout) {

var queue = [], read, readTimed, reading = false
var state = State(), ended, streaming, abort

function maxDelay(fn, delay) {
return function (a, cb) {
var timer
if (delay) {
timer = setTimeout(function () {
fn(new Error('pull-reader: read exceeded timeout'), cb)
}, delay)
}
fn(a, function (err, value) {
if (delay) clearTimeout(timer)
if (err === true && state.read < state.total) {
return cb(new Error('attempted to read '+state.wants+' of '+state.total+' bytes'))
}
cb(err, value)
})
}
}

function drain () {
while (queue.length) {
if(null == queue[0].length && state.has(1)) {
Expand Down Expand Up @@ -88,6 +91,9 @@ module.exports = function (timeout) {
}

reader.read = function (len, _timeout, cb) {
if(isInteger(len)) {
state.wants += len
}
if(isFunction(_timeout))
cb = _timeout, _timeout = timeout
if(isFunction(cb)) {
Expand Down Expand Up @@ -115,8 +121,3 @@ module.exports = function (timeout) {

return reader
}





16 changes: 10 additions & 6 deletions state.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,23 @@ module.exports = function () {

return {
length: length,
total: 0,
data: this,
wants: 0,
read: 0,
add: function (data) {
if(!Buffer.isBuffer(data))
throw new Error('data must be a buffer, was: ' + JSON.stringify(data))
this.length = length = length + data.length
this.total += data.length
buffers.push(data)
return this
},
has: function (n) {
if(null == n) return length > 0
return length >= n
},
get: function (n) {
_get: function (n) {
var _length
if(n == null || n === length) {
length = 0
Expand Down Expand Up @@ -62,12 +66,12 @@ module.exports = function () {
}
else
throw new Error('could not get ' + n + ' bytes')
},
get: function(n) {
var b = this._get(n)
this.read += b.length
return b
}
}

}





34 changes: 32 additions & 2 deletions test/read.js
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ tape('if streaming, the stream should abort', function (t) {

tape('abort stream once in streaming mode', function (t) {

var reader = Reader(), err = new Error('intended')
var reader = Reader()

pull(Hang(), reader)

Expand Down Expand Up @@ -232,7 +232,6 @@ tape('timeout does not apply to the rest of the stream', function (t) {
pull(
reader.read(),
pull.collect(function (err, ary) {
console.log(err)
t.notOk(err)
t.equal(Buffer.concat(ary).toString(), 'hello world')
t.end()
Expand All @@ -241,6 +240,37 @@ tape('timeout does not apply to the rest of the stream', function (t) {
})


tape('overreading results in an error', function (t) {
var corruptedBytes = crypto.randomBytes(10)

pull(
pull.values([corruptedBytes]),
reader = Reader(20e3)
)

reader.read(11, function(_err) {
t.ok(_err)
t.equal(_err.message, 'attempted to read 11 of 10 bytes')
t.end()
})
})


tape('overreading with multiple reads results in an error', function (t) {
var corruptedBytes = crypto.randomBytes(10)

pull(
pull.values([corruptedBytes]),
reader = Reader()
)

reader.read(1, function(err) {
t.notOk(err)
reader.read(100, function(_err) {
t.ok(_err)
t.equal(_err.message, 'attempted to read 101 of 10 bytes')
t.end()
})
})
})

0 comments on commit 364a665

Please sign in to comment.