From 55ab1cf8b42c7bf73d636f9de4a5d1b70f7945ed Mon Sep 17 00:00:00 2001 From: Evgeny Medvedev Date: Tue, 6 Mar 2018 01:06:38 +0700 Subject: [PATCH 1/2] Fix OutOfMemory in read stream https://github.com/ethereumjs/merkle-patricia-tree/issues/36 --- baseTrie.js | 26 +++++++++++++++++------ prioritizedTaskExecutor.js | 43 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 63 insertions(+), 6 deletions(-) create mode 100644 prioritizedTaskExecutor.js diff --git a/baseTrie.js b/baseTrie.js index 1e1066f..9080031 100644 --- a/baseTrie.js +++ b/baseTrie.js @@ -7,6 +7,7 @@ const ethUtil = require('ethereumjs-util') const semaphore = require('semaphore') const TrieNode = require('./trieNode') const ReadStream = require('./readStream') +const PrioritizedTaskExecutor = require('./prioritizedTaskExecutor') const matchingNibbleLength = require('./util').matchingNibbleLength const doKeysMatch = require('./util').doKeysMatch const callTogether = require('./util').callTogether @@ -461,6 +462,11 @@ Trie.prototype._walkTrie = function (root, onNode, onDone) { }) }) + // the maximum pool size should be high enough to utilise the parallelizability of reading nodes from disk and + // low enough to utilize the prioritisation of node lookup. + var maxPoolSize = 500 + var taskExecutor = new PrioritizedTaskExecutor(maxPoolSize) + function processNode (nodeRef, node, key, cb) { if (!node) return cb() if (aborted) return cb() @@ -490,17 +496,25 @@ Trie.prototype._walkTrie = function (root, onNode, onDone) { var keyExtension = childData[0] var childRef = childData[1] var childKey = key.concat(keyExtension) - self._lookupNode(childRef, function (childNode) { - processNode(childRef, childNode, childKey, cb) + var priority = childKey.length + taskExecutor.execute(priority, function (taskCallback) { + self._lookupNode(childRef, function (childNode) { + taskCallback() + processNode(childRef, childNode, childKey, cb) + }) }) }, cb) }, only: function (childIndex) { var childRef = node.getValue(childIndex) - self._lookupNode(childRef, function (childNode) { - var childKey = key.slice() - childKey.push(childIndex) - processNode(childRef, childNode, childKey, cb) + var childKey = key.slice() + childKey.push(childIndex) + var priority = childKey.length + taskExecutor.execute(priority, function (taskCallback) { + self._lookupNode(childRef, function (childNode) { + taskCallback() + processNode(childRef, childNode, childKey, cb) + }) }) } } diff --git a/prioritizedTaskExecutor.js b/prioritizedTaskExecutor.js new file mode 100644 index 0000000..e6457d3 --- /dev/null +++ b/prioritizedTaskExecutor.js @@ -0,0 +1,43 @@ +module.exports = PrioritizedTaskExecutor + +/** + * Executes tasks up to maxPoolSize at a time, other items are put in a priority queue. + * @class PrioritizedTaskExecutor + * @param {Number} maxPoolSize The maximum size of the pool + * @prop {Number} maxPoolSize The maximum size of the pool + * @prop {Number} currentPoolSize The current size of the pool + * @prop {Array} queue The task queue + */ +function PrioritizedTaskExecutor (maxPoolSize) { + this.maxPoolSize = maxPoolSize + this.currentPoolSize = 0 + this.queue = [] +} + +/** + * Executes the task. + * @param {Number} priority The priority of the task + * @param {Function} task The function that accepts the callback, which must be called upon the task completion. + */ +PrioritizedTaskExecutor.prototype.execute = function (priority, task) { + var self = this + + if (self.currentPoolSize < self.maxPoolSize) { + self.currentPoolSize++ + task(function () { + self.currentPoolSize-- + if (self.queue.length > 0) { + self.queue.sort(function (a, b) { + return b.priority - a.priority + }) + var item = self.queue.shift() + self.execute(item.priority, item.task) + } + }) + } else { + self.queue.push({ + priority: priority, + task: task + }) + } +} From 6731209be025bcc9585c2b7adcde0d97058a7273 Mon Sep 17 00:00:00 2001 From: Evgeny Medvedev Date: Thu, 8 Mar 2018 04:33:16 +0700 Subject: [PATCH 2/2] Add tests for PrioritizedTaskExecutor --- test/prioritizedTaskExecutor.js | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) create mode 100644 test/prioritizedTaskExecutor.js diff --git a/test/prioritizedTaskExecutor.js b/test/prioritizedTaskExecutor.js new file mode 100644 index 0000000..bdd8e8d --- /dev/null +++ b/test/prioritizedTaskExecutor.js @@ -0,0 +1,23 @@ +const PrioritizedTaskExecutor = require('../prioritizedTaskExecutor.js') +const tape = require('tape') +const taskExecutor = new PrioritizedTaskExecutor(2) + +tape('prioritized task executor test', function (t) { + var tasks = [1, 2, 3, 4] + var callbacks = [] + var executionOrder = [] + tasks.forEach(function (task) { + taskExecutor.execute(task, function (cb) { + executionOrder.push(task) + callbacks.push(cb) + }) + }) + + callbacks.forEach(function (callback) { + callback() + }) + + var expectedExecutionOrder = [1, 2, 4, 3] + t.deepEqual(executionOrder, expectedExecutionOrder) + t.end() +})