-
-
Notifications
You must be signed in to change notification settings - Fork 32.4k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
6b4942b
commit bbf062f
Showing
16 changed files
with
348 additions
and
29 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
// Inspired by http://caolan.github.io/async/docs.html#cargo | ||
// The main difference is that we have a timeout. | ||
class Batcher { | ||
pendingEntries = []; | ||
|
||
timeout = null; | ||
|
||
context = {}; | ||
|
||
constructor(worker, options = {}) { | ||
// max waiting time before flushing the pending entries (process them) | ||
this.maxWait = options.maxWait || 1000; | ||
// max number of entries in the queue before flushing them (process them) | ||
this.maxItems = options.maxItems || 100; | ||
this.worker = worker; | ||
} | ||
|
||
// public method | ||
push(entries, contextItem) { | ||
this.context = contextItem; | ||
this.pendingEntries = this.pendingEntries.concat(entries); | ||
|
||
if (this.pendingEntries.length >= this.maxItems) { | ||
return this.sendItems(); | ||
} | ||
|
||
clearTimeout(this.timeout); | ||
this.timeout = setTimeout(() => { | ||
this.sendItems(); | ||
}, this.maxWait); | ||
|
||
return null; | ||
} | ||
|
||
sendItems() { | ||
const pendingEntries = this.pendingEntries.splice(0); // Transfer the item to the job. | ||
clearTimeout(this.timeout); | ||
return this.worker(pendingEntries, this.context); | ||
} | ||
|
||
clear() { | ||
clearTimeout(this.timeout); | ||
this.pendingEntries = []; | ||
} | ||
} | ||
|
||
export default Batcher; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
import waitUntil from './waitUntil'; | ||
|
||
class Queue { | ||
pendingEntries = []; | ||
|
||
inFlight = 0; | ||
|
||
err = null; | ||
|
||
constructor(worker, options = {}) { | ||
this.worker = worker; | ||
this.concurrency = options.concurrency || 1; | ||
} | ||
|
||
push = entries => { | ||
this.pendingEntries = this.pendingEntries.concat(entries); | ||
this.process(); | ||
}; | ||
|
||
process = () => { | ||
const scheduled = this.pendingEntries.splice(0, this.concurrency - this.inFlight); | ||
this.inFlight += scheduled.length; | ||
scheduled.forEach(async task => { | ||
try { | ||
await this.worker(task); | ||
} catch (err) { | ||
this.err = err; | ||
} finally { | ||
this.inFlight -= 1; | ||
} | ||
|
||
if (this.pendingEntries.length > 0) { | ||
this.process(); | ||
} | ||
}); | ||
}; | ||
|
||
wait = (options = {}) => { | ||
return waitUntil( | ||
() => { | ||
if (this.err) { | ||
this.pendingEntries = []; | ||
throw this.err; | ||
} | ||
|
||
return { | ||
predicate: options.empty | ||
? this.inFlight === 0 && this.pendingEntries.length === 0 | ||
: this.concurrency > this.pendingEntries.length, | ||
}; | ||
}, | ||
{ | ||
delay: 50, | ||
}, | ||
); | ||
}; | ||
} | ||
|
||
export default Queue; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
# Waterfall | ||
|
||
A set of utility functions for handling async/await at scale. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
async function forEach(array, iteratee) { | ||
for (let i = 0; i < array.length; i += 1) { | ||
// eslint-disable-next-line no-await-in-loop | ||
await iteratee(array[i], i); | ||
} | ||
} | ||
|
||
export default forEach; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,126 @@ | ||
// The API is inspired by console.time | ||
// The implementation is isomorphic. | ||
import warning from 'warning'; | ||
|
||
const times = new Map(); | ||
|
||
const implementations = { | ||
mark: { | ||
start: name => { | ||
times.set(name, performance.now()); | ||
performance.mark(`metric_${name}_start`); | ||
}, | ||
end: name => { | ||
const endMark = `metric_${name}_end`; | ||
performance.mark(endMark); | ||
const startMark = `metric_${name}_start`; | ||
performance.measure(name, startMark, endMark); | ||
const duration = performance.getEntriesByName(name)[0].duration; | ||
return duration; | ||
}, | ||
}, | ||
now: { | ||
start: name => { | ||
times.set(name, performance.now()); | ||
}, | ||
end: name => { | ||
const time = times.get(name); | ||
const duration = performance.now() - time; | ||
return duration; | ||
}, | ||
}, | ||
hrtime: { | ||
start: name => { | ||
// https://nodejs.org/api/process.html#process_process_hrtime_time | ||
times.set(name, process.hrtime()); | ||
}, | ||
end: name => { | ||
const time = times.get(name); | ||
const durations = process.hrtime(time); | ||
const duration = durations[0] / 1e3 + durations[1] / 1e6; | ||
return duration; | ||
}, | ||
}, | ||
}; | ||
|
||
let getImplementationCache; | ||
|
||
function getImplementation() { | ||
if (getImplementationCache) { | ||
return getImplementationCache; | ||
} | ||
|
||
if (typeof performance !== 'undefined' && performance.mark) { | ||
getImplementationCache = implementations.mark; | ||
} else if (typeof performance !== 'undefined' && performance.now) { | ||
getImplementationCache = implementations.now; | ||
} else if (process.hrtime) { | ||
getImplementationCache = implementations.hrtime; | ||
} else { | ||
throw new Error('No performance API available'); | ||
} | ||
|
||
return getImplementationCache; | ||
} | ||
|
||
class Metric { | ||
/** | ||
* Call to begin a measurement. | ||
*/ | ||
static start(name) { | ||
warning(!times.get(name), 'Recording already started'); | ||
getImplementation().start(name); | ||
} | ||
|
||
/** | ||
* Returns the duration of the timing metric. The unit is milliseconds. | ||
* @type {number} | ||
*/ | ||
static end(name) { | ||
if (!times.get(name)) { | ||
throw new Error(`No such name '${name}' for metric`); | ||
} | ||
|
||
const duration = getImplementation().end(name); | ||
times.delete(name); | ||
return duration; | ||
} | ||
|
||
name = ''; | ||
|
||
/** | ||
* @param {string} name A name for the metric. | ||
*/ | ||
constructor(name) { | ||
if (!name) { | ||
throw new Error('Please provide a metric name'); | ||
} | ||
|
||
this.name = name; | ||
} | ||
|
||
/** | ||
* Call to begin a measurement. | ||
*/ | ||
start(name) { | ||
if (name) { | ||
throw new Error('The name argument is not supported'); | ||
} | ||
|
||
Metric.start(this.name); | ||
} | ||
|
||
/** | ||
* Returns the duration of the timing metric. The unit is milliseconds. | ||
* @type {number} | ||
*/ | ||
end(name) { | ||
if (name) { | ||
throw new Error('The name argument is not supported'); | ||
} | ||
|
||
return Metric.end(this.name); | ||
} | ||
} | ||
|
||
export default Metric; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
// Inspired by https://github.com/zeit/async-retry | ||
// Without the retry dependency (1 kB gzipped +) | ||
async function retry(tryFunction, options = {}) { | ||
const { retries = 3 } = options; | ||
|
||
let tries = 0; | ||
let output = null; | ||
let exitErr = null; | ||
|
||
const bail = err => { | ||
exitErr = err; | ||
}; | ||
|
||
while (tries < retries) { | ||
tries += 1; | ||
try { | ||
// eslint-disable-next-line no-await-in-loop | ||
output = await tryFunction({ tries, bail }); | ||
break; | ||
} catch (err) { | ||
if (tries >= retries) { | ||
throw err; | ||
} | ||
} | ||
} | ||
|
||
if (exitErr) { | ||
throw exitErr; | ||
} | ||
|
||
return output; | ||
} | ||
|
||
export default retry; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
function sleep(delay = 0) { | ||
return new Promise(resolve => { | ||
setTimeout(resolve, delay); | ||
}); | ||
} | ||
|
||
export default sleep; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
import sleep from './sleep'; | ||
|
||
export default async function waitUntil(test, options = {}) { | ||
const { delay = 5e3, tries = -1 } = options; | ||
const { predicate, result } = await test(); | ||
|
||
if (predicate) { | ||
return result; | ||
} | ||
|
||
if (tries - 1 === 0) { | ||
throw new Error('tries limit reached'); | ||
} | ||
|
||
await sleep(delay); | ||
return waitUntil(test, { ...options, tries: tries > 0 ? tries - 1 : tries }); | ||
} |
Oops, something went wrong.