Skip to content

Commit

Permalink
feat: add pinning API (#36)
Browse files Browse the repository at this point in the history
Adds the pinning API with the reference counting implementation as
specified in #28 - see that issue for discussion

Benchmarks prove reference counting is faster than DAG traversal during
gc, see [this comment for results &
discussion](#36 (comment)).

Closes: #28
  • Loading branch information
achingbrain authored Feb 24, 2023
1 parent 97da23e commit 270bb98
Show file tree
Hide file tree
Showing 25 changed files with 1,861 additions and 51 deletions.
36 changes: 36 additions & 0 deletions benchmarks/gc/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
{
"name": "benchmarks-gc",
"version": "1.0.0",
"main": "index.js",
"private": true,
"type": "module",
"scripts": {
"clean": "aegir clean",
"build": "aegir build --bundle false",
"lint": "aegir lint",
"dep-check": "aegir dep-check",
"start": "npm run build && node dist/src/index.js"
},
"devDependencies": {
"@chainsafe/libp2p-noise": "^11.0.0",
"@chainsafe/libp2p-yamux": "^3.0.5",
"@ipld/dag-pb": "^4.0.2",
"@libp2p/websockets": "^5.0.3",
"aegir": "^38.1.5",
"blockstore-datastore-adapter": "^5.0.0",
"datastore-core": "^8.0.4",
"datastore-fs": "^8.0.0",
"datastore-level": "^9.0.4",
"execa": "^7.0.0",
"go-ipfs": "^0.18.1",
"helia": "~0.0.0",
"ipfs-core": "^0.18.0",
"ipfsd-ctl": "^13.0.0",
"it-all": "^2.0.0",
"it-drain": "^2.0.0",
"kubo-rpc-client": "^3.0.1",
"libp2p": "^0.42.2",
"multiformats": "^11.0.1",
"tinybench": "^2.3.1"
}
}
47 changes: 47 additions & 0 deletions benchmarks/gc/src/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# GC Benchmark

Benchmarks Helia GC performance against js-ipfs and Kubo

- Removes any existing pins
- Creates 10000 DAGs with two nodes linked to by a root node that is pinned
- Creates 10000 unpinned blocks
- Runs GC to delete the unpinned blocks leaving the others intact

All three implementations use on-disk block/datastores to ensure a reasonable basis for comparison.

Warning! It can take a long time with realistic pinset sizes - on the order of a whole day.

To run:

1. Add `benchmarks/*` to the `workspaces` entry in the root `package.json` of this repo
3. Run
```console
$ npm run reset
$ npm i
$ npm run build
$ cd benchmarks/gc
$ npm start

> benchmarks-gc@1.0.0 start
> npm run build && node dist/src/index.js


> benchmarks-gc@1.0.0 build
> aegir build --bundle false

[14:51:28] tsc [started]
[14:51:33] tsc [completed]
generating Ed25519 keypair...
┌─────────┬────────────────┬─────────┬───────────┬──────┐
│ (index) │ Implementation │ ops/s │ ms/op │ runs │
├─────────┼────────────────┼─────────┼───────────┼──────┤
//... results here
```

## Graph

To output stats for a graph run:

```console
$ npm run build && node dist/src/graph.js
```
18 changes: 18 additions & 0 deletions benchmarks/gc/src/graph.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { execa } from 'execa'

const ITERATIONS = 2
const INCREMENT = 1000
const MAX = 10000

for (let i = 1; i <= MAX / INCREMENT; i ++) {
await execa('node', ['dist/src/index.js'], {
env: {
...process.env,
INCREMENT: (i * INCREMENT).toString(),
ITERATIONS: ITERATIONS.toString(),
ITERATION: i.toString()
},
stdout: 'inherit',
stderr: 'inherit'
})
}
86 changes: 86 additions & 0 deletions benchmarks/gc/src/helia.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import { createHelia, DAGWalker } from 'helia'
import { createLibp2p } from 'libp2p'
import { webSockets } from '@libp2p/websockets'
import { noise } from '@chainsafe/libp2p-noise'
import { yamux } from '@chainsafe/libp2p-yamux'
import type { GcBenchmark } from './index.js'
import * as dagPb from '@ipld/dag-pb'
import all from 'it-all'
import os from 'node:os'
import path from 'node:path'
import { LevelDatastore } from 'datastore-level'
import { BlockstoreDatastoreAdapter } from 'blockstore-datastore-adapter'
import { ShardingDatastore } from 'datastore-core/sharding'
import { NextToLast } from 'datastore-core/shard'
import { FsDatastore } from 'datastore-fs'
import drain from 'it-drain'

const dagPbWalker: DAGWalker = {
codec: dagPb.code,
async * walk (block) {
const node = dagPb.decode(block)

yield * node.Links.map(l => l.Hash)
}
}

export async function createHeliaBenchmark (): Promise<GcBenchmark> {
const repoPath = path.join(os.tmpdir(), `helia-${Math.random()}`)

const helia = await createHelia({
blockstore: new BlockstoreDatastoreAdapter(
new ShardingDatastore(
new FsDatastore(`${repoPath}/blocks`, {
extension: '.data'
}),
new NextToLast(2)
)
),
datastore: new LevelDatastore(`${repoPath}/data`),
libp2p: await createLibp2p({
transports: [
webSockets()
],
connectionEncryption: [
noise()
],
streamMuxers: [
yamux()
]
}),
dagWalkers: [
dagPbWalker
],
start: false
})

return {
async gc () {
await helia.gc()
},
async putBlocks (blocks) {
await drain(helia.blockstore.putMany(blocks))
},
async pin (cid) {
await helia.pins.add(cid)
},
async teardown () {
await helia.stop()
},
async clearPins () {
const pins = await all(helia.pins.ls())

for (const pin of pins) {
await helia.pins.rm(pin.cid)
}

return pins.length
},
isPinned: (cid) => {
return helia.pins.isPinned(cid)
},
hasBlock: (cid) => {
return helia.blockstore.has(cid)
}
}
}
193 changes: 193 additions & 0 deletions benchmarks/gc/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
import { Bench } from 'tinybench'
import { CID } from 'multiformats/cid'
import { createHeliaBenchmark } from './helia.js'
import { createIpfsBenchmark } from './ipfs.js'
import { createKuboBenchmark } from './kubo.js'
import * as dagPb from '@ipld/dag-pb'
import crypto from 'node:crypto'
import { sha256 } from 'multiformats/hashes/sha2'

const PINNED_DAG_COUNT = parseInt(process.env.INCREMENT ?? '10000')
const GARBAGE_BLOCK_COUNT = parseInt(process.env.INCREMENT ?? '10000')
const ITERATIONS = parseInt(process.env.ITERATIONS ?? '5')
const RESULT_PRECISION = 2

export interface GcBenchmark {
gc: () => Promise<void>
teardown: () => Promise<void>
pin: (cid: CID ) => Promise<void>
putBlocks: (blocks: Array<{ key: CID, value: Uint8Array }>) => Promise<void>
clearPins: () => Promise<number>
isPinned: (cid: CID) => Promise<boolean>
hasBlock: (cid: CID) => Promise<boolean>
}

const blocks: Array<{ key: CID, value: Uint8Array }> = []
const garbageBlocks: Array<{ key: CID, value: Uint8Array }> = []
const pins: CID[] = []

/**
* Create blocks that will be pinned and/or deleted
*/
async function generateBlocks (): Promise<void> {
// generate DAGs of two blocks linked by a root that will be pinned
for (let i = 0; i < PINNED_DAG_COUNT; i++) {
const block1 = dagPb.encode({
Data: crypto.randomBytes(5),
Links: []
})
const mh1 = await sha256.digest(block1)
const cid1 = CID.createV1(dagPb.code, mh1)

const block2 = dagPb.encode({
Data: crypto.randomBytes(5),
Links: []
})
const mh2 = await sha256.digest(block2)
const cid2 = CID.createV1(dagPb.code, mh2)

const block3 = dagPb.encode({
Links: [{
Hash: cid1,
Tsize: block1.length
}, {
Hash: cid2,
Tsize: block2.length
}]
})
const mh3 = await sha256.digest(block3)
const cid3 = CID.createV1(dagPb.code, mh3)

blocks.push({ key: cid1, value: block1 })
blocks.push({ key: cid2, value: block2 })
blocks.push({ key: cid3, value: block3 })
pins.push(cid3)
}

// generate garbage blocks that will be deleted
for (let i = 0; i < GARBAGE_BLOCK_COUNT; i++) {
const block = dagPb.encode({
Data: crypto.randomBytes(5),
Links: []
})
const mh = await sha256.digest(block)
const cid = CID.createV1(dagPb.code, mh)

garbageBlocks.push({ key: cid, value: block })
}
}

async function addBlocks (benchmark: GcBenchmark): Promise<void> {
// add all the blocks
await benchmark.putBlocks(blocks)
await benchmark.putBlocks(garbageBlocks)
}

async function pinBlocks (benchmark: GcBenchmark): Promise<void> {
// add all of the pins
for (const pin of pins) {
await benchmark.pin(pin)
}
}

const impls: Array<{ name: string, create: () => Promise<GcBenchmark>, results: { gc: number[], clearedPins: number[], addedBlocks: number[], pinnedBlocks: number[] }}> = [{
name: 'helia',
create: () => createHeliaBenchmark(),
results: {
gc: [],
clearedPins: [],
addedBlocks: [],
pinnedBlocks: []
}
}, {
name: 'ipfs',
create: () => createIpfsBenchmark(),
results: {
gc: [],
clearedPins: [],
addedBlocks: [],
pinnedBlocks: []
}
}, {
name: 'kubo',
create: () => createKuboBenchmark(),
results: {
gc: [],
clearedPins: [],
addedBlocks: [],
pinnedBlocks: []
}
}]

async function main (): Promise<void> {
let subject: GcBenchmark

await generateBlocks()

const suite = new Bench({
iterations: ITERATIONS,
time: 1
})

for (const impl of impls) {
suite.add(impl.name, async () => {
const start = Date.now()
await subject.gc()
impl.results.gc.push(Date.now() - start)
}, {
beforeAll: async () => {
subject = await impl.create()
},
beforeEach: async () => {
let start = Date.now()
const pinCount = await subject.clearPins()

if (pinCount > 0) {
impl.results.clearedPins.push(Date.now() - start)
}

start = Date.now()
await addBlocks(subject)
impl.results.addedBlocks.push(Date.now() - start)

start = Date.now()
await pinBlocks(subject)
impl.results.pinnedBlocks.push(Date.now() - start)
},
afterAll: async () => {
await subject.teardown()
}
})
}

await suite.run()

if (process.env.INCREMENT != null) {
if (process.env.ITERATION === '1') {
console.info('implementation, count, clear pins (ms), add blocks (ms), add pins (ms), gc (ms)')
}

for (const impl of impls) {
console.info(
`${impl.name},`,
`${process.env.INCREMENT},`,
`${(impl.results.clearedPins.reduce((acc, curr) => acc + curr, 0) / impl.results.clearedPins.length).toFixed(RESULT_PRECISION)},`,
`${(impl.results.addedBlocks.reduce((acc, curr) => acc + curr, 0) / impl.results.addedBlocks.length).toFixed(RESULT_PRECISION)},`,
`${(impl.results.pinnedBlocks.reduce((acc, curr) => acc + curr, 0) / impl.results.pinnedBlocks.length).toFixed(RESULT_PRECISION)},`,
`${(impl.results.gc.reduce((acc, curr) => acc + curr, 0) / impl.results.gc.length).toFixed(RESULT_PRECISION)}`,
)
}
} else {
console.table(suite.tasks.map(({ name, result }) => ({
'Implementation': name,
'ops/s': result?.hz.toFixed(RESULT_PRECISION),
'ms/op': result?.period.toFixed(RESULT_PRECISION),
'runs': result?.samples.length
})))
}
}

main().catch(err => {
console.error(err) // eslint-disable-line no-console
process.exit(1)
})
Loading

0 comments on commit 270bb98

Please sign in to comment.