Skip to content
This repository has been archived by the owner on Jan 19, 2021. It is now read-only.

Fix OutOfMemory in read stream #38

Merged
merged 2 commits into from
Mar 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 20 additions & 6 deletions baseTrie.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const ethUtil = require('ethereumjs-util')
const semaphore = require('semaphore')
const TrieNode = require('./trieNode')
const ReadStream = require('./readStream')
const PrioritizedTaskExecutor = require('./prioritizedTaskExecutor')
const matchingNibbleLength = require('./util').matchingNibbleLength
const doKeysMatch = require('./util').doKeysMatch
const callTogether = require('./util').callTogether
Expand Down Expand Up @@ -461,6 +462,11 @@ Trie.prototype._walkTrie = function (root, onNode, onDone) {
})
})

// the maximum pool size should be high enough to utilise the parallelizability of reading nodes from disk and
// low enough to utilize the prioritisation of node lookup.
var maxPoolSize = 500
var taskExecutor = new PrioritizedTaskExecutor(maxPoolSize)

function processNode (nodeRef, node, key, cb) {
if (!node) return cb()
if (aborted) return cb()
Expand Down Expand Up @@ -490,17 +496,25 @@ Trie.prototype._walkTrie = function (root, onNode, onDone) {
var keyExtension = childData[0]
var childRef = childData[1]
var childKey = key.concat(keyExtension)
self._lookupNode(childRef, function (childNode) {
processNode(childRef, childNode, childKey, cb)
var priority = childKey.length
taskExecutor.execute(priority, function (taskCallback) {
self._lookupNode(childRef, function (childNode) {
taskCallback()
processNode(childRef, childNode, childKey, cb)
})
})
}, cb)
},
only: function (childIndex) {
var childRef = node.getValue(childIndex)
self._lookupNode(childRef, function (childNode) {
var childKey = key.slice()
childKey.push(childIndex)
processNode(childRef, childNode, childKey, cb)
var childKey = key.slice()
childKey.push(childIndex)
var priority = childKey.length
taskExecutor.execute(priority, function (taskCallback) {
self._lookupNode(childRef, function (childNode) {
taskCallback()
processNode(childRef, childNode, childKey, cb)
})
})
}
}
Expand Down
43 changes: 43 additions & 0 deletions prioritizedTaskExecutor.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
module.exports = PrioritizedTaskExecutor

/**
* Executes tasks up to maxPoolSize at a time, other items are put in a priority queue.
* @class PrioritizedTaskExecutor
* @param {Number} maxPoolSize The maximum size of the pool
* @prop {Number} maxPoolSize The maximum size of the pool
* @prop {Number} currentPoolSize The current size of the pool
* @prop {Array} queue The task queue
*/
function PrioritizedTaskExecutor (maxPoolSize) {
this.maxPoolSize = maxPoolSize
this.currentPoolSize = 0
this.queue = []
}

/**
* Executes the task.
* @param {Number} priority The priority of the task
* @param {Function} task The function that accepts the callback, which must be called upon the task completion.
*/
PrioritizedTaskExecutor.prototype.execute = function (priority, task) {
var self = this

if (self.currentPoolSize < self.maxPoolSize) {
self.currentPoolSize++
task(function () {
self.currentPoolSize--
if (self.queue.length > 0) {
self.queue.sort(function (a, b) {
return b.priority - a.priority
})
var item = self.queue.shift()
self.execute(item.priority, item.task)
}
})
} else {
self.queue.push({
priority: priority,
task: task
})
}
}
23 changes: 23 additions & 0 deletions test/prioritizedTaskExecutor.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
const PrioritizedTaskExecutor = require('../prioritizedTaskExecutor.js')
const tape = require('tape')
const taskExecutor = new PrioritizedTaskExecutor(2)

tape('prioritized task executor test', function (t) {
var tasks = [1, 2, 3, 4]
var callbacks = []
var executionOrder = []
tasks.forEach(function (task) {
taskExecutor.execute(task, function (cb) {
executionOrder.push(task)
callbacks.push(cb)
})
})

callbacks.forEach(function (callback) {
callback()
})

var expectedExecutionOrder = [1, 2, 4, 3]
t.deepEqual(executionOrder, expectedExecutionOrder)
t.end()
})