Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: add block streaming interface #44

Merged
merged 3 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions bitswap-fetcher.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import defer from 'p-defer'
import { pipe } from 'it-pipe'
import * as lp from 'it-length-prefixed'
import { base58btc } from 'multiformats/bases/base58'
import { identity } from 'multiformats/hashes/identity'
import debug from 'debug'
import { Entry, Message, BlockPresenceType } from './message.js'
import * as Prefix from './prefix.js'
Expand Down Expand Up @@ -91,6 +92,10 @@ export class BitswapFetcher {
throw options.signal.reason || abortError()
}

if (cid.code === identity.code) {
return { cid, bytes: cid.multihash.digest }
}

// ensure we can hash the data when we receive the block
if (!this.#hashers[cid.multihash.code]) {
throw new Error(`missing hasher: 0x${cid.multihash.code.toString(16)} for wanted block: ${cid}`)
Expand Down Expand Up @@ -123,6 +128,35 @@ export class BitswapFetcher {
return deferred.promise
}

/**
* @param {import('multiformats').UnknownLink} cid
* @param {{ range?: import('./index').Range, signal?: AbortSignal }} [options]
*/
async stream (cid, options) {
const block = await this.get(cid, options)
if (!block) return

return /** @type {ReadableStream<Uint8Array>} */ (new ReadableStream({
pull (controller) {
const { range } = options ?? {}
const bytes = range
? block.bytes.slice(range[0], range[1] && (range[1] + 1))
: block.bytes
controller.enqueue(bytes)
controller.close()
}
}))
}

/**
* @param {import('multiformats').UnknownLink} cid
* @param {{ signal?: AbortSignal }} [options]
*/
async stat (cid, options) {
const block = await this.get(cid, options)
if (block) return { size: block.bytes.length }
}

/** @type {import('@libp2p/interface-registrar').StreamHandler} */
async handler ({ stream }) {
log('incoming stream')
Expand Down
154 changes: 105 additions & 49 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import type { Stream } from '@libp2p/interface-connection'
import type { StreamHandler } from '@libp2p/interface-registrar'
import type { PeerId } from '@libp2p/interface-peer-id'

export type { AbortOptions }

export interface BlockDecoders {
[code: number]: BlockDecoder<any, any>
}
Expand All @@ -21,8 +23,26 @@ export interface Block {
bytes: Uint8Array
}

export interface Blockstore {
get: (cid: UnknownLink, options?: { signal?: AbortSignal }) => Promise<Block | undefined>
export interface BlockStat {
/** Total size in bytes of the block. */
size: number
}

export interface Blockstore extends BlockGetter, BlockStreamer, BlockInspecter {}

export interface BlockGetter {
/** Retrieve a block. */
get: (cid: UnknownLink, options?: AbortOptions) => Promise<Block|undefined>
}

export interface BlockStreamer {
/** Stream bytes from a block. */
stream: (cid: UnknownLink, options?: AbortOptions & RangeOptions) => Promise<ReadableStream<Uint8Array>|undefined>
}

export interface BlockInspecter {
/** Retrieve information about a block. */
stat: (cid: UnknownLink, options?: AbortOptions) => Promise<BlockStat|undefined>
}

export interface Network {
Expand Down Expand Up @@ -63,27 +83,66 @@ export interface DagScopeOptions {
}

/**
* Specifies a range of bytes.
* - `*` can be substituted for end-of-file
* - `{ from: 0, to: '*' }` is the entire file.
* - Negative numbers can be used for referring to bytes from the end of a file
* - `{ from: -1024, to: '*' }` is the last 1024 bytes of a file.
* - It is also permissible to ask for the range of 500 bytes from the
* beginning of the file to 1000 bytes from the end: `{ from: 499, to: -1000 }`
* An absolute byte range to extract - always an array of two values
* corresponding to the first and last bytes (both inclusive). e.g.
*
* ```
* [100, 200]
* ```
*/
export interface ByteRange {
/** Byte-offset of the first byte in a range (inclusive) */
from: number
/** Byte-offset of the last byte in the range (inclusive) */
to: number|'*'
}
export type AbsoluteRange = [first: number, last: number]

/**
* An suffix byte range - always an array of one value corresponding to the
* first byte to start extraction from (inclusive). e.g.
*
* ```
* [900]
* ```
*
* If it is unknown how large a resource is, the last `n` bytes
* can be requested by specifying a negative value:
*
* ```
* [-100]
* ```
*/
export type SuffixRange = [first: number]

/**
* Byte range to extract - an array of one or two values corresponding to the
* first and last bytes (both inclusive). e.g.
*
* ```
* [100, 200]
* ```
*
* Omitting the second value requests all remaining bytes of the resource. e.g.
*
* ```
* [900]
* ```
*
* Alternatively, if it's unknown how large a resource is, the last `n` bytes
* can be requested by specifying a negative value:
*
* ```
* [-100]
* ```
*/
export type Range = AbsoluteRange | SuffixRange

export interface EntityBytesOptions {
/**
* A specific byte range from the entity. Setting entity bytes implies DAG
* scope: entity.
*/
entityBytes?: ByteRange
entityBytes?: Range
}

export interface RangeOptions {
/** Extracts a specific byte range from the resource. */
range?: Range
}

/**
Expand All @@ -110,49 +169,46 @@ export interface BlockOrderOptions {
order?: BlockOrder
}

export interface IDagula {
/**
* Get a complete DAG by root CID.
*/
/** @deprecated Use `BlockService`, `DagService` and `UnixfsService` interface instead. */
export interface IDagula extends BlockService, DagService, UnixfsService {}

export interface BlockService {
/** Get a single block. */
getBlock (cid: UnknownLink|string, options?: AbortOptions): Promise<Block>
/** Retrieve information about a block. */
statBlock (cid: UnknownLink|string, options?: AbortOptions): Promise<BlockStat>
/** Stream bytes from a single block. */
streamBlock (cid: UnknownLink|string, options?: AbortOptions & RangeOptions): Promise<ReadableStream<Uint8Array>>
}

export interface DagService {
/** Get a complete DAG by root CID. */
get (cid: UnknownLink|string, options?: AbortOptions & BlockOrderOptions): AsyncIterableIterator<Block>
/**
* Get a DAG for a cid+path.
*/
/** Get a DAG for a cid+path. */
getPath (cidPath: string, options?: AbortOptions & DagScopeOptions & EntityBytesOptions & BlockOrderOptions): AsyncIterableIterator<Block>
/**
* Get a single block.
*/
getBlock (cid: UnknownLink|string, options?: AbortOptions): Promise<Block>
/**
* Get UnixFS files and directories.
*/
}

export interface UnixfsService {
/** Get UnixFS files and directories. */
getUnixfs (path: UnknownLink|string, options?: AbortOptions): Promise<UnixFSEntry>
/**
* Emit nodes for all path segements and get UnixFS files and directories.
*/
/** Emit nodes for all path segements and get UnixFS files and directories. */
walkUnixfsPath (path: UnknownLink|string, options?: AbortOptions): AsyncIterableIterator<UnixFSEntry>
}

export declare class Dagula implements IDagula {
export declare class Dagula implements BlockService, DagService, UnixfsService {
Copy link
Member Author

@alanshaw alanshaw May 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: I'm planning on separating this into multiple classes that implement the respective interfaces. However it's going to take a bit of time because the unixfs specific stuff is all mixed up with the generic DAG traversal stuff.

WIP: #45

constructor (blockstore: Blockstore, options?: { decoders?: BlockDecoders, hashers?: MultihashHashers })
/**
* Get a complete DAG by root CID.
*/
/** Get a complete DAG by root CID. */
get (cid: UnknownLink|string, options?: AbortOptions & BlockOrderOptions): AsyncIterableIterator<Block>
/**
* Get a DAG for a cid+path.
*/
/** Get a DAG for a cid+path. */
getPath (cidPath: string, options?: AbortOptions & DagScopeOptions & EntityBytesOptions & BlockOrderOptions): AsyncIterableIterator<Block>
/**
* Get a single block.
*/
/** Get a single block. */
getBlock (cid: UnknownLink|string, options?: AbortOptions): Promise<Block>
/**
* Get UnixFS files and directories.
*/
/** Retrieve information about a block. */
statBlock (cid: UnknownLink|string, options?: AbortOptions): Promise<BlockStat>
/** Stream bytes from a single block. */
streamBlock (cid: UnknownLink|string, options?: AbortOptions & RangeOptions): Promise<ReadableStream<Uint8Array>>
/** Get UnixFS files and directories. */
getUnixfs (path: UnknownLink|string, options?: AbortOptions): Promise<UnixFSEntry>
/**
* Emit nodes for all path segements and get UnixFS files and directories.
*/
/** Emit nodes for all path segements and get UnixFS files and directories. */
walkUnixfsPath (path: UnknownLink|string, options?: AbortOptions): AsyncIterableIterator<UnixFSEntry>
}
Loading
Loading