Skip to content

Commit

Permalink
feat!: update to libp2p@2.x.x
Browse files Browse the repository at this point in the history
Incorporates API changes appearing in libp2p@2.x.x.

BREAKING CHANGE: can only be used with libp2p@2.x.x
  • Loading branch information
achingbrain committed Sep 2, 2024
1 parent a60d9d7 commit a4689b9
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 33 deletions.
13 changes: 7 additions & 6 deletions packages/libp2p-daemon-client/src/dht.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ import {
import { CodeError } from '@libp2p/interface'
import { isPeerId, type PeerId, type PeerInfo } from '@libp2p/interface'
import { logger } from '@libp2p/logger'
import { peerIdFromBytes } from '@libp2p/peer-id'
import { peerIdFromMultihash } from '@libp2p/peer-id'
import { multiaddr } from '@multiformats/multiaddr'
import { CID } from 'multiformats/cid'
import * as Digest from 'multiformats/hashes/digest'
import type { DaemonClient } from './index.js'

const log = logger('libp2p:daemon-client:dht')
Expand Down Expand Up @@ -96,7 +97,7 @@ export class DHT {
type: Request.Type.DHT,
dht: {
type: DHTRequest.Type.FIND_PEER,
peer: peerId.toBytes()
peer: peerId.toMultihash().bytes
}
})

Expand All @@ -113,7 +114,7 @@ export class DHT {
}

return {
id: peerIdFromBytes(response.dht.peer.id),
id: peerIdFromMultihash(Digest.decode(response.dht.peer.id)),
multiaddrs: response.dht.peer.addrs.map((a) => multiaddr(a))
}
}
Expand Down Expand Up @@ -180,7 +181,7 @@ export class DHT {
// Stream values
if (dhtResponse.type === DHTResponse.Type.VALUE && dhtResponse.peer?.addrs != null) {
yield {
id: peerIdFromBytes(dhtResponse.peer.id),
id: peerIdFromMultihash(Digest.decode(dhtResponse.peer.id)),
multiaddrs: dhtResponse.peer.addrs.map((a) => multiaddr(a))
}
} else {
Expand Down Expand Up @@ -226,7 +227,7 @@ export class DHT {

// Stream values
if (dhtResponse.type === DHTResponse.Type.VALUE && dhtResponse.value != null) {
const peerId = peerIdFromBytes(dhtResponse.value)
const peerId = peerIdFromMultihash(Digest.decode(dhtResponse.value))

yield {
id: peerId,
Expand All @@ -252,7 +253,7 @@ export class DHT {
type: Request.Type.DHT,
dht: {
type: DHTRequest.Type.GET_PUBLIC_KEY,
peer: peerId.toBytes()
peer: peerId.toMultihash().bytes
}
})

Expand Down
11 changes: 6 additions & 5 deletions packages/libp2p-daemon-client/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ import { StreamHandler } from '@libp2p/daemon-protocol/stream-handler'
import { passThroughUpgrader } from '@libp2p/daemon-protocol/upgrader'
import { CodeError, isPeerId } from '@libp2p/interface'
import { defaultLogger, logger } from '@libp2p/logger'
import { peerIdFromBytes } from '@libp2p/peer-id'
import { peerIdFromMultihash } from '@libp2p/peer-id'
import { tcp } from '@libp2p/tcp'
import { multiaddr, isMultiaddr } from '@multiformats/multiaddr'
import { pbStream, type ProtobufStream } from 'it-protobuf-stream'
import * as Digest from 'multiformats/hashes/digest'
import { DHT } from './dht.js'
import { Pubsub } from './pubsub.js'
import type { Stream, PeerId, MultiaddrConnection, PeerInfo, Transport } from '@libp2p/interface'
Expand Down Expand Up @@ -82,7 +83,7 @@ class Client implements DaemonClient {
const sh = await this.send({
type: Request.Type.CONNECT,
connect: {
peer: peerId.toBytes(),
peer: peerId.toMultihash().bytes,
addrs: addrs.map((a) => a.bytes)
}
})
Expand Down Expand Up @@ -121,7 +122,7 @@ class Client implements DaemonClient {
throw new CodeError('Invalid response', 'ERR_IDENTIFY_FAILED')
}

const peerId = peerIdFromBytes(response.identify?.id)
const peerId = peerIdFromMultihash(Digest.decode(response.identify?.id))
const addrs = response.identify.addrs.map((a) => multiaddr(a))

await sh.unwrap().close()
Expand All @@ -145,7 +146,7 @@ class Client implements DaemonClient {

await sh.unwrap().close()

return response.peers.map((peer) => peerIdFromBytes(peer.id))
return response.peers.map((peer) => peerIdFromMultihash(Digest.decode(peer.id)))
}

/**
Expand All @@ -163,7 +164,7 @@ class Client implements DaemonClient {
const sh = await this.send({
type: Request.Type.STREAM_OPEN,
streamOpen: {
peer: peerId.toBytes(),
peer: peerId.toMultihash().bytes,
proto: [protocol]
}
})
Expand Down
5 changes: 3 additions & 2 deletions packages/libp2p-daemon-client/src/pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ import {
PSMessage
} from '@libp2p/daemon-protocol'
import { CodeError } from '@libp2p/interface'
import { peerIdFromBytes } from '@libp2p/peer-id'
import { peerIdFromMultihash } from '@libp2p/peer-id'
import * as Digest from 'multiformats/hashes/digest'
import type { DaemonClient, Subscription } from './index.js'
import type { PeerId } from '@libp2p/interface'

Expand Down Expand Up @@ -138,6 +139,6 @@ export class Pubsub {
throw new CodeError('Invalid response', 'ERR_PUBSUB_GET_SUBSCRIBERS_FAILED')
}

return response.pubsub.peerIDs.map(buf => peerIdFromBytes(buf))
return response.pubsub.peerIDs.map(buf => peerIdFromMultihash(Digest.decode(buf)))
}
}
6 changes: 3 additions & 3 deletions packages/libp2p-daemon-server/src/dht.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ export class DHTOperations {
if (event.name === 'PEER_RESPONSE') {
yield * event.closer.map(peer => DHTResponse.encode({
type: DHTResponse.Type.VALUE,
value: peer.id.toBytes()
value: peer.id.toMultihash().bytes
}))
}
}
Expand Down Expand Up @@ -97,7 +97,7 @@ export class DHTOperations {
dht: {
type: DHTResponse.Type.VALUE,
peer: {
id: event.peer.id.toBytes(),
id: event.peer.id.toMultihash().bytes,
addrs: event.peer.multiaddrs.map(m => m.bytes)
}
}
Expand Down Expand Up @@ -131,7 +131,7 @@ export class DHTOperations {
yield DHTResponse.encode({
type: DHTResponse.Type.VALUE,
peer: {
id: provider.id.toBytes(),
id: provider.id.toMultihash().bytes,
addrs: (provider.multiaddrs ?? []).map(m => m.bytes)
}
})
Expand Down
25 changes: 11 additions & 14 deletions packages/libp2p-daemon-server/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@ import {
} from '@libp2p/daemon-protocol'
import { passThroughUpgrader } from '@libp2p/daemon-protocol/upgrader'
import { defaultLogger, logger } from '@libp2p/logger'
import { peerIdFromBytes } from '@libp2p/peer-id'
import { peerIdFromMultihash } from '@libp2p/peer-id'
import { tcp } from '@libp2p/tcp'
import { multiaddr, protocols } from '@multiformats/multiaddr'
import * as lp from 'it-length-prefixed'
import { lpStream } from 'it-length-prefixed-stream'
import { pipe } from 'it-pipe'
import { pbStream } from 'it-protobuf-stream'
import { CID } from 'multiformats/cid'
import * as Digest from 'multiformats/hashes/digest'
import { DHTOperations } from './dht.js'
import { PubSubOperations } from './pubsub.js'
import { ErrorResponse, OkResponse } from './responses.js'
Expand Down Expand Up @@ -87,7 +88,7 @@ export class Server implements Libp2pServer {

const peer = request.connect.peer
const addrs = request.connect.addrs.map((a) => multiaddr(a))
const peerId = peerIdFromBytes(peer)
const peerId = peerIdFromMultihash(Digest.decode(peer))

log('connect - adding multiaddrs %a to peer %p', addrs, peerId)
await this.libp2p.peerStore.merge(peerId, {
Expand All @@ -107,21 +108,19 @@ export class Server implements Libp2pServer {
}

const { peer, proto } = request.streamOpen
const peerId = peerIdFromBytes(peer)
const peerId = peerIdFromMultihash(Digest.decode(peer))

log('openStream - dial %p', peerId)
const connection = await this.libp2p.dial(peerId)

log('openStream - open stream for protocol %s', proto)
const stream = await connection.newStream(proto, {
runOnTransientConnection: true,
// @ts-expect-error this has not been released yet
runOnLimitedConnection: true
})

return {
streamInfo: {
peer: peerId.toBytes(),
peer: peerId.toMultihash().bytes,
addr: connection.remoteAddr.bytes,
proto: stream.protocol ?? ''
},
Expand Down Expand Up @@ -155,7 +154,7 @@ export class Server implements Libp2pServer {
})

const message = StreamInfo.encode({
peer: connection.remotePeer.toBytes(),
peer: connection.remotePeer.toMultihash().bytes,
addr: connection.remoteAddr.bytes,
proto: stream.protocol ?? ''
})
Expand Down Expand Up @@ -194,8 +193,6 @@ export class Server implements Libp2pServer {
}
})
}, {
runOnTransientConnection: true,
// @ts-expect-error this has not been released yet
runOnLimitedConnection: true
})
}
Expand Down Expand Up @@ -257,7 +254,7 @@ export class Server implements Libp2pServer {
throw new Error('Invalid request')
}

const peerId = peerIdFromBytes(request.id) // eslint-disable-line no-case-declarations
const peerId = peerIdFromMultihash(Digest.decode(request.id)) // eslint-disable-line no-case-declarations
const peer = await this.libp2p.peerStore.get(peerId) // eslint-disable-line no-case-declarations
const protos = peer.protocols // eslint-disable-line no-case-declarations
yield OkResponse({ peerStore: { protos } })
Expand Down Expand Up @@ -331,7 +328,7 @@ export class Server implements Libp2pServer {
throw new Error('Invalid request')
}

yield * this.dhtOperations.findPeer(peerIdFromBytes(request.peer))
yield * this.dhtOperations.findPeer(peerIdFromMultihash(Digest.decode(request.peer)))
return
case DHTRequest.Type.FIND_PROVIDERS:
if (request.cid == null) {
Expand Down Expand Up @@ -359,7 +356,7 @@ export class Server implements Libp2pServer {
throw new Error('Invalid request')
}

yield * this.dhtOperations.getPublicKey(peerIdFromBytes(request.peer))
yield * this.dhtOperations.getPublicKey(peerIdFromMultihash(Digest.decode(request.peer)))
return
case DHTRequest.Type.GET_VALUE:
if (request.key == null) {
Expand Down Expand Up @@ -414,7 +411,7 @@ export class Server implements Libp2pServer {
await pb.write({
type: Response.Type.OK,
identify: {
id: daemon.libp2p.peerId.toBytes(),
id: daemon.libp2p.peerId.toMultihash().bytes,
addrs: daemon.libp2p.getMultiaddrs().map(ma => ma.decapsulateCode(protocols('p2p').code)).map(m => m.bytes)
}
}, Response)
Expand All @@ -436,7 +433,7 @@ export class Server implements Libp2pServer {
seen.add(peerId)

peers.push({
id: connection.remotePeer.toBytes(),
id: connection.remotePeer.toMultihash().bytes,
addrs: [connection.remoteAddr.bytes]
})
}
Expand Down
7 changes: 4 additions & 3 deletions packages/libp2p-daemon-server/src/pubsub.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/* eslint max-depth: ["error", 6] */

import { publicKeyToProtobuf } from '@libp2p/crypto/keys'
import {
PSMessage
} from '@libp2p/daemon-protocol'
Expand Down Expand Up @@ -52,12 +53,12 @@ export class PubSubOperations {

if (msg.type === 'signed') {
onMessage.push(PSMessage.encode({
from: msg.from.toBytes(),
from: msg.from.toMultihash().bytes,
data: msg.data,
seqno: msg.sequenceNumber == null ? undefined : uint8ArrayFromString(msg.sequenceNumber.toString(16).padStart(16, '0'), 'base16'),
topicIDs: [msg.topic],
signature: msg.signature,
key: msg.key
key: publicKeyToProtobuf(msg.key)
}).subarray())
} else {
onMessage.push(PSMessage.encode({
Expand Down Expand Up @@ -90,7 +91,7 @@ export class PubSubOperations {
yield OkResponse({
pubsub: {
topics: [topic],
peerIDs: this.pubsub.getSubscribers(topic).map(peer => peer.toBytes())
peerIDs: this.pubsub.getSubscribers(topic).map(peer => peer.toMultihash().bytes)
}
})
} catch (err: any) {
Expand Down

0 comments on commit a4689b9

Please sign in to comment.