Skip to content

Commit

Permalink
feat: add debounce and repeating task to utils
Browse files Browse the repository at this point in the history
This functionality is required in multiple places so add it to the
utils module.
  • Loading branch information
achingbrain committed Nov 2, 2024
1 parent ad5cfd6 commit 101c808
Show file tree
Hide file tree
Showing 6 changed files with 235 additions and 0 deletions.
8 changes: 8 additions & 0 deletions packages/utils/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@
"types": "./dist/src/close-source.d.ts",
"import": "./dist/src/close-source.js"
},
"./debounce": {
"types": "./dist/src/debounce.d.ts",
"import": "./dist/src/debounce.js"
},
"./filters": {
"types": "./dist/src/filters/index.d.ts",
"import": "./dist/src/filters/index.js"
Expand Down Expand Up @@ -112,6 +116,10 @@
"types": "./dist/src/rate-limiter.d.ts",
"import": "./dist/src/rate-limiter.js"
},
"./repeating-task": {
"types": "./dist/src/repeating-task.d.ts",
"import": "./dist/src/repeating-task.js"
},
"./stream-to-ma-conn": {
"types": "./dist/src/stream-to-ma-conn.d.ts",
"import": "./dist/src/stream-to-ma-conn.js"
Expand Down
28 changes: 28 additions & 0 deletions packages/utils/src/debounce.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
export interface CancelableFunction {
(): void
stop(): void
}

/**
* Returns a function wrapper that will only call the passed function once
*
* Important - the passed function should not throw or reject
*/
export function debounce (func: () => void | Promise<void>, wait: number): CancelableFunction {
let timeout: ReturnType<typeof setTimeout> | undefined

const output = function (): void {
const later = function (): void {
timeout = undefined
void func()
}

clearTimeout(timeout)
timeout = setTimeout(later, wait)
}
output.stop = () => {
clearTimeout(timeout)
}

return output
}
82 changes: 82 additions & 0 deletions packages/utils/src/repeating-task.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import { setMaxListeners } from '@libp2p/interface'
import { anySignal } from 'any-signal'
import type { AbortOptions } from '@libp2p/interface'

export interface RepeatingTask {
start(): void
stop(): void
}

export interface RepeatingTaskOptions {
/**
* How long the task is allowed to run before the passed AbortSignal fires an
* abort event
*/
timeout?: number

/**
* Whether to schedule the task to run immediately
*/
runImmediately?: boolean
}

export function repeatingTask (fn: (options?: AbortOptions) => void | Promise<void>, interval: number, options?: RepeatingTaskOptions): RepeatingTask {
let timeout: ReturnType<typeof setTimeout>
let shutdownController: AbortController

function runTask (): void {
const opts: AbortOptions = {
signal: shutdownController.signal
}

if (options?.timeout != null) {
const signal = anySignal([shutdownController.signal, AbortSignal.timeout(options.timeout)])
setMaxListeners(Infinity, signal)

opts.signal = signal
}

Promise.resolve().then(async () => {
await fn(opts)
})
.catch(() => {})
.finally(() => {
if (shutdownController.signal.aborted) {
// task has been cancelled, bail
return
}

Check warning on line 47 in packages/utils/src/repeating-task.ts

View check run for this annotation

Codecov / codecov/patch

packages/utils/src/repeating-task.ts#L45-L47

Added lines #L45 - L47 were not covered by tests

// reschedule
timeout = setTimeout(runTask, interval)
})
}

let started = false

return {
start: () => {
if (started) {
return
}

Check warning on line 60 in packages/utils/src/repeating-task.ts

View check run for this annotation

Codecov / codecov/patch

packages/utils/src/repeating-task.ts#L59-L60

Added lines #L59 - L60 were not covered by tests

started = true
shutdownController = new AbortController()
setMaxListeners(Infinity, shutdownController.signal)

// run now
if (options?.runImmediately === true) {
queueMicrotask(() => {
runTask()
})
} else {
// run later
timeout = setTimeout(runTask, interval)
}
},
stop: () => {
clearTimeout(timeout)
shutdownController?.abort()
started = false
}
}
}
45 changes: 45 additions & 0 deletions packages/utils/test/debounce.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import { expect } from 'aegir/chai'
import delay from 'delay'
import { debounce } from '../src/debounce.js'

describe('debounce', () => {
it('should debounce function', async () => {
let invocations = 0
const fn = (): void => {
invocations++
}

const debounced = debounce(fn, 10)

debounced()
debounced()
debounced()
debounced()
debounced()

await delay(500)

expect(invocations).to.equal(1)
})

it('should cancel debounced function', async () => {
let invocations = 0
const fn = (): void => {
invocations++
}

Check warning on line 29 in packages/utils/test/debounce.spec.ts

View check run for this annotation

Codecov / codecov/patch

packages/utils/test/debounce.spec.ts#L28-L29

Added lines #L28 - L29 were not covered by tests

const debounced = debounce(fn, 10000)

debounced()
debounced()
debounced()
debounced()
debounced()

debounced.stop()

await delay(500)

expect(invocations).to.equal(0)
})
})
70 changes: 70 additions & 0 deletions packages/utils/test/repeating-task.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import { expect } from 'aegir/chai'
import delay from 'delay'
import pDefer from 'p-defer'
import { repeatingTask } from '../src/repeating-task.js'

describe('repeating-task', () => {
it('should repeat a task', async () => {
let count = 0

const task = repeatingTask(() => {
count++
}, 100)
task.start()

await delay(1000)

task.stop()

expect(count).to.be.greaterThan(1)
})

it('should run a task immediately', async () => {
let count = 0

const task = repeatingTask(() => {
count++
}, 60000, {
runImmediately: true
})
task.start()

await delay(10)

task.stop()

expect(count).to.equal(1)
})

it('should time out a task', async () => {
const deferred = pDefer()

const task = repeatingTask((opts) => {
opts?.signal?.addEventListener('abort', () => {
deferred.resolve()
})
}, 100, {
timeout: 10
})
task.start()

await deferred.promise
task.stop()
})

it('should repeat a task that throws', async () => {
let count = 0

const task = repeatingTask(() => {
count++
throw new Error('Urk!')
}, 100)
task.start()

await delay(1000)

task.stop()

expect(count).to.be.greaterThan(1)
})
})
2 changes: 2 additions & 0 deletions packages/utils/typedoc.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"./src/array-equals.ts",
"./src/close.ts",
"./src/close-source.ts",
"./src/debounce.ts",
"./src/filters/index.ts",
"./src/ip-port-to-multiaddr.ts",
"./src/is-promise.ts",
Expand All @@ -18,6 +19,7 @@
"./src/private-ip.ts",
"./src/queue/index.ts",
"./src/rate-limiter.ts",
"./src/repeating-task.ts",
"./src/stream-to-ma-conn.ts",
"./src/tracked-list.ts",
"./src/tracked-map.ts"
Expand Down

0 comments on commit 101c808

Please sign in to comment.