Skip to content

Commit

Permalink
chore: simplify
Browse files Browse the repository at this point in the history
  • Loading branch information
achingbrain committed Nov 30, 2023
1 parent 81e3fcb commit fb11afc
Show file tree
Hide file tree
Showing 11 changed files with 254 additions and 348 deletions.
2 changes: 1 addition & 1 deletion packages/libp2p-daemon-client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@
"@libp2p/peer-id": "^4.0.0",
"@libp2p/tcp": "^9.0.0",
"@multiformats/multiaddr": "^12.1.3",
"it-stream-types": "^2.0.1",
"it-protobuf-stream": "^1.1.1",
"multiformats": "^12.0.1",
"uint8arraylist": "^2.4.3"
},
Expand Down
118 changes: 33 additions & 85 deletions packages/libp2p-daemon-client/src/dht.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@ import {
} from '@libp2p/daemon-protocol'
import { CodeError } from '@libp2p/interface'
import { isPeerId, type PeerId, type PeerInfo } from '@libp2p/interface'
import { logger } from '@libp2p/logger'
import { peerIdFromBytes } from '@libp2p/peer-id'
import { multiaddr } from '@multiformats/multiaddr'
import { CID } from 'multiformats/cid'
import type { DaemonClient } from './index.js'

const log = logger('libp2p:daemon-client:dht')

export class DHT {
private readonly client: DaemonClient

Expand Down Expand Up @@ -39,15 +42,11 @@ export class DHT {
}
})

const message = await sh.read()

if (message == null) {
throw new CodeError('Empty response from remote', 'ERR_EMPTY_RESPONSE')
}
const response = await sh.read(Response)

const response = Response.decode(message)
log('read', response)

await sh.close()
await sh.unwrap().close()

if (response.type !== Response.Type.OK) {
throw new CodeError(response.error?.msg ?? 'DHT put failed', 'ERR_DHT_PUT_FAILED')
Expand All @@ -70,15 +69,9 @@ export class DHT {
}
})

const message = await sh.read()
const response = await sh.read(Response)

if (message == null) {
throw new CodeError('Empty response from remote', 'ERR_EMPTY_RESPONSE')
}

const response = Response.decode(message)

await sh.close()
await sh.unwrap().close()

if (response.type !== Response.Type.OK) {
throw new CodeError(response.error?.msg ?? 'DHT get failed', 'ERR_DHT_GET_FAILED')
Expand Down Expand Up @@ -107,15 +100,9 @@ export class DHT {
}
})

const message = await sh.read()

if (message == null) {
throw new CodeError('Empty response from remote', 'ERR_EMPTY_RESPONSE')
}

const response = Response.decode(message)
const response = await sh.read(Response)

await sh.close()
await sh.unwrap().close()

if (response.type !== Response.Type.OK) {
throw new CodeError(response.error?.msg ?? 'DHT find peer failed', 'ERR_DHT_FIND_PEER_FAILED')
Expand All @@ -127,8 +114,7 @@ export class DHT {

return {
id: peerIdFromBytes(response.dht.peer.id),
multiaddrs: response.dht.peer.addrs.map((a) => multiaddr(a)),
protocols: []
multiaddrs: response.dht.peer.addrs.map((a) => multiaddr(a))
}
}

Expand All @@ -148,15 +134,9 @@ export class DHT {
}
})

const message = await sh.read()

if (message == null) {
throw new CodeError('Empty response from remote', 'ERR_EMPTY_RESPONSE')
}
const response = await sh.read(Response)

const response = Response.decode(message)

await sh.close()
await sh.unwrap().close()

if (response.type !== Response.Type.OK) {
throw new CodeError(response.error?.msg ?? 'DHT provide failed', 'ERR_DHT_PROVIDE_FAILED')
Expand All @@ -180,45 +160,32 @@ export class DHT {
}
})

let message = await sh.read()

if (message == null) {
throw new CodeError('Empty response from remote', 'ERR_EMPTY_RESPONSE')
}

// stream begin message
const response = Response.decode(message)
const response = await sh.read(Response)

if (response.type !== Response.Type.OK) {
await sh.close()
await sh.unwrap().close()
throw new CodeError(response.error?.msg ?? 'DHT find providers failed', 'ERR_DHT_FIND_PROVIDERS_FAILED')
}

while (true) {
message = await sh.read()

if (message == null) {
throw new CodeError('Empty response from remote', 'ERR_EMPTY_RESPONSE')
}

const response = DHTResponse.decode(message)
const dhtResponse = await sh.read(DHTResponse)

// Stream end
if (response.type === DHTResponse.Type.END) {
await sh.close()
if (dhtResponse.type === DHTResponse.Type.END) {
await sh.unwrap().close()
return
}

// Stream values
if (response.type === DHTResponse.Type.VALUE && response.peer != null && response.peer?.addrs != null) {
if (dhtResponse.type === DHTResponse.Type.VALUE && dhtResponse.peer != null && dhtResponse.peer?.addrs != null) {
yield {
id: peerIdFromBytes(response.peer.id),
multiaddrs: response.peer.addrs.map((a) => multiaddr(a)),
protocols: []
id: peerIdFromBytes(dhtResponse.peer.id),
multiaddrs: dhtResponse.peer.addrs.map((a) => multiaddr(a))
}
} else {
// Unexpected message received
await sh.close()
await sh.unwrap().close()
throw new CodeError('unexpected message received', 'ERR_UNEXPECTED_MESSAGE_RECEIVED')
}
}
Expand All @@ -241,46 +208,33 @@ export class DHT {
})

// stream begin message
let message = await sh.read()

if (message == null) {
throw new CodeError('Empty response from remote', 'ERR_EMPTY_RESPONSE')
}

const response = Response.decode(message)
const response = await sh.read(Response)

if (response.type !== Response.Type.OK) {
await sh.close()
await sh.unwrap().close()
throw new CodeError(response.error?.msg ?? 'DHT find providers failed', 'ERR_DHT_FIND_PROVIDERS_FAILED')
}

while (true) {
message = await sh.read()

if (message == null) {
throw new CodeError('Empty response from remote', 'ERR_EMPTY_RESPONSE')
}

const response = DHTResponse.decode(message)
const dhtResponse = await sh.read(DHTResponse)

// Stream end
if (response.type === DHTResponse.Type.END) {
await sh.close()
if (dhtResponse.type === DHTResponse.Type.END) {
await sh.unwrap().close()
return
}

// Stream values
if (response.type === DHTResponse.Type.VALUE && response.value != null) {
const peerId = peerIdFromBytes(response.value)
if (dhtResponse.type === DHTResponse.Type.VALUE && dhtResponse.value != null) {
const peerId = peerIdFromBytes(dhtResponse.value)

yield {
id: peerId,
multiaddrs: [],
protocols: []
multiaddrs: []
}
} else {
// Unexpected message received
await sh.close()
await sh.unwrap().close()
throw new CodeError('unexpected message received', 'ERR_UNEXPECTED_MESSAGE_RECEIVED')
}
}
Expand All @@ -302,15 +256,9 @@ export class DHT {
}
})

const message = await sh.read()

if (message == null) {
throw new CodeError('Empty response from remote', 'ERR_EMPTY_RESPONSE')
}

const response = Response.decode(message)
const response = await sh.read(Response)

await sh.close()
await sh.unwrap().close()

if (response.type !== Response.Type.OK) {
throw new CodeError(response.error?.msg ?? 'DHT get public key failed', 'ERR_DHT_GET_PUBLIC_KEY_FAILED')
Expand Down
Loading

0 comments on commit fb11afc

Please sign in to comment.